GrabBag/VrNets/tcpServer/Src/CYTCPServer.cpp

420 lines
9.1 KiB
C++
Raw Normal View History

2025-09-14 14:51:38 +08:00
#include "CYTCPServer.h"
#include <stdio.h>
#include <thread>
#include <algorithm>
#include <cmath>
#include <cstring>
#ifdef _WIN32
#include <windows.h>
#include <WS2tcpip.h>
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <sys/time.h>
#include <functional>
#include <linux/tcp.h>
#endif // _WIN32
CYTCPServer::CYTCPServer()
: m_eWorkStats(WORK_INIT)
, m_nSocket(INVALID_SOCKET)
, m_bWork(false)
, m_fRecv(nullptr)
, m_fEvent(nullptr)
, m_bCreateRecv(false)
, m_bOffNagle(false)
{
m_vTCPClient.clear();
}
CYTCPServer::~CYTCPServer()
{
Close();
}
///<2F><>ʼ<EFBFBD><CABC>socket
bool CYTCPServer::Init(const int port, bool bOffNagle)
{
bool bRet = false;
#ifdef _WIN32
WORD ver = MAKEWORD(2, 2);
WSADATA data;
WSAStartup(ver, &data);
#endif // _WIN32
m_nSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if (m_nSocket != INVALID_SOCKET)
{
bRet = true;
sockaddr_in sSockAddr;
sSockAddr.sin_family = AF_INET;
sSockAddr.sin_port = htons(port);//host to net unsigned short
#ifdef _WIN32
sSockAddr.sin_addr.S_un.S_addr = INADDR_ANY;
#else
sSockAddr.sin_addr.s_addr = INADDR_ANY;
memset(sSockAddr.sin_zero, 0, 8);
int opt = SO_REUSEADDR;
int len = sizeof(opt);
setsockopt(m_nSocket, SOL_SOCKET, SO_REUSEADDR, &opt, len);
if(bOffNagle)
{
int on = 1; //<2F>Ƿ<EFBFBD><C7B7>nagle<6C><EFBFBD><E3B7A8> //1 <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>͵<EFBFBD><CDB5><EFBFBD>tcp<63><70> 0<><30><EFBFBD><EFBFBD>ӳٷ<D3B3><D9B7><EFBFBD>tcp<63><70>
setsockopt(m_nSocket, IPPROTO_TCP, TCP_NODELAY, (void *)&on, sizeof(on));
}
#endif // _WIN32
int nRet = bind(m_nSocket, (struct sockaddr*)&sSockAddr, sizeof(sockaddr_in));
if (SOCKET_ERROR != nRet)
{
nRet = listen(m_nSocket, 50);
if (SOCKET_ERROR == nRet)
{
bRet = false;
}
}
else
{
printf("bind %d failed \n", port);
bRet = false;
}
if (!bRet)
{
#ifdef _WIN32
closesocket(m_nSocket);
#else
close(m_nSocket);
#endif
m_nSocket = INVALID_SOCKET;
}
}
else
{
bRet = false;
}
return bRet;
}
bool CYTCPServer::Start(FunTCPServerRecv fRecv, bool bRecvSelfProtocol)
{
//1<><31><EFBFBD><EFBFBD>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD>
m_bWork = true;
//2<><32><EFBFBD><EFBFBD>ֵ<EFBFBD>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD>
m_fRecv = fRecv;
//3<><33>ע<EFBFBD><D7A2>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̣߳<DFB3><CCA3><EFBFBD>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD>Ϊnull <20><><EFBFBD><EFBFBD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>RecvData<74>ӿ<EFBFBD>
if (!m_bCreateRecv)
{
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳̽<DFB3><CCBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ظ<EFBFBD><D8B8><EFBFBD><EFBFBD><EFBFBD>
std::thread tLinkThread(std::bind(&CYTCPServer::_OnMonitorLink, this));
tLinkThread.detach();
//<2F>״ν<D7B4><CEBD><EFBFBD>ֱ<EFBFBD>ӽ<EFBFBD><D3BD><EFBFBD>runing ״̬
m_eWorkStats = WORK_RUNING;
m_bCreateRecv = true;
///<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Task
std::lock_guard<std::mutex> oLck(m_mVectorTask);
for (int i = (int)floor(MAX_CLIENT_NUM / FD_SETSIZE); i >= 0; i--)
{
CYServerTask* pServerTask = new CYServerTask();
m_vServerTask.push_back(pServerTask);
pServerTask->StartTask(fRecv, bRecvSelfProtocol);
// 设置异常处理回调指向CYTCPServer的_Exception方法
pServerTask->SetExceptionCallback([this](const TCPClient* pClient) {
this->_Exception(pClient);
});
}
}
else
{
//<2F><><EFBFBD><EFBFBD><EFBFBD>źŽ<C5BA><C5BD>п<EFBFBD>ʼ
while (WORK_RUNING != m_eWorkStats)
{
std::unique_lock<std::mutex> lock(m_mutexRecv);
m_cvRecv.notify_one();
}
}
return true;
}
void CYTCPServer::SetEventCallback(FunTCPServerEvent fEvent)
{
m_fEvent = fEvent;
}
bool CYTCPServer::Stop()
{
m_bWork = false;
///<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>˿<EFBFBD>ʼ<EFBFBD>ŵȴ<C5B5><C8B4><EFBFBD><EFBFBD><EFBFBD>
if(m_bCreateRecv)
{
while (WORK_WAITSINGAL != m_eWorkStats)
{
std::chrono::milliseconds milTime(1);
std::this_thread::sleep_for(milTime);
}
m_fRecv = nullptr;
///<2F>رշ<D8B1><D5B7><EFBFBD><EFBFBD>Task
std::lock_guard<std::mutex> oLck(m_mVectorTask);
std::vector<CYServerTask*>::iterator iter = m_vServerTask.begin();
while(iter != m_vServerTask.end())
{
(*iter)->StopTask();
iter = m_vServerTask.erase(iter);
}
}
m_bCreateRecv = false;
return true;
}
///<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ
bool CYTCPServer::SendData(const TCPClient* pClient, const char* nBuf, const int nLen)
{
if (nullptr == pClient || pClient->m_nFD == INVALID_SOCKET)
{
return false;
}
std::lock_guard<std::mutex> mSocketLock(m_mSocketMutex);
bool bRet = false;
std::vector<TCPClient*> vTCPClient;
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
vTCPClient = m_vTCPClient;
}
for (int i = (int)vTCPClient.size() - 1; i >= 0; i--)
{
if (pClient->m_nFD == vTCPClient[i]->m_nFD)
{
#if 0
int* intBuf = (int *)nBuf;
printf("send : %x len : %d Alleln : %d\n", *(intBuf + 2), *(intBuf + 4), nLen);
#endif
int nSendLen = 0;
while (nSendLen < nLen)
{
int len = send(pClient->m_nFD, nBuf + nSendLen, nLen - nSendLen, 0);
if (len <= 0)
{
bRet = false;
break;
}
nSendLen += len;
}
if (nSendLen == nLen)
{
bRet = true;
}
}
}
return bRet;
}
///<2F><><EFBFBD>͸<EFBFBD><CDB8><EFBFBD><EFBFBD>пͻ<D0BF><CDBB><EFBFBD>
bool CYTCPServer::SendAllData(const char* nBuf, const int nLen)
{
bool bRet = false;
std::vector<TCPClient*> vTCPClient;
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
vTCPClient = m_vTCPClient;
}
for (int i = (int)vTCPClient.size() - 1; i >= 0; i--)
{
bRet = SendData(vTCPClient[i], nBuf, nLen);
}
return bRet;
}
///<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
bool CYTCPServer::RecvData(unsigned char** ppData, unsigned int* nLen)
{
bool bRet = true;
return bRet;
}
bool CYTCPServer::Close()
{
if (INVALID_SOCKET != m_nSocket)
{
if (WORK_RUNING == m_eWorkStats)
{
Stop();
}
#ifdef _WIN32
closesocket(m_nSocket);
WSACleanup();
#else
close(m_nSocket);
#endif
m_nSocket = INVALID_SOCKET;
m_eWorkStats = WORK_CLOSE;
while (m_eWorkStats != WORK_EXIT)
{
std::unique_lock<std::mutex> lock(m_mutexRecv);
m_cvRecv.notify_one();
std::chrono::milliseconds milTime(1);
std::this_thread::sleep_for(milTime);
}
}
return true;
}
void CYTCPServer::_AddClient(const SOCKET nSocket, const sockaddr_in sClientAddr)
{
printf("index : %d welcome [%d]: %s\n", (int)m_vTCPClient.size(), (int)nSocket, inet_ntoa(sClientAddr.sin_addr));
TCPClient* pTCPClient = new TCPClient;
pTCPClient->m_nFD = (int)nSocket;
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
m_vTCPClient.push_back(pTCPClient);
}
{
std::lock_guard<std::mutex> taskLock(m_mVectorTask);
std::vector<CYServerTask*>::iterator iterMin = m_vServerTask.begin();
std::vector<CYServerTask*>::iterator iter = m_vServerTask.begin();
while (iter != m_vServerTask.end())
{
iterMin = (*iterMin)->GetClientNum() <= (*iter)->GetClientNum() ? iterMin : iter;
iter++;
}
if (iterMin != m_vServerTask.end())
{
pTCPClient->m_Task = (*iterMin);
(*iterMin)->AddClient(pTCPClient);
}
}
// 触发客户端连接事件
if (m_fEvent)
{
m_fEvent(pTCPClient, TCP_EVENT_CLIENT_CONNECTED);
}
}
void CYTCPServer::_CloseClient(const TCPClient* pClient)
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
std::vector<TCPClient*>::iterator iter = m_vTCPClient.begin();
while(iter != m_vTCPClient.end())
{
if(*iter == pClient)
{
printf("client exit [%d]\n", (int)pClient->m_nFD);
// 触发客户端断开连接事件
if (m_fEvent)
{
m_fEvent(pClient, TCP_EVENT_CLIENT_DISCONNECTED);
}
delete pClient;
m_vTCPClient.erase(iter);
break;
}
else
{
iter++;
}
}
}
void CYTCPServer::_OnMonitorLink()
{
//select
fd_set fdRead;
fd_set fdExp;
while (true)
{
if (!m_bWork)
{
m_eWorkStats = WORK_WAITSINGAL;
std::unique_lock<std::mutex> lock(m_mutexRecv);
m_cvRecv.wait(lock);
if (WORK_CLOSE == m_eWorkStats)
{
break;
}
else
{
m_eWorkStats = WORK_RUNING;
}
}
else
{
FD_ZERO(&fdRead);
FD_ZERO(&fdExp);
FD_SET(m_nSocket, &fdRead);
FD_SET(m_nSocket, &fdExp);
struct timeval sWaitTime = {0, 1000};
int nCount = select((int)m_nSocket + 1, &fdRead, nullptr, &fdExp, &sWaitTime);
if (nCount <= 0)
{
continue;
}
if (FD_ISSET(m_nSocket, &fdRead))
{
FD_CLR(m_nSocket, &fdRead);
sockaddr_in sClientAddr;
int nAddrLen = sizeof(sockaddr_in);
SOCKET nSocket = accept(m_nSocket, (sockaddr*)&sClientAddr, (socklen_t *)&nAddrLen);
if (INVALID_SOCKET != nSocket)
{
_AddClient(nSocket, sClientAddr);
}
}
}
}
m_eWorkStats = WORK_EXIT;
}
/// <20><><EFBFBD><EFBFBD><EFBFBD>
void CYTCPServer::_Exception(const TCPClient* pClient)
{
// 触发客户端异常事件
if (m_fEvent)
{
m_fEvent(pClient, TCP_EVENT_CLIENT_EXCEPTION);
}
CYServerTask* p = (CYServerTask*)(pClient->m_Task);
p->DelClient(pClient);
_CloseClient(pClient);
}
bool VrCreatYTCPServer(IYTCPServer** ppIYTCPServer)
{
CYTCPServer* pYTCPServer = new CYTCPServer();
*ppIYTCPServer = pYTCPServer;
return true;
}