Commit 0b00491a authored by jselbie's avatar jselbie

TCP mode supports sockets staying open longer and handling multiple requests

parent f5bf8486
...@@ -436,6 +436,7 @@ void TcpClientLoop(StunClientLogicConfig& config, ClientSocketConfig& socketconf ...@@ -436,6 +436,7 @@ void TcpClientLoop(StunClientLogicConfig& config, ClientSocketConfig& socketconf
{ {
// server cut us off before we got all the bytes we thought we were supposed to get? // server cut us off before we got all the bytes we thought we were supposed to get?
ASSERT(false); ASSERT(false);
Logging::LogMsg(LL_ALWAYS, "Lost connection");
break; break;
} }
if (ret < 0) if (ret < 0)
......
...@@ -92,3 +92,11 @@ void CConnectionPool::ReleaseConnection(StunConnection* pConn) ...@@ -92,3 +92,11 @@ void CConnectionPool::ReleaseConnection(StunConnection* pConn)
_freelist = pConn; _freelist = pConn;
} }
void CConnectionPool::ResetConnection(StunConnection* pConn)
{
pConn->_reader.Reset();
pConn->_reader.GetStream().Attach(pConn->_spReaderBuffer, true);
pConn->_state = ConnectionState_Receiving;
pConn->_txCount = 0;
}
...@@ -9,8 +9,7 @@ enum StunConnectionState ...@@ -9,8 +9,7 @@ enum StunConnectionState
{ {
ConnectionState_Idle, ConnectionState_Idle,
ConnectionState_Receiving, ConnectionState_Receiving,
ConnectionState_Transmitting, ConnectionState_Transmitting
ConnectionState_Closing, // shutdown has been called, waiting for close notification on other end
}; };
...@@ -45,6 +44,10 @@ public: ...@@ -45,6 +44,10 @@ public:
StunConnection* GetConnection(int sock, SocketRole role); StunConnection* GetConnection(int sock, SocketRole role);
void ReleaseConnection(StunConnection* pConn); void ReleaseConnection(StunConnection* pConn);
// ResetConnection resets streams to handle a subsequent incoming packet
// It's a lighter version of calling ReleaseConnection followed by GetConnection
void ResetConnection(StunConnection* pConn);
void Reset(); void Reset();
}; };
......
...@@ -21,10 +21,10 @@ ...@@ -21,10 +21,10 @@
#include "stunsocketthread.h" #include "stunsocketthread.h"
// client sockets are edge triggered // client sockets are now level triggered
const uint32_t EPOLL_CLIENT_READ_EVENT_SET = IPOLLING_EDGETRIGGER | IPOLLING_READ | IPOLLING_RDHUP; const uint32_t EPOLL_CLIENT_READ_EVENT_SET = IPOLLING_READ | IPOLLING_RDHUP;
const uint32_t EPOLL_CLIENT_WRITE_EVENT_SET = IPOLLING_EDGETRIGGER | IPOLLING_WRITE; const uint32_t EPOLL_CLIENT_WRITE_EVENT_SET = IPOLLING_WRITE;
// listen sockets are always level triggered (that way, when we recover from // listen sockets are always level triggered (that way, when we recover from
// hitting a max connections condition, we don't have to worry about // hitting a max connections condition, we don't have to worry about
...@@ -456,14 +456,30 @@ void CTCPStunThread::ProcessConnectionEvent(int sock, uint32_t eventflags) ...@@ -456,14 +456,30 @@ void CTCPStunThread::ProcessConnectionEvent(int sock, uint32_t eventflags)
{ {
WriteBytesForConnection(pConn); WriteBytesForConnection(pConn);
} }
else if (pConn->_state == ConnectionState_Closing)
{
ConsumeRemoteClose(pConn);
}
} }
bool CTCPStunThread::RateCheck(const CSocketAddress& addr)
{
bool result = true;
if (_spLimiter.get())
{
result = _spLimiter->RateCheck(addr);
if (result == false)
{
if (Logging::GetLogLevel() >= LL_VERBOSE)
{
char szIP[100];
addr.ToStringBuffer(szIP, 100);
Logging::LogMsg(LL_VERBOSE, "Rate Limiter has blocked incoming connection from IP %s", szIP);
}
}
}
return result;
}
StunConnection* CTCPStunThread::AcceptConnection(CStunSocket* pListenSocket) StunConnection* CTCPStunThread::AcceptConnection(CStunSocket* pListenSocket)
{ {
...@@ -477,6 +493,7 @@ StunConnection* CTCPStunThread::AcceptConnection(CStunSocket* pListenSocket) ...@@ -477,6 +493,7 @@ StunConnection* CTCPStunThread::AcceptConnection(CStunSocket* pListenSocket)
HRESULT hr = S_OK; HRESULT hr = S_OK;
int insertresult; int insertresult;
int err; int err;
bool allowed_to_pass = true;
ASSERT(listensock != -1); ASSERT(listensock != -1);
ASSERT(::IsValidSocketRole(role)); ASSERT(::IsValidSocketRole(role));
...@@ -488,22 +505,8 @@ StunConnection* CTCPStunThread::AcceptConnection(CStunSocket* pListenSocket) ...@@ -488,22 +505,8 @@ StunConnection* CTCPStunThread::AcceptConnection(CStunSocket* pListenSocket)
ChkIfA(socktmp == -1, E_FAIL); ChkIfA(socktmp == -1, E_FAIL);
// --- rate limit check------- // --- rate limit check-------
if (_spLimiter.get()) allowed_to_pass = RateCheck(CSocketAddress(addrClient));
{ ChkIf(allowed_to_pass==false, E_FAIL); // this will trigger the socket to be immediately closed
CSocketAddress addr = CSocketAddress(addrClient);
bool allowed_to_pass = _spLimiter->RateCheck(addr);
if (allowed_to_pass == false)
{
if (Logging::GetLogLevel() >= LL_VERBOSE)
{
char szIP[100];
addr.ToStringBuffer(szIP, 100);
Logging::LogMsg(LL_VERBOSE, "Rate Limiter has blocked incoming connection from IP %s", szIP);
}
ChkIf(false, E_FAIL); // this will trigger the socket to be immediately closed
}
}
// -------------------------- // --------------------------
...@@ -557,6 +560,7 @@ HRESULT CTCPStunThread::ReceiveBytesForConnection(StunConnection* pConn) ...@@ -557,6 +560,7 @@ HRESULT CTCPStunThread::ReceiveBytesForConnection(StunConnection* pConn)
HRESULT hr = S_OK; HRESULT hr = S_OK;
CStunMessageReader::ReaderParseState readerstate; CStunMessageReader::ReaderParseState readerstate;
int err; int err;
bool allowed_to_pass = true;
int sock = pConn->_stunsocket.GetSocketHandle(); int sock = pConn->_stunsocket.GetSocketHandle();
...@@ -604,6 +608,9 @@ HRESULT CTCPStunThread::ReceiveBytesForConnection(StunConnection* pConn) ...@@ -604,6 +608,9 @@ HRESULT CTCPStunThread::ReceiveBytesForConnection(StunConnection* pConn)
msgOut.spBufferOut = pConn->_spOutputBuffer; msgOut.spBufferOut = pConn->_spOutputBuffer;
allowed_to_pass = this->RateCheck(msgIn.addrRemote);
ChkIf(allowed_to_pass == false, E_FAIL);
Chk(CStunRequestHandler::ProcessRequest(msgIn, msgOut, &_tsa, _spAuth)); Chk(CStunRequestHandler::ProcessRequest(msgIn, msgOut, &_tsa, _spAuth));
// success - transition to the response state // success - transition to the response state
...@@ -615,7 +622,6 @@ HRESULT CTCPStunThread::ReceiveBytesForConnection(StunConnection* pConn) ...@@ -615,7 +622,6 @@ HRESULT CTCPStunThread::ReceiveBytesForConnection(StunConnection* pConn)
// optimization - go ahead and try to send the response // optimization - go ahead and try to send the response
WriteBytesForConnection(pConn); WriteBytesForConnection(pConn);
// WriteBytesForConnection will close the connection on error // WriteBytesForConnection will close the connection on error
// And it might call ConsumeRemoteClose, which will also null it out
// so we can't assume the connection is still alive. And if it's not alive, pConn likely got deleted // so we can't assume the connection is still alive. And if it's not alive, pConn likely got deleted
// either refetch from the hash tables, or invent an out parameter on WriteBytesForConnection and ConsumeRemoteClose to better propagate the close state of the connection // either refetch from the hash tables, or invent an out parameter on WriteBytesForConnection and ConsumeRemoteClose to better propagate the close state of the connection
...@@ -683,20 +689,13 @@ HRESULT CTCPStunThread::WriteBytesForConnection(StunConnection* pConn) ...@@ -683,20 +689,13 @@ HRESULT CTCPStunThread::WriteBytesForConnection(StunConnection* pConn)
if (pConn->_txCount >= bytestotal) if (pConn->_txCount >= bytestotal)
{ {
pConn->_state = ConnectionState_Closing; pConn->_state = ConnectionState_Receiving;
_connectionpool.ResetConnection(pConn);
shutdown(sock, SHUT_WR);
// go back to listening for read events // go back to listening for read events
ChkA(_spPolling->ChangeEventSet(sock, EPOLL_CLIENT_READ_EVENT_SET)); ChkA(_spPolling->ChangeEventSet(sock, EPOLL_CLIENT_READ_EVENT_SET));
ConsumeRemoteClose(pConn);
// so we can't assume the connection is still alive. And if it's not alive, pConn likely got deleted
// either refetch from the hash tables, or invent an out parameter on WriteBytesForConnection and ConsumeRemoteClose to better propagate the close state of the connection
pConn = NULL; pConn = NULL;
break; break;
} }
// loop back and try to send the remaining bytes // loop back and try to send the remaining bytes
...@@ -711,47 +710,6 @@ Cleanup: ...@@ -711,47 +710,6 @@ Cleanup:
return hr; return hr;
} }
HRESULT CTCPStunThread::ConsumeRemoteClose(StunConnection* pConn)
{
uint8_t buffer[MAX_STUN_MESSAGE_SIZE];
HRESULT hr = S_OK;
int ret;
int err;
ASSERT(pConn != NULL);
int sock = pConn->_stunsocket.GetSocketHandle();
ASSERT(sock != -1);
while (true)
{
ret = ::recv(sock, buffer, sizeof(buffer), 0);
err = errno;
if ((ret < 0) && ((errno == EWOULDBLOCK) || (errno == EAGAIN)))
{
// still waiting
hr = S_FALSE;
break;
}
Logging::LogMsg(LL_VERBOSE, "ConsumeRemoteClose. recv for socket %d returned %d (errno=%d)", sock, ret, (ret<0)?err:0);
if (ret <= 0)
{
// whether it was a clean error (0) or some other error, we are done
// that's it, we're done
CloseConnection(pConn);
pConn = NULL;
break;
}
}
return hr;
}
void CTCPStunThread::CloseConnection(StunConnection* pConn) void CTCPStunThread::CloseConnection(StunConnection* pConn)
{ {
if (pConn) if (pConn)
......
...@@ -96,7 +96,6 @@ class CTCPStunThread ...@@ -96,7 +96,6 @@ class CTCPStunThread
HRESULT ReceiveBytesForConnection(StunConnection* pConn); HRESULT ReceiveBytesForConnection(StunConnection* pConn);
HRESULT WriteBytesForConnection(StunConnection* pConn); HRESULT WriteBytesForConnection(StunConnection* pConn);
HRESULT ConsumeRemoteClose(StunConnection* pConn);
void CloseAllConnections(StunThreadConnectionMap* pConnMap); void CloseAllConnections(StunThreadConnectionMap* pConnMap);
void SweepDeadConnections(); void SweepDeadConnections();
...@@ -104,6 +103,7 @@ class CTCPStunThread ...@@ -104,6 +103,7 @@ class CTCPStunThread
int GetTimeoutSeconds(); int GetTimeoutSeconds();
bool IsConnectionCountAtMax(); bool IsConnectionCountAtMax();
void CloseConnection(StunConnection* pConn); void CloseConnection(StunConnection* pConn);
bool RateCheck(const CSocketAddress& addr);
// thread members // thread members
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment