GrabBag/VrNets/tcpServer/Src/CYTCPServer.cpp

430 lines
9.2 KiB
C++
Raw Normal View History

2025-09-14 14:51:38 +08:00
#include "CYTCPServer.h"
#include <stdio.h>
#include <thread>
#include <algorithm>
#include <cmath>
#include <cstring>
#ifdef _WIN32
#include <windows.h>
#include <WS2tcpip.h>
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <sys/time.h>
#include <functional>
#include <linux/tcp.h>
#endif // _WIN32
CYTCPServer::CYTCPServer()
: m_eWorkStats(WORK_INIT)
, m_nSocket(INVALID_SOCKET)
, m_bWork(false)
, m_fRecv(nullptr)
, m_fEvent(nullptr)
, m_bCreateRecv(false)
, m_bOffNagle(false)
{
m_vTCPClient.clear();
}
CYTCPServer::~CYTCPServer()
{
Close();
}
2025-10-24 23:19:44 +08:00
/// 初始化socket
2025-09-14 14:51:38 +08:00
bool CYTCPServer::Init(const int port, bool bOffNagle)
{
bool bRet = false;
#ifdef _WIN32
WORD ver = MAKEWORD(2, 2);
WSADATA data;
WSAStartup(ver, &data);
#endif // _WIN32
m_nSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if (m_nSocket != INVALID_SOCKET)
{
bRet = true;
sockaddr_in sSockAddr;
sSockAddr.sin_family = AF_INET;
sSockAddr.sin_port = htons(port);//host to net unsigned short
#ifdef _WIN32
sSockAddr.sin_addr.S_un.S_addr = INADDR_ANY;
// 设置 SO_REUSEADDR 允许重新绑定
int opt = 1;
setsockopt(m_nSocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt));
if(bOffNagle)
{
int on = 1; //是否关闭nagle算法 //1 表示小包立即发送tcp帧 0表示延时发送tcp帧
setsockopt(m_nSocket, IPPROTO_TCP, TCP_NODELAY, (const char*)&on, sizeof(on));
}
2025-09-14 14:51:38 +08:00
#else
sSockAddr.sin_addr.s_addr = INADDR_ANY;
memset(sSockAddr.sin_zero, 0, 8);
int opt = SO_REUSEADDR;
int len = sizeof(opt);
setsockopt(m_nSocket, SOL_SOCKET, SO_REUSEADDR, &opt, len);
if(bOffNagle)
{
2025-10-24 23:19:44 +08:00
int on = 1; //是否关闭nagle算法 //1 表示小包立即发送tcp帧 0表示延时发送tcp帧
2025-09-14 14:51:38 +08:00
setsockopt(m_nSocket, IPPROTO_TCP, TCP_NODELAY, (void *)&on, sizeof(on));
}
#endif // _WIN32
int nRet = bind(m_nSocket, (struct sockaddr*)&sSockAddr, sizeof(sockaddr_in));
if (SOCKET_ERROR != nRet)
{
nRet = listen(m_nSocket, 50);
if (SOCKET_ERROR == nRet)
{
bRet = false;
}
}
else
{
printf("bind %d failed \n", port);
bRet = false;
}
if (!bRet)
{
#ifdef _WIN32
closesocket(m_nSocket);
#else
close(m_nSocket);
#endif
m_nSocket = INVALID_SOCKET;
}
}
else
{
bRet = false;
}
return bRet;
}
bool CYTCPServer::Start(FunTCPServerRecv fRecv, bool bRecvSelfProtocol)
{
2025-10-24 23:19:44 +08:00
//1.初始化工作
2025-09-14 14:51:38 +08:00
m_bWork = true;
2025-10-24 23:19:44 +08:00
//2.赋值回调函数
2025-09-14 14:51:38 +08:00
m_fRecv = fRecv;
2025-10-24 23:19:44 +08:00
//3.注册回调函数处理线程如果回调函数为null 则需要重写RecvData接口
2025-09-14 14:51:38 +08:00
if (!m_bCreateRecv)
{
2025-10-24 23:19:44 +08:00
//创建工作线程并进行防重复处理
2025-09-14 14:51:38 +08:00
std::thread tLinkThread(std::bind(&CYTCPServer::_OnMonitorLink, this));
tLinkThread.detach();
2025-10-24 23:19:44 +08:00
//首次进入直接进入runing状态
2025-09-14 14:51:38 +08:00
m_eWorkStats = WORK_RUNING;
m_bCreateRecv = true;
2025-10-24 23:19:44 +08:00
///创建工作Task
2025-09-14 14:51:38 +08:00
std::lock_guard<std::mutex> oLck(m_mVectorTask);
for (int i = (int)floor(MAX_CLIENT_NUM / FD_SETSIZE); i >= 0; i--)
{
CYServerTask* pServerTask = new CYServerTask();
m_vServerTask.push_back(pServerTask);
pServerTask->StartTask(fRecv, bRecvSelfProtocol);
// 设置异常处理回调指向CYTCPServer的_Exception方法
pServerTask->SetExceptionCallback([this](const TCPClient* pClient) {
this->_Exception(pClient);
});
}
}
else
{
2025-10-24 23:19:44 +08:00
//发送信号进行开始
2025-09-14 14:51:38 +08:00
while (WORK_RUNING != m_eWorkStats)
{
std::unique_lock<std::mutex> lock(m_mutexRecv);
m_cvRecv.notify_one();
}
}
return true;
}
void CYTCPServer::SetEventCallback(FunTCPServerEvent fEvent)
{
m_fEvent = fEvent;
}
bool CYTCPServer::Stop()
{
m_bWork = false;
2025-10-24 23:19:44 +08:00
///等待所有客户开始退出等待
2025-09-14 14:51:38 +08:00
if(m_bCreateRecv)
{
while (WORK_WAITSINGAL != m_eWorkStats)
{
std::chrono::milliseconds milTime(1);
std::this_thread::sleep_for(milTime);
}
m_fRecv = nullptr;
2025-10-24 23:19:44 +08:00
///关闭服务Task
2025-09-14 14:51:38 +08:00
std::lock_guard<std::mutex> oLck(m_mVectorTask);
std::vector<CYServerTask*>::iterator iter = m_vServerTask.begin();
while(iter != m_vServerTask.end())
{
(*iter)->StopTask();
iter = m_vServerTask.erase(iter);
}
}
m_bCreateRecv = false;
return true;
}
2025-10-24 23:19:44 +08:00
///发送消息
2025-09-14 14:51:38 +08:00
bool CYTCPServer::SendData(const TCPClient* pClient, const char* nBuf, const int nLen)
{
if (nullptr == pClient || pClient->m_nFD == INVALID_SOCKET)
{
return false;
}
std::lock_guard<std::mutex> mSocketLock(m_mSocketMutex);
bool bRet = false;
std::vector<TCPClient*> vTCPClient;
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
vTCPClient = m_vTCPClient;
}
for (int i = (int)vTCPClient.size() - 1; i >= 0; i--)
{
if (pClient->m_nFD == vTCPClient[i]->m_nFD)
{
#if 0
int* intBuf = (int *)nBuf;
printf("send : %x len : %d Alleln : %d\n", *(intBuf + 2), *(intBuf + 4), nLen);
#endif
int nSendLen = 0;
while (nSendLen < nLen)
{
int len = send(pClient->m_nFD, nBuf + nSendLen, nLen - nSendLen, 0);
if (len <= 0)
{
bRet = false;
break;
}
nSendLen += len;
}
if (nSendLen == nLen)
{
bRet = true;
}
}
}
return bRet;
}
2025-10-24 23:19:44 +08:00
///发送给所有客户端
2025-09-14 14:51:38 +08:00
bool CYTCPServer::SendAllData(const char* nBuf, const int nLen)
{
bool bRet = false;
std::vector<TCPClient*> vTCPClient;
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
vTCPClient = m_vTCPClient;
}
for (int i = (int)vTCPClient.size() - 1; i >= 0; i--)
{
bRet = SendData(vTCPClient[i], nBuf, nLen);
}
return bRet;
}
2025-10-24 23:19:44 +08:00
///接收数据
2025-09-14 14:51:38 +08:00
bool CYTCPServer::RecvData(unsigned char** ppData, unsigned int* nLen)
{
bool bRet = true;
return bRet;
}
bool CYTCPServer::Close()
{
if (INVALID_SOCKET != m_nSocket)
{
if (WORK_RUNING == m_eWorkStats)
{
Stop();
}
#ifdef _WIN32
closesocket(m_nSocket);
WSACleanup();
#else
close(m_nSocket);
#endif
m_nSocket = INVALID_SOCKET;
m_eWorkStats = WORK_CLOSE;
while (m_eWorkStats != WORK_EXIT)
{
std::unique_lock<std::mutex> lock(m_mutexRecv);
m_cvRecv.notify_one();
std::chrono::milliseconds milTime(1);
std::this_thread::sleep_for(milTime);
}
}
return true;
}
void CYTCPServer::_AddClient(const SOCKET nSocket, const sockaddr_in sClientAddr)
{
printf("index : %d welcome [%d]: %s\n", (int)m_vTCPClient.size(), (int)nSocket, inet_ntoa(sClientAddr.sin_addr));
TCPClient* pTCPClient = new TCPClient;
pTCPClient->m_nFD = (int)nSocket;
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
m_vTCPClient.push_back(pTCPClient);
}
{
std::lock_guard<std::mutex> taskLock(m_mVectorTask);
std::vector<CYServerTask*>::iterator iterMin = m_vServerTask.begin();
std::vector<CYServerTask*>::iterator iter = m_vServerTask.begin();
while (iter != m_vServerTask.end())
{
iterMin = (*iterMin)->GetClientNum() <= (*iter)->GetClientNum() ? iterMin : iter;
iter++;
}
if (iterMin != m_vServerTask.end())
{
pTCPClient->m_Task = (*iterMin);
(*iterMin)->AddClient(pTCPClient);
}
}
// 触发客户端连接事件
if (m_fEvent)
{
m_fEvent(pTCPClient, TCP_EVENT_CLIENT_CONNECTED);
}
}
void CYTCPServer::_CloseClient(const TCPClient* pClient)
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
std::vector<TCPClient*>::iterator iter = m_vTCPClient.begin();
while(iter != m_vTCPClient.end())
{
if(*iter == pClient)
{
printf("client exit [%d]\n", (int)pClient->m_nFD);
// 触发客户端断开连接事件
if (m_fEvent)
{
m_fEvent(pClient, TCP_EVENT_CLIENT_DISCONNECTED);
}
delete pClient;
m_vTCPClient.erase(iter);
break;
}
else
{
iter++;
}
}
}
void CYTCPServer::_OnMonitorLink()
{
//select
fd_set fdRead;
fd_set fdExp;
while (true)
{
if (!m_bWork)
{
m_eWorkStats = WORK_WAITSINGAL;
std::unique_lock<std::mutex> lock(m_mutexRecv);
m_cvRecv.wait(lock);
if (WORK_CLOSE == m_eWorkStats)
{
break;
}
else
{
m_eWorkStats = WORK_RUNING;
}
}
else
{
FD_ZERO(&fdRead);
FD_ZERO(&fdExp);
FD_SET(m_nSocket, &fdRead);
FD_SET(m_nSocket, &fdExp);
struct timeval sWaitTime = {0, 1000};
int nCount = select((int)m_nSocket + 1, &fdRead, nullptr, &fdExp, &sWaitTime);
if (nCount <= 0)
{
continue;
}
if (FD_ISSET(m_nSocket, &fdRead))
{
FD_CLR(m_nSocket, &fdRead);
sockaddr_in sClientAddr;
int nAddrLen = sizeof(sockaddr_in);
SOCKET nSocket = accept(m_nSocket, (sockaddr*)&sClientAddr, (socklen_t *)&nAddrLen);
if (INVALID_SOCKET != nSocket)
{
_AddClient(nSocket, sClientAddr);
}
}
}
}
m_eWorkStats = WORK_EXIT;
}
2025-10-24 23:19:44 +08:00
/// 处理异常
2025-09-14 14:51:38 +08:00
void CYTCPServer::_Exception(const TCPClient* pClient)
{
// 触发客户端异常事件
if (m_fEvent)
{
m_fEvent(pClient, TCP_EVENT_CLIENT_EXCEPTION);
}
CYServerTask* p = (CYServerTask*)(pClient->m_Task);
p->DelClient(pClient);
_CloseClient(pClient);
}
bool VrCreatYTCPServer(IYTCPServer** ppIYTCPServer)
{
CYTCPServer* pYTCPServer = new CYTCPServer();
*ppIYTCPServer = pYTCPServer;
return true;
}