GrabBag/App/BeltTearing/BeltTearingServer/TearingTcpProtocol.cpp

446 lines
14 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "TearingTcpProtocol.h"
#include "VrLog.h"
#include <QDataStream>
TearingTcpProtocol::TearingTcpProtocol(IYTCPServer* tcpServer, QObject *parent)
: QObject(parent)
, m_tcpServer(tcpServer)
, m_heartbeatTimer(new QTimer(this))
, m_clientCheckTimer(new QTimer(this))
, m_heartbeatInterval(30)
, m_clientTimeout(90)
{
// 连接心跳定时器
connect(m_heartbeatTimer, &QTimer::timeout, this, &TearingTcpProtocol::onHeartbeatTimeout);
// 连接客户端超时检查定时器
connect(m_clientCheckTimer, &QTimer::timeout, this, &TearingTcpProtocol::onClientTimeoutCheck);
}
TearingTcpProtocol::~TearingTcpProtocol()
{
stop();
}
void TearingTcpProtocol::start(int heartbeatInterval)
{
m_heartbeatInterval = heartbeatInterval;
m_clientTimeout = heartbeatInterval * 3; // 超时时间为3倍心跳间隔
// 启动心跳定时器(暂不启动,等有客户端连接后再考虑)
// m_heartbeatTimer->start(m_heartbeatInterval * 1000);
// 启动客户端超时检查定时器
m_clientCheckTimer->start(10000); // 每10秒检查一次
LOG_INFO("TearingTcpProtocol started, heartbeat interval: %d seconds\n", m_heartbeatInterval);
}
void TearingTcpProtocol::stop()
{
m_heartbeatTimer->stop();
m_clientCheckTimer->stop();
m_clientBuffers.clear();
m_clientLastActive.clear();
LOG_INFO("TearingTcpProtocol stopped\n");
}
void TearingTcpProtocol::sendDetectResult(const std::vector<SSG_beltTearingInfo>& results, const QImage& visImage)
{
if (!m_tcpServer) {
return;
}
// 计算撕裂个数和最大撕裂长度
int count = static_cast<int>(results.size());
int maxLength = 0;
for (const auto& result : results) {
double width = result.roi.right - result.roi.left;
double length = result.roi.bottom - result.roi.top;
int tearLength = static_cast<int>(width > length ? width : length);
if (tearLength > maxLength) {
maxLength = tearLength;
}
}
// 构造JSON消息
QJsonObject jsonObj;
jsonObj["msgType"] = "DETECT_RESULT";
jsonObj["timestamp"] = QDateTime::currentMSecsSinceEpoch();
jsonObj["count"] = count;
jsonObj["max"] = maxLength;
// 将图像转换为Base64
if (!visImage.isNull()) {
jsonObj["visimg"] = imageToBase64(visImage);
} else {
jsonObj["visimg"] = "";
}
// 转换为JSON字符串
QJsonDocument doc(jsonObj);
QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
// 构造数据帧
QByteArray frame = buildFrame(jsonData);
// 发送给所有客户端
bool success = m_tcpServer->SendAllData(frame.constData(), frame.size());
if (success) {
LOG_DEBUG("Sent DETECT_RESULT to all clients, count=%d, max=%d\n", count, maxLength);
} else {
LOG_WARNING("Failed to send DETECT_RESULT to clients\n");
}
}
void TearingTcpProtocol::handleReceivedData(const TCPClient* pClient, const char* pData, unsigned int nLen)
{
if (!pClient || !pData || nLen == 0) {
return;
}
QString clientId = generateClientId(pClient);
// 更新客户端最后活跃时间
m_clientLastActive[clientId] = QDateTime::currentMSecsSinceEpoch();
// 将数据添加到客户端缓冲区
QByteArray& buffer = m_clientBuffers[clientId];
buffer.append(pData, nLen);
// 解析数据帧
QList<QByteArray> jsonDataList;
int parsedCount = parseFrames(buffer, jsonDataList);
if (parsedCount > 0) {
LOG_DEBUG("Parsed %d frames from client %s\n", parsedCount, clientId.toStdString().c_str());
// 处理每个JSON消息
for (const QByteArray& jsonData : jsonDataList) {
handleJsonMessage(pClient, jsonData);
}
}
}
void TearingTcpProtocol::setSpeedCallback(const SpeedCallback& callback)
{
m_speedCallback = callback;
}
void TearingTcpProtocol::setControlCallback(const ControlCallback& callback)
{
m_controlCallback = callback;
}
void TearingTcpProtocol::onHeartbeatTimeout()
{
// 可以在这里主动发送心跳给客户端(如果需要)
// 目前协议设计是客户端发送心跳,服务器应答
}
void TearingTcpProtocol::onClientTimeoutCheck()
{
qint64 currentTime = QDateTime::currentMSecsSinceEpoch();
qint64 timeoutMs = m_clientTimeout * 1000;
QList<QString> timeoutClients;
for (auto it = m_clientLastActive.begin(); it != m_clientLastActive.end(); ++it) {
if (currentTime - it.value() > timeoutMs) {
timeoutClients.append(it.key());
}
}
// 清理超时客户端的数据
for (const QString& clientId : timeoutClients) {
LOG_WARNING("Client %s timeout, cleaning up\n", clientId.toStdString().c_str());
m_clientBuffers.remove(clientId);
m_clientLastActive.remove(clientId);
}
}
QByteArray TearingTcpProtocol::buildFrame(const QByteArray& jsonData)
{
QByteArray frame;
quint32 dataLength = static_cast<quint32>(jsonData.size());
// 写入帧头
frame.append(FRAME_HEADER, FRAME_HEADER_SIZE);
// 写入数据长度8位字符串格式
char lengthStr[9]; // 8位数字 + '\0'
#ifdef _WIN32
sprintf_s(lengthStr, "%08u", dataLength);
#else
sprintf(lengthStr, "%08u", dataLength);
#endif
frame.append(lengthStr, FRAME_LENGTH_SIZE);
// 写入JSON数据
frame.append(jsonData);
// 写入帧尾
frame.append(FRAME_TAIL, FRAME_TAIL_SIZE);
return frame;
}
int TearingTcpProtocol::parseFrames(const QByteArray& data, QList<QByteArray>& outJsonData)
{
int parsedCount = 0;
QByteArray& buffer = const_cast<QByteArray&>(data);
while (buffer.size() >= FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE) {
// 查找帧头
int headerPos = buffer.indexOf(FRAME_HEADER);
if (headerPos == -1) {
// 没有找到帧头,清空缓冲区
buffer.clear();
break;
}
// 如果帧头不在开始位置,丢弃之前的数据
if (headerPos > 0) {
buffer.remove(0, headerPos);
}
// 检查是否有足够的数据读取长度字段
if (buffer.size() < FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE) {
break;
}
// 读取数据长度8位字符串格式
QByteArray lengthBytes = buffer.mid(FRAME_HEADER_SIZE, FRAME_LENGTH_SIZE);
QString lengthStr = QString::fromLatin1(lengthBytes);
bool ok;
quint32 dataLength = lengthStr.toUInt(&ok);
if (!ok) {
LOG_ERROR("Failed to parse frame data length: %s, discarding buffer\n", lengthStr.toStdString().c_str());
buffer.remove(0, FRAME_HEADER_SIZE);
continue;
}
// 检查数据长度是否合理(防止错误数据)
if (dataLength > 10 * 1024 * 1024) { // 限制最大10MB
LOG_ERROR("Invalid frame data length: %u, discarding buffer\n", dataLength);
buffer.remove(0, FRAME_HEADER_SIZE);
continue;
}
// 计算完整帧的大小
int frameSize = FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE + dataLength + FRAME_TAIL_SIZE;
// 检查是否接收到完整的帧
if (buffer.size() < frameSize) {
// 数据不完整,等待更多数据
break;
}
// 验证帧尾
int tailPos = FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE + dataLength;
QByteArray tail = buffer.mid(tailPos, FRAME_TAIL_SIZE);
if (tail != QByteArray(FRAME_TAIL, FRAME_TAIL_SIZE)) {
LOG_ERROR("Invalid frame tail, discarding frame\n");
buffer.remove(0, FRAME_HEADER_SIZE);
continue;
}
// 提取JSON数据
QByteArray jsonData = buffer.mid(FRAME_HEADER_SIZE + FRAME_LENGTH_SIZE, dataLength);
outJsonData.append(jsonData);
parsedCount++;
// 移除已处理的帧
buffer.remove(0, frameSize);
}
return parsedCount;
}
TcpMessageType TearingTcpProtocol::parseMessageType(const QString& msgTypeStr)
{
if (msgTypeStr == "DETECT_RESULT") return TcpMessageType::DETECT_RESULT;
if (msgTypeStr == "SET_SPEED") return TcpMessageType::SET_SPEED;
if (msgTypeStr == "SET_CONTROL") return TcpMessageType::SET_CONTROL;
if (msgTypeStr == "CMD_RESPONSE") return TcpMessageType::CMD_RESPONSE;
if (msgTypeStr == "HEARTBEAT") return TcpMessageType::HEARTBEAT;
if (msgTypeStr == "HEARTBEAT_ACK") return TcpMessageType::HEARTBEAT_ACK;
return TcpMessageType::UNKNOWN;
}
void TearingTcpProtocol::handleJsonMessage(const TCPClient* pClient, const QByteArray& jsonData)
{
// 解析JSON
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(jsonData, &parseError);
if (parseError.error != QJsonParseError::NoError) {
LOG_ERROR("Failed to parse JSON: %s\n", parseError.errorString().toStdString().c_str());
return;
}
if (!doc.isObject()) {
LOG_ERROR("JSON is not an object\n");
return;
}
QJsonObject jsonObj = doc.object();
// 获取消息类型
QString msgTypeStr = jsonObj["msgType"].toString();
TcpMessageType msgType = parseMessageType(msgTypeStr);
// 根据消息类型处理
switch (msgType) {
case TcpMessageType::SET_SPEED:
handleSetSpeed(pClient, jsonObj);
break;
case TcpMessageType::SET_CONTROL:
handleSetControl(pClient, jsonObj);
break;
case TcpMessageType::HEARTBEAT:
handleHeartbeat(pClient, jsonObj);
break;
default:
LOG_WARNING("Unknown message type: %s\n", msgTypeStr.toStdString().c_str());
break;
}
}
void TearingTcpProtocol::handleSetSpeed(const TCPClient* pClient, const QJsonObject& jsonObj)
{
int speed = jsonObj["speed"].toInt();
LOG_INFO("Received SET_SPEED command: speed=%d mm/s\n", speed);
// 参数验证
bool result = true;
int errorCode = 0;
QString errorMsg;
if (speed < 0 || speed > 5000) {
result = false;
errorCode = 1;
errorMsg = "Speed out of range (0-5000 mm/s)";
LOG_ERROR("Invalid speed value: %d\n", speed);
} else {
// 调用速度设置回调
if (m_speedCallback) {
m_speedCallback(speed);
}
}
// 发送应答
sendCommandResponse(pClient, "SET_SPEED", result, errorCode, errorMsg);
}
void TearingTcpProtocol::handleSetControl(const TCPClient* pClient, const QJsonObject& jsonObj)
{
bool control = jsonObj["control"].toBool();
LOG_INFO("Received SET_CONTROL command: control=%s\n", control ? "start" : "stop");
// 调用控制回调
bool result = true;
int errorCode = 0;
QString errorMsg;
if (m_controlCallback) {
m_controlCallback(control);
}
// 发送应答
sendCommandResponse(pClient, "SET_CONTROL", result, errorCode, errorMsg);
}
void TearingTcpProtocol::handleHeartbeat(const TCPClient* pClient, const QJsonObject& jsonObj)
{
LOG_DEBUG("Received HEARTBEAT from client\n");
// 发送心跳应答
sendHeartbeatAck(pClient);
}
void TearingTcpProtocol::sendCommandResponse(const TCPClient* pClient, const QString& cmdType,
bool result, int errorCode, const QString& errorMsg)
{
if (!m_tcpServer || !pClient) {
return;
}
// 构造应答JSON
QJsonObject jsonObj;
jsonObj["msgType"] = "CMD_RESPONSE";
jsonObj["timestamp"] = QDateTime::currentMSecsSinceEpoch();
jsonObj["cmdType"] = cmdType;
jsonObj["result"] = result;
jsonObj["errorCode"] = errorCode;
jsonObj["errorMsg"] = errorMsg;
// 转换为JSON字符串
QJsonDocument doc(jsonObj);
QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
// 构造数据帧
QByteArray frame = buildFrame(jsonData);
// 发送应答
bool success = m_tcpServer->SendData(pClient, frame.constData(), frame.size());
if (success) {
LOG_DEBUG("Sent CMD_RESPONSE for %s, result=%s\n",
cmdType.toStdString().c_str(), result ? "success" : "failure");
} else {
LOG_ERROR("Failed to send CMD_RESPONSE for %s\n", cmdType.toStdString().c_str());
}
}
void TearingTcpProtocol::sendHeartbeatAck(const TCPClient* pClient)
{
if (!m_tcpServer || !pClient) {
return;
}
// 构造心跳应答JSON
QJsonObject jsonObj;
jsonObj["msgType"] = "HEARTBEAT_ACK";
jsonObj["timestamp"] = QDateTime::currentMSecsSinceEpoch();
// 转换为JSON字符串
QJsonDocument doc(jsonObj);
QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
// 构造数据帧
QByteArray frame = buildFrame(jsonData);
// 发送应答
m_tcpServer->SendData(pClient, frame.constData(), frame.size());
}
QString TearingTcpProtocol::imageToBase64(const QImage& image)
{
if (image.isNull()) {
return "";
}
QByteArray byteArray;
QBuffer buffer(&byteArray);
buffer.open(QIODevice::WriteOnly);
// 保存为JPEG格式质量50
image.save(&buffer, "JPEG", 50);
// 转换为Base64并添加Data URI头部
QString base64Data = QString(byteArray.toBase64());
return QString("data:image/jpeg;base64,%1").arg(base64Data);
}
QString TearingTcpProtocol::generateClientId(const TCPClient* pClient)
{
return QString("Client_%1").arg(pClient->m_nFD);
}