#include "CYServerTask.h" CYServerTask::CYServerTask() : m_maxSocket(INVALID_SOCKET) , m_tTask(nullptr) , m_bWork(false) , m_eWorkStatus(WORK_INIT) , m_fRecv(nullptr) , m_fException(nullptr) , m_bUseProtocol(false) { m_vClient.clear(); FD_ZERO(&m_fdRead); FD_ZERO(&m_fdExp); m_pRecvBuf = new char[RECV_DATA_LEN]; m_pProtocalHead = new ProtocolHead; } CYServerTask::~CYServerTask() { delete m_tTask; m_tTask = nullptr; m_vClient.clear(); FD_ZERO(&m_fdRead); FD_ZERO(&m_fdExp); delete[] m_pRecvBuf; delete m_pProtocalHead; } bool CYServerTask::StartTask(FunTCPServerRecv fRecv, bool bRecvSelfProtocol) { //1初始化线程 m_bWork = true; //2赋值回调函数 m_fRecv = fRecv; m_bUseProtocol = bRecvSelfProtocol; if (!m_tTask) { m_tTask = new std::thread(std::mem_fn(&CYServerTask::_OnProcessEvent), this); m_tTask->detach(); } else { //发送信号进行初始化 while (WORK_RUNING != m_eWorkStatus) { std::unique_lock lock(m_mutexWork); m_cvWork.notify_one(); } } return true; } void CYServerTask::SetExceptionCallback(std::function fException) { m_fException = fException; } bool CYServerTask::StopTask() { m_bWork = false; ///等待线程开始信号等待处理 if (m_tTask) { while (WORK_WAITSINGAL != m_eWorkStatus) { std::chrono::milliseconds milTime(10); std::this_thread::sleep_for(milTime); } m_fRecv = nullptr; delete m_tTask; m_tTask = nullptr; } return true; } ///添加客户端 bool CYServerTask::AddClient(TCPClient * pClient) { if(nullptr != pClient && m_vClient.size() < FD_SETSIZE) { std::lock_guard mLck(m_mClient); //��¼Task�еĿͻ��� m_vClient.push_back(pClient); //���ӵ�select��fd������ FD_SET(pClient->m_nFD, &m_fdRead); FD_SET(pClient->m_nFD, &m_fdExp); //�ҵ����FD m_maxSocket = m_maxSocket > pClient->m_nFD ? m_maxSocket : pClient->m_nFD; return true; } else { return false; } } ///移除客户端 bool CYServerTask::DelClient(const TCPClient * pClient) { bool bRet = false; std::lock_guard mLck(m_mClient); std::vector::iterator iter = m_vClient.begin(); m_maxSocket = INVALID_SOCKET; while (iter != m_vClient.end()) { if (*iter == pClient) { m_vClient.erase(iter); FD_CLR(pClient->m_nFD, &m_fdRead); FD_CLR(pClient->m_nFD, &m_fdExp); bRet = true; break; } } iter = m_vClient.begin(); while (iter != m_vClient.end()) { //����ͳ�����ֵ m_maxSocket = m_maxSocket > (*iter)->m_nFD ? m_maxSocket : (*iter)->m_nFD; iter++; } return bRet; } ///获取Task中客户端的数目 int CYServerTask::GetClientNum() { return (int)m_vClient.size(); } void CYServerTask::_OnProcessEvent() { while (true) { if (!m_bWork) { m_eWorkStatus = WORK_WAITSINGAL; std::unique_lock lock(m_mutexWork); m_cvWork.wait(lock); if (WORK_CLOSE == m_eWorkStatus) { break; } else { m_eWorkStatus = WORK_RUNING; } } else { if (m_vClient.empty()) { std::chrono::milliseconds milTime(1); std::this_thread::sleep_for(milTime); continue; } ///临时客户端vector std::vector vTCPClient; fd_set fdRead; fd_set fdExp; { std::unique_lock lock(m_mClient); vTCPClient = m_vClient; fdRead = m_fdRead; fdExp = m_fdExp; } struct timeval sWaitTime = { 0, 1000 }; int nCount = select((int)m_maxSocket + 1, &fdRead, nullptr, &fdExp, &sWaitTime); if (nCount <= 0) { continue; } for (int i = (int)vTCPClient.size() - 1; i >= 0; i--) { TCPClient* tmpClient = vTCPClient[i]; if (FD_ISSET(tmpClient->m_nFD, &fdRead)) { //���ܲ��ص� if (!_OnProcessData(tmpClient)) { if (m_fException) { m_fException(tmpClient); } } } } } } m_eWorkStatus = WORK_EXIT; } bool CYServerTask::_OnProcessData(TCPClient* pClient) { const int nRecvLen = RECV_DATA_LEN; //��Э����� if(!m_bUseProtocol) { int nCount = recv(pClient->m_nFD, m_pRecvBuf, nRecvLen, 0); if (nCount > 0 && m_fRecv) { m_fRecv(pClient, m_pRecvBuf, nCount); } return nCount > 0; } //Э����� int nAllDataLen = 0; int recv_len = 0; int nRet = 0; int nDataAddr = 6 * sizeof(int); //recv head do { if ((recv_len = recv(pClient->m_nFD, (char *)(m_pProtocalHead) + nAllDataLen, nDataAddr - nAllDataLen, 0)) <= 0) { printf("read head failed \n"); return false; } nAllDataLen += recv_len; } while (nAllDataLen < nDataAddr); nAllDataLen = 0; //recv data while (nAllDataLen < m_pProtocalHead->nLen) { recv_len = recv(pClient->m_nFD, (char *)(m_pProtocalHead) + nDataAddr + nAllDataLen, m_pProtocalHead->nLen - nAllDataLen, 0); if (recv_len <= 0) { printf("read data len : %d failed [%d]\n", m_pProtocalHead->nLen - nAllDataLen, recv_len); return false; } nAllDataLen += recv_len; } nAllDataLen = 0; if (m_fRecv) { m_fRecv(pClient, (char *)m_pProtocalHead, m_pProtocalHead->nLen + nDataAddr); } //printf("cmd = %x len = %d \n", protocol.nCmd, protocol.nLen); return 0 == nRet; }