Commit d57c3eee authored by John Selbie's avatar John Selbie

Support multithreading with SO_REUSEPORT option and new threading model

parent 7b74a63a
......@@ -278,7 +278,7 @@ HRESULT CStunSocket::InitCommon(int socktype, const CSocketAddress& addrlocal, S
int sock = -1;
int ret;
HRESULT hr = S_OK;
ASSERT((socktype == SOCK_DGRAM)||(socktype==SOCK_STREAM));
sock = socket(addrlocal.GetFamily(), socktype, 0);
......@@ -295,8 +295,17 @@ HRESULT CStunSocket::InitCommon(int socktype, const CSocketAddress& addrlocal, S
if (fSetReuseFlag)
{
int socket_option_reuse = SO_REUSEADDR;
// for now, just do SO_REUSEPORT on the UDP thread
// There's still some validation and we need to do on the TCP side to decide how to enable threading
#ifdef SO_REUSEPORT
if (socktype == SOCK_DGRAM)
{
socket_option_reuse = SO_REUSEPORT;
}
#endif
int fAllow = 1;
ret = ::setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &fAllow, sizeof(fAllow));
ret = ::setsockopt(sock, SOL_SOCKET, socket_option_reuse, &fAllow, sizeof(fAllow));
ChkIf(ret == -1, ERRNOHR);
}
......@@ -329,4 +338,19 @@ HRESULT CStunSocket::TCPInit(const CSocketAddress& local, SocketRole role, bool
return InitCommon(SOCK_STREAM, local, role, fSetReuseFlag);
}
HRESULT CStunSocket::SetRecvTimeout(int milliseconds)
{
HRESULT hr = S_OK;
timeval tv = {};
int result = 0;
ChkIfA(_sock == -1, E_UNEXPECTED);
tv.tv_sec = milliseconds / 1000;
tv.tv_usec = (milliseconds % 1000) * 1000;
result = ::setsockopt(_sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
ChkIf(result == -1, ERRNOHR);
hr = S_OK;
Cleanup:
return hr;
}
......@@ -58,6 +58,8 @@ public:
HRESULT EnablePktInfoOption(bool fEnable);
HRESULT SetNonBlocking(bool fEnable);
HRESULT SetRecvTimeout(int milliseconds);
void UpdateAddresses();
......
......@@ -163,22 +163,20 @@ void DumpConfig(CStunServerConfig &config)
std::string strSocket;
if (config.fHasPP)
{
config.addrPP.ToString(&strSocket);
Logging::LogMsg(LL_DEBUG, "PP = %s", strSocket.c_str());
}
if (config.fHasPA)
config.addrPP.ToString(&strSocket);
Logging::LogMsg(LL_DEBUG, "PP = %s", strSocket.c_str());
if (config.fIsFullMode)
{
config.addrPA.ToString(&strSocket);
Logging::LogMsg(LL_DEBUG, "PA = %s", strSocket.c_str());
}
if (config.fHasAP)
if (config.fIsFullMode)
{
config.addrAP.ToString(&strSocket);
Logging::LogMsg(LL_DEBUG, "AP = %s", strSocket.c_str());
}
if (config.fHasAA)
if (config.fIsFullMode)
{
config.addrAA.ToString(&strSocket);
Logging::LogMsg(LL_DEBUG, "AA = %s", strSocket.c_str());
......@@ -398,13 +396,13 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig*
if (mode == Basic)
{
uint16_t port = (uint16_t)((int16_t)nPrimaryPort);
config.fIsFullMode = false;
// in basic mode, if no adapter is specified, bind to all of them
if (args.strPrimaryInterface.length() == 0)
{
if (family == AF_INET)
{
config.addrPP = CSocketAddress(0, port);
config.fHasPP = true;
}
else if (family == AF_INET6)
{
......@@ -412,7 +410,6 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig*
addr6.sin6_family = AF_INET6;
config.addrPP = CSocketAddress(addr6);
config.addrPP.SetPort(port);
config.fHasPP = true;
}
}
else
......@@ -425,7 +422,6 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig*
Chk(hr);
}
config.addrPP = addr;
config.fHasPP = true;
}
}
else // Full mode
......@@ -466,19 +462,17 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig*
config.addrPP = addrPrimary;
config.addrPP.SetPort(portPrimary);
config.fHasPP = true;
config.addrPA = addrPrimary;
config.addrPA.SetPort(portAlternate);
config.fHasPA = true;
config.addrAP = addrAlternate;
config.addrAP.SetPort(portPrimary);
config.fHasAP = true;
config.addrAA = addrAlternate;
config.addrAA.SetPort(portAlternate);
config.fHasAA = true;
config.fIsFullMode = true;
}
......@@ -501,7 +495,7 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig*
if (mode != Full)
{
Logging::LogMsg(LL_ALWAYS, "Error. --altadvertised was specified, but --mode param was not set to FULL.");
ChkIf(config.fHasAA, E_INVALIDARG);
ChkIf(config.fIsFullMode, E_INVALIDARG);
}
hr = ::NumericIPToAddress(family, pszAltAdvertised, &config.addrAlternateAdvertised);
......@@ -528,7 +522,7 @@ HRESULT BuildServerConfigurationFromArgs(StartupArgs& argsIn, CStunServerConfig*
Logging::LogMsg(LL_ALWAYS, "Error with --threading. required argument must be between 0 - 64");
Chk(hr);
}
config.nThreadsPerSocket = threadcount;
config.nThreading = threadcount;
}
*pConfigOut = config;
......@@ -858,7 +852,7 @@ int main(int argc, char** argv)
}
Logging::LogMsg(LL_DEBUG, "Server is exiting");
Logging::LogMsg(LL_DEBUG, "Server is exiting. This may take a few seconds to complete.");
for (auto itor = udpServers.begin(); itor != udpServers.end(); itor++)
......
This diff is collapsed.
......@@ -14,7 +14,6 @@
limitations under the License.
*/
#ifndef STUN_SERVER_H
#define STUN_SERVER_H
......@@ -23,35 +22,26 @@
#include "stunauth.h"
#include "messagehandler.h"
class CStunServerConfig
{
public:
uint32_t nThreading; // when set to 0, all sockets on 1 thread. Otherwise, N threads per socket
bool fHasPP; // PP: Primary ip, Primary port
bool fHasPA; // PA: Primary ip, Alternate port
bool fHasAP; // AP: Alternate ip, Primary port
bool fHasAA; // AA: Alternate ip, Alternate port
int nThreadsPerSocket; // when set to > 0, each socket gets N threads assigned to it, otherwise, all sockets on 1 thread
bool fTCP; // if true, then use TCP instead of UDP
uint32_t nMaxConnections; // only valid for TCP (on a per-thread basis)
CSocketAddress addrPP; // address for PP
CSocketAddress addrPA; // address for PA
CSocketAddress addrAP; // address for AP
CSocketAddress addrAA; // address for AA
CSocketAddress addrPA; // address for PA, ignored if fIsFullMode==false
CSocketAddress addrAP; // address for AP, ignored if fIsFullMode==false
CSocketAddress addrAA; // address for AA, ignored if fIsFullMode==false
CSocketAddress addrPrimaryAdvertised; // public-IP for PP and PA (port is ignored)
CSocketAddress addrAlternateAdvertised; // public-IP for AP and AA (port is ignored)
bool fEnableDosProtection; // enable denial of service protection
bool fReuseAddr; // if true, the socket option SO_REUSEADDR will be set
bool fIsFullMode; // indicated that we are listening on PA, AP, and AA addresses above
bool fTCP; // if true, then use TCP instead of UDP
CStunServerConfig();
};
......@@ -59,13 +49,17 @@ public:
class CStunServer
{
private:
CStunSocket _arrSockets[4];
std::vector<std::shared_ptr<CStunSocket>> _sockets;
std::vector<CStunSocketThread*> _threads;
TransportAddressSet _tsa;
std::shared_ptr<IStunAuth> _spAuth;
HRESULT AddSocket(TransportAddressSet* pTSA, SocketRole role, const CSocketAddress& addrListen, const CSocketAddress& addrAdvertise, bool fSetReuseFlag);
HRESULT InitializeTSA(const CStunServerConfig& config);
HRESULT CreateSocket(SocketRole role, const CSocketAddress& addr, bool fReuseAddr);
HRESULT CreateSockets(const CStunServerConfig& config);
void PostWakeupMessages();
public:
CStunServer();
......@@ -78,7 +72,4 @@ public:
HRESULT Stop();
};
#endif /* SERVER_H */
......@@ -37,56 +37,64 @@ _tsa() // zero-init
CStunSocketThread::~CStunSocketThread()
{
SignalForStop(true);
SignalForStop();
WaitForStopAndClose();
}
void CStunSocketThread::ClearSocketArray()
{
_arrSendSockets = nullptr;
_arrSendSockets.clear();
_socks.clear();
}
HRESULT CStunSocketThread::Init(CStunSocket* arrayOfFourSockets, TransportAddressSet* pTSA, std::shared_ptr<IStunAuth> spAuth, SocketRole rolePrimaryRecv, std::shared_ptr<RateLimiter>& spLimiter)
void CStunSocketThread::DumpInitParams(std::vector<std::shared_ptr<CStunSocket>>& arrayOfFourSockets, const TransportAddressSet& tsa, SocketRole rolePrimaryRecv)
{
Logging::LogMsg(LL_VERBOSE, "CStunSocketThread initialized with:");
for (auto spStunSocket : arrayOfFourSockets)
{
Logging::LogMsg(LL_VERBOSE, "sock handle: %d", (spStunSocket == nullptr) ? -1 : (int)spStunSocket->GetSocketHandle());
}
}
HRESULT CStunSocketThread::Init(std::vector<std::shared_ptr<CStunSocket>>& arrayOfFourSockets, const TransportAddressSet& tsa, std::shared_ptr<IStunAuth> spAuth, SocketRole rolePrimaryRecv, std::shared_ptr<RateLimiter>& spLimiter)
{
HRESULT hr = S_OK;
DumpInitParams(arrayOfFourSockets, tsa, rolePrimaryRecv);
// if -1 was passed, then we are in "multi socket mode", otherwise, 1 socket to receive on
bool fSingleSocketRecv = ::IsValidSocketRole(rolePrimaryRecv);
ChkIfA(_fThreadIsValid, E_UNEXPECTED);
ChkIfA(arrayOfFourSockets == nullptr, E_INVALIDARG);
ChkIfA(pTSA == nullptr, E_INVALIDARG);
ChkIfA(arrayOfFourSockets.size() == 0, E_INVALIDARG);
// if this thread was configured to listen on a single socket (aka "multi-threaded mode"), then
// validate that it exists
if (fSingleSocketRecv)
{
ChkIfA(arrayOfFourSockets[rolePrimaryRecv].IsValid()==false, E_UNEXPECTED);
ChkIfA(arrayOfFourSockets[rolePrimaryRecv] == nullptr, E_UNEXPECTED);
ChkIfA(arrayOfFourSockets[rolePrimaryRecv]->IsValid()==false, E_UNEXPECTED);
}
_arrSendSockets = arrayOfFourSockets;
// initialize the TSA thing
_tsa = *pTSA;
_tsa = tsa;
if (fSingleSocketRecv)
{
// only one socket to listen on
_socks.push_back(&_arrSendSockets[rolePrimaryRecv]);
_socks.push_back(_arrSendSockets[rolePrimaryRecv]);
}
else
{
for (size_t i = 0; i < 4; i++)
for (auto spSocket : arrayOfFourSockets)
{
if (_arrSendSockets[i].IsValid())
if (spSocket != nullptr && spSocket->IsValid())
{
_socks.push_back(&_arrSendSockets[i]);
_socks.push_back(spSocket);
}
}
}
Chk(InitThreadBuffers());
......@@ -151,45 +159,10 @@ Cleanup:
return hr;
}
HRESULT CStunSocketThread::SignalForStop(bool fPostMessages)
HRESULT CStunSocketThread::SignalForStop()
{
HRESULT hr = S_OK;
_fNeedToExit = true;
// have the socket send a message to itself
// if another thread is sharing the same socket, this may wake that thread up to
// but all the threads should be started and shutdown together
if (fPostMessages)
{
for (size_t index = 0; index < _socks.size(); index++)
{
char data = 'x';
ASSERT(_socks[index] != nullptr);
CSocketAddress addr(_socks[index]->GetLocalAddress());
// If no specific adapter was binded to, IP will be 0.0.0.0
// Linux evidently treats 0.0.0.0 IP as loopback (and works)
// On Windows you can't send to 0.0.0.0. sendto will fail - switch to sending to localhost
if (addr.IsIPAddressZero())
{
CSocketAddress addrLocal;
CSocketAddress::GetLocalHost(addr.GetFamily(), &addrLocal);
addrLocal.SetPort(addr.GetPort());
addr = addrLocal;
}
::sendto(_socks[index]->GetSocketHandle(), &data, 1, 0, addr.GetSockAddr(), addr.GetSockAddrLength());
}
}
return hr;
}
......@@ -261,7 +234,7 @@ CStunSocket* CStunSocketThread::WaitForSocketData()
if (FD_ISSET(sock, &set))
{
pReadySocket = _socks[indexconverted];
pReadySocket = _socks[indexconverted].get(); // todo - let this method return a shared_ptr
break;
}
}
......@@ -277,7 +250,7 @@ void CStunSocketThread::Run()
size_t nSocketCount = _socks.size();
bool fMultiSocketMode = (nSocketCount > 1);
int recvflags = fMultiSocketMode ? MSG_DONTWAIT : 0;
CStunSocket* pSocket = _socks[0];
CStunSocket* pSocket = _socks[0].get();
int ret;
char szIPRemote[100] = {};
char szIPLocal[100] = {};
......@@ -321,14 +294,27 @@ void CStunSocketThread::Run()
_spBufferIn->SetSize(0);
ret = ::recvfromex(pSocket->GetSocketHandle(), _spBufferIn->GetData(), _spBufferIn->GetAllocatedSize(), recvflags, &_msgIn.addrRemote, &_msgIn.addrLocal);
if (ret < 0)
{
int err = errno;
if ((err == EAGAIN) || (err == EWOULDBLOCK))
{
Logging::LogMsg(LL_VERBOSE_EXTREME, "recvfromex returned timeout error");
}
else
{
Logging::LogMsg(LL_VERBOSE, "recvfromex returned error: %d", err);
}
continue;
}
// recvfromex no longer sets the port value on the local address
if (ret >= 0)
if (_fNeedToExit)
{
_msgIn.addrLocal.SetPort(pSocket->GetLocalAddress().GetPort());
break;
}
_msgIn.addrLocal.SetPort(pSocket->GetLocalAddress().GetPort());
if (Logging::GetLogLevel() >= LL_VERBOSE)
{
_msgIn.addrRemote.ToStringBuffer(szIPRemote, 100);
......@@ -340,28 +326,16 @@ void CStunSocketThread::Run()
szIPLocal[0] = '\0';
}
Logging::LogMsg(LL_VERBOSE, "recvfrom returns %d from %s on local interface %s on thread %lu", ret, szIPRemote, szIPLocal, (unsigned long)threadid);
Logging::LogMsg(LL_VERBOSE, "recvfrom returns %d from %s on local interface %s on thread %lu sr=%d", ret, szIPRemote, szIPLocal, (unsigned long)threadid, (int)pSocket->GetRole());
allowed_to_pass = (_spLimiter.get() != nullptr) ? _spLimiter->RateCheck(_msgIn.addrRemote) : true;
if (allowed_to_pass == false)
{
Logging::LogMsg(LL_VERBOSE, "RateLimiter signals false for packet from %s", szIPRemote);
}
if ((ret < 0) || (allowed_to_pass == false))
{
// error
continue;
}
if (_fNeedToExit)
{
break;
Logging::LogMsg(LL_VERBOSE, "RateLimiter signals false for packet from %s", szIPRemote);
continue;
}
_spBufferIn->SetSize(ret);
_msgIn.socketrole = pSocket->GetRole();
......@@ -395,8 +369,8 @@ HRESULT CStunSocketThread::ProcessRequestAndSendResponse()
Chk(CStunRequestHandler::ProcessRequest(_msgIn, _msgOut, &_tsa, _spAuth.get()));
ASSERT(_tsa.set[_msgOut.socketrole].fValid);
ASSERT(_arrSendSockets[_msgOut.socketrole].IsValid());
sockout = _arrSendSockets[_msgOut.socketrole].GetSocketHandle();
ASSERT(_arrSendSockets[_msgOut.socketrole]->IsValid());
sockout = _arrSendSockets[_msgOut.socketrole]->GetSocketHandle();
ASSERT(sockout != -1);
// find the socket that matches the role specified by msgOut
......
......@@ -14,8 +14,6 @@
limitations under the License.
*/
#ifndef STUNSOCKETTHREAD_H
#define STUNSOCKETTHREAD_H
......@@ -33,14 +31,12 @@ public:
CStunSocketThread();
~CStunSocketThread();
HRESULT Init(CStunSocket* arrayOfFourSockets, TransportAddressSet* pTSA, std::shared_ptr<IStunAuth> spAuth, SocketRole rolePrimaryRecv, std::shared_ptr<RateLimiter>& _spRateLimiter);
HRESULT Init(std::vector<std::shared_ptr<CStunSocket>>& arrayOfFourSockets, const TransportAddressSet& tsa, std::shared_ptr<IStunAuth> spAuth, SocketRole rolePrimaryRecv, std::shared_ptr<RateLimiter>& _spRateLimiter);
HRESULT Start();
HRESULT SignalForStop(bool fPostMessages);
HRESULT SignalForStop();
HRESULT WaitForStopAndClose();
private:
// this is the function that runs in a thread
......@@ -50,8 +46,8 @@ private:
CStunSocket* WaitForSocketData();
CStunSocket* _arrSendSockets; // matches CStunServer::_arrSockets
std::vector<CStunSocket*> _socks; // sockets for receiving on
std::vector<std::shared_ptr<CStunSocket>> _arrSendSockets; // 1 socket in basic mode. 4 sockets in full mode
std::vector<std::shared_ptr<CStunSocket>> _socks; // 1 socket in multi-threaded or basic mode. 4 sockets in single-threaded full mode
bool _fNeedToExit;
pthread_t _pthread;
......@@ -67,7 +63,7 @@ private:
CStunMessageReader _reader;
CRefCountedBuffer _spBufferReader; // buffer internal to the reader
CRefCountedBuffer _spBufferIn; // buffer we receive requests on
CRefCountedBuffer _spBufferOut; // buffer we send response on
CRefCountedBuffer _spBufferOut; // buffer we send responses on
StunMessageIn _msgIn;
StunMessageOut _msgOut;
......@@ -79,11 +75,10 @@ private:
HRESULT ProcessRequestAndSendResponse();
void ClearSocketArray();
void DumpInitParams(std::vector<std::shared_ptr<CStunSocket>>& arrayOfFourSockets, const TransportAddressSet& tsa, SocketRole rolePrimaryRecv);
};
#endif /* STUNSOCKETTHREAD_H */
......@@ -844,15 +844,15 @@ HRESULT CTCPServer::Initialize(const CStunServerConfig& config)
// tsaHandler is sort of a hack for TCP. It's really just a glorified indication to the the
// CStunRequestHandler code to figure out if it can offer a CHANGED-ADDRESS attribute.
InitTSA(&tsaHandler, RolePP, config.fHasPP, config.addrPP, config.addrPrimaryAdvertised);
InitTSA(&tsaHandler, RolePA, config.fHasPA, config.addrPA, config.addrPrimaryAdvertised);
InitTSA(&tsaHandler, RoleAP, config.fHasAP, config.addrAP, config.addrAlternateAdvertised);
InitTSA(&tsaHandler, RoleAA, config.fHasAA, config.addrAA, config.addrAlternateAdvertised);
InitTSA(&tsaListenAll, RolePP, config.fHasPP, config.addrPP, CSocketAddress());
InitTSA(&tsaListenAll, RolePA, config.fHasPA, config.addrPA, CSocketAddress());
InitTSA(&tsaListenAll, RoleAP, config.fHasAP, config.addrAP, CSocketAddress());
InitTSA(&tsaListenAll, RoleAA, config.fHasAA, config.addrAA, CSocketAddress());
InitTSA(&tsaHandler, RolePP, true, config.addrPP, config.addrPrimaryAdvertised);
InitTSA(&tsaHandler, RolePA, config.fIsFullMode, config.addrPA, config.addrPrimaryAdvertised);
InitTSA(&tsaHandler, RoleAP, config.fIsFullMode, config.addrAP, config.addrAlternateAdvertised);
InitTSA(&tsaHandler, RoleAA, config.fIsFullMode, config.addrAA, config.addrAlternateAdvertised);
InitTSA(&tsaListenAll, RolePP, true, config.addrPP, CSocketAddress());
InitTSA(&tsaListenAll, RolePA, config.fIsFullMode, config.addrPA, CSocketAddress());
InitTSA(&tsaListenAll, RoleAP, config.fIsFullMode, config.addrAP, CSocketAddress());
InitTSA(&tsaListenAll, RoleAA, config.fIsFullMode, config.addrAA, CSocketAddress());
if (config.fEnableDosProtection)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment