#include "TearingTcpProtocol.h" #include "VrLog.h" #include TearingTcpProtocol::TearingTcpProtocol(QObject *parent) : QObject(parent) , m_tcpServer(nullptr) , m_heartbeatTimer(new QTimer(this)) , m_clientCheckTimer(new QTimer(this)) , m_heartbeatInterval(30) , m_clientTimeout(90) , m_tcpPort(0) { // 连接心跳定时器 connect(m_heartbeatTimer, &QTimer::timeout, this, &TearingTcpProtocol::onHeartbeatTimeout); // 连接客户端超时检查定时器 connect(m_clientCheckTimer, &QTimer::timeout, this, &TearingTcpProtocol::onClientTimeoutCheck); } TearingTcpProtocol::~TearingTcpProtocol() { stop(); if (m_tcpServer) { delete m_tcpServer; m_tcpServer = nullptr; } } void TearingTcpProtocol::start(int heartbeatInterval) { m_heartbeatInterval = heartbeatInterval; m_clientTimeout = heartbeatInterval * 3; // 超时时间为3倍心跳间隔 // 创建TCP服务器 if (!m_tcpServer && m_tcpPort > 0) { if (!VrCreatYTCPServer(&m_tcpServer)) { LOG_ERROR("Failed to create TCP server\n"); m_tcpServer = nullptr; return; } // 初始化TCP服务器 if (!m_tcpServer->Init(m_tcpPort, true)) { LOG_ERROR("Failed to initialize TCP server on port %d\n", m_tcpPort); delete m_tcpServer; m_tcpServer = nullptr; return; } // 启动TCP服务器,设置数据接收回调函数 if (!m_tcpServer->Start([this](const TCPClient* pClient, const char* pData, const unsigned int nLen) { this->handleReceivedData(pClient, pData, nLen); }, false)) { LOG_ERROR("Failed to start TCP server on port %d\n", m_tcpPort); m_tcpServer->Close(); delete m_tcpServer; m_tcpServer = nullptr; return; } LOG_INFO("TCP server started successfully on port %d\n", m_tcpPort); } // 启动心跳定时器(暂不启动,等有客户端连接后再考虑) // m_heartbeatTimer->start(m_heartbeatInterval * 1000); // 启动客户端超时检查定时器 m_clientCheckTimer->start(10000); // 每10秒检查一次 LOG_INFO("TearingTcpProtocol started, heartbeat interval: %d seconds\n", m_heartbeatInterval); } void TearingTcpProtocol::stop() { m_heartbeatTimer->stop(); m_clientCheckTimer->stop(); m_clientBuffers.clear(); m_clientLastActive.clear(); // 关闭并释放TCP服务器 if (m_tcpServer) { m_tcpServer->Stop(); m_tcpServer->Close(); delete m_tcpServer; m_tcpServer = nullptr; } LOG_INFO("TearingTcpProtocol stopped\n"); } void TearingTcpProtocol::sendDetectResult(const std::vector& results, const QImage& visImage) { if (!m_tcpServer) { return; } // 计算撕裂个数和最大撕裂长度 int count = static_cast(results.size()); int maxLength = 0; for (const auto& result : results) { double width = result.roi.right - result.roi.left; double length = result.roi.bottom - result.roi.top; int tearLength = static_cast(width > length ? width : length); if (tearLength > maxLength) { maxLength = tearLength; } } // 构造JSON消息 QJsonObject jsonObj; jsonObj["msgType"] = "DETECT_RESULT"; jsonObj["timestamp"] = QDateTime::currentMSecsSinceEpoch(); jsonObj["count"] = count; jsonObj["max"] = maxLength; // 将图像转换为Base64 if (!visImage.isNull()) { jsonObj["visimg"] = imageToBase64(visImage); } else { jsonObj["visimg"] = ""; } // 转换为JSON字符串 QJsonDocument doc(jsonObj); QByteArray jsonData = doc.toJson(QJsonDocument::Compact); // 构造数据帧 QByteArray frame = buildFrame(jsonData); // 发送给所有客户端 bool success = m_tcpServer->SendAllData(frame.constData(), frame.size()); if (success) { LOG_DEBUG("Sent DETECT_RESULT to all clients, count=%d, max=%d\n", count, maxLength); } else { LOG_WARNING("Failed to send DETECT_RESULT to clients\n"); } } void TearingTcpProtocol::handleReceivedData(const TCPClient* pClient, const char* pData, unsigned int nLen) { if (!pClient || !pData || nLen == 0) { return; } QString clientId = generateClientId(pClient); // 更新客户端最后活跃时间 m_clientLastActive[clientId] = QDateTime::currentMSecsSinceEpoch(); // 将数据添加到客户端缓冲区 QByteArray& buffer = m_clientBuffers[clientId]; buffer.append(pData, nLen); // 解析数据帧 QList jsonDataList; int parsedCount = parseFrames(buffer, jsonDataList); if (parsedCount > 0) { LOG_DEBUG("Parsed %d frames from client %s\n", parsedCount, clientId.toStdString().c_str()); // 处理每个JSON消息 for (const QByteArray& jsonData : jsonDataList) { handleJsonMessage(pClient, jsonData); } } } void TearingTcpProtocol::setSpeedCallback(const SpeedCallback& callback) { m_speedCallback = callback; } void TearingTcpProtocol::setControlCallback(const ControlCallback& callback) { m_controlCallback = callback; } void TearingTcpProtocol::setTcpPort(quint16 port) { m_tcpPort = port; } void TearingTcpProtocol::onHeartbeatTimeout() { // 可以在这里主动发送心跳给客户端(如果需要) // 目前协议设计是客户端发送心跳,服务器应答 } void TearingTcpProtocol::onClientTimeoutCheck() { qint64 currentTime = QDateTime::currentMSecsSinceEpoch(); qint64 timeoutMs = m_clientTimeout * 1000; QList timeoutClients; for (auto it = m_clientLastActive.begin(); it != m_clientLastActive.end(); ++it) { if (currentTime - it.value() > timeoutMs) { timeoutClients.append(it.key()); } } // 清理超时客户端的数据 for (const QString& clientId : timeoutClients) { LOG_WARNING("Client %s timeout, cleaning up\n", clientId.toStdString().c_str()); m_clientBuffers.remove(clientId); m_clientLastActive.remove(clientId); } } QByteArray TearingTcpProtocol::buildFrame(const QByteArray& jsonData) { QByteArray frame; quint32 dataLength = static_cast(jsonData.size()); // 写入帧头 frame.append(FRAME_HEADER, FRAME_HEADER_SIZE); // 写入数据长度(8位字符串格式) char lengthStr[9]; // 8位数字 + '\0' #ifdef _WIN32 sprintf_s(lengthStr, "%08u", dataLength); #else sprintf(lengthStr, "%08u", dataLength); #endif frame.append(lengthStr, FRAME_LENGTH_SIZE); // 写入JSON数据 frame.append(jsonData); // 写入帧尾 frame.append(FRAME_TAIL, FRAME_TAIL_SIZE); return frame; } int TearingTcpProtocol::parseFrames(const QByteArray& data, QList& outJsonData) { int parsedCount = 0; QByteArray& buffer = const_cast(data); while (buffer.size() >= FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE) { // 查找帧头 int headerPos = buffer.indexOf(FRAME_HEADER); if (headerPos == -1) { // 没有找到帧头,清空缓冲区 buffer.clear(); break; } // 如果帧头不在开始位置,丢弃之前的数据 if (headerPos > 0) { buffer.remove(0, headerPos); } // 检查是否有足够的数据读取长度字段 if (buffer.size() < FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE) { break; } // 读取数据长度(8位字符串格式) QByteArray lengthBytes = buffer.mid(FRAME_HEADER_SIZE, FRAME_LENGTH_SIZE); QString lengthStr = QString::fromLatin1(lengthBytes); bool ok; quint32 dataLength = lengthStr.toUInt(&ok); if (!ok) { LOG_ERROR("Failed to parse frame data length: %s, discarding buffer\n", lengthStr.toStdString().c_str()); buffer.remove(0, FRAME_HEADER_SIZE); continue; } // 检查数据长度是否合理(防止错误数据) if (dataLength > 10 * 1024 * 1024) { // 限制最大10MB LOG_ERROR("Invalid frame data length: %u, discarding buffer\n", dataLength); buffer.remove(0, FRAME_HEADER_SIZE); continue; } // 计算完整帧的大小 int frameSize = FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE + dataLength + FRAME_TAIL_SIZE; // 检查是否接收到完整的帧 if (buffer.size() < frameSize) { // 数据不完整,等待更多数据 break; } // 验证帧尾 int tailPos = FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE + dataLength; QByteArray tail = buffer.mid(tailPos, FRAME_TAIL_SIZE); if (tail != QByteArray(FRAME_TAIL, FRAME_TAIL_SIZE)) { LOG_ERROR("Invalid frame tail, discarding frame\n"); buffer.remove(0, FRAME_HEADER_SIZE); continue; } // 提取JSON数据 QByteArray jsonData = buffer.mid(FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE, dataLength); outJsonData.append(jsonData); parsedCount++; // 移除已处理的帧 buffer.remove(0, frameSize); } return parsedCount; } TcpMessageType TearingTcpProtocol::parseMessageType(const QString& msgTypeStr) { if (msgTypeStr == "DETECT_RESULT") return TcpMessageType::DETECT_RESULT; if (msgTypeStr == "SET_SPEED") return TcpMessageType::SET_SPEED; if (msgTypeStr == "SET_CONTROL") return TcpMessageType::SET_CONTROL; if (msgTypeStr == "CMD_RESPONSE") return TcpMessageType::CMD_RESPONSE; if (msgTypeStr == "HEARTBEAT") return TcpMessageType::HEARTBEAT; if (msgTypeStr == "HEARTBEAT_ACK") return TcpMessageType::HEARTBEAT_ACK; return TcpMessageType::UNKNOWN; } void TearingTcpProtocol::handleJsonMessage(const TCPClient* pClient, const QByteArray& jsonData) { // 解析JSON QJsonParseError parseError; QJsonDocument doc = QJsonDocument::fromJson(jsonData, &parseError); if (parseError.error != QJsonParseError::NoError) { LOG_ERROR("Failed to parse JSON: %s\n", parseError.errorString().toStdString().c_str()); return; } if (!doc.isObject()) { LOG_ERROR("JSON is not an object\n"); return; } QJsonObject jsonObj = doc.object(); // 获取消息类型 QString msgTypeStr = jsonObj["msgType"].toString(); TcpMessageType msgType = parseMessageType(msgTypeStr); // 根据消息类型处理 switch (msgType) { case TcpMessageType::SET_SPEED: handleSetSpeed(pClient, jsonObj); break; case TcpMessageType::SET_CONTROL: handleSetControl(pClient, jsonObj); break; case TcpMessageType::HEARTBEAT: handleHeartbeat(pClient, jsonObj); break; default: LOG_WARNING("Unknown message type: %s\n", msgTypeStr.toStdString().c_str()); break; } } void TearingTcpProtocol::handleSetSpeed(const TCPClient* pClient, const QJsonObject& jsonObj) { int speed = jsonObj["speed"].toInt(); LOG_INFO("Received SET_SPEED command: speed=%d mm/s\n", speed); // 参数验证 bool result = true; int errorCode = 0; QString errorMsg; if (speed < 0 || speed > 5000) { result = false; errorCode = 1; errorMsg = "Speed out of range (0-5000 mm/s)"; LOG_ERROR("Invalid speed value: %d\n", speed); } else { // 调用速度设置回调 if (m_speedCallback) { errorCode = m_speedCallback(speed); result = (errorCode == 0); if (!result) { errorMsg = "Speed callback execution failed"; LOG_ERROR("Speed callback execution failed for speed: %d, error code: %d\n", speed, errorCode); } } } // 发送应答 sendCommandResponse(pClient, "SET_SPEED", result, errorCode, errorMsg); } void TearingTcpProtocol::handleSetControl(const TCPClient* pClient, const QJsonObject& jsonObj) { bool control = jsonObj["control"].toBool(); LOG_INFO("Received SET_CONTROL command: control=%s\n", control ? "start" : "stop"); // 调用控制回调 bool result = true; int errorCode = 0; QString errorMsg; if (m_controlCallback) { errorCode = m_controlCallback(control); result = (errorCode == 0); if (!result) { errorMsg = "Control callback execution failed"; LOG_ERROR("Control callback execution failed for control: %s, error code: %d\n", control ? "start" : "stop", errorCode); } } // 发送应答 sendCommandResponse(pClient, "SET_CONTROL", result, errorCode, errorMsg); } void TearingTcpProtocol::handleHeartbeat(const TCPClient* pClient, const QJsonObject& jsonObj) { LOG_DEBUG("Received HEARTBEAT from client\n"); // 发送心跳应答 sendHeartbeatAck(pClient); } void TearingTcpProtocol::sendCommandResponse(const TCPClient* pClient, const QString& cmdType, bool result, int errorCode, const QString& errorMsg) { if (!m_tcpServer || !pClient) { return; } // 构造应答JSON QJsonObject jsonObj; jsonObj["msgType"] = "CMD_RESPONSE"; jsonObj["timestamp"] = QDateTime::currentMSecsSinceEpoch(); jsonObj["cmdType"] = cmdType; jsonObj["result"] = result; jsonObj["errorCode"] = errorCode; jsonObj["errorMsg"] = errorMsg; // 转换为JSON字符串 QJsonDocument doc(jsonObj); QByteArray jsonData = doc.toJson(QJsonDocument::Compact); // 构造数据帧 QByteArray frame = buildFrame(jsonData); // 发送应答 bool success = m_tcpServer->SendData(pClient, frame.constData(), frame.size()); if (success) { LOG_DEBUG("Sent CMD_RESPONSE for %s, result=%s\n", cmdType.toStdString().c_str(), result ? "success" : "failure"); } else { LOG_ERROR("Failed to send CMD_RESPONSE for %s\n", cmdType.toStdString().c_str()); } } void TearingTcpProtocol::sendHeartbeatAck(const TCPClient* pClient) { if (!m_tcpServer || !pClient) { return; } // 构造心跳应答JSON QJsonObject jsonObj; jsonObj["msgType"] = "HEARTBEAT_ACK"; jsonObj["timestamp"] = QDateTime::currentMSecsSinceEpoch(); // 转换为JSON字符串 QJsonDocument doc(jsonObj); QByteArray jsonData = doc.toJson(QJsonDocument::Compact); // 构造数据帧 QByteArray frame = buildFrame(jsonData); // 发送应答 m_tcpServer->SendData(pClient, frame.constData(), frame.size()); } QString TearingTcpProtocol::imageToBase64(const QImage& image) { if (image.isNull()) { return ""; } QByteArray byteArray; QBuffer buffer(&byteArray); buffer.open(QIODevice::WriteOnly); // 保存为JPEG格式,质量50 image.save(&buffer, "JPEG", 50); // 转换为Base64,并添加Data URI头部 QString base64Data = QString(byteArray.toBase64()); return QString("data:image/jpeg;base64,%1").arg(base64Data); } QString TearingTcpProtocol::generateClientId(const TCPClient* pClient) { return QString("Client_%1").arg(pClient->m_nFD); }