GrabBag/VrNets/tcpServer/Src/CYServerTask.cpp

264 lines
5.1 KiB
C++
Raw Normal View History

#include "CYServerTask.h"
CYServerTask::CYServerTask()
: m_maxSocket(INVALID_SOCKET)
, m_tTask(nullptr)
, m_bWork(false)
, m_eWorkStatus(WORK_INIT)
, m_fRecv(nullptr)
, m_fException(nullptr)
, m_bUseProtocol(false)
{
m_vClient.clear();
FD_ZERO(&m_fdRead);
FD_ZERO(&m_fdExp);
m_pRecvBuf = new char[RECV_DATA_LEN];
m_pProtocalHead = new ProtocolHead;
}
CYServerTask::~CYServerTask()
{
delete m_tTask;
m_tTask = nullptr;
m_vClient.clear();
FD_ZERO(&m_fdRead);
FD_ZERO(&m_fdExp);
delete[] m_pRecvBuf;
delete m_pProtocalHead;
}
bool CYServerTask::StartTask(FunTCPServerRecv fRecv, bool bRecvSelfProtocol)
{
//1初始化线程
m_bWork = true;
//2赋值回调函数
m_fRecv = fRecv;
m_bUseProtocol = bRecvSelfProtocol;
if (!m_tTask)
{
m_tTask = new std::thread(std::mem_fn(&CYServerTask::_OnProcessEvent), this);
m_tTask->detach();
}
else
{
//发送信号进行初始化
while (WORK_RUNING != m_eWorkStatus)
{
std::unique_lock<std::mutex> lock(m_mutexWork);
m_cvWork.notify_one();
}
}
return true;
}
void CYServerTask::SetExceptionCallback(std::function<void(const TCPClient*)> fException)
{
m_fException = fException;
}
bool CYServerTask::StopTask()
{
m_bWork = false;
///等待线程开始信号等待处理
if (m_tTask)
{
while (WORK_WAITSINGAL != m_eWorkStatus)
{
std::chrono::milliseconds milTime(10);
std::this_thread::sleep_for(milTime);
}
m_fRecv = nullptr;
delete m_tTask;
m_tTask = nullptr;
}
return true;
}
///添加客户端
bool CYServerTask::AddClient(TCPClient * pClient)
{
if(nullptr != pClient && m_vClient.size() < FD_SETSIZE)
{
std::lock_guard<std::mutex> mLck(m_mClient);
//<2F><>¼Task<73>еĿͻ<C4BF><CDBB><EFBFBD>
m_vClient.push_back(pClient);
//<2F><><EFBFBD>ӵ<EFBFBD>select<63><74>fd<66><64><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
FD_SET(pClient->m_nFD, &m_fdRead);
FD_SET(pClient->m_nFD, &m_fdExp);
//<2F>ҵ<EFBFBD><D2B5><EFBFBD><EFBFBD>FD
m_maxSocket = m_maxSocket > pClient->m_nFD ? m_maxSocket : pClient->m_nFD;
return true;
}
else
{
return false;
}
}
///移除客户端
bool CYServerTask::DelClient(const TCPClient * pClient)
{
bool bRet = false;
std::lock_guard<std::mutex> mLck(m_mClient);
std::vector<TCPClient*>::iterator iter = m_vClient.begin();
m_maxSocket = INVALID_SOCKET;
while (iter != m_vClient.end())
{
if (*iter == pClient)
{
m_vClient.erase(iter);
FD_CLR(pClient->m_nFD, &m_fdRead);
FD_CLR(pClient->m_nFD, &m_fdExp);
bRet = true;
break;
}
}
iter = m_vClient.begin();
while (iter != m_vClient.end())
{
//<2F><><EFBFBD><EFBFBD>ͳ<EFBFBD><CDB3><EFBFBD><EFBFBD><EFBFBD>ֵ
m_maxSocket = m_maxSocket > (*iter)->m_nFD ? m_maxSocket : (*iter)->m_nFD;
iter++;
}
return bRet;
}
///获取Task中客户端的数目
int CYServerTask::GetClientNum()
{
return (int)m_vClient.size();
}
void CYServerTask::_OnProcessEvent()
{
while (true)
{
if (!m_bWork)
{
m_eWorkStatus = WORK_WAITSINGAL;
std::unique_lock<std::mutex> lock(m_mutexWork);
m_cvWork.wait(lock);
if (WORK_CLOSE == m_eWorkStatus)
{
break;
}
else
{
m_eWorkStatus = WORK_RUNING;
}
}
else
{
if (m_vClient.empty())
{
std::chrono::milliseconds milTime(1);
std::this_thread::sleep_for(milTime);
continue;
}
///临时客户端vector
std::vector<TCPClient*> vTCPClient;
fd_set fdRead;
fd_set fdExp;
{
std::unique_lock<std::mutex> lock(m_mClient);
vTCPClient = m_vClient;
fdRead = m_fdRead;
fdExp = m_fdExp;
}
struct timeval sWaitTime = { 0, 1000 };
int nCount = select((int)m_maxSocket + 1, &fdRead, nullptr, &fdExp, &sWaitTime);
if (nCount <= 0)
{
continue;
}
for (int i = (int)vTCPClient.size() - 1; i >= 0; i--)
{
TCPClient* tmpClient = vTCPClient[i];
if (FD_ISSET(tmpClient->m_nFD, &fdRead))
{
//<2F><><EFBFBD>ܲ<EFBFBD><DCB2>ص<EFBFBD>
if (!_OnProcessData(tmpClient))
{
if (m_fException)
{
m_fException(tmpClient);
}
}
}
}
}
}
m_eWorkStatus = WORK_EXIT;
}
bool CYServerTask::_OnProcessData(TCPClient* pClient)
{
const int nRecvLen = RECV_DATA_LEN;
//<2F><>Э<EFBFBD><D0AD><EFBFBD><EFBFBD><EFBFBD>
if(!m_bUseProtocol)
{
int nCount = recv(pClient->m_nFD, m_pRecvBuf, nRecvLen, 0);
if (nCount > 0 && m_fRecv)
{
m_fRecv(pClient, m_pRecvBuf, nCount);
}
return nCount > 0;
}
//Э<><D0AD><EFBFBD><EFBFBD><EFBFBD>
int nAllDataLen = 0;
int recv_len = 0;
int nRet = 0;
int nDataAddr = 6 * sizeof(int);
//recv head
do
{
if ((recv_len = recv(pClient->m_nFD, (char *)(m_pProtocalHead) + nAllDataLen, nDataAddr - nAllDataLen, 0)) <= 0)
{
printf("read head failed \n");
return false;
}
nAllDataLen += recv_len;
} while (nAllDataLen < nDataAddr);
nAllDataLen = 0;
//recv data
while (nAllDataLen < m_pProtocalHead->nLen)
{
recv_len = recv(pClient->m_nFD, (char *)(m_pProtocalHead) + nDataAddr + nAllDataLen,
m_pProtocalHead->nLen - nAllDataLen, 0);
if (recv_len <= 0)
{
printf("read data len : %d failed [%d]\n", m_pProtocalHead->nLen - nAllDataLen, recv_len);
return false;
}
nAllDataLen += recv_len;
}
nAllDataLen = 0;
if (m_fRecv)
{
m_fRecv(pClient, (char *)m_pProtocalHead, m_pProtocalHead->nLen + nDataAddr);
}
//printf("cmd = %x len = %d \n", protocol.nCmd, protocol.nLen);
return 0 == nRet;
}