GrabBag/App/BeltTearing/BeltTearingServer/TearingTcpProtocol.cpp

504 lines
16 KiB
C++
Raw Permalink Normal View History

#include "TearingTcpProtocol.h"
#include "VrLog.h"
#include <QDataStream>
2025-11-26 22:44:38 +08:00
TearingTcpProtocol::TearingTcpProtocol(QObject *parent)
: QObject(parent)
2025-11-26 22:44:38 +08:00
, m_tcpServer(nullptr)
, m_heartbeatTimer(new QTimer(this))
, m_clientCheckTimer(new QTimer(this))
, m_heartbeatInterval(30)
, m_clientTimeout(90)
2025-11-26 22:44:38 +08:00
, m_tcpPort(0)
{
// 连接心跳定时器
connect(m_heartbeatTimer, &QTimer::timeout, this, &TearingTcpProtocol::onHeartbeatTimeout);
// 连接客户端超时检查定时器
connect(m_clientCheckTimer, &QTimer::timeout, this, &TearingTcpProtocol::onClientTimeoutCheck);
}
TearingTcpProtocol::~TearingTcpProtocol()
{
stop();
2025-11-26 22:44:38 +08:00
if (m_tcpServer) {
delete m_tcpServer;
m_tcpServer = nullptr;
}
}
void TearingTcpProtocol::start(int heartbeatInterval)
{
m_heartbeatInterval = heartbeatInterval;
m_clientTimeout = heartbeatInterval * 3; // 超时时间为3倍心跳间隔
2025-11-26 22:44:38 +08:00
// 创建TCP服务器
if (!m_tcpServer && m_tcpPort > 0) {
if (!VrCreatYTCPServer(&m_tcpServer)) {
LOG_ERROR("Failed to create TCP server\n");
m_tcpServer = nullptr;
return;
}
// 初始化TCP服务器
if (!m_tcpServer->Init(m_tcpPort, true)) {
LOG_ERROR("Failed to initialize TCP server on port %d\n", m_tcpPort);
delete m_tcpServer;
m_tcpServer = nullptr;
return;
}
// 启动TCP服务器设置数据接收回调函数
if (!m_tcpServer->Start([this](const TCPClient* pClient, const char* pData, const unsigned int nLen) {
this->handleReceivedData(pClient, pData, nLen);
}, false)) {
LOG_ERROR("Failed to start TCP server on port %d\n", m_tcpPort);
m_tcpServer->Close();
delete m_tcpServer;
m_tcpServer = nullptr;
return;
}
LOG_INFO("TCP server started successfully on port %d\n", m_tcpPort);
}
// 启动心跳定时器(暂不启动,等有客户端连接后再考虑)
// m_heartbeatTimer->start(m_heartbeatInterval * 1000);
// 启动客户端超时检查定时器
m_clientCheckTimer->start(10000); // 每10秒检查一次
LOG_INFO("TearingTcpProtocol started, heartbeat interval: %d seconds\n", m_heartbeatInterval);
}
void TearingTcpProtocol::stop()
{
m_heartbeatTimer->stop();
m_clientCheckTimer->stop();
m_clientBuffers.clear();
m_clientLastActive.clear();
2025-11-26 22:44:38 +08:00
// 关闭并释放TCP服务器
if (m_tcpServer) {
m_tcpServer->Stop();
m_tcpServer->Close();
delete m_tcpServer;
m_tcpServer = nullptr;
}
LOG_INFO("TearingTcpProtocol stopped\n");
}
void TearingTcpProtocol::sendDetectResult(const std::vector<SSG_beltTearingInfo>& results, const QImage& visImage)
{
if (!m_tcpServer) {
return;
}
// 计算撕裂个数和最大撕裂长度
int count = static_cast<int>(results.size());
int maxLength = 0;
for (const auto& result : results) {
double width = result.roi.right - result.roi.left;
double length = result.roi.bottom - result.roi.top;
int tearLength = static_cast<int>(width > length ? width : length);
if (tearLength > maxLength) {
maxLength = tearLength;
}
}
// 构造JSON消息
QJsonObject jsonObj;
jsonObj["msgType"] = "DETECT_RESULT";
jsonObj["timestamp"] = QDateTime::currentMSecsSinceEpoch();
jsonObj["count"] = count;
jsonObj["max"] = maxLength;
// 将图像转换为Base64
if (!visImage.isNull()) {
jsonObj["visimg"] = imageToBase64(visImage);
} else {
jsonObj["visimg"] = "";
}
// 转换为JSON字符串
QJsonDocument doc(jsonObj);
QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
// 构造数据帧
QByteArray frame = buildFrame(jsonData);
// 发送给所有客户端
bool success = m_tcpServer->SendAllData(frame.constData(), frame.size());
if (success) {
LOG_DEBUG("Sent DETECT_RESULT to all clients, count=%d, max=%d\n", count, maxLength);
} else {
LOG_WARNING("Failed to send DETECT_RESULT to clients\n");
}
}
void TearingTcpProtocol::handleReceivedData(const TCPClient* pClient, const char* pData, unsigned int nLen)
{
if (!pClient || !pData || nLen == 0) {
return;
}
QString clientId = generateClientId(pClient);
// 更新客户端最后活跃时间
m_clientLastActive[clientId] = QDateTime::currentMSecsSinceEpoch();
// 将数据添加到客户端缓冲区
QByteArray& buffer = m_clientBuffers[clientId];
buffer.append(pData, nLen);
// 解析数据帧
QList<QByteArray> jsonDataList;
int parsedCount = parseFrames(buffer, jsonDataList);
if (parsedCount > 0) {
LOG_DEBUG("Parsed %d frames from client %s\n", parsedCount, clientId.toStdString().c_str());
// 处理每个JSON消息
for (const QByteArray& jsonData : jsonDataList) {
handleJsonMessage(pClient, jsonData);
}
}
}
void TearingTcpProtocol::setSpeedCallback(const SpeedCallback& callback)
{
m_speedCallback = callback;
}
void TearingTcpProtocol::setControlCallback(const ControlCallback& callback)
{
m_controlCallback = callback;
}
2025-11-26 22:44:38 +08:00
void TearingTcpProtocol::setTcpPort(quint16 port)
{
m_tcpPort = port;
}
void TearingTcpProtocol::onHeartbeatTimeout()
{
// 可以在这里主动发送心跳给客户端(如果需要)
// 目前协议设计是客户端发送心跳,服务器应答
}
void TearingTcpProtocol::onClientTimeoutCheck()
{
qint64 currentTime = QDateTime::currentMSecsSinceEpoch();
qint64 timeoutMs = m_clientTimeout * 1000;
QList<QString> timeoutClients;
for (auto it = m_clientLastActive.begin(); it != m_clientLastActive.end(); ++it) {
if (currentTime - it.value() > timeoutMs) {
timeoutClients.append(it.key());
}
}
// 清理超时客户端的数据
for (const QString& clientId : timeoutClients) {
LOG_WARNING("Client %s timeout, cleaning up\n", clientId.toStdString().c_str());
m_clientBuffers.remove(clientId);
m_clientLastActive.remove(clientId);
}
}
QByteArray TearingTcpProtocol::buildFrame(const QByteArray& jsonData)
{
QByteArray frame;
quint32 dataLength = static_cast<quint32>(jsonData.size());
// 写入帧头
frame.append(FRAME_HEADER, FRAME_HEADER_SIZE);
// 写入数据长度8位字符串格式
char lengthStr[9]; // 8位数字 + '\0'
#ifdef _WIN32
sprintf_s(lengthStr, "%08u", dataLength);
#else
sprintf(lengthStr, "%08u", dataLength);
#endif
frame.append(lengthStr, FRAME_LENGTH_SIZE);
// 写入JSON数据
frame.append(jsonData);
// 写入帧尾
frame.append(FRAME_TAIL, FRAME_TAIL_SIZE);
return frame;
}
int TearingTcpProtocol::parseFrames(const QByteArray& data, QList<QByteArray>& outJsonData)
{
int parsedCount = 0;
QByteArray& buffer = const_cast<QByteArray&>(data);
while (buffer.size() >= FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE) {
// 查找帧头
int headerPos = buffer.indexOf(FRAME_HEADER);
if (headerPos == -1) {
// 没有找到帧头,清空缓冲区
buffer.clear();
break;
}
// 如果帧头不在开始位置,丢弃之前的数据
if (headerPos > 0) {
buffer.remove(0, headerPos);
}
// 检查是否有足够的数据读取长度字段
if (buffer.size() < FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE) {
break;
}
// 读取数据长度8位字符串格式
QByteArray lengthBytes = buffer.mid(FRAME_HEADER_SIZE, FRAME_LENGTH_SIZE);
QString lengthStr = QString::fromLatin1(lengthBytes);
bool ok;
quint32 dataLength = lengthStr.toUInt(&ok);
if (!ok) {
LOG_ERROR("Failed to parse frame data length: %s, discarding buffer\n", lengthStr.toStdString().c_str());
buffer.remove(0, FRAME_HEADER_SIZE);
continue;
}
// 检查数据长度是否合理(防止错误数据)
if (dataLength > 10 * 1024 * 1024) { // 限制最大10MB
LOG_ERROR("Invalid frame data length: %u, discarding buffer\n", dataLength);
buffer.remove(0, FRAME_HEADER_SIZE);
continue;
}
// 计算完整帧的大小
int frameSize = FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE + dataLength + FRAME_TAIL_SIZE;
// 检查是否接收到完整的帧
if (buffer.size() < frameSize) {
// 数据不完整,等待更多数据
break;
}
// 验证帧尾
int tailPos = FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE + dataLength;
QByteArray tail = buffer.mid(tailPos, FRAME_TAIL_SIZE);
if (tail != QByteArray(FRAME_TAIL, FRAME_TAIL_SIZE)) {
LOG_ERROR("Invalid frame tail, discarding frame\n");
buffer.remove(0, FRAME_HEADER_SIZE);
continue;
}
// 提取JSON数据
QByteArray jsonData = buffer.mid(FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE, dataLength);
outJsonData.append(jsonData);
parsedCount++;
// 移除已处理的帧
buffer.remove(0, frameSize);
}
return parsedCount;
}
TcpMessageType TearingTcpProtocol::parseMessageType(const QString& msgTypeStr)
{
if (msgTypeStr == "DETECT_RESULT") return TcpMessageType::DETECT_RESULT;
if (msgTypeStr == "SET_SPEED") return TcpMessageType::SET_SPEED;
if (msgTypeStr == "SET_CONTROL") return TcpMessageType::SET_CONTROL;
if (msgTypeStr == "CMD_RESPONSE") return TcpMessageType::CMD_RESPONSE;
if (msgTypeStr == "HEARTBEAT") return TcpMessageType::HEARTBEAT;
if (msgTypeStr == "HEARTBEAT_ACK") return TcpMessageType::HEARTBEAT_ACK;
return TcpMessageType::UNKNOWN;
}
void TearingTcpProtocol::handleJsonMessage(const TCPClient* pClient, const QByteArray& jsonData)
{
// 解析JSON
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(jsonData, &parseError);
if (parseError.error != QJsonParseError::NoError) {
LOG_ERROR("Failed to parse JSON: %s\n", parseError.errorString().toStdString().c_str());
return;
}
if (!doc.isObject()) {
LOG_ERROR("JSON is not an object\n");
return;
}
QJsonObject jsonObj = doc.object();
// 获取消息类型
QString msgTypeStr = jsonObj["msgType"].toString();
TcpMessageType msgType = parseMessageType(msgTypeStr);
// 根据消息类型处理
switch (msgType) {
case TcpMessageType::SET_SPEED:
handleSetSpeed(pClient, jsonObj);
break;
case TcpMessageType::SET_CONTROL:
handleSetControl(pClient, jsonObj);
break;
case TcpMessageType::HEARTBEAT:
handleHeartbeat(pClient, jsonObj);
break;
default:
LOG_WARNING("Unknown message type: %s\n", msgTypeStr.toStdString().c_str());
break;
}
}
void TearingTcpProtocol::handleSetSpeed(const TCPClient* pClient, const QJsonObject& jsonObj)
{
int speed = jsonObj["speed"].toInt();
LOG_INFO("Received SET_SPEED command: speed=%d mm/s\n", speed);
// 参数验证
bool result = true;
int errorCode = 0;
QString errorMsg;
if (speed < 0 || speed > 5000) {
result = false;
errorCode = 1;
errorMsg = "Speed out of range (0-5000 mm/s)";
LOG_ERROR("Invalid speed value: %d\n", speed);
} else {
// 调用速度设置回调
if (m_speedCallback) {
2025-11-26 22:44:38 +08:00
errorCode = m_speedCallback(speed);
result = (errorCode == 0);
if (!result) {
errorMsg = "Speed callback execution failed";
LOG_ERROR("Speed callback execution failed for speed: %d, error code: %d\n", speed, errorCode);
}
}
}
// 发送应答
sendCommandResponse(pClient, "SET_SPEED", result, errorCode, errorMsg);
}
void TearingTcpProtocol::handleSetControl(const TCPClient* pClient, const QJsonObject& jsonObj)
{
bool control = jsonObj["control"].toBool();
LOG_INFO("Received SET_CONTROL command: control=%s\n", control ? "start" : "stop");
// 调用控制回调
bool result = true;
int errorCode = 0;
QString errorMsg;
if (m_controlCallback) {
2025-11-26 22:44:38 +08:00
errorCode = m_controlCallback(control);
result = (errorCode == 0);
if (!result) {
errorMsg = "Control callback execution failed";
LOG_ERROR("Control callback execution failed for control: %s, error code: %d\n", control ? "start" : "stop", errorCode);
}
}
// 发送应答
sendCommandResponse(pClient, "SET_CONTROL", result, errorCode, errorMsg);
}
void TearingTcpProtocol::handleHeartbeat(const TCPClient* pClient, const QJsonObject& jsonObj)
{
LOG_DEBUG("Received HEARTBEAT from client\n");
// 发送心跳应答
sendHeartbeatAck(pClient);
}
void TearingTcpProtocol::sendCommandResponse(const TCPClient* pClient, const QString& cmdType,
bool result, int errorCode, const QString& errorMsg)
{
if (!m_tcpServer || !pClient) {
return;
}
// 构造应答JSON
QJsonObject jsonObj;
jsonObj["msgType"] = "CMD_RESPONSE";
jsonObj["timestamp"] = QDateTime::currentMSecsSinceEpoch();
jsonObj["cmdType"] = cmdType;
jsonObj["result"] = result;
jsonObj["errorCode"] = errorCode;
jsonObj["errorMsg"] = errorMsg;
// 转换为JSON字符串
QJsonDocument doc(jsonObj);
QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
// 构造数据帧
QByteArray frame = buildFrame(jsonData);
// 发送应答
bool success = m_tcpServer->SendData(pClient, frame.constData(), frame.size());
if (success) {
LOG_DEBUG("Sent CMD_RESPONSE for %s, result=%s\n",
cmdType.toStdString().c_str(), result ? "success" : "failure");
} else {
LOG_ERROR("Failed to send CMD_RESPONSE for %s\n", cmdType.toStdString().c_str());
}
}
void TearingTcpProtocol::sendHeartbeatAck(const TCPClient* pClient)
{
if (!m_tcpServer || !pClient) {
return;
}
// 构造心跳应答JSON
QJsonObject jsonObj;
jsonObj["msgType"] = "HEARTBEAT_ACK";
jsonObj["timestamp"] = QDateTime::currentMSecsSinceEpoch();
// 转换为JSON字符串
QJsonDocument doc(jsonObj);
QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
// 构造数据帧
QByteArray frame = buildFrame(jsonData);
// 发送应答
m_tcpServer->SendData(pClient, frame.constData(), frame.size());
}
QString TearingTcpProtocol::imageToBase64(const QImage& image)
{
if (image.isNull()) {
return "";
}
QByteArray byteArray;
QBuffer buffer(&byteArray);
buffer.open(QIODevice::WriteOnly);
// 保存为JPEG格式质量50
image.save(&buffer, "JPEG", 50);
// 转换为Base64并添加Data URI头部
QString base64Data = QString(byteArray.toBase64());
return QString("data:image/jpeg;base64,%1").arg(base64Data);
}
QString TearingTcpProtocol::generateClientId(const TCPClient* pClient)
{
return QString("Client_%1").arg(pClient->m_nFD);
}