GrabBag/VrNets/tcpServer/Src/CYTCPServer.cpp
2025-10-24 23:19:44 +08:00

420 lines
8.8 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "CYTCPServer.h"
#include <stdio.h>
#include <thread>
#include <algorithm>
#include <cmath>
#include <cstring>
#ifdef _WIN32
#include <windows.h>
#include <WS2tcpip.h>
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <sys/time.h>
#include <functional>
#include <linux/tcp.h>
#endif // _WIN32
CYTCPServer::CYTCPServer()
: m_eWorkStats(WORK_INIT)
, m_nSocket(INVALID_SOCKET)
, m_bWork(false)
, m_fRecv(nullptr)
, m_fEvent(nullptr)
, m_bCreateRecv(false)
, m_bOffNagle(false)
{
m_vTCPClient.clear();
}
CYTCPServer::~CYTCPServer()
{
Close();
}
/// 初始化socket
bool CYTCPServer::Init(const int port, bool bOffNagle)
{
bool bRet = false;
#ifdef _WIN32
WORD ver = MAKEWORD(2, 2);
WSADATA data;
WSAStartup(ver, &data);
#endif // _WIN32
m_nSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if (m_nSocket != INVALID_SOCKET)
{
bRet = true;
sockaddr_in sSockAddr;
sSockAddr.sin_family = AF_INET;
sSockAddr.sin_port = htons(port);//host to net unsigned short
#ifdef _WIN32
sSockAddr.sin_addr.S_un.S_addr = INADDR_ANY;
#else
sSockAddr.sin_addr.s_addr = INADDR_ANY;
memset(sSockAddr.sin_zero, 0, 8);
int opt = SO_REUSEADDR;
int len = sizeof(opt);
setsockopt(m_nSocket, SOL_SOCKET, SO_REUSEADDR, &opt, len);
if(bOffNagle)
{
int on = 1; //是否关闭nagle算法 //1 表示小包立即发送tcp帧 0表示延时发送tcp帧
setsockopt(m_nSocket, IPPROTO_TCP, TCP_NODELAY, (void *)&on, sizeof(on));
}
#endif // _WIN32
int nRet = bind(m_nSocket, (struct sockaddr*)&sSockAddr, sizeof(sockaddr_in));
if (SOCKET_ERROR != nRet)
{
nRet = listen(m_nSocket, 50);
if (SOCKET_ERROR == nRet)
{
bRet = false;
}
}
else
{
printf("bind %d failed \n", port);
bRet = false;
}
if (!bRet)
{
#ifdef _WIN32
closesocket(m_nSocket);
#else
close(m_nSocket);
#endif
m_nSocket = INVALID_SOCKET;
}
}
else
{
bRet = false;
}
return bRet;
}
bool CYTCPServer::Start(FunTCPServerRecv fRecv, bool bRecvSelfProtocol)
{
//1.初始化工作
m_bWork = true;
//2.赋值回调函数
m_fRecv = fRecv;
//3.注册回调函数处理线程如果回调函数为null 则需要重写RecvData接口
if (!m_bCreateRecv)
{
//创建工作线程并进行防重复处理
std::thread tLinkThread(std::bind(&CYTCPServer::_OnMonitorLink, this));
tLinkThread.detach();
//首次进入直接进入runing状态
m_eWorkStats = WORK_RUNING;
m_bCreateRecv = true;
///创建工作Task
std::lock_guard<std::mutex> oLck(m_mVectorTask);
for (int i = (int)floor(MAX_CLIENT_NUM / FD_SETSIZE); i >= 0; i--)
{
CYServerTask* pServerTask = new CYServerTask();
m_vServerTask.push_back(pServerTask);
pServerTask->StartTask(fRecv, bRecvSelfProtocol);
// 设置异常处理回调指向CYTCPServer的_Exception方法
pServerTask->SetExceptionCallback([this](const TCPClient* pClient) {
this->_Exception(pClient);
});
}
}
else
{
//发送信号进行开始
while (WORK_RUNING != m_eWorkStats)
{
std::unique_lock<std::mutex> lock(m_mutexRecv);
m_cvRecv.notify_one();
}
}
return true;
}
void CYTCPServer::SetEventCallback(FunTCPServerEvent fEvent)
{
m_fEvent = fEvent;
}
bool CYTCPServer::Stop()
{
m_bWork = false;
///等待所有客户开始退出等待
if(m_bCreateRecv)
{
while (WORK_WAITSINGAL != m_eWorkStats)
{
std::chrono::milliseconds milTime(1);
std::this_thread::sleep_for(milTime);
}
m_fRecv = nullptr;
///关闭服务Task
std::lock_guard<std::mutex> oLck(m_mVectorTask);
std::vector<CYServerTask*>::iterator iter = m_vServerTask.begin();
while(iter != m_vServerTask.end())
{
(*iter)->StopTask();
iter = m_vServerTask.erase(iter);
}
}
m_bCreateRecv = false;
return true;
}
///发送消息
bool CYTCPServer::SendData(const TCPClient* pClient, const char* nBuf, const int nLen)
{
if (nullptr == pClient || pClient->m_nFD == INVALID_SOCKET)
{
return false;
}
std::lock_guard<std::mutex> mSocketLock(m_mSocketMutex);
bool bRet = false;
std::vector<TCPClient*> vTCPClient;
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
vTCPClient = m_vTCPClient;
}
for (int i = (int)vTCPClient.size() - 1; i >= 0; i--)
{
if (pClient->m_nFD == vTCPClient[i]->m_nFD)
{
#if 0
int* intBuf = (int *)nBuf;
printf("send : %x len : %d Alleln : %d\n", *(intBuf + 2), *(intBuf + 4), nLen);
#endif
int nSendLen = 0;
while (nSendLen < nLen)
{
int len = send(pClient->m_nFD, nBuf + nSendLen, nLen - nSendLen, 0);
if (len <= 0)
{
bRet = false;
break;
}
nSendLen += len;
}
if (nSendLen == nLen)
{
bRet = true;
}
}
}
return bRet;
}
///发送给所有客户端
bool CYTCPServer::SendAllData(const char* nBuf, const int nLen)
{
bool bRet = false;
std::vector<TCPClient*> vTCPClient;
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
vTCPClient = m_vTCPClient;
}
for (int i = (int)vTCPClient.size() - 1; i >= 0; i--)
{
bRet = SendData(vTCPClient[i], nBuf, nLen);
}
return bRet;
}
///接收数据
bool CYTCPServer::RecvData(unsigned char** ppData, unsigned int* nLen)
{
bool bRet = true;
return bRet;
}
bool CYTCPServer::Close()
{
if (INVALID_SOCKET != m_nSocket)
{
if (WORK_RUNING == m_eWorkStats)
{
Stop();
}
#ifdef _WIN32
closesocket(m_nSocket);
WSACleanup();
#else
close(m_nSocket);
#endif
m_nSocket = INVALID_SOCKET;
m_eWorkStats = WORK_CLOSE;
while (m_eWorkStats != WORK_EXIT)
{
std::unique_lock<std::mutex> lock(m_mutexRecv);
m_cvRecv.notify_one();
std::chrono::milliseconds milTime(1);
std::this_thread::sleep_for(milTime);
}
}
return true;
}
void CYTCPServer::_AddClient(const SOCKET nSocket, const sockaddr_in sClientAddr)
{
printf("index : %d welcome [%d]: %s\n", (int)m_vTCPClient.size(), (int)nSocket, inet_ntoa(sClientAddr.sin_addr));
TCPClient* pTCPClient = new TCPClient;
pTCPClient->m_nFD = (int)nSocket;
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
m_vTCPClient.push_back(pTCPClient);
}
{
std::lock_guard<std::mutex> taskLock(m_mVectorTask);
std::vector<CYServerTask*>::iterator iterMin = m_vServerTask.begin();
std::vector<CYServerTask*>::iterator iter = m_vServerTask.begin();
while (iter != m_vServerTask.end())
{
iterMin = (*iterMin)->GetClientNum() <= (*iter)->GetClientNum() ? iterMin : iter;
iter++;
}
if (iterMin != m_vServerTask.end())
{
pTCPClient->m_Task = (*iterMin);
(*iterMin)->AddClient(pTCPClient);
}
}
// 触发客户端连接事件
if (m_fEvent)
{
m_fEvent(pTCPClient, TCP_EVENT_CLIENT_CONNECTED);
}
}
void CYTCPServer::_CloseClient(const TCPClient* pClient)
{
std::unique_lock<std::mutex> lock(m_mVectorSocket);
std::vector<TCPClient*>::iterator iter = m_vTCPClient.begin();
while(iter != m_vTCPClient.end())
{
if(*iter == pClient)
{
printf("client exit [%d]\n", (int)pClient->m_nFD);
// 触发客户端断开连接事件
if (m_fEvent)
{
m_fEvent(pClient, TCP_EVENT_CLIENT_DISCONNECTED);
}
delete pClient;
m_vTCPClient.erase(iter);
break;
}
else
{
iter++;
}
}
}
void CYTCPServer::_OnMonitorLink()
{
//select
fd_set fdRead;
fd_set fdExp;
while (true)
{
if (!m_bWork)
{
m_eWorkStats = WORK_WAITSINGAL;
std::unique_lock<std::mutex> lock(m_mutexRecv);
m_cvRecv.wait(lock);
if (WORK_CLOSE == m_eWorkStats)
{
break;
}
else
{
m_eWorkStats = WORK_RUNING;
}
}
else
{
FD_ZERO(&fdRead);
FD_ZERO(&fdExp);
FD_SET(m_nSocket, &fdRead);
FD_SET(m_nSocket, &fdExp);
struct timeval sWaitTime = {0, 1000};
int nCount = select((int)m_nSocket + 1, &fdRead, nullptr, &fdExp, &sWaitTime);
if (nCount <= 0)
{
continue;
}
if (FD_ISSET(m_nSocket, &fdRead))
{
FD_CLR(m_nSocket, &fdRead);
sockaddr_in sClientAddr;
int nAddrLen = sizeof(sockaddr_in);
SOCKET nSocket = accept(m_nSocket, (sockaddr*)&sClientAddr, (socklen_t *)&nAddrLen);
if (INVALID_SOCKET != nSocket)
{
_AddClient(nSocket, sClientAddr);
}
}
}
}
m_eWorkStats = WORK_EXIT;
}
/// 处理异常
void CYTCPServer::_Exception(const TCPClient* pClient)
{
// 触发客户端异常事件
if (m_fEvent)
{
m_fEvent(pClient, TCP_EVENT_CLIENT_EXCEPTION);
}
CYServerTask* p = (CYServerTask*)(pClient->m_Task);
p->DelClient(pClient);
_CloseClient(pClient);
}
bool VrCreatYTCPServer(IYTCPServer** ppIYTCPServer)
{
CYTCPServer* pYTCPServer = new CYTCPServer();
*ppIYTCPServer = pYTCPServer;
return true;
}