GrabBag/VrNets/tcpServer/Src/CYServerTask.cpp

264 lines
5.1 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "CYServerTask.h"
CYServerTask::CYServerTask()
: m_maxSocket(INVALID_SOCKET)
, m_tTask(nullptr)
, m_bWork(false)
, m_eWorkStatus(WORK_INIT)
, m_fRecv(nullptr)
, m_fException(nullptr)
, m_bUseProtocol(false)
{
m_vClient.clear();
FD_ZERO(&m_fdRead);
FD_ZERO(&m_fdExp);
m_pRecvBuf = new char[RECV_DATA_LEN];
m_pProtocalHead = new ProtocolHead;
}
CYServerTask::~CYServerTask()
{
delete m_tTask;
m_tTask = nullptr;
m_vClient.clear();
FD_ZERO(&m_fdRead);
FD_ZERO(&m_fdExp);
delete[] m_pRecvBuf;
delete m_pProtocalHead;
}
bool CYServerTask::StartTask(FunTCPServerRecv fRecv, bool bRecvSelfProtocol)
{
//1初始化线程
m_bWork = true;
//2赋值回调函数
m_fRecv = fRecv;
m_bUseProtocol = bRecvSelfProtocol;
if (!m_tTask)
{
m_tTask = new std::thread(std::mem_fn(&CYServerTask::_OnProcessEvent), this);
m_tTask->detach();
}
else
{
//发送信号进行初始化
while (WORK_RUNING != m_eWorkStatus)
{
std::unique_lock<std::mutex> lock(m_mutexWork);
m_cvWork.notify_one();
}
}
return true;
}
void CYServerTask::SetExceptionCallback(std::function<void(const TCPClient*)> fException)
{
m_fException = fException;
}
bool CYServerTask::StopTask()
{
m_bWork = false;
///等待线程开始信号等待处理
if (m_tTask)
{
while (WORK_WAITSINGAL != m_eWorkStatus)
{
std::chrono::milliseconds milTime(10);
std::this_thread::sleep_for(milTime);
}
m_fRecv = nullptr;
delete m_tTask;
m_tTask = nullptr;
}
return true;
}
///添加客户端
bool CYServerTask::AddClient(TCPClient * pClient)
{
if(nullptr != pClient && m_vClient.size() < FD_SETSIZE)
{
std::lock_guard<std::mutex> mLck(m_mClient);
//<2F><>¼Task<73>еĿͻ<C4BF><CDBB><EFBFBD>
m_vClient.push_back(pClient);
//<2F><><EFBFBD>ӵ<EFBFBD>select<63><74>fd<66><64><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
FD_SET(pClient->m_nFD, &m_fdRead);
FD_SET(pClient->m_nFD, &m_fdExp);
//<2F>ҵ<EFBFBD><D2B5><EFBFBD><EFBFBD>FD
m_maxSocket = m_maxSocket > pClient->m_nFD ? m_maxSocket : pClient->m_nFD;
return true;
}
else
{
return false;
}
}
///移除客户端
bool CYServerTask::DelClient(const TCPClient * pClient)
{
bool bRet = false;
std::lock_guard<std::mutex> mLck(m_mClient);
std::vector<TCPClient*>::iterator iter = m_vClient.begin();
m_maxSocket = INVALID_SOCKET;
while (iter != m_vClient.end())
{
if (*iter == pClient)
{
m_vClient.erase(iter);
FD_CLR(pClient->m_nFD, &m_fdRead);
FD_CLR(pClient->m_nFD, &m_fdExp);
bRet = true;
break;
}
}
iter = m_vClient.begin();
while (iter != m_vClient.end())
{
//<2F><><EFBFBD><EFBFBD>ͳ<EFBFBD><CDB3><EFBFBD><EFBFBD><EFBFBD>ֵ
m_maxSocket = m_maxSocket > (*iter)->m_nFD ? m_maxSocket : (*iter)->m_nFD;
iter++;
}
return bRet;
}
///获取Task中客户端的数目
int CYServerTask::GetClientNum()
{
return (int)m_vClient.size();
}
void CYServerTask::_OnProcessEvent()
{
while (true)
{
if (!m_bWork)
{
m_eWorkStatus = WORK_WAITSINGAL;
std::unique_lock<std::mutex> lock(m_mutexWork);
m_cvWork.wait(lock);
if (WORK_CLOSE == m_eWorkStatus)
{
break;
}
else
{
m_eWorkStatus = WORK_RUNING;
}
}
else
{
if (m_vClient.empty())
{
std::chrono::milliseconds milTime(1);
std::this_thread::sleep_for(milTime);
continue;
}
///临时客户端vector
std::vector<TCPClient*> vTCPClient;
fd_set fdRead;
fd_set fdExp;
{
std::unique_lock<std::mutex> lock(m_mClient);
vTCPClient = m_vClient;
fdRead = m_fdRead;
fdExp = m_fdExp;
}
struct timeval sWaitTime = { 0, 1000 };
int nCount = select((int)m_maxSocket + 1, &fdRead, nullptr, &fdExp, &sWaitTime);
if (nCount <= 0)
{
continue;
}
for (int i = (int)vTCPClient.size() - 1; i >= 0; i--)
{
TCPClient* tmpClient = vTCPClient[i];
if (FD_ISSET(tmpClient->m_nFD, &fdRead))
{
//<2F><><EFBFBD>ܲ<EFBFBD><DCB2>ص<EFBFBD>
if (!_OnProcessData(tmpClient))
{
if (m_fException)
{
m_fException(tmpClient);
}
}
}
}
}
}
m_eWorkStatus = WORK_EXIT;
}
bool CYServerTask::_OnProcessData(TCPClient* pClient)
{
const int nRecvLen = RECV_DATA_LEN;
//<2F><>Э<EFBFBD><D0AD><EFBFBD><EFBFBD><EFBFBD>
if(!m_bUseProtocol)
{
int nCount = recv(pClient->m_nFD, m_pRecvBuf, nRecvLen, 0);
if (nCount > 0 && m_fRecv)
{
m_fRecv(pClient, m_pRecvBuf, nCount);
}
return nCount > 0;
}
//Э<><D0AD><EFBFBD><EFBFBD><EFBFBD>
int nAllDataLen = 0;
int recv_len = 0;
int nRet = 0;
int nDataAddr = 6 * sizeof(int);
//recv head
do
{
if ((recv_len = recv(pClient->m_nFD, (char *)(m_pProtocalHead) + nAllDataLen, nDataAddr - nAllDataLen, 0)) <= 0)
{
printf("read head failed \n");
return false;
}
nAllDataLen += recv_len;
} while (nAllDataLen < nDataAddr);
nAllDataLen = 0;
//recv data
while (nAllDataLen < m_pProtocalHead->nLen)
{
recv_len = recv(pClient->m_nFD, (char *)(m_pProtocalHead) + nDataAddr + nAllDataLen,
m_pProtocalHead->nLen - nAllDataLen, 0);
if (recv_len <= 0)
{
printf("read data len : %d failed [%d]\n", m_pProtocalHead->nLen - nAllDataLen, recv_len);
return false;
}
nAllDataLen += recv_len;
}
nAllDataLen = 0;
if (m_fRecv)
{
m_fRecv(pClient, (char *)m_pProtocalHead, m_pProtocalHead->nLen + nDataAddr);
}
//printf("cmd = %x len = %d \n", protocol.nCmd, protocol.nLen);
return 0 == nRet;
}