Commit 43a48bf5 authored by john selbie's avatar john selbie

First attempt at enabling multi-threading

parent b08a72f0
.\" Automatically generated by Pandoc 1.16.0.2 .\" Automatically generated by Pandoc 1.19.2.4
.\" .\"
.TH "STUNCLIENT" "1" "" "January 22, 2012" "User Manual" .TH "STUNCLIENT" "1" "" "January 22, 2012" "User Manual"
.hy .hy
......
.\" Automatically generated by Pandoc 1.16.0.2 .\" Automatically generated by Pandoc 1.19.2.4
.\" .\"
.TH "STUNSERVER" "1" "" "January 22, 2012" "User Manual" .TH "STUNSERVER" "1" "" "January 22, 2012" "User Manual"
.hy .hy
...@@ -31,7 +31,9 @@ The following options are supported. ...@@ -31,7 +31,9 @@ The following options are supported.
\-\-ddp \-\-ddp
\-\-primaryadvertised \-\-primaryadvertised
\-\-altadvertised \-\-altadvertised
\-\-threading\ THREADCOUNT
\-\-configfile \-\-configfile
\-\-reuseaddr
\-\-help \-\-help
\f[] \f[]
.fi .fi
...@@ -206,6 +208,18 @@ correctly set these parameters for use within Amazon EC2. ...@@ -206,6 +208,18 @@ correctly set these parameters for use within Amazon EC2.
.PP .PP
* * * * * * * * * *
.PP .PP
\f[B]\-\-threading\f[] THREADCOUNT
.PP
The \-\-threading switch specifies the number of threads to use per
socket.
If 0 is specified for THREADCOUNT, this indicates the default mode,
which is that all sockets share a single thread.
This is the recommended value for most scenarios.
Specifying a higher number of threads is useful in scenarios where a
single core is not sufficient to process the entire network load.
.PP
* * * * *
.PP
\f[B]\-\-configfile\f[] FILENAME \f[B]\-\-configfile\f[] FILENAME
.PP .PP
The \-\-configfile switch allows the server to be configured with a JSON The \-\-configfile switch allows the server to be configured with a JSON
......
...@@ -29,7 +29,9 @@ The following options are supported. ...@@ -29,7 +29,9 @@ The following options are supported.
--ddp --ddp
--primaryadvertised --primaryadvertised
--altadvertised --altadvertised
--threading THREADCOUNT
--configfile --configfile
--reuseaddr
--help --help
Details of each option are as follows. Details of each option are as follows.
...@@ -186,6 +188,14 @@ For more details, visit www.stunprotocol.org for details on how to correctly set ...@@ -186,6 +188,14 @@ For more details, visit www.stunprotocol.org for details on how to correctly set
____ ____
**--threading** THREADCOUNT
The --threading switch specifies the number of threads to use per socket. If 0 is specified for THREADCOUNT, this indicates the default mode, which is
that all sockets share a single thread. This is the recommended value for most scenarios. Specifying a higher number of threads is useful in scenarios where a single
core is not sufficient to process the entire network load.
____
**--configfile** FILENAME **--configfile** FILENAME
The --configfile switch allows the server to be configured with a JSON configuration file rather The --configfile switch allows the server to be configured with a JSON configuration file rather
......
This diff is collapsed.
...@@ -129,6 +129,7 @@ struct StartupArgs ...@@ -129,6 +129,7 @@ struct StartupArgs
std::string strDosProtect; std::string strDosProtect;
std::string strConfigFile; std::string strConfigFile;
std::string strReuseAddr; std::string strReuseAddr;
std::string strThreading;
}; };
...@@ -151,6 +152,7 @@ void DumpStartupArgs(StartupArgs& args) ...@@ -151,6 +152,7 @@ void DumpStartupArgs(StartupArgs& args)
PRINTARG(strMaxConnections); PRINTARG(strMaxConnections);
PRINTARG(strDosProtect); PRINTARG(strDosProtect);
PRINTARG(strReuseAddr); PRINTARG(strReuseAddr);
PRINTARG(strThreading);
Logging::LogMsg(LL_DEBUG, "--------------------------\n"); Logging::LogMsg(LL_DEBUG, "--------------------------\n");
} }
...@@ -243,6 +245,8 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig* ...@@ -243,6 +245,8 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig*
bool fHasAtLeastTwoAdapters = false; bool fHasAtLeastTwoAdapters = false;
CStunServerConfig config; CStunServerConfig config;
int nMaxConnections = 0; int nMaxConnections = 0;
int threadcount = 0;
const char* pszPrimaryAdvertised = argsIn.strPrimaryAdvertised.c_str(); const char* pszPrimaryAdvertised = argsIn.strPrimaryAdvertised.c_str();
const char* pszAltAdvertised = argsIn.strAlternateAdvertised.c_str(); const char* pszAltAdvertised = argsIn.strAlternateAdvertised.c_str();
...@@ -279,6 +283,7 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig* ...@@ -279,6 +283,7 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig*
StringHelper::Trim(args.strPrimaryAdvertised); StringHelper::Trim(args.strPrimaryAdvertised);
StringHelper::Trim(args.strAlternateAdvertised); StringHelper::Trim(args.strAlternateAdvertised);
StringHelper::Trim(args.strThreading);
...@@ -513,6 +518,19 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig* ...@@ -513,6 +518,19 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig*
// ---- REUSE ADDRESS SWITCH ------------------------------------------- // ---- REUSE ADDRESS SWITCH -------------------------------------------
config.fReuseAddr = (argsIn.strReuseAddr.length() > 0); config.fReuseAddr = (argsIn.strReuseAddr.length() > 0);
// ---- THREADING ------------------------------------------------------
if (argsIn.strThreading.size() > 0)
{
hr = StringHelper::ValidateNumberString(argsIn.strThreading.c_str(), 0, 64, &threadcount);
if (FAILED(hr))
{
Logging::LogMsg(LL_ALWAYS, "Error with --threading. required argument must be between 0 - 64");
Chk(hr);
}
}
config.nThreadsPerSocket = threadcount;
*pConfigOut = config; *pConfigOut = config;
hr = S_OK; hr = S_OK;
...@@ -542,6 +560,7 @@ HRESULT ParseCommandLineArgs(int argc, char** argv, int startindex, StartupArgs* ...@@ -542,6 +560,7 @@ HRESULT ParseCommandLineArgs(int argc, char** argv, int startindex, StartupArgs*
cmdline.AddOption("ddp", no_argument, &pStartupArgs->strDosProtect); cmdline.AddOption("ddp", no_argument, &pStartupArgs->strDosProtect);
cmdline.AddOption("configfile", required_argument, &pStartupArgs->strConfigFile); cmdline.AddOption("configfile", required_argument, &pStartupArgs->strConfigFile);
cmdline.AddOption("reuseaddr", no_argument, &pStartupArgs->strReuseAddr); cmdline.AddOption("reuseaddr", no_argument, &pStartupArgs->strReuseAddr);
cmdline.AddOption("threading", required_argument, &pStartupArgs->strThreading);
cmdline.ParseCommandLine(argc, argv, startindex, &fError); cmdline.ParseCommandLine(argc, argv, startindex, &fError);
...@@ -598,6 +617,7 @@ HRESULT LoadConfigsFromFile(const std::string& filename, std::vector<StartupArgs ...@@ -598,6 +617,7 @@ HRESULT LoadConfigsFromFile(const std::string& filename, std::vector<StartupArgs
args.strMaxConnections = child.get("maxconn", ""); args.strMaxConnections = child.get("maxconn", "");
args.strDosProtect = child.get("ddp", ""); args.strDosProtect = child.get("ddp", "");
args.strReuseAddr = child.get("reuseaddr", ""); args.strReuseAddr = child.get("reuseaddr", "");
args.strThreading = child.get("threading", "");
configurations.push_back(args); configurations.push_back(args);
} }
...@@ -739,8 +759,6 @@ int main(int argc, char** argv) ...@@ -739,8 +759,6 @@ int main(int argc, char** argv)
BlockSignal(SIGTERM); BlockSignal(SIGTERM);
BlockSignal(SIGINT); BlockSignal(SIGINT);
ASSERT(false);
#ifdef DEBUG #ifdef DEBUG
Logging::SetLogLevel(LL_DEBUG); Logging::SetLogLevel(LL_DEBUG);
......
...@@ -29,7 +29,7 @@ fHasPP(false), ...@@ -29,7 +29,7 @@ fHasPP(false),
fHasPA(false), fHasPA(false),
fHasAP(false), fHasAP(false),
fHasAA(false), fHasAA(false),
fMultiThreadedMode(false), nThreadsPerSocket(0),
fTCP(false), fTCP(false),
nMaxConnections(0), // zero means default nMaxConnections(0), // zero means default
fEnableDosProtection(false), fEnableDosProtection(false),
...@@ -139,10 +139,11 @@ HRESULT CStunServer::Initialize(const CStunServerConfig& config) ...@@ -139,10 +139,11 @@ HRESULT CStunServer::Initialize(const CStunServerConfig& config)
{ {
Logging::LogMsg(LL_DEBUG, "Creating rate limiter for ddos protection\n"); Logging::LogMsg(LL_DEBUG, "Creating rate limiter for ddos protection\n");
// hard coding to 25000 ip addresses // hard coding to 25000 ip addresses
spLimiter = std::shared_ptr<RateLimiter>(new RateLimiter(25000, config.fMultiThreadedMode)); bool fMultiThreaded = (config.nThreadsPerSocket > 0);
spLimiter = std::shared_ptr<RateLimiter>(new RateLimiter(25000, fMultiThreaded));
} }
if (config.fMultiThreadedMode == false) if (config.nThreadsPerSocket <= 0)
{ {
Logging::LogMsg(LL_DEBUG, "Configuring single threaded mode\n"); Logging::LogMsg(LL_DEBUG, "Configuring single threaded mode\n");
...@@ -156,9 +157,9 @@ HRESULT CStunServer::Initialize(const CStunServerConfig& config) ...@@ -156,9 +157,9 @@ HRESULT CStunServer::Initialize(const CStunServerConfig& config)
} }
else else
{ {
Logging::LogMsg(LL_DEBUG, "Configuring multi-threaded mode\n"); Logging::LogMsg(LL_DEBUG, "Configuring multi-threaded mode with %d threads per socket\n", config.nThreadsPerSocket);
// one thread for every socket // N threads for every socket
CStunSocketThread* pThread = NULL; CStunSocketThread* pThread = NULL;
for (size_t index = 0; index < ARRAYSIZE(_arrSockets); index++) for (size_t index = 0; index < ARRAYSIZE(_arrSockets); index++)
{ {
...@@ -166,6 +167,8 @@ HRESULT CStunServer::Initialize(const CStunServerConfig& config) ...@@ -166,6 +167,8 @@ HRESULT CStunServer::Initialize(const CStunServerConfig& config)
{ {
SocketRole rolePrimaryRecv = _arrSockets[index].GetRole(); SocketRole rolePrimaryRecv = _arrSockets[index].GetRole();
ASSERT(rolePrimaryRecv == (SocketRole)index); ASSERT(rolePrimaryRecv == (SocketRole)index);
for (int t = 0; t < config.nThreadsPerSocket; t++)
{
pThread = new CStunSocketThread(); pThread = new CStunSocketThread();
ChkIf(pThread==NULL, E_OUTOFMEMORY); ChkIf(pThread==NULL, E_OUTOFMEMORY);
_threads.push_back(pThread); _threads.push_back(pThread);
...@@ -173,6 +176,7 @@ HRESULT CStunServer::Initialize(const CStunServerConfig& config) ...@@ -173,6 +176,7 @@ HRESULT CStunServer::Initialize(const CStunServerConfig& config)
} }
} }
} }
}
Cleanup: Cleanup:
......
...@@ -35,7 +35,7 @@ public: ...@@ -35,7 +35,7 @@ public:
bool fHasAP; // AP: Alternate ip, Primary port bool fHasAP; // AP: Alternate ip, Primary port
bool fHasAA; // AA: Alternate ip, Alternate port bool fHasAA; // AA: Alternate ip, Alternate port
bool fMultiThreadedMode; // if true, one thread for each socket int nThreadsPerSocket; // when set to > 0, each socket gets N threads assigned to it, otherwise, all sockets on 1 thread
bool fTCP; // if true, then use TCP instead of UDP bool fTCP; // if true, then use TCP instead of UDP
uint32_t nMaxConnections; // only valid for TCP (on a per-thread basis) uint32_t nMaxConnections; // only valid for TCP (on a per-thread basis)
......
...@@ -282,6 +282,7 @@ void CStunSocketThread::Run() ...@@ -282,6 +282,7 @@ void CStunSocketThread::Run()
char szIPRemote[100] = {}; char szIPRemote[100] = {};
char szIPLocal[100] = {}; char szIPLocal[100] = {};
bool allowed_to_pass = true; bool allowed_to_pass = true;
pthread_t threadid = pthread_self();
int sendsocketcount = 0; int sendsocketcount = 0;
...@@ -339,7 +340,7 @@ void CStunSocketThread::Run() ...@@ -339,7 +340,7 @@ void CStunSocketThread::Run()
szIPLocal[0] = '\0'; szIPLocal[0] = '\0';
} }
Logging::LogMsg(LL_VERBOSE, "recvfrom returns %d from %s on local interface %s", ret, szIPRemote, szIPLocal); Logging::LogMsg(LL_VERBOSE, "recvfrom returns %d from %s on local interface %s on thread %lu", ret, szIPRemote, szIPLocal, (unsigned long)threadid);
allowed_to_pass = (_spLimiter.get() != NULL) ? _spLimiter->RateCheck(_msgIn.addrRemote) : true; allowed_to_pass = (_spLimiter.get() != NULL) ? _spLimiter->RateCheck(_msgIn.addrRemote) : true;
......
...@@ -833,6 +833,7 @@ HRESULT CTCPServer::Initialize(const CStunServerConfig& config) ...@@ -833,6 +833,7 @@ HRESULT CTCPServer::Initialize(const CStunServerConfig& config)
TransportAddressSet tsaListenAll; TransportAddressSet tsaListenAll;
TransportAddressSet tsaHandler; TransportAddressSet tsaHandler;
std::shared_ptr<RateLimiter> spLimiter; std::shared_ptr<RateLimiter> spLimiter;
bool fMultithreaded = false; // hardwire TCP to be non-threaded until we can figure this all out
ChkIfA(_threads[0] != NULL, E_UNEXPECTED); // we can't already be initialized, right? ChkIfA(_threads[0] != NULL, E_UNEXPECTED); // we can't already be initialized, right?
...@@ -855,10 +856,10 @@ HRESULT CTCPServer::Initialize(const CStunServerConfig& config) ...@@ -855,10 +856,10 @@ HRESULT CTCPServer::Initialize(const CStunServerConfig& config)
if (config.fEnableDosProtection) if (config.fEnableDosProtection)
{ {
spLimiter = std::make_shared<RateLimiter>(20000, config.fMultiThreadedMode); spLimiter = std::make_shared<RateLimiter>(20000, fMultithreaded);
} }
if (config.fMultiThreadedMode == false) if (fMultithreaded == false)
{ {
_threads[0] = new CTCPStunThread(); _threads[0] = new CTCPStunThread();
......
...@@ -31,7 +31,7 @@ _allocatedSize(0) ...@@ -31,7 +31,7 @@ _allocatedSize(0)
void CBuffer::Reset() void CBuffer::Reset()
{ {
_spAllocation.clear(); _vec.clear();
_data = nullptr; _data = nullptr;
_size = 0; _size = 0;
_allocatedSize = 0; _allocatedSize = 0;
...@@ -47,25 +47,16 @@ CBuffer::CBuffer(size_t nSize) ...@@ -47,25 +47,16 @@ CBuffer::CBuffer(size_t nSize)
HRESULT CBuffer::InitWithAllocation(size_t size) HRESULT CBuffer::InitWithAllocation(size_t size)
{ {
Reset(); Reset();
// deliberately not checking for 0. Ok to allocate a 0 byte array // deliberately not checking for 0. Ok to allocate a 0 byte array.
std::vector<uint8_t> spAlloc(size+2); // add two bytes for null termination (makes debugging ascii and unicode strings easier), but these two bytes are invisible to the caller (not included in _allocatedSize) // Plus two bytes for null termination (useful for debugging strings)
_vec.resize(size+2);
_spAllocation.swap(spAlloc); _vec[size] = 0;
_vec[size+1] = 0;
spAlloc.clear();
_data = _spAllocation.data();
if (_data) _data = _vec.data();
{ _size = size;
_data[size] = 0;
_data[size+1] = 0;
}
_size = (_data != nullptr) ? size : 0;
_allocatedSize = _size; _allocatedSize = _size;
return (_data != nullptr) ? S_OK : E_FAIL; return (_data != nullptr) ? S_OK : E_FAIL;
......
...@@ -30,7 +30,7 @@ private: ...@@ -30,7 +30,7 @@ private:
uint8_t* _data; uint8_t* _data;
size_t _size; size_t _size;
size_t _allocatedSize; size_t _allocatedSize;
std::vector<uint8_t> _spAllocation; std::vector<uint8_t> _vec;
// disallow copy and assignment. // disallow copy and assignment.
CBuffer(const CBuffer&); CBuffer(const CBuffer&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment