mirror of
https://gitee.com/zhijiantianya/ruoyi-vue-pro.git
synced 2025-12-26 08:26:44 +08:00
feat:【IoT 物联网】优化 TCP 二进制协议的消息编码和解码逻辑
This commit is contained in:
parent
00bd4293f0
commit
8449ccbb7d
@ -4,10 +4,9 @@ import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@ -19,9 +18,9 @@ import java.nio.charset.StandardCharsets;
|
||||
* 二进制协议格式(所有数值使用大端序):
|
||||
*
|
||||
* <pre>
|
||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
* | 魔术字 | 版本号 | 消息类型| 消息标志| 消息长度(4 字节) |
|
||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
* +--------+--------+--------+---------------------------+--------+--------+
|
||||
* | 魔术字 | 版本号 | 消息类型| 消息长度(4 字节) |
|
||||
* +--------+--------+--------+---------------------------+--------+--------+
|
||||
* | 消息 ID 长度(2 字节) | 消息 ID (变长字符串) |
|
||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
* | 方法名长度(2 字节) | 方法名(变长字符串) |
|
||||
@ -44,8 +43,6 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
|
||||
public static final String TYPE = "TCP_BINARY";
|
||||
|
||||
// ==================== 协议常量 ====================
|
||||
|
||||
/**
|
||||
* 协议魔术字,用于协议识别
|
||||
*/
|
||||
@ -56,27 +53,20 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
*/
|
||||
private static final byte PROTOCOL_VERSION = (byte) 0x01;
|
||||
|
||||
// TODO @haohao:这个要不直接静态枚举,不用 MessageType
|
||||
/**
|
||||
* 消息类型常量
|
||||
* 请求消息类型
|
||||
*/
|
||||
public static class MessageType {
|
||||
|
||||
/**
|
||||
* 请求消息
|
||||
*/
|
||||
public static final byte REQUEST = 0x01;
|
||||
/**
|
||||
* 响应消息
|
||||
*/
|
||||
public static final byte RESPONSE = 0x02;
|
||||
|
||||
}
|
||||
private static final byte REQUEST = (byte) 0x01;
|
||||
|
||||
/**
|
||||
* 协议头部固定长度(魔术字 + 版本号 + 消息类型 + 消息标志 + 消息长度)
|
||||
* 响应消息类型
|
||||
*/
|
||||
private static final int HEADER_FIXED_LENGTH = 8;
|
||||
private static final byte RESPONSE = (byte) 0x02;
|
||||
|
||||
/**
|
||||
* 协议头部固定长度(魔术字 + 版本号 + 消息类型 + 消息长度)
|
||||
*/
|
||||
private static final int HEADER_FIXED_LENGTH = 7;
|
||||
|
||||
/**
|
||||
* 最小消息长度(头部 + 消息ID长度 + 方法名长度)
|
||||
@ -97,7 +87,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
byte messageType = determineMessageType(message);
|
||||
// 2. 构建消息体
|
||||
byte[] bodyData = buildMessageBody(message, messageType);
|
||||
// 3. 构建完整消息(不包含deviceId,由连接上下文管理)
|
||||
// 3. 构建完整消息
|
||||
return buildCompleteMessage(message, messageType, bodyData);
|
||||
} catch (Exception e) {
|
||||
log.error("[encode][TCP 二进制消息编码失败,消息: {}]", message, e);
|
||||
@ -111,30 +101,59 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
Assert.isTrue(bytes.length >= MIN_MESSAGE_LENGTH, "数据包长度不足");
|
||||
try {
|
||||
Buffer buffer = Buffer.buffer(bytes);
|
||||
// 1. 解析协议头部
|
||||
ProtocolHeader header = parseProtocolHeader(buffer);
|
||||
// 2. 解析消息内容(不包含deviceId,由上层连接管理器设置)
|
||||
return parseMessageContent(buffer, header);
|
||||
// 解析协议头部和消息内容
|
||||
int index = 0;
|
||||
// 1. 验证魔术字
|
||||
byte magic = buffer.getByte(index++);
|
||||
Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic);
|
||||
|
||||
// 2. 验证版本号
|
||||
byte version = buffer.getByte(index++);
|
||||
Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version);
|
||||
|
||||
// 3. 读取消息类型
|
||||
byte messageType = buffer.getByte(index++);
|
||||
// 直接验证消息类型,无需抽取方法
|
||||
Assert.isTrue(messageType == REQUEST || messageType == RESPONSE,
|
||||
"无效的消息类型: " + messageType);
|
||||
|
||||
// 4. 读取消息长度
|
||||
int messageLength = buffer.getInt(index);
|
||||
index += 4;
|
||||
Assert.isTrue(messageLength == buffer.length(),
|
||||
"消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length());
|
||||
|
||||
// 5. 读取消息 ID
|
||||
short messageIdLength = buffer.getShort(index);
|
||||
index += 2;
|
||||
String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name());
|
||||
index += messageIdLength;
|
||||
|
||||
// 6. 读取方法名
|
||||
short methodLength = buffer.getShort(index);
|
||||
index += 2;
|
||||
String method = buffer.getString(index, index + methodLength, StandardCharsets.UTF_8.name());
|
||||
index += methodLength;
|
||||
|
||||
// 7. 解析消息体
|
||||
return parseMessageBody(buffer, index, messageType, messageId, method);
|
||||
} catch (Exception e) {
|
||||
log.error("[decode][TCP 二进制消息解码失败,数据长度: {}]", bytes.length, e);
|
||||
throw new RuntimeException("TCP 二进制消息解码失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 编码相关方法 ====================
|
||||
|
||||
/**
|
||||
* 确定消息类型
|
||||
* 优化后的判断逻辑:有响应字段就是响应消息,否则就是请求消息
|
||||
*/
|
||||
private byte determineMessageType(IotDeviceMessage message) {
|
||||
// 判断是否为响应消息:有响应码或响应消息时为响应
|
||||
// TODO @haohao:感觉只判断 code 更稳妥点?msg 有可能空。。。
|
||||
if (message.getCode() != null || StrUtil.isNotBlank(message.getMsg())) {
|
||||
return MessageType.RESPONSE;
|
||||
if (message.getCode() != null) {
|
||||
return RESPONSE;
|
||||
}
|
||||
// 默认为请求消息
|
||||
return MessageType.REQUEST;
|
||||
return REQUEST;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -142,12 +161,12 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
*/
|
||||
private byte[] buildMessageBody(IotDeviceMessage message, byte messageType) {
|
||||
Buffer bodyBuffer = Buffer.buffer();
|
||||
if (messageType == MessageType.RESPONSE) {
|
||||
if (messageType == RESPONSE) {
|
||||
// code
|
||||
bodyBuffer.appendInt(message.getCode() != null ? message.getCode() : 0);
|
||||
// msg
|
||||
String msg = message.getMsg() != null ? message.getMsg() : "";
|
||||
byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
|
||||
byte[] msgBytes = StrUtil.utf8Bytes(msg);
|
||||
bodyBuffer.appendShort((short) msgBytes.length);
|
||||
bodyBuffer.appendBytes(msgBytes);
|
||||
// data
|
||||
@ -155,11 +174,9 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getData()));
|
||||
}
|
||||
} else {
|
||||
// params
|
||||
// TODO @haohao:请求是不是只处理 message.getParams() 哈?
|
||||
Object payload = message.getParams() != null ? message.getParams() : message.getData();
|
||||
if (payload != null) {
|
||||
bodyBuffer.appendBytes(JsonUtils.toJsonByte(payload));
|
||||
// 请求消息只处理 params 参数
|
||||
if (message.getParams() != null) {
|
||||
bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getParams()));
|
||||
}
|
||||
}
|
||||
return bodyBuffer.getBytes();
|
||||
@ -174,20 +191,17 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
buffer.appendByte(MAGIC_NUMBER);
|
||||
buffer.appendByte(PROTOCOL_VERSION);
|
||||
buffer.appendByte(messageType);
|
||||
buffer.appendByte((byte) 0x00); // 消息标志,预留字段 TODO @haohao:这个标识的作用是啥呀?
|
||||
// 2. 预留消息长度位置(在 6. 更新消息长度)
|
||||
// 2. 预留消息长度位置(在 5. 更新消息长度)
|
||||
int lengthPosition = buffer.length();
|
||||
buffer.appendInt(0);
|
||||
// 3. 写入消息 ID
|
||||
String messageId = StrUtil.isNotBlank(message.getRequestId()) ? message.getRequestId()
|
||||
// TODO @haohao:复用 IotDeviceMessageUtils 的 generateMessageId 哇?
|
||||
: generateMessageId(message.getMethod());
|
||||
// TODO @haohao:StrUtil.utf8Bytes()
|
||||
byte[] messageIdBytes = messageId.getBytes(StandardCharsets.UTF_8);
|
||||
: IotDeviceMessageUtils.generateMessageId();
|
||||
byte[] messageIdBytes = StrUtil.utf8Bytes(messageId);
|
||||
buffer.appendShort((short) messageIdBytes.length);
|
||||
buffer.appendBytes(messageIdBytes);
|
||||
// 4. 写入方法名
|
||||
byte[] methodBytes = message.getMethod().getBytes(StandardCharsets.UTF_8);
|
||||
byte[] methodBytes = StrUtil.utf8Bytes(message.getMethod());
|
||||
buffer.appendShort((short) methodBytes.length);
|
||||
buffer.appendBytes(methodBytes);
|
||||
// 5. 写入消息体
|
||||
@ -197,66 +211,6 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
return buffer.getBytes();
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成消息 ID
|
||||
*/
|
||||
private String generateMessageId(String method) {
|
||||
return method + "_" + System.currentTimeMillis() + "_" + (int) (Math.random() * 1000);
|
||||
}
|
||||
|
||||
// ==================== 解码相关方法 ====================
|
||||
|
||||
// TODO @haohao:是不是把 parseProtocolHeader、parseMessageContent 合并?
|
||||
/**
|
||||
* 解析协议头部
|
||||
*/
|
||||
private ProtocolHeader parseProtocolHeader(Buffer buffer) {
|
||||
int index = 0;
|
||||
// 1. 验证魔术字
|
||||
byte magic = buffer.getByte(index++);
|
||||
Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic);
|
||||
// 2. 验证版本号
|
||||
byte version = buffer.getByte(index++);
|
||||
Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version);
|
||||
|
||||
// 3. 读取消息类型
|
||||
byte messageType = buffer.getByte(index++);
|
||||
Assert.isTrue(isValidMessageType(messageType), "无效的消息类型: " + messageType);
|
||||
// 4. 读取消息标志(暂时跳过)
|
||||
byte messageFlags = buffer.getByte(index++);
|
||||
|
||||
// 5. 读取消息长度
|
||||
int messageLength = buffer.getInt(index);
|
||||
index += 4;
|
||||
|
||||
Assert.isTrue(messageLength == buffer.length(),
|
||||
"消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length());
|
||||
|
||||
return new ProtocolHeader(magic, version, messageType, messageFlags, messageLength, index);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析消息内容
|
||||
*/
|
||||
private IotDeviceMessage parseMessageContent(Buffer buffer, ProtocolHeader header) {
|
||||
int index = header.getNextIndex();
|
||||
|
||||
// 1. 读取消息 ID
|
||||
short messageIdLength = buffer.getShort(index);
|
||||
index += 2;
|
||||
String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name());
|
||||
index += messageIdLength;
|
||||
|
||||
// 2. 读取方法名
|
||||
short methodLength = buffer.getShort(index);
|
||||
index += 2;
|
||||
String method = buffer.getString(index, index + methodLength, StandardCharsets.UTF_8.name());
|
||||
index += methodLength;
|
||||
|
||||
// 3. 解析消息体
|
||||
return parseMessageBody(buffer, index, header.getMessageType(), messageId, method);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析消息体
|
||||
*/
|
||||
@ -267,11 +221,11 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
return IotDeviceMessage.of(messageId, method, null, null, null, null);
|
||||
}
|
||||
|
||||
if (messageType == MessageType.RESPONSE) {
|
||||
if (messageType == RESPONSE) {
|
||||
// 响应消息:解析 code + msg + data
|
||||
return parseResponseMessage(buffer, startIndex, messageId, method);
|
||||
} else {
|
||||
// 请求消息:解析 payload(可能是 params 或 data)
|
||||
// 请求消息:解析 payload
|
||||
Object payload = parseJsonData(buffer, startIndex, buffer.length());
|
||||
return IotDeviceMessage.of(messageId, method, payload, null, null, null);
|
||||
}
|
||||
@ -303,7 +257,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析JSON数据
|
||||
* 解析 JSON 数据
|
||||
*/
|
||||
private Object parseJsonData(Buffer buffer, int startIndex, int endIndex) {
|
||||
if (startIndex >= endIndex) {
|
||||
@ -318,34 +272,14 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 辅助方法 ====================
|
||||
|
||||
// TODO @haohao:这个貌似只用一次,可以考虑不抽小方法哈;
|
||||
/**
|
||||
* 验证消息类型是否有效
|
||||
* 快速检测是否为二进制格式
|
||||
*
|
||||
* @param data 数据
|
||||
* @return 是否为二进制格式
|
||||
*/
|
||||
private boolean isValidMessageType(byte messageType) {
|
||||
return messageType == MessageType.REQUEST || messageType == MessageType.RESPONSE;
|
||||
public static boolean isBinaryFormatQuick(byte[] data) {
|
||||
return data != null && data.length >= 1 && data[0] == MAGIC_NUMBER;
|
||||
}
|
||||
|
||||
// ==================== 内部类 ====================
|
||||
|
||||
/**
|
||||
* 协议头部信息
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
private static class ProtocolHeader {
|
||||
|
||||
private byte magic;
|
||||
private byte version;
|
||||
private byte messageType;
|
||||
private byte messageFlags;
|
||||
private int messageLength;
|
||||
/**
|
||||
* 指向消息内容开始位置
|
||||
*/
|
||||
private int nextIndex;
|
||||
|
||||
}
|
||||
}
|
||||
@ -28,46 +28,33 @@ public class IotTcpConnectionManager {
|
||||
private final Map<NetSocket, ConnectionInfo> connectionMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 设备 ID -> NetSocket 的映射(用于快速查找)
|
||||
* 设备 ID -> NetSocket 的映射
|
||||
*/
|
||||
private final Map<Long, NetSocket> deviceSocketMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* NetSocket -> 设备 ID 的映射(用于连接断开时清理)
|
||||
*/
|
||||
private final Map<NetSocket, Long> socketDeviceMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 注册设备连接(包含认证信息)
|
||||
*
|
||||
* @param socket TCP 连接
|
||||
* @param deviceId 设备 ID
|
||||
* @param authInfo 认证信息
|
||||
* @param socket TCP 连接
|
||||
* @param deviceId 设备 ID
|
||||
* @param connectionInfo 连接信息
|
||||
*/
|
||||
public void registerConnection(NetSocket socket, Long deviceId, AuthInfo authInfo) {
|
||||
public void registerConnection(NetSocket socket, Long deviceId, ConnectionInfo connectionInfo) {
|
||||
// 如果设备已有其他连接,先清理旧连接
|
||||
NetSocket oldSocket = deviceSocketMap.get(deviceId);
|
||||
if (oldSocket != null && oldSocket != socket) {
|
||||
log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]",
|
||||
deviceId, oldSocket.remoteAddress());
|
||||
oldSocket.close();
|
||||
// 清理所有相关映射
|
||||
// 清理旧连接的映射
|
||||
connectionMap.remove(oldSocket);
|
||||
socketDeviceMap.remove(oldSocket);
|
||||
}
|
||||
|
||||
// 注册新连接 - 更新所有映射关系
|
||||
ConnectionInfo connectionInfo = new ConnectionInfo()
|
||||
.setDeviceId(deviceId)
|
||||
.setAuthInfo(authInfo)
|
||||
.setAuthenticated(true);
|
||||
connectionMap.put(socket, connectionInfo);
|
||||
deviceSocketMap.put(deviceId, socket);
|
||||
// TODO @haohao:socketDeviceMap 和 connectionMap 会重复哇?connectionMap.get(socket).getDeviceId
|
||||
socketDeviceMap.put(socket, deviceId);
|
||||
|
||||
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]",
|
||||
deviceId, socket.remoteAddress(), authInfo.getProductKey(), authInfo.getDeviceName());
|
||||
deviceId, socket.remoteAddress(), connectionInfo.getProductKey(), connectionInfo.getDeviceName());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -77,29 +64,14 @@ public class IotTcpConnectionManager {
|
||||
*/
|
||||
public void unregisterConnection(NetSocket socket) {
|
||||
ConnectionInfo connectionInfo = connectionMap.remove(socket);
|
||||
Long deviceId = socketDeviceMap.remove(socket);
|
||||
if (connectionInfo != null && deviceId != null) {
|
||||
if (connectionInfo != null) {
|
||||
Long deviceId = connectionInfo.getDeviceId();
|
||||
deviceSocketMap.remove(deviceId);
|
||||
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
|
||||
deviceId, socket.remoteAddress());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO @haohao:用不到,要不暂时清理哈。
|
||||
/**
|
||||
* 注销设备连接(通过设备 ID)
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
*/
|
||||
public void unregisterConnection(Long deviceId) {
|
||||
NetSocket socket = deviceSocketMap.remove(deviceId);
|
||||
if (socket != null) {
|
||||
connectionMap.remove(socket);
|
||||
socketDeviceMap.remove(socket);
|
||||
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId, socket.remoteAddress());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查连接是否已认证
|
||||
*/
|
||||
@ -116,11 +88,10 @@ public class IotTcpConnectionManager {
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取连接的认证信息
|
||||
* 获取连接信息
|
||||
*/
|
||||
public AuthInfo getAuthInfo(NetSocket socket) {
|
||||
ConnectionInfo info = connectionMap.get(socket);
|
||||
return info != null ? info.getAuthInfo() : null;
|
||||
public ConnectionInfo getConnectionInfo(NetSocket socket) {
|
||||
return connectionMap.get(socket);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -159,30 +130,34 @@ public class IotTcpConnectionManager {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO @haohao:ConnectionInfo 和 AuthInfo 是不是可以融合哈?
|
||||
|
||||
/**
|
||||
* 连接信息
|
||||
* 连接信息(包含认证信息)
|
||||
*/
|
||||
@Data
|
||||
public static class ConnectionInfo {
|
||||
|
||||
private Long deviceId;
|
||||
private AuthInfo authInfo;
|
||||
private boolean authenticated;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 认证信息
|
||||
*/
|
||||
@Data
|
||||
public static class AuthInfo {
|
||||
|
||||
/**
|
||||
* 设备 ID
|
||||
*/
|
||||
private Long deviceId;
|
||||
/**
|
||||
* 产品 Key
|
||||
*/
|
||||
private String productKey;
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
private String deviceName;
|
||||
/**
|
||||
* 客户端 ID
|
||||
*/
|
||||
private String clientId;
|
||||
|
||||
/**
|
||||
* 消息编解码类型(认证后确定)
|
||||
*/
|
||||
private String codecType;
|
||||
/**
|
||||
* 是否已认证
|
||||
*/
|
||||
private boolean authenticated;
|
||||
}
|
||||
}
|
||||
@ -1,12 +1,12 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.BooleanUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
@ -21,12 +21,8 @@ import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessa
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* TCP 上行消息处理器
|
||||
*
|
||||
@ -77,31 +73,55 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
});
|
||||
|
||||
// 设置消息处理器
|
||||
socket.handler(buffer -> processMessage(clientId, buffer, socket));
|
||||
socket.handler(buffer -> {
|
||||
try {
|
||||
processMessage(clientId, buffer, socket);
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
|
||||
clientId, socket.remoteAddress(), e.getMessage());
|
||||
cleanupConnection(socket);
|
||||
socket.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param buffer 消息
|
||||
* @param socket 网络连接
|
||||
* @throws Exception 消息解码失败时抛出异常
|
||||
*/
|
||||
private void processMessage(String clientId, Buffer buffer, NetSocket socket) {
|
||||
try {
|
||||
// 1.1 数据包基础检查
|
||||
if (buffer.length() == 0) {
|
||||
return;
|
||||
}
|
||||
// 1.2 解码消息
|
||||
MessageInfo messageInfo = decodeMessage(buffer);
|
||||
if (messageInfo == null) {
|
||||
return;
|
||||
}
|
||||
private void processMessage(String clientId, Buffer buffer, NetSocket socket) throws Exception {
|
||||
// 1. 基础检查
|
||||
if (buffer == null || buffer.length() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 根据消息类型路由处理
|
||||
if (isAuthRequest(messageInfo.message)) {
|
||||
// 2. 获取消息格式类型
|
||||
String codecType = getMessageCodecType(buffer, socket);
|
||||
|
||||
// 3. 解码消息
|
||||
IotDeviceMessage message;
|
||||
try {
|
||||
message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
|
||||
if (message == null) {
|
||||
throw new Exception("解码后消息为空");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 消息格式错误时抛出异常,由上层处理连接断开
|
||||
throw new Exception("消息解码失败: " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
// 4. 根据消息类型路由处理
|
||||
try {
|
||||
if (AUTH_METHOD.equals(message.getMethod())) {
|
||||
// 认证请求
|
||||
handleAuthenticationRequest(clientId, messageInfo, socket);
|
||||
handleAuthenticationRequest(clientId, message, codecType, socket);
|
||||
} else {
|
||||
// 业务消息
|
||||
handleBusinessRequest(clientId, messageInfo, socket);
|
||||
handleBusinessRequest(clientId, message, codecType, socket);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[processMessage][处理消息失败,客户端 ID: {}]", clientId, e);
|
||||
@ -110,226 +130,158 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
|
||||
/**
|
||||
* 处理认证请求
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param message 消息信息
|
||||
* @param codecType 消息编解码类型
|
||||
* @param socket 网络连接
|
||||
*/
|
||||
private void handleAuthenticationRequest(String clientId, MessageInfo messageInfo, NetSocket socket) {
|
||||
private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, String codecType,
|
||||
NetSocket socket) {
|
||||
try {
|
||||
// 1.1 解析认证参数
|
||||
IotDeviceMessage message = messageInfo.message;
|
||||
AuthParams authParams = parseAuthParams(message.getParams());
|
||||
IotDeviceAuthReqDTO authParams = JsonUtils.parseObject(message.getParams().toString(),
|
||||
IotDeviceAuthReqDTO.class);
|
||||
if (authParams == null) {
|
||||
sendError(socket, message.getRequestId(), "认证参数不完整", messageInfo.codecType);
|
||||
sendErrorResponse(socket, message.getRequestId(), "认证参数不完整", codecType);
|
||||
return;
|
||||
}
|
||||
// 1.2 执行认证
|
||||
if (!authenticateDevice(authParams)) {
|
||||
if (!validateDeviceAuth(authParams)) {
|
||||
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]",
|
||||
clientId, authParams.username);
|
||||
sendError(socket, message.getRequestId(), "认证失败", messageInfo.codecType);
|
||||
clientId, authParams.getUsername());
|
||||
sendErrorResponse(socket, message.getRequestId(), "认证失败", codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2.1 解析设备信息
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.username);
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
|
||||
if (deviceInfo == null) {
|
||||
sendError(socket, message.getRequestId(), "解析设备信息失败", messageInfo.codecType);
|
||||
sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败", codecType);
|
||||
return;
|
||||
}
|
||||
// 2.2 获取设备信息
|
||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
|
||||
deviceInfo.getDeviceName());
|
||||
if (device == null) {
|
||||
sendError(socket, message.getRequestId(), "设备不存在", messageInfo.codecType);
|
||||
sendErrorResponse(socket, message.getRequestId(), "设备不存在", codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 注册连接并发送成功响应
|
||||
registerConnection(socket, device, deviceInfo, authParams.clientId);
|
||||
sendOnlineMessage(deviceInfo);
|
||||
sendSuccess(socket, message.getRequestId(), "认证成功", messageInfo.codecType);
|
||||
// 3.1 注册连接
|
||||
registerConnection(socket, device, clientId, codecType);
|
||||
// 3.2 发送上线消息
|
||||
sendOnlineMessage(device);
|
||||
// 3.3 发送成功响应
|
||||
sendSuccessResponse(socket, message.getRequestId(), "认证成功", codecType);
|
||||
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]",
|
||||
device.getId(), deviceInfo.getDeviceName());
|
||||
device.getId(), device.getDeviceName());
|
||||
} catch (Exception e) {
|
||||
log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e);
|
||||
sendError(socket, messageInfo.message.getRequestId(), "认证处理异常", messageInfo.codecType);
|
||||
sendErrorResponse(socket, message.getRequestId(), "认证处理异常", codecType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理业务请求
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param message 消息信息
|
||||
* @param codecType 消息编解码类型
|
||||
* @param socket 网络连接
|
||||
*/
|
||||
private void handleBusinessRequest(String clientId, MessageInfo messageInfo, NetSocket socket) {
|
||||
private void handleBusinessRequest(String clientId, IotDeviceMessage message, String codecType, NetSocket socket) {
|
||||
try {
|
||||
// 1. 检查认证状态
|
||||
if (connectionManager.isNotAuthenticated(socket)) {
|
||||
log.warn("[handleBusinessRequest][设备未认证,客户端 ID: {}]", clientId);
|
||||
sendError(socket, messageInfo.message.getRequestId(), "请先进行认证", messageInfo.codecType);
|
||||
sendErrorResponse(socket, message.getRequestId(), "请先进行认证", codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 获取认证信息并处理业务消息
|
||||
IotTcpConnectionManager.AuthInfo authInfo = connectionManager.getAuthInfo(socket);
|
||||
processBusinessMessage(clientId, messageInfo.message, authInfo);
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
|
||||
|
||||
// 3. 发送消息到消息总线
|
||||
deviceMessageService.sendDeviceMessage(message, connectionInfo.getProductKey(),
|
||||
connectionInfo.getDeviceName(), serverId);
|
||||
} catch (Exception e) {
|
||||
log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO @haohao:processBusinessMessage 这个小方法,直接融合到 handleBusinessRequest 里?读起来更聚集点
|
||||
/**
|
||||
* 处理业务消息
|
||||
*/
|
||||
private void processBusinessMessage(String clientId, IotDeviceMessage message,
|
||||
IotTcpConnectionManager.AuthInfo authInfo) {
|
||||
try {
|
||||
message.setDeviceId(authInfo.getDeviceId());
|
||||
message.setServerId(serverId);
|
||||
// 发送到消息总线
|
||||
deviceMessageService.sendDeviceMessage(message, authInfo.getProductKey(),
|
||||
authInfo.getDeviceName(), serverId);
|
||||
} catch (Exception e) {
|
||||
log.error("[processBusinessMessage][业务消息处理失败,客户端 ID: {},消息 ID: {}]",
|
||||
clientId, message.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解码消息
|
||||
* 获取消息编解码类型
|
||||
*
|
||||
* @param buffer 消息
|
||||
* @param socket 网络连接
|
||||
* @return 消息编解码类型
|
||||
*/
|
||||
private MessageInfo decodeMessage(Buffer buffer) {
|
||||
if (buffer == null || buffer.length() == 0) {
|
||||
return null;
|
||||
}
|
||||
// 1. 快速检测消息格式类型
|
||||
// TODO @haohao:是不是进一步优化?socket 建立认证后,那条消息已经定义了所有消息的格式哈?
|
||||
String codecType = detectMessageFormat(buffer);
|
||||
try {
|
||||
// 2. 使用检测到的格式进行解码
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
|
||||
if (message == null) {
|
||||
return null;
|
||||
}
|
||||
return new MessageInfo(message, codecType);
|
||||
} catch (Exception e) {
|
||||
log.warn("[decodeMessage][消息解码失败,格式: {},数据长度: {},错误: {}]",
|
||||
codecType, buffer.length(), e.getMessage());
|
||||
// TODO @haohao:一般消息格式不对,应该抛出异常,断开连接居多?
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检测消息格式类型
|
||||
* 优化性能:避免不必要的字符串转换
|
||||
*/
|
||||
private String detectMessageFormat(Buffer buffer) {
|
||||
// TODO @haohao:是不是 IotTcpBinaryDeviceMessageCodec 提供一个 isBinaryFormat 方法哈?
|
||||
// 默认使用 JSON
|
||||
if (buffer.length() == 0) {
|
||||
return CODEC_TYPE_JSON;
|
||||
private String getMessageCodecType(Buffer buffer, NetSocket socket) {
|
||||
// 1. 如果已认证,优先使用缓存的编解码类型
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
|
||||
if (connectionInfo != null && connectionInfo.isAuthenticated() &&
|
||||
StrUtil.isNotBlank(connectionInfo.getCodecType())) {
|
||||
return connectionInfo.getCodecType();
|
||||
}
|
||||
|
||||
// 1. 优先检测二进制格式(检查魔术字节 0x7E)
|
||||
if (isBinaryFormat(buffer)) {
|
||||
return CODEC_TYPE_BINARY;
|
||||
}
|
||||
|
||||
// 2. 检测 JSON 格式(检查前几个有效字符)
|
||||
// TODO @haohao:这个检测去掉?直接 return CODEC_TYPE_JSON 更简洁一点。
|
||||
if (isJsonFormat(buffer)) {
|
||||
return CODEC_TYPE_JSON;
|
||||
}
|
||||
|
||||
// 3. 默认尝试 JSON 格式
|
||||
return CODEC_TYPE_JSON;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检测二进制格式
|
||||
* 通过检查魔术字节快速识别,避免完整字符串转换
|
||||
*/
|
||||
private boolean isBinaryFormat(Buffer buffer) {
|
||||
// 二进制协议最小长度检查
|
||||
if (buffer.length() < 8) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// 检查魔术字节 0x7E(二进制协议的第一个字节)
|
||||
byte firstByte = buffer.getByte(0);
|
||||
return firstByte == (byte) 0x7E;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检测 JSON 格式
|
||||
* 只检查前几个有效字符,避免完整字符串转换
|
||||
*/
|
||||
private boolean isJsonFormat(Buffer buffer) {
|
||||
try {
|
||||
// 检查前 64 个字节或整个缓冲区(取较小值)
|
||||
int checkLength = Math.min(buffer.length(), 64);
|
||||
String prefix = buffer.getString(0, checkLength, StandardCharsets.UTF_8.name());
|
||||
|
||||
if (StrUtil.isBlank(prefix)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String trimmed = prefix.trim();
|
||||
// JSON 格式必须以 { 或 [ 开头
|
||||
return trimmed.startsWith("{") || trimmed.startsWith("[");
|
||||
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
// 2. 未认证时检测消息格式类型
|
||||
return IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(buffer.getBytes()) ? CODEC_TYPE_BINARY
|
||||
: CODEC_TYPE_JSON;
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册连接信息
|
||||
*
|
||||
* @param socket 网络连接
|
||||
* @param device 设备
|
||||
* @param clientId 客户端 ID
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo, String clientId) {
|
||||
// TODO @haohao:AuthInfo 的创建,放在 connectionManager 里构建貌似会更收敛一点?
|
||||
// 创建认证信息
|
||||
IotTcpConnectionManager.AuthInfo authInfo = new IotTcpConnectionManager.AuthInfo()
|
||||
String clientId, String codecType) {
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = new IotTcpConnectionManager.ConnectionInfo()
|
||||
.setDeviceId(device.getId())
|
||||
.setProductKey(deviceInfo.getProductKey())
|
||||
.setDeviceName(deviceInfo.getDeviceName())
|
||||
.setClientId(clientId);
|
||||
.setProductKey(device.getProductKey())
|
||||
.setDeviceName(device.getDeviceName())
|
||||
.setClientId(clientId)
|
||||
.setCodecType(codecType)
|
||||
.setAuthenticated(true);
|
||||
// 注册连接
|
||||
connectionManager.registerConnection(socket, device.getId(), authInfo);
|
||||
connectionManager.registerConnection(socket, device.getId(), connectionInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送设备上线消息
|
||||
*
|
||||
* @param device 设备信息
|
||||
*/
|
||||
private void sendOnlineMessage(IotDeviceAuthUtils.DeviceInfo deviceInfo) {
|
||||
private void sendOnlineMessage(IotDeviceRespDTO device) {
|
||||
try {
|
||||
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
|
||||
deviceMessageService.sendDeviceMessage(onlineMessage, deviceInfo.getProductKey(),
|
||||
deviceInfo.getDeviceName(), serverId);
|
||||
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
|
||||
device.getDeviceName(), serverId);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", deviceInfo.getDeviceName(), e);
|
||||
log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理连接
|
||||
*
|
||||
* @param socket 网络连接
|
||||
*/
|
||||
private void cleanupConnection(NetSocket socket) {
|
||||
try {
|
||||
// 发送离线消息(如果已认证)
|
||||
IotTcpConnectionManager.AuthInfo authInfo = connectionManager.getAuthInfo(socket);
|
||||
if (authInfo != null) {
|
||||
// 1. 发送离线消息(如果已认证)
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
|
||||
if (connectionInfo != null) {
|
||||
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
|
||||
deviceMessageService.sendDeviceMessage(offlineMessage, authInfo.getProductKey(),
|
||||
authInfo.getDeviceName(), serverId);
|
||||
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
|
||||
connectionInfo.getDeviceName(), serverId);
|
||||
}
|
||||
|
||||
// 注销连接
|
||||
// 2. 注销连接
|
||||
connectionManager.unregisterConnection(socket);
|
||||
} catch (Exception e) {
|
||||
log.error("[cleanupConnection][清理连接失败]", e);
|
||||
@ -338,6 +290,12 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
|
||||
/**
|
||||
* 发送响应消息
|
||||
*
|
||||
* @param socket 网络连接
|
||||
* @param success 是否成功
|
||||
* @param message 消息
|
||||
* @param requestId 请求 ID
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
private void sendResponse(NetSocket socket, boolean success, String message, String requestId, String codecType) {
|
||||
try {
|
||||
@ -346,8 +304,9 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
.put("message", message)
|
||||
.build();
|
||||
|
||||
int code = success ? 0 : 401;
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData,
|
||||
success ? 0 : 401, message);
|
||||
code, message);
|
||||
|
||||
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
|
||||
socket.write(Buffer.buffer(encodedData));
|
||||
@ -357,94 +316,47 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 辅助方法 ====================
|
||||
|
||||
/**
|
||||
* 判断是否为认证请求
|
||||
* 验证设备认证信息
|
||||
*
|
||||
* @param authParams 认证参数
|
||||
* @return 是否认证成功
|
||||
*/
|
||||
private boolean isAuthRequest(IotDeviceMessage message) {
|
||||
return AUTH_METHOD.equals(message.getMethod());
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析认证参数
|
||||
*/
|
||||
private AuthParams parseAuthParams(Object params) {
|
||||
if (params == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
JSONObject paramsJson = params instanceof JSONObject ? (JSONObject) params
|
||||
: JSONUtil.parseObj(params.toString());
|
||||
String clientId = paramsJson.getStr("clientId");
|
||||
String username = paramsJson.getStr("username");
|
||||
String password = paramsJson.getStr("password");
|
||||
return StrUtil.hasBlank(clientId, username, password) ? null
|
||||
: new AuthParams(clientId, username, password);
|
||||
} catch (Exception e) {
|
||||
log.warn("[parseAuthParams][解析认证参数失败]", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 认证设备
|
||||
*/
|
||||
private boolean authenticateDevice(AuthParams authParams) {
|
||||
private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) {
|
||||
try {
|
||||
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
|
||||
.setClientId(authParams.clientId)
|
||||
.setUsername(authParams.username)
|
||||
.setPassword(authParams.password));
|
||||
return result.isSuccess() && Boolean.TRUE.equals(result.getData());
|
||||
.setClientId(authParams.getClientId()).setUsername(authParams.getUsername())
|
||||
.setPassword(authParams.getPassword()));
|
||||
result.checkError();
|
||||
return BooleanUtil.isTrue(result.getData());
|
||||
} catch (Exception e) {
|
||||
log.error("[authenticateDevice][设备认证异常,username: {}]", authParams.username, e);
|
||||
log.error("[validateDeviceAuth][设备认证异常,username: {}]", authParams.getUsername(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO @haohao:改成 sendErrorResponse sendSuccessResponse 更清晰点?
|
||||
|
||||
/**
|
||||
* 发送错误响应
|
||||
*
|
||||
* @param socket 网络连接
|
||||
* @param requestId 请求 ID
|
||||
* @param errorMessage 错误消息
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
private void sendError(NetSocket socket, String requestId, String errorMessage, String codecType) {
|
||||
private void sendErrorResponse(NetSocket socket, String requestId, String errorMessage, String codecType) {
|
||||
sendResponse(socket, false, errorMessage, requestId, codecType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送成功响应
|
||||
*
|
||||
* @param socket 网络连接
|
||||
* @param requestId 请求 ID
|
||||
* @param message 消息
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
private void sendSuccess(NetSocket socket, String requestId, String message, String codecType) {
|
||||
private void sendSuccessResponse(NetSocket socket, String requestId, String message, String codecType) {
|
||||
sendResponse(socket, true, message, requestId, codecType);
|
||||
}
|
||||
|
||||
// ==================== 内部类 ====================
|
||||
|
||||
// TODO @haohao:IotDeviceAuthReqDTO 复用这个?
|
||||
/**
|
||||
* 认证参数
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
private static class AuthParams {
|
||||
|
||||
private final String clientId;
|
||||
private final String username;
|
||||
private final String password;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息信息
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
private static class MessageInfo {
|
||||
|
||||
private final IotDeviceMessage message;
|
||||
|
||||
private final String codecType;
|
||||
|
||||
}
|
||||
}
|
||||
@ -9,7 +9,7 @@ TCP 二进制协议是一种高效的自定义协议格式,采用紧凑的二
|
||||
- **高效传输**:完全二进制格式,减少数据传输量
|
||||
- **版本控制**:内置协议版本号,支持协议升级
|
||||
- **类型安全**:明确的消息类型标识
|
||||
- **扩展性**:预留标志位,支持未来功能扩展
|
||||
- **简洁设计**:去除冗余字段,协议更加精简
|
||||
- **兼容性**:与现有 `IotDeviceMessage` 接口完全兼容
|
||||
|
||||
## 2. 协议格式
|
||||
@ -17,9 +17,9 @@ TCP 二进制协议是一种高效的自定义协议格式,采用紧凑的二
|
||||
### 2.1 整体结构
|
||||
|
||||
```
|
||||
+--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
| 魔术字 | 版本号 | 消息类型| 消息标志| 消息长度(4字节) |
|
||||
+--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
+--------+--------+--------+---------------------------+--------+--------+
|
||||
| 魔术字 | 版本号 | 消息类型| 消息长度(4字节) |
|
||||
+--------+--------+--------+---------------------------+--------+--------+
|
||||
| 消息 ID 长度(2字节) | 消息 ID (变长字符串) |
|
||||
+--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
| 方法名长度(2字节) | 方法名(变长字符串) |
|
||||
@ -35,7 +35,6 @@ TCP 二进制协议是一种高效的自定义协议格式,采用紧凑的二
|
||||
| 魔术字 | 1字节 | byte | `0x7E` - 协议识别标识,用于数据同步 |
|
||||
| 版本号 | 1字节 | byte | `0x01` - 协议版本号,支持版本控制 |
|
||||
| 消息类型 | 1字节 | byte | `0x01`=请求, `0x02`=响应 |
|
||||
| 消息标志 | 1字节 | byte | 预留字段,用于未来扩展 |
|
||||
| 消息长度 | 4字节 | int | 整个消息的总长度(包含头部) |
|
||||
| 消息 ID 长度 | 2字节 | short | 消息 ID 字符串的字节长度 |
|
||||
| 消息 ID | 变长 | string | 消息唯一标识符(UTF-8编码) |
|
||||
@ -53,14 +52,12 @@ private static final byte MAGIC_NUMBER = (byte) 0x7E;
|
||||
private static final byte PROTOCOL_VERSION = (byte) 0x01;
|
||||
|
||||
// 消息类型
|
||||
public static class MessageType {
|
||||
public static final byte REQUEST = 0x01; // 请求消息
|
||||
public static final byte RESPONSE = 0x02; // 响应消息
|
||||
}
|
||||
private static final byte REQUEST = (byte) 0x01; // 请求消息
|
||||
private static final byte RESPONSE = (byte) 0x02; // 响应消息
|
||||
|
||||
// 协议长度
|
||||
private static final int HEADER_FIXED_LENGTH = 8; // 固定头部长度
|
||||
private static final int MIN_MESSAGE_LENGTH = 12; // 最小消息长度
|
||||
private static final int HEADER_FIXED_LENGTH = 7; // 固定头部长度
|
||||
private static final int MIN_MESSAGE_LENGTH = 11; // 最小消息长度
|
||||
```
|
||||
|
||||
## 3. 消息类型和格式
|
||||
@ -86,8 +83,7 @@ private static final int MIN_MESSAGE_LENGTH = 12; // 最小消息长度
|
||||
7E // 魔术字 (0x7E)
|
||||
01 // 版本号 (0x01)
|
||||
01 // 消息类型 (REQUEST)
|
||||
00 // 消息标志 (预留)
|
||||
00 00 00 8A // 消息长度 (138字节)
|
||||
00 00 00 89 // 消息长度 (137字节)
|
||||
00 19 // 消息 ID 长度 (25字节)
|
||||
61 75 74 68 5F 31 37 30 34 30 // 消息 ID: "auth_1704067200000_123"
|
||||
36 37 32 30 30 30 30 30 5F 31
|
||||
@ -144,8 +140,7 @@ private static final int MIN_MESSAGE_LENGTH = 12; // 最小消息长度
|
||||
7E // 魔术字 (0x7E)
|
||||
01 // 版本号 (0x01)
|
||||
02 // 消息类型 (RESPONSE)
|
||||
00 // 消息标志 (预留)
|
||||
00 00 00 A5 // 消息长度 (165字节)
|
||||
00 00 00 A4 // 消息长度 (164字节)
|
||||
00 22 // 消息 ID 长度 (34字节)
|
||||
61 75 74 68 5F 72 65 73 70 6F // 消息 ID: "auth_response_1704067200000_123"
|
||||
6E 73 65 5F 31 37 30 34 30 36
|
||||
@ -175,19 +170,19 @@ public static final String TYPE = "TCP_BINARY";
|
||||
- **数据紧凑**:二进制格式,相比 JSON 减少 30-50% 的数据量
|
||||
- **解析高效**:直接二进制操作,减少字符串转换开销
|
||||
- **类型安全**:明确的消息类型和字段定义
|
||||
- **扩展性强**:预留标志位支持未来功能扩展
|
||||
- **设计简洁**:去除冗余字段,协议更加精简高效
|
||||
- **版本控制**:内置版本号支持协议升级
|
||||
|
||||
## 6. 与 JSON 协议对比
|
||||
|
||||
| 特性 | 二进制协议 | JSON协议 |
|
||||
|------|------------|----------|
|
||||
| 数据大小 | 小(节省30-50%) | 大 |
|
||||
| 解析性能 | 高 | 中等 |
|
||||
| 网络开销 | 低 | 高 |
|
||||
| 可读性 | 差 | 优秀 |
|
||||
| 调试难度 | 高 | 低 |
|
||||
| 扩展性 | 良好(有预留位) | 优秀 |
|
||||
| 特性 | 二进制协议 | JSON协议 |
|
||||
|------|-------------|--------|
|
||||
| 数据大小 | 小(节省30-50%) | 大 |
|
||||
| 解析性能 | 高 | 中等 |
|
||||
| 网络开销 | 低 | 高 |
|
||||
| 可读性 | 差 | 优秀 |
|
||||
| 调试难度 | 高 | 低 |
|
||||
| 扩展性 | 良好 | 优秀 |
|
||||
|
||||
**推荐场景**:
|
||||
- ✅ **高频数据传输**:传感器数据实时上报
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user