From 44b1950e4a4825bbe8488d257ee0cb99cd568ea2 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 1 Feb 2026 02:52:58 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88iot=EF=BC=89=EF=BC=9A=E3=80=90?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=94=B9=E9=80=A0=E3=80=91tcp=20=E5=88=9D?= =?UTF-8?q?=E6=AD=A5=E6=94=B9=E9=80=A0=EF=BC=8850%=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 44 +--- .../gateway/config/IotGatewayProperties.java | 160 +------------ .../gateway/protocol/IotProtocolManager.java | 19 +- .../protocol/http/IotHttpProtocol.java | 22 +- .../gateway/protocol/tcp/IotTcpConfig.java | 114 ++++++++++ .../gateway/protocol/tcp/IotTcpProtocol.java | 205 +++++++++++++++++ .../protocol/tcp/IotTcpUpstreamProtocol.java | 125 ----------- .../tcp/codec/IotTcpCodecTypeEnum.java | 77 +++++++ .../protocol/tcp/codec/IotTcpFrameCodec.java | 64 ++++++ .../delimiter/IotTcpDelimiterFrameCodec.java | 107 +++++++++ .../length/IotTcpFixedLengthFrameCodec.java | 58 +++++ .../length/IotTcpLengthFieldFrameCodec.java | 166 ++++++++++++++ .../downstream}/IotTcpDownstreamHandler.java | 35 ++- .../IotTcpDownstreamSubscriber.java | 6 +- .../upstream}/IotTcpUpstreamHandler.java | 210 +++++++++--------- .../tcp/manager/IotTcpConnectionManager.java | 5 +- .../src/main/resources/application.yaml | 29 ++- ...irectDeviceTcpProtocolIntegrationTest.java | 116 ++++++---- .../tcp/tcp-binary-packet-examples.md | 193 ---------------- .../protocol/tcp/tcp-json-packet-examples.md | 191 ---------------- 20 files changed, 1039 insertions(+), 907 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/IotTcpCodecTypeEnum.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/IotTcpFrameCodec.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/delimiter/IotTcpDelimiterFrameCodec.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpFixedLengthFrameCodec.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpLengthFieldFrameCodec.java rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/{router => handler/downstream}/IotTcpDownstreamHandler.java (60%) rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/{ => handler/downstream}/IotTcpDownstreamSubscriber.java (80%) rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/{router => handler/upstream}/IotTcpUpstreamHandler.java (77%) delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-binary-packet-examples.md delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-json-packet-examples.md diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index d300fd4a77..3aaf3b1d2a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager; -import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; @@ -12,10 +11,6 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscr import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager; @@ -24,6 +19,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketDownst import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; @@ -84,44 +80,6 @@ public class IotGatewayConfiguration { } } - /** - * IoT 网关 TCP 协议配置类 - */ - @Configuration - @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.tcp", name = "enabled", havingValue = "true") - @Slf4j - public static class TcpProtocolConfiguration { - - @Bean(name = "tcpVertx", destroyMethod = "close") - public Vertx tcpVertx() { - return Vertx.vertx(); - } - - @Bean - public IotTcpUpstreamProtocol iotTcpUpstreamProtocol(IotGatewayProperties gatewayProperties, - IotDeviceService deviceService, - IotDeviceMessageService messageService, - IotTcpConnectionManager connectionManager, - @Qualifier("tcpVertx") Vertx tcpVertx) { - return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(), - deviceService, messageService, connectionManager, tcpVertx); - } - - @Bean - public IotTcpDownstreamHandler iotTcpDownstreamHandler(IotDeviceMessageService messageService, - IotTcpConnectionManager connectionManager) { - return new IotTcpDownstreamHandler(messageService, connectionManager); - } - - @Bean - public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocolHandler, - IotTcpDownstreamHandler downstreamHandler, - IotMessageBus messageBus) { - return new IotTcpDownstreamSubscriber(protocolHandler, downstreamHandler, messageBus); - } - - } - /** * IoT 网关 MQTT 协议配置类 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 907a0ae8d9..707c8e37e0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -2,6 +2,9 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig; +import io.vertx.core.net.KeyCertOptions; +import io.vertx.core.net.TrustOptions; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -76,21 +79,11 @@ public class IotGatewayProperties { @Data public static class ProtocolProperties { - /** - * HTTP 组件配置 - */ - private HttpProperties http; - /** * EMQX 组件配置 */ private EmqxProperties emqx; - /** - * TCP 组件配置 - */ - private TcpProperties tcp; - /** * MQTT 组件配置 */ @@ -113,36 +106,6 @@ public class IotGatewayProperties { } - @Data - public static class HttpProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - /** - * 服务端口 - */ - private Integer serverPort; - - /** - * 是否开启 SSL - */ - @NotNull(message = "是否开启 SSL 不能为空") - private Boolean sslEnabled = false; - - /** - * SSL 证书路径 - */ - private String sslKeyPath; - /** - * SSL 证书路径 - */ - private String sslCertPath; - - } - @Data public static class EmqxProperties { @@ -312,47 +275,6 @@ public class IotGatewayProperties { } - @Data - public static class TcpProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - - /** - * 服务器端口 - */ - private Integer port = 8091; - - /** - * 心跳超时时间(毫秒) - */ - private Long keepAliveTimeoutMs = 30000L; - - /** - * 最大连接数 - */ - private Integer maxConnections = 1000; - - /** - * 是否启用SSL - */ - private Boolean sslEnabled = false; - - /** - * SSL证书路径 - */ - private String sslCertPath; - - /** - * SSL私钥路径 - */ - private String sslKeyPath; - - } - @Data public static class MqttProperties { @@ -381,6 +303,7 @@ public class IotGatewayProperties { */ private Integer keepAliveTimeoutSeconds = 300; + // TODO @AI:所有跟 ssl 相关的参数,是不是可以统一?放到 protocol 层级?ProtocolInstanceProperties【优先级:低】暂时不用规划; /** * 是否启用 SSL */ @@ -399,11 +322,11 @@ public class IotGatewayProperties { /** * 密钥证书选项 */ - private io.vertx.core.net.KeyCertOptions keyCertOptions; + private KeyCertOptions keyCertOptions; /** * 信任选项 */ - private io.vertx.core.net.TrustOptions trustOptions; + private TrustOptions trustOptions; /** * SSL 证书路径 */ @@ -596,78 +519,11 @@ public class IotGatewayProperties { @Valid private IotHttpConfig http; - // TODO @AI:后续改下; /** - * TCP 协议配置(后续扩展) + * TCP 协议配置 */ @Valid - private TcpInstanceConfig tcp; - - } - - /** - * TCP 协议实例配置(后续扩展) - */ - @Data - public static class TcpInstanceConfig { - - /** - * 最大连接数 - */ - private Integer maxConnections = 1000; - - /** - * 心跳超时时间(毫秒) - */ - private Long keepAliveTimeoutMs = 30000L; - - /** - * 是否启用 SSL - */ - private Boolean sslEnabled = false; - - /** - * SSL 证书路径 - */ - private String sslCertPath; - - /** - * SSL 私钥路径 - */ - private String sslKeyPath; - - /** - * 拆包配置 - */ - private CodecConfig codec; - - /** - * TCP 拆包配置 - */ - @Data - public static class CodecConfig { - - /** - * 拆包类型:LENGTH_FIELD / DELIMITER - */ - private String type; - - /** - * LENGTH_FIELD: 偏移量 - */ - private Integer lengthFieldOffset; - - /** - * LENGTH_FIELD: 长度字段长度 - */ - private Integer lengthFieldLength; - - /** - * DELIMITER: 分隔符 - */ - private String delimiter; - - } + private IotTcpConfig tcp; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java index 2e339e4a73..c64c828f8d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java @@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol; import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; import lombok.extern.slf4j.Slf4j; import org.springframework.context.SmartLifecycle; @@ -14,9 +15,7 @@ import java.util.ArrayList; import java.util.List; /** - * IoT 协议管理器 - * - * 负责根据配置创建和管理协议实例 + * IoT 协议管理器:负责根据配置创建和管理协议实例 * * @author 芋道源码 */ @@ -96,7 +95,7 @@ public class IotProtocolManager implements SmartLifecycle { * @param config 协议实例配置 * @return 协议实例 */ - @SuppressWarnings({"SwitchStatementWithTooFewBranches", "EnhancedSwitchMigration"}) + @SuppressWarnings({"EnhancedSwitchMigration"}) private IotProtocol createProtocol(IotGatewayProperties.ProtocolInstanceProperties config) { IotProtocolTypeEnum protocolType = IotProtocolTypeEnum.of(config.getType()); if (protocolType == null) { @@ -106,6 +105,8 @@ public class IotProtocolManager implements SmartLifecycle { switch (protocolType) { case HTTP: return createHttpProtocol(config); + case TCP: + return createTcpProtocol(config); // TODO 后续添加其他协议类型 default: throw new IllegalArgumentException(String.format( @@ -123,4 +124,14 @@ public class IotProtocolManager implements SmartLifecycle { return new IotHttpProtocol(config, messageBus); } + /** + * 创建 TCP 协议实例 + * + * @param config 协议实例配置 + * @return TCP 协议实例 + */ + private IotTcpProtocol createTcpProtocol(IotGatewayProperties.ProtocolInstanceProperties config) { + return new IotTcpProtocol(config, messageBus, serializerManager); + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java index c65e8d87ab..b141afc5e4 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java @@ -33,10 +33,6 @@ public class IotHttpProtocol implements IotProtocol { * 协议配置 */ private final ProtocolInstanceProperties properties; - /** - * 消息总线 - */ - private final IotMessageBus messageBus; /** * 服务器 ID(用于消息追踪,全局唯一) */ @@ -44,26 +40,26 @@ public class IotHttpProtocol implements IotProtocol { private final String serverId; /** - * Vert.x 实例(每个 Protocol 自己管理) + * 运行状态 + */ + private volatile boolean running = false; + + /** + * Vert.x 实例 */ private Vertx vertx; /** * HTTP 服务器 */ private HttpServer httpServer; + /** * 下行消息订阅者 */ private IotHttpDownstreamSubscriber downstreamSubscriber; - /** - * 运行状态 - */ - private volatile boolean running = false; - public IotHttpProtocol(ProtocolInstanceProperties properties, IotMessageBus messageBus) { this.properties = properties; - this.messageBus = messageBus; this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus); } @@ -121,7 +117,6 @@ public class IotHttpProtocol implements IotProtocol { getId(), properties.getPort(), serverId); // 2. 启动下行消息订阅者 - this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus); this.downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT HTTP 协议 {} 启动失败]", getId(), e); @@ -132,9 +127,6 @@ public class IotHttpProtocol implements IotProtocol { } throw e; } - - // 2. 启动下行消息订阅者 - this.downstreamSubscriber.start(); } @Override diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConfig.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConfig.java new file mode 100644 index 0000000000..c967ce2764 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConfig.java @@ -0,0 +1,114 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; + +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum; +import jakarta.validation.Valid; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * IoT TCP 协议配置 + * + * @author 芋道源码 + */ +@Data +public class IotTcpConfig { + + /** + * 最大连接数 + */ + @NotNull(message = "最大连接数不能为空") + @Min(value = 1, message = "最大连接数必须大于 0") + private Integer maxConnections = 1000; + /** + * 心跳超时时间(毫秒) + */ + @NotNull(message = "心跳超时时间不能为空") + @Min(value = 1000, message = "心跳超时时间必须大于 1000 毫秒") + private Long keepAliveTimeoutMs = 30000L; + + /** + * 是否启用 SSL + */ + @NotNull(message = "是否启用 SSL 不能为空") + private Boolean sslEnabled = false; + /** + * SSL 证书路径 + */ + private String sslCertPath; + /** + * SSL 私钥路径 + */ + private String sslKeyPath; + + /** + * 拆包配置 + */ + @Valid + private CodecConfig codec; + + /** + * TCP 拆包配置 + */ + @Data + public static class CodecConfig { + + /** + * 拆包类型 + * + * @see IotTcpCodecTypeEnum + */ + @NotNull(message = "拆包类型不能为空") + private String type; + + /** + * LENGTH_FIELD: 长度字段偏移量 + *

+ * 表示长度字段在消息中的起始位置(从0开始) + */ + private Integer lengthFieldOffset; + /** + * LENGTH_FIELD: 长度字段长度(字节数) + *

+ * 常见值:1(最大255)、2(最大65535)、4(最大2GB) + */ + private Integer lengthFieldLength; + /** + * LENGTH_FIELD: 长度调整值 + *

+ * 用于调整长度字段的值,例如长度字段包含头部长度时需要减去头部长度 + */ + private Integer lengthAdjustment = 0; + /** + * LENGTH_FIELD: 跳过的初始字节数 + *

+ * 解码后跳过的字节数,通常等于 lengthFieldOffset + lengthFieldLength + */ + private Integer initialBytesToStrip = 0; + + /** + * DELIMITER: 分隔符 + *

+ * 支持转义字符:\n(换行)、\r(回车)、\r\n(回车换行) + */ + private String delimiter; + + /** + * FIXED_LENGTH: 固定消息长度(字节) + *

+ * 每条消息的固定长度 + */ + private Integer fixedLength; + + /** + * 最大帧长度(字节) + *

+ * 防止内存溢出,默认 1MB + */ + @NotNull(message = "最大帧长度不能为空") + @Min(value = 1, message = "最大帧长度必须大于 0") + private Integer maxFrameLength = 1048576; + + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java new file mode 100644 index 0000000000..c03383a224 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java @@ -0,0 +1,205 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; + +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream.IotTcpDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream.IotTcpDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.upstream.IotTcpUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Vertx; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.PemKeyCertOptions; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT TCP 协议实现 + *

+ * 基于 Vert.x 实现 TCP 服务器,接收设备上行消息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotTcpProtocol implements IotProtocol { + + /** + * 协议配置 + */ + private final ProtocolInstanceProperties properties; + /** + * 服务器 ID(用于消息追踪,全局唯一) + */ + @Getter + private final String serverId; + + /** + * 运行状态 + */ + private volatile boolean running = false; + + /** + * Vert.x 实例 + */ + private Vertx vertx; + /** + * TCP 服务器 + */ + private NetServer tcpServer; + + /** + * 下行消息订阅者 + */ + private final IotTcpDownstreamSubscriber downstreamSubscriber; + + /** + * 消息序列化器 + */ + private final IotMessageSerializer serializer; + + /** + * TCP 帧编解码器 + */ + private final IotTcpFrameCodec frameCodec; + + public IotTcpProtocol(ProtocolInstanceProperties properties, IotMessageBus messageBus, + IotMessageSerializerManager serializerManager) { + this.properties = properties; + this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); + + // 初始化序列化器 + IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(properties.getSerialize()); + if (serializeType == null) { + serializeType = IotSerializeTypeEnum.JSON; // 默认 JSON + } + this.serializer = serializerManager.get(serializeType); + + // 初始化帧编解码器 + IotTcpConfig tcpConfig = properties.getTcp(); + IotTcpConfig.CodecConfig codecConfig = tcpConfig != null ? tcpConfig.getCodec() : null; + this.frameCodec = IotTcpFrameCodec.create(codecConfig); + + // 初始化下行消息订阅者 + IotTcpConnectionManager connectionManager = SpringUtil.getBean(IotTcpConnectionManager.class); + IotTcpDownstreamHandler downstreamHandler = new IotTcpDownstreamHandler(connectionManager, frameCodec, serializer); + this.downstreamSubscriber = new IotTcpDownstreamSubscriber(this, downstreamHandler, messageBus); + } + + @Override + public String getId() { + return properties.getId(); + } + + @Override + public IotProtocolTypeEnum getType() { + return IotProtocolTypeEnum.TCP; + } + + @Override + public void start() { + if (running) { + log.warn("[start][IoT TCP 协议 {} 已经在运行中]", getId()); + return; + } + + // 1.1 创建 Vertx 实例(每个 Protocol 独立管理) + this.vertx = Vertx.vertx(); + + // 1.2 创建服务器选项 + IotTcpConfig tcpConfig = properties.getTcp(); + NetServerOptions options = new NetServerOptions() + .setPort(properties.getPort()) + .setTcpKeepAlive(true) + .setTcpNoDelay(true) + .setReuseAddress(true); + if (tcpConfig != null && Boolean.TRUE.equals(tcpConfig.getSslEnabled())) { + PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions() + .setKeyPath(tcpConfig.getSslKeyPath()) + .setCertPath(tcpConfig.getSslCertPath()); + options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); + } + + // 1.3 创建服务器并设置连接处理器 + tcpServer = vertx.createNetServer(options); + IotDeviceService deviceService = SpringUtil.getBean(IotDeviceService.class); + IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class); + IotTcpConnectionManager connectionManager = SpringUtil.getBean(IotTcpConnectionManager.class); + tcpServer.connectHandler(socket -> { + IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, deviceService, + connectionManager, frameCodec, serializer); + handler.handle(socket); + }); + + // 1.4 启动 TCP 服务器 + try { + tcpServer.listen().result(); + running = true; + log.info("[start][IoT TCP 协议 {} 启动成功,端口:{},serverId:{}]", + getId(), properties.getPort(), serverId); + + // 2. 启动下行消息订阅者 + this.downstreamSubscriber.start(); + } catch (Exception e) { + log.error("[start][IoT TCP 协议 {} 启动失败]", getId(), e); + // 启动失败时关闭 Vertx + if (vertx != null) { + vertx.close(); + vertx = null; + } + throw e; + } + } + + @Override + public void stop() { + if (!running) { + return; + } + // 1. 停止下行消息订阅者 + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT TCP 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT TCP 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + + // 2.1 关闭 TCP 服务器 + if (tcpServer != null) { + try { + tcpServer.close().result(); + log.info("[stop][IoT TCP 协议 {} 服务器已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT TCP 协议 {} 服务器停止失败]", getId(), e); + } + tcpServer = null; + } + // 2.2 关闭 Vertx 实例 + if (vertx != null) { + try { + vertx.close().result(); + log.info("[stop][IoT TCP 协议 {} Vertx 已关闭]", getId()); + } catch (Exception e) { + log.error("[stop][IoT TCP 协议 {} Vertx 关闭失败]", getId(), e); + } + vertx = null; + } + running = false; + log.info("[stop][IoT TCP 协议 {} 已停止]", getId()); + } + + @Override + public boolean isRunning() { + return running; + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java deleted file mode 100644 index e5aeb78c08..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java +++ /dev/null @@ -1,125 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; - -import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpUpstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.core.Vertx; -import io.vertx.core.net.NetServer; -import io.vertx.core.net.NetServerOptions; -import io.vertx.core.net.PemKeyCertOptions; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 TCP 协议:接收设备上行消息 - * - * @author 芋道源码 - */ -@Slf4j -public class IotTcpUpstreamProtocol implements IotProtocol { - - private static final String ID = "tcp"; - - private final IotGatewayProperties.TcpProperties tcpProperties; - - private final IotDeviceService deviceService; - - private final IotDeviceMessageService messageService; - - private final IotTcpConnectionManager connectionManager; - - private final Vertx vertx; - - @Getter - private final String serverId; - - private NetServer tcpServer; - - private volatile boolean running = false; - - public IotTcpUpstreamProtocol(IotGatewayProperties.TcpProperties tcpProperties, - IotDeviceService deviceService, - IotDeviceMessageService messageService, - IotTcpConnectionManager connectionManager, - Vertx vertx) { - this.tcpProperties = tcpProperties; - this.deviceService = deviceService; - this.messageService = messageService; - this.connectionManager = connectionManager; - this.vertx = vertx; - this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getPort()); - } - - @Override - public String getId() { - return ID; - } - - @Override - public IotProtocolTypeEnum getType() { - return IotProtocolTypeEnum.TCP; - } - - @Override - @PostConstruct - public void start() { - // 创建服务器选项 - NetServerOptions options = new NetServerOptions() - .setPort(tcpProperties.getPort()) - .setTcpKeepAlive(true) - .setTcpNoDelay(true) - .setReuseAddress(true); - // 配置 SSL(如果启用) - if (Boolean.TRUE.equals(tcpProperties.getSslEnabled())) { - PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions() - .setKeyPath(tcpProperties.getSslKeyPath()) - .setCertPath(tcpProperties.getSslCertPath()); - options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); - } - - // 创建服务器并设置连接处理器 - tcpServer = vertx.createNetServer(options); - tcpServer.connectHandler(socket -> { - IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, deviceService, - connectionManager); - handler.handle(socket); - }); - - // 启动服务器 - try { - tcpServer.listen().result(); - running = true; - log.info("[start][IoT 网关 TCP 协议启动成功,端口:{}]", tcpProperties.getPort()); - } catch (Exception e) { - log.error("[start][IoT 网关 TCP 协议启动失败]", e); - throw e; - } - } - - @Override - @PreDestroy - public void stop() { - if (tcpServer != null) { - try { - tcpServer.close().result(); - running = false; - log.info("[stop][IoT 网关 TCP 协议已停止]"); - } catch (Exception e) { - log.error("[stop][IoT 网关 TCP 协议停止失败]", e); - } - } - } - - @Override - public boolean isRunning() { - return running; - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/IotTcpCodecTypeEnum.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/IotTcpCodecTypeEnum.java new file mode 100644 index 0000000000..7b4b669112 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/IotTcpCodecTypeEnum.java @@ -0,0 +1,77 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec; + +import cn.hutool.core.util.ArrayUtil; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.delimiter.IotTcpDelimiterFrameCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.length.IotTcpFixedLengthFrameCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.length.IotTcpLengthFieldFrameCodec; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.function.Function; + +/** + * IoT TCP 拆包类型枚举 + * + * @author 芋道源码 + */ +@AllArgsConstructor +@Getter +public enum IotTcpCodecTypeEnum { + + /** + * 基于固定长度的拆包 + *

+ * 消息格式:固定长度的消息体 + * 需要配置:fixedLength(固定长度) + */ + FIXED_LENGTH("fixed_length", IotTcpFixedLengthFrameCodec::new), + + /** + * 基于分隔符的拆包 + *

+ * 消息格式:消息内容 + 分隔符 + * 需要配置:delimiter(分隔符) + */ + DELIMITER("delimiter", IotTcpDelimiterFrameCodec::new), + + /** + * 基于长度字段的拆包 + *

+ * 消息格式:[长度字段][消息体] + * 需要配置:lengthFieldOffset(长度字段偏移量)、lengthFieldLength(长度字段长度) + */ + LENGTH_FIELD("length_field", IotTcpLengthFieldFrameCodec::new), + ; + + /** + * 类型标识 + */ + private final String type; + + /** + * Codec 创建工厂 + */ + private final Function codecFactory; + + /** + * 根据类型获取枚举 + * + * @param type 类型标识 + * @return 枚举值 + */ + public static IotTcpCodecTypeEnum of(String type) { + return ArrayUtil.firstMatch(e -> e.getType().equalsIgnoreCase(type), values()); + } + + /** + * 创建 Codec 实例 + * + * @param config 拆包配置 + * @return Codec 实例 + */ + public IotTcpFrameCodec createCodec(IotTcpConfig.CodecConfig config) { + return codecFactory.apply(config); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/IotTcpFrameCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/IotTcpFrameCodec.java new file mode 100644 index 0000000000..7ee16d3a0d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/IotTcpFrameCodec.java @@ -0,0 +1,64 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec; + +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.parsetools.RecordParser; + +/** + * IoT TCP 帧编解码器接口 + *

+ * 用于解决 TCP 粘包/拆包问题,提供解码(拆包)和编码(加帧)能力 + * + * @author 芋道源码 + */ +public interface IotTcpFrameCodec { + + /** + * 获取编解码器类型 + * + * @return 编解码器类型 + */ + IotTcpCodecTypeEnum getType(); + + /** + * 创建解码器(RecordParser) + *

+ * 每个连接调用一次,返回的 parser 需绑定到 socket.handler() + * + * @param handler 消息处理器,当收到完整的消息帧后回调 + * @return RecordParser 实例 + */ + RecordParser createDecodeParser(Handler handler); + + /** + * 编码消息(加帧) + *

+ * 根据不同的编解码类型添加帧头/分隔符 + * + * @param data 原始数据 + * @return 编码后的数据(带帧头/分隔符) + */ + Buffer encode(byte[] data); + + // TODO @AI:还是搞个 facory 类 ,更好理解; + // ========== 静态工厂方法 ========== + + /** + * 根据配置创建编解码器 + * + * @param config 拆包配置 + * @return 编解码器实例,如果配置为空则返回 null + */ + static IotTcpFrameCodec create(IotTcpConfig.CodecConfig config) { + if (config == null) { + return null; + } + IotTcpCodecTypeEnum type = IotTcpCodecTypeEnum.of(config.getType()); + if (type == null) { + return null; + } + return type.createCodec(config); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/delimiter/IotTcpDelimiterFrameCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/delimiter/IotTcpDelimiterFrameCodec.java new file mode 100644 index 0000000000..6d4b8b009a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/delimiter/IotTcpDelimiterFrameCodec.java @@ -0,0 +1,107 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.delimiter; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.parsetools.RecordParser; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT TCP 分隔符帧编解码器 + *

+ * 基于分隔符的拆包策略,消息格式:消息内容 + 分隔符 + *

+ * 支持的分隔符: + *

+ * + * @author 芋道源码 + */ +@Slf4j +public class IotTcpDelimiterFrameCodec implements IotTcpFrameCodec { + + private final IotTcpConfig.CodecConfig config; + + /** + * 解析后的分隔符字节数组 + */ + private final byte[] delimiterBytes; + + /** + * 最大帧长度 + */ + // TODO @AI:最大帧数要不去掉;简洁一点;包括其他地方的配置项; + private final int maxFrameLength; + + public IotTcpDelimiterFrameCodec(IotTcpConfig.CodecConfig config) { + this.config = config; + // TODO @AI:禁止为空; + this.delimiterBytes = parseDelimiter(config.getDelimiter()); + this.maxFrameLength = config.getMaxFrameLength() != null ? config.getMaxFrameLength() : 1048576; + } + + @Override + public IotTcpCodecTypeEnum getType() { + return IotTcpCodecTypeEnum.DELIMITER; + } + + @Override + public RecordParser createDecodeParser(Handler handler) { + RecordParser parser = RecordParser.newDelimited(Buffer.buffer(delimiterBytes)); + + parser.handler(buffer -> { + // 检查帧长度是否超过限制 + if (buffer.length() > maxFrameLength) { + log.warn("[createDecodeParser][帧长度超过限制,length: {}, maxFrameLength: {}]", + buffer.length(), maxFrameLength); + return; + } + // 处理完整消息(不包含分隔符) + handler.handle(buffer); + }); + + // TODO @AI:异常处理; + parser.exceptionHandler(ex -> log.error("[createDecodeParser][解析异常]", ex)); + return parser; + } + + @Override + public Buffer encode(byte[] data) { + Buffer buffer = Buffer.buffer(); + buffer.appendBytes(data); + buffer.appendBytes(delimiterBytes); + return buffer; + } + + /** + * 解析分隔符字符串为字节数组 + *

+ * 支持转义字符:\n、\r、\r\n、\t + * + * @param delimiter 分隔符字符串 + * @return 分隔符字节数组 + */ + private byte[] parseDelimiter(String delimiter) { + if (StrUtil.isBlank(delimiter)) { + // 默认使用换行符 + return new byte[]{'\n'}; + } + + // 处理转义字符 + // TODO @AI:是否必要?不调整感觉也没问题?用户自己写对就 ok 了是哇? + String parsed = delimiter + .replace("\\r\\n", "\r\n") + .replace("\\r", "\r") + .replace("\\n", "\n") + .replace("\\t", "\t"); + return parsed.getBytes(); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpFixedLengthFrameCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpFixedLengthFrameCodec.java new file mode 100644 index 0000000000..c9269fc383 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpFixedLengthFrameCodec.java @@ -0,0 +1,58 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.length; + +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.parsetools.RecordParser; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT TCP 定长帧编解码器 + *

+ * 基于固定长度的拆包策略,每条消息固定字节数 + * + * @author 芋道源码 + */ +@Slf4j +public class IotTcpFixedLengthFrameCodec implements IotTcpFrameCodec { + + /** + * 固定消息长度 + */ + private final int fixedLength; + + public IotTcpFixedLengthFrameCodec(IotTcpConfig.CodecConfig config) { + // TODO @AI:config.getFixedLength() 禁止为空; + this.fixedLength = config.getFixedLength() != null ? config.getFixedLength() : 1024; + } + + @Override + public IotTcpCodecTypeEnum getType() { + return IotTcpCodecTypeEnum.FIXED_LENGTH; + } + + @Override + public RecordParser createDecodeParser(Handler handler) { + RecordParser parser = RecordParser.newFixed(fixedLength); + parser.handler(handler); + // TODO @AI:解析失败,是不是要抛出异常?因为要 close 掉连接; + parser.exceptionHandler(ex -> log.error("[createDecodeParser][解析异常]", ex)); + return parser; + } + + @Override + public Buffer encode(byte[] data) { + Buffer buffer = Buffer.buffer(fixedLength); + buffer.appendBytes(data); + // 如果数据不足固定长度,填充 0 + // TODO @AI:这里的填充是合理的么?RecordParser.newFixed(fixedLength) 有填充的逻辑么? + if (data.length < fixedLength) { + byte[] padding = new byte[fixedLength - data.length]; + buffer.appendBytes(padding); + } + return buffer; + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpLengthFieldFrameCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpLengthFieldFrameCodec.java new file mode 100644 index 0000000000..12a0b680eb --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpLengthFieldFrameCodec.java @@ -0,0 +1,166 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.length; + +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.parsetools.RecordParser; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT TCP 长度字段帧编解码器 + *

+ * 基于长度字段的拆包策略,消息格式:[长度字段][消息体] + *

+ * 参数说明: + *

+ * + * @author 芋道源码 + */ +@Slf4j +public class IotTcpLengthFieldFrameCodec implements IotTcpFrameCodec { + + private final int lengthFieldOffset; + private final int lengthFieldLength; + private final int lengthAdjustment; + private final int initialBytesToStrip; + // TODO @AI:去掉 maxFrameLength 相关字段; + private final int maxFrameLength; + + /** + * 头部长度 = 长度字段偏移量 + 长度字段长度 + */ + private final int headerLength; + + public IotTcpLengthFieldFrameCodec(IotTcpConfig.CodecConfig config) { + // TODO @AI: 增加参数校验;不要 default 逻辑; + this.lengthFieldOffset = config.getLengthFieldOffset() != null ? config.getLengthFieldOffset() : 0; + this.lengthFieldLength = config.getLengthFieldLength() != null ? config.getLengthFieldLength() : 4; + this.lengthAdjustment = config.getLengthAdjustment() != null ? config.getLengthAdjustment() : 0; + this.initialBytesToStrip = config.getInitialBytesToStrip() != null ? config.getInitialBytesToStrip() : 0; + this.maxFrameLength = config.getMaxFrameLength() != null ? config.getMaxFrameLength() : 1048576; + this.headerLength = lengthFieldOffset + lengthFieldLength; + } + + @Override + public IotTcpCodecTypeEnum getType() { + return IotTcpCodecTypeEnum.LENGTH_FIELD; + } + + @Override + public RecordParser createDecodeParser(Handler handler) { + // 创建状态机:先读取头部,再读取消息体 + RecordParser parser = RecordParser.newFixed(headerLength); + // 使用数组保存状态和头部数据 + // TODO @AI:bodyLength 只使用第 0 位,是不是 atomicInteger 更合适? + final int[] bodyLength = {-1}; + final Buffer[] headerBuffer = {null}; + + // 处理读取到的数据 + parser.handler(buffer -> { + if (bodyLength[0] == -1) { + // 阶段 1: 读取头部,解析长度字段 + headerBuffer[0] = buffer.copy(); + int length = readLength(buffer, lengthFieldOffset, lengthFieldLength); + int frameBodyLength = length + lengthAdjustment; + // 检查帧长度是否超过限制 + if (frameBodyLength < 0 || frameBodyLength > maxFrameLength - headerLength) { + log.warn("[createDecodeParser][帧长度异常,length: {}, frameBodyLength: {}, maxFrameLength: {}]", + length, frameBodyLength, maxFrameLength); + return; + } + + if (frameBodyLength == 0) { + // 消息体为空,直接处理 + // TODO @AI:消息体为空,是不是不合理哈?应该抛出异常? + Buffer frame = processFrame(headerBuffer[0], null); + handler.handle(frame); + } else { + // 切换到读取消息体模式 + bodyLength[0] = frameBodyLength; + parser.fixedSizeMode(frameBodyLength); + } + } else { + // 阶段 2: 读取消息体,组装完整帧 + Buffer frame = processFrame(headerBuffer[0], buffer); + // 重置状态,准备读取下一帧 + bodyLength[0] = -1; + headerBuffer[0] = null; + parser.fixedSizeMode(headerLength); + + // 处理完整消息 + handler.handle(frame); + } + }); + + parser.exceptionHandler(ex -> log.error("[createDecodeParser][解析异常]", ex)); + return parser; + } + + @Override + public Buffer encode(byte[] data) { + Buffer buffer = Buffer.buffer(); + // 计算要写入的长度值 + int lengthValue = data.length - lengthAdjustment; + // 写入偏移量前的填充字节(如果有) + for (int i = 0; i < lengthFieldOffset; i++) { + buffer.appendByte((byte) 0); + } + // 写入长度字段 + writeLength(buffer, lengthValue, lengthFieldLength); + // 写入消息体 + buffer.appendBytes(data); + return buffer; + } + + /** + * 从 Buffer 中读取长度字段 + */ + // TODO @AI:兼容 JDK8 + private int readLength(Buffer buffer, int offset, int length) { + return switch (length) { + case 1 -> buffer.getUnsignedByte(offset); + case 2 -> buffer.getUnsignedShort(offset); + case 4 -> buffer.getInt(offset); + default -> throw new IllegalArgumentException("不支持的长度字段长度: " + length); + }; + } + + /** + * 向 Buffer 中写入长度字段 + */ + // TODO @AI:兼容 JDK8 + private void writeLength(Buffer buffer, int length, int fieldLength) { + switch (fieldLength) { + case 1 -> buffer.appendByte((byte) length); + case 2 -> buffer.appendShort((short) length); + case 4 -> buffer.appendInt(length); + default -> throw new IllegalArgumentException("不支持的长度字段长度: " + fieldLength); + } + } + + /** + * 处理帧数据(根据 initialBytesToStrip 跳过指定字节) + */ + private Buffer processFrame(Buffer header, Buffer body) { + Buffer fullFrame = Buffer.buffer(); + if (header != null) { + fullFrame.appendBuffer(header); + } + if (body != null) { + fullFrame.appendBuffer(body); + } + // 根据 initialBytesToStrip 跳过指定字节 + if (initialBytesToStrip > 0 && initialBytesToStrip < fullFrame.length()) { + return fullFrame.slice(initialBytesToStrip, fullFrame.length()); + } + return fullFrame; + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamHandler.java similarity index 60% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamHandler.java index 374e75287b..c87eebb39c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamHandler.java @@ -1,8 +1,10 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import io.vertx.core.buffer.Buffer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -15,10 +17,17 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class IotTcpDownstreamHandler { - private final IotDeviceMessageService deviceMessageService; - private final IotTcpConnectionManager connectionManager; + /** + * TCP 帧编解码器(处理粘包/拆包) + */ + private final IotTcpFrameCodec codec; + /** + * 消息序列化器(处理业务消息序列化/反序列化) + */ + private final IotMessageSerializer serializer; + /** * 处理下行消息 */ @@ -26,21 +35,25 @@ public class IotTcpDownstreamHandler { try { log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]", message.getDeviceId(), message.getMethod(), message.getId()); - - // 1. 获取连接信息(包含 codecType) + // 1. 检查设备连接 IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId( message.getDeviceId()); if (connectionInfo == null) { - log.error("[handle][连接信息不存在,设备 ID: {}]", message.getDeviceId()); + // TODO @AI:是不是把消息 id 也打印进去?类似上面的日志 + log.warn("[handle][连接信息不存在,设备 ID: {}]", message.getDeviceId()); return; } - // 2. 使用连接时的 codecType 编码消息,并发送到设备 - byte[] bytes = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getCodecType()); - boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes); + // 2. 序列化 + 帧编码 + byte[] serializedData = serializer.serialize(message); + Buffer frameData = codec.encode(serializedData); + + // 3. 发送到设备 + boolean success = connectionManager.sendToDevice(message.getDeviceId(), frameData.getBytes()); + // TODO @AI:不成功,直接抛出异常;反正下面的日志也会打印失败的 if (success) { log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]", - message.getDeviceId(), message.getMethod(), message.getId(), bytes.length); + message.getDeviceId(), message.getMethod(), message.getId(), frameData.length()); } else { log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]", message.getDeviceId(), message.getMethod(), message.getId()); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamSubscriber.java similarity index 80% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamSubscriber.java index f04d2b2381..7a29e6c00c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamSubscriber.java @@ -1,9 +1,9 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler; import lombok.extern.slf4j.Slf4j; /** @@ -16,7 +16,7 @@ public class IotTcpDownstreamSubscriber extends IotProtocolDownstreamSubscriber private final IotTcpDownstreamHandler downstreamHandler; - public IotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocol, + public IotTcpDownstreamSubscriber(IotProtocol protocol, IotTcpDownstreamHandler downstreamHandler, IotMessageBus messageBus) { super(protocol, messageBus); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java similarity index 77% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java index 4a20f46af2..c5b6267eba 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java @@ -1,9 +1,8 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.upstream; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.IdUtil; -import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; @@ -16,15 +15,16 @@ import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -37,9 +37,6 @@ import java.util.Map; @Slf4j public class IotTcpUpstreamHandler implements Handler { - private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE; - private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE; - private static final String AUTH_METHOD = "auth"; private final IotDeviceMessageService deviceMessageService; @@ -52,15 +49,29 @@ public class IotTcpUpstreamHandler implements Handler { private final String serverId; - public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol, + /** + * TCP 帧编解码器(处理粘包/拆包) + */ + private final IotTcpFrameCodec codec; + /** + * 消息序列化器(处理业务消息序列化/反序列化) + */ + private final IotMessageSerializer serializer; + + public IotTcpUpstreamHandler(IotProtocol protocol, IotDeviceMessageService deviceMessageService, IotDeviceService deviceService, - IotTcpConnectionManager connectionManager) { + IotTcpConnectionManager connectionManager, + IotTcpFrameCodec codec, + IotMessageSerializer serializer) { + this.serverId = protocol.getServerId(); + this.codec = codec; + this.serializer = serializer; + this.connectionManager = connectionManager; + // TODO @AI:都通过 springutil 获取下; this.deviceMessageService = deviceMessageService; this.deviceService = deviceService; - this.connectionManager = connectionManager; this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); - this.serverId = protocol.getServerId(); } @Override @@ -78,18 +89,32 @@ public class IotTcpUpstreamHandler implements Handler { cleanupConnection(socket); }); - // 设置消息处理器 - socket.handler(buffer -> { - // TODO @AI:TODO @芋艿:这里应该有拆粘包的问题; + // 设置消息处理器(带拆包支持) + Handler messageHandler = buffer -> { + // TODO @AI:需要跟 AI 讨论。哪些情况关闭;哪些情况,发送异常消息; try { processMessage(clientId, buffer, socket); } catch (Exception e) { + // TODO @AI:这里能合并到 exceptionHandler 么?还是怎么搞好点; log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]", clientId, socket.remoteAddress(), e.getMessage()); cleanupConnection(socket); socket.close(); } - }); + }; + + // 根据是否配置了 FrameCodec 来决定是否使用拆包器 + // TODO @AI:必须配置! + if (codec != null) { + // 使用拆包器处理粘包/拆包 + RecordParser parser = codec.createDecodeParser(messageHandler); + socket.handler(parser); + log.debug("[handle][启用 {} 拆包器,客户端 ID: {}]", codec.getType(), clientId); + } else { + // 未配置拆包器,直接处理原始数据(可能存在粘包问题) + socket.handler(messageHandler); + log.debug("[handle][未配置拆包器,客户端 ID: {}]", clientId); + } } /** @@ -102,43 +127,42 @@ public class IotTcpUpstreamHandler implements Handler { */ private void processMessage(String clientId, Buffer buffer, NetSocket socket) throws Exception { // 1. 基础检查 + // TODO @AI:不太应该为空?! if (buffer == null || buffer.length() == 0) { return; } - // 2. 获取消息格式类型 - String codecType = getMessageCodecType(buffer, socket); - - // 3. 解码消息 + // 2. 反序列化消息 IotDeviceMessage message; try { - message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType); + message = serializer.deserialize(buffer.getBytes()); if (message == null) { - throw new Exception("解码后消息为空"); + throw new IllegalArgumentException("反序列化后消息为空"); } } catch (Exception e) { - // 消息格式错误时抛出异常,由上层处理连接断开 - throw new Exception("消息解码失败: " + e.getMessage(), e); + // TODO @AI:是不是不用 try catch? + throw new Exception("消息反序列化失败: " + e.getMessage(), e); } - // 4. 根据消息类型路由处理 + // 3. 根据消息类型路由处理 try { if (AUTH_METHOD.equals(message.getMethod())) { // 认证请求 - handleAuthenticationRequest(clientId, message, codecType, socket); + handleAuthenticationRequest(clientId, message, socket); } else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) { // 设备动态注册请求 - handleRegisterRequest(clientId, message, codecType, socket); + handleRegisterRequest(clientId, message, socket); } else { // 业务消息 - handleBusinessRequest(clientId, message, codecType, socket); + handleBusinessRequest(clientId, message, socket); } } catch (Exception e) { - log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]", - clientId, message.getMethod(), e); + // TODO @AI:如果参数不正确,不断开连接; + log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]", clientId, message.getMethod(), e); // 发送错误响应,避免客户端一直等待 + // TODO @AI:发送失败,是不是不用 try catch? try { - sendErrorResponse(socket, message.getRequestId(), "消息处理失败", codecType); + sendErrorResponse(socket, message.getRequestId(), "消息处理失败"); } catch (Exception responseEx) { log.error("[processMessage][发送错误响应失败,客户端 ID: {}]", clientId, responseEx); } @@ -148,74 +172,73 @@ public class IotTcpUpstreamHandler implements Handler { /** * 处理认证请求 * - * @param clientId 客户端 ID - * @param message 消息信息 - * @param codecType 消息编解码类型 - * @param socket 网络连接 + * @param clientId 客户端 ID + * @param message 消息信息 + * @param socket 网络连接 */ - private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, String codecType, - NetSocket socket) { + private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, NetSocket socket) { try { // 1.1 解析认证参数 + // TODO @AI:直接 JsonUtils.convertObject(params, IotDeviceAuthReqDTO.class);然后,校验参数,不正确抛出 invalid exception;和 http 那一样; IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams()); if (authParams == null) { log.warn("[handleAuthenticationRequest][认证参数解析失败,客户端 ID: {}]", clientId); - sendErrorResponse(socket, message.getRequestId(), "认证参数不完整", codecType); + sendErrorResponse(socket, message.getRequestId(), "认证参数不完整"); return; } // 1.2 执行认证 if (!validateDeviceAuth(authParams)) { log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]", clientId, authParams.getUsername()); - sendErrorResponse(socket, message.getRequestId(), "认证失败", codecType); + sendErrorResponse(socket, message.getRequestId(), "认证失败"); return; } // 2.1 解析设备信息 IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername()); + // TODO @AI:这里就断言 deviceInfo 不为空了?! if (deviceInfo == null) { - sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败", codecType); + sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败"); return; } // 2.2 获取设备信息 - IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), - deviceInfo.getDeviceName()); + IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + // TODO @AI:这里就断言 device 不为空了?! if (device == null) { - sendErrorResponse(socket, message.getRequestId(), "设备不存在", codecType); + sendErrorResponse(socket, message.getRequestId(), "设备不存在"); return; } // 3.1 注册连接 - registerConnection(socket, device, clientId, codecType); + registerConnection(socket, device, clientId); // 3.2 发送上线消息 sendOnlineMessage(device); // 3.3 发送成功响应 - sendSuccessResponse(socket, message.getRequestId(), "认证成功", codecType); + sendSuccessResponse(socket, message.getRequestId(), "认证成功"); log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]", device.getId(), device.getDeviceName()); } catch (Exception e) { + // TODO @AI:最大化去掉 try catch;(这个方法里的) log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e); - sendErrorResponse(socket, message.getRequestId(), "认证处理异常", codecType); + sendErrorResponse(socket, message.getRequestId(), "认证处理异常"); } } /** * 处理设备动态注册请求(一型一密,不需要认证) * - * @param clientId 客户端 ID - * @param message 消息信息 - * @param codecType 消息编解码类型 - * @param socket 网络连接 + * @param clientId 客户端 ID + * @param message 消息信息 + * @param socket 网络连接 * @see 阿里云 - 一型一密 */ - private void handleRegisterRequest(String clientId, IotDeviceMessage message, String codecType, - NetSocket socket) { + private void handleRegisterRequest(String clientId, IotDeviceMessage message, NetSocket socket) { try { // 1. 解析注册参数 IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams()); if (params == null) { log.warn("[handleRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId); - sendErrorResponse(socket, message.getRequestId(), "注册参数不完整", codecType); + sendErrorResponse(socket, message.getRequestId(), "注册参数不完整"); return; } @@ -223,34 +246,33 @@ public class IotTcpUpstreamHandler implements Handler { CommonResult result = deviceApi.registerDevice(params); if (result.isError()) { log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg()); - sendErrorResponse(socket, message.getRequestId(), result.getMsg(), codecType); + sendErrorResponse(socket, message.getRequestId(), result.getMsg()); return; } // 3. 发送成功响应(包含 deviceSecret) - sendRegisterSuccessResponse(socket, message.getRequestId(), result.getData(), codecType); + sendRegisterSuccessResponse(socket, message.getRequestId(), result.getData()); log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]", clientId, params.getDeviceName()); } catch (Exception e) { log.error("[handleRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e); - sendErrorResponse(socket, message.getRequestId(), "注册处理异常", codecType); + sendErrorResponse(socket, message.getRequestId(), "注册处理异常"); } } /** * 处理业务请求 * - * @param clientId 客户端 ID - * @param message 消息信息 - * @param codecType 消息编解码类型 - * @param socket 网络连接 + * @param clientId 客户端 ID + * @param message 消息信息 + * @param socket 网络连接 */ - private void handleBusinessRequest(String clientId, IotDeviceMessage message, String codecType, NetSocket socket) { + private void handleBusinessRequest(String clientId, IotDeviceMessage message, NetSocket socket) { try { // 1. 检查认证状态 if (connectionManager.isNotAuthenticated(socket)) { log.warn("[handleBusinessRequest][设备未认证,客户端 ID: {}]", clientId); - sendErrorResponse(socket, message.getRequestId(), "请先进行认证", codecType); + sendErrorResponse(socket, message.getRequestId(), "请先进行认证"); return; } @@ -267,42 +289,19 @@ public class IotTcpUpstreamHandler implements Handler { } } - /** - * 获取消息编解码类型 - * - * @param buffer 消息 - * @param socket 网络连接 - * @return 消息编解码类型 - */ - private String getMessageCodecType(Buffer buffer, NetSocket socket) { - // 1. 如果已认证,优先使用缓存的编解码类型 - IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket); - if (connectionInfo != null - && StrUtil.isNotBlank(connectionInfo.getCodecType())) { - return connectionInfo.getCodecType(); - } - - // 2. 未认证时检测消息格式类型 - return IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(buffer.getBytes()) ? CODEC_TYPE_BINARY - : CODEC_TYPE_JSON; - } - /** * 注册连接信息 * - * @param socket 网络连接 - * @param device 设备 - * @param clientId 客户端 ID - * @param codecType 消息编解码类型 + * @param socket 网络连接 + * @param device 设备 + * @param clientId 客户端 ID */ - private void registerConnection(NetSocket socket, IotDeviceRespDTO device, - String clientId, String codecType) { + private void registerConnection(NetSocket socket, IotDeviceRespDTO device, String clientId) { IotTcpConnectionManager.ConnectionInfo connectionInfo = new IotTcpConnectionManager.ConnectionInfo() .setDeviceId(device.getId()) .setProductKey(device.getProductKey()) .setDeviceName(device.getDeviceName()) - .setClientId(clientId) - .setCodecType(codecType); + .setClientId(clientId); // 注册连接 connectionManager.registerConnection(socket, device.getId(), connectionInfo); } @@ -351,10 +350,10 @@ public class IotTcpUpstreamHandler implements Handler { * @param success 是否成功 * @param message 消息 * @param requestId 请求 ID - * @param codecType 消息编解码类型 */ - private void sendResponse(NetSocket socket, boolean success, String message, String requestId, String codecType) { + private void sendResponse(NetSocket socket, boolean success, String message, String requestId) { try { + // TODO @AI:是不是不用 Object responseData = MapUtil.builder() .put("success", success) .put("message", message) @@ -364,14 +363,17 @@ public class IotTcpUpstreamHandler implements Handler { IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, code, message); - byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType); - socket.write(Buffer.buffer(encodedData)); + // 序列化 + 帧编码 + byte[] serializedData = serializer.serialize(responseMessage); + Buffer frameData = codec.encode(serializedData); + socket.write(frameData); } catch (Exception e) { log.error("[sendResponse][发送响应失败,requestId: {}]", requestId, e); } } + // TODO @AI:合并到 handleAuthenticationRequest 里; /** * 验证设备认证信息 * @@ -397,10 +399,9 @@ public class IotTcpUpstreamHandler implements Handler { * @param socket 网络连接 * @param requestId 请求 ID * @param errorMessage 错误消息 - * @param codecType 消息编解码类型 */ - private void sendErrorResponse(NetSocket socket, String requestId, String errorMessage, String codecType) { - sendResponse(socket, false, errorMessage, requestId, codecType); + private void sendErrorResponse(NetSocket socket, String requestId, String errorMessage) { + sendResponse(socket, false, errorMessage, requestId); } /** @@ -409,11 +410,10 @@ public class IotTcpUpstreamHandler implements Handler { * @param socket 网络连接 * @param requestId 请求 ID * @param message 消息 - * @param codecType 消息编解码类型 */ @SuppressWarnings("SameParameterValue") - private void sendSuccessResponse(NetSocket socket, String requestId, String message, String codecType) { - sendResponse(socket, true, message, requestId, codecType); + private void sendSuccessResponse(NetSocket socket, String requestId, String message) { + sendResponse(socket, true, message, requestId); } /** @@ -489,17 +489,17 @@ public class IotTcpUpstreamHandler implements Handler { * @param socket 网络连接 * @param requestId 请求 ID * @param registerResp 注册响应 - * @param codecType 消息编解码类型 */ private void sendRegisterSuccessResponse(NetSocket socket, String requestId, - IotDeviceRegisterRespDTO registerResp, String codecType) { + IotDeviceRegisterRespDTO registerResp) { try { // 1. 构建响应消息(参考 HTTP 返回格式,直接返回 IotDeviceRegisterRespDTO) IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null); - // 2. 发送响应 - byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType); - socket.write(Buffer.buffer(encodedData)); + // 2. 序列化 + 帧编码 + byte[] serializedData = serializer.serialize(responseMessage); + Buffer frameData = codec.encode(serializedData); + socket.write(frameData); } catch (Exception e) { log.error("[sendRegisterSuccessResponse][发送注册成功响应失败,requestId: {}]", requestId, e); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java index c0f2cf7aaa..e236a6db9f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java @@ -81,6 +81,7 @@ public class IotTcpConnectionManager { return info != null; } + // TODO @AI:是不是可以去掉;因为现在只有认证成功的,才会注册连接; /** * 检查连接是否未认证 */ @@ -148,10 +149,6 @@ public class IotTcpConnectionManager { * 客户端 ID */ private String clientId; - /** - * 消息编解码类型(认证后确定) - */ - private String codecType; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index 7ec5c2a463..16ae6298d3 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -42,7 +42,7 @@ yudao: secret: yudaoIotGatewayTokenSecret123456789 # Token 密钥,至少32位 expiration: 7d - # 协议实例列表(新版配置方式) + # 协议实例列表 protocols: # ==================================== # 针对引入的 HTTP 组件的配置 @@ -53,6 +53,22 @@ yudao: enabled: true http: ssl-enabled: false + # ==================================== + # 针对引入的 TCP 组件的配置 + # ==================================== + - id: tcp-json + type: tcp + port: 8091 + enabled: true + serialize: json + tcp: + max-connections: 1000 + keep-alive-timeout-ms: 30000 + ssl-enabled: false + codec: + type: delimiter # 拆包类型:length_field / delimiter / fixed_length + delimiter: "\\n" # 分隔符(支持转义:\\n=换行, \\r=回车, \\t=制表符) + max-frame-length: 1048576 # 最大帧长度(字节) # 协议配置(旧版,保持兼容) protocol: @@ -91,17 +107,6 @@ yudao: trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径 trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码 # ==================================== - # 针对引入的 TCP 组件的配置 - # ==================================== - tcp: - enabled: false - port: 8091 - keep-alive-timeout-ms: 30000 - max-connections: 1000 - ssl-enabled: false - ssl-cert-path: "classpath:certs/client.jks" - ssl-key-path: "classpath:certs/client.jks" - # ==================================== # 针对引入的 UDP 组件的配置 # ==================================== udp: diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java index 4b6936c63c..b386cd1455 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java @@ -10,32 +10,40 @@ import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.binary.IotBinarySerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import java.io.InputStream; +import java.io.BufferedReader; +import java.io.InputStreamReader; import java.io.OutputStream; import java.net.Socket; +import java.nio.charset.StandardCharsets; /** * IoT 直连设备 TCP 协议集成测试(手动测试) * *

测试场景:直连设备(IotProductDeviceTypeEnum 的 DIRECT 类型)通过 TCP 协议直接连接平台 * - *

支持两种编解码格式: + *

支持两种序列化格式: *

    - *
  • {@link IotTcpJsonDeviceMessageCodec} - JSON 格式
  • - *
  • {@link IotTcpBinaryDeviceMessageCodec} - 二进制格式
  • + *
  • {@link IotJsonSerializer} - JSON 格式
  • + *
  • {@link IotBinarySerializer} - 二进制格式
  • + *
+ * + *

TCP 拆包配置(需与 application.yaml 中的 codec 配置一致): + *

    + *
  • type: delimiter - 基于分隔符拆包
  • + *
  • delimiter: \n - 换行符作为分隔符
  • *
* *

使用步骤: *

    *
  1. 启动 yudao-module-iot-gateway 服务(TCP 端口 8091)
  2. - *
  3. 修改 {@link #CODEC} 选择测试的编解码格式
  4. + *
  5. 修改 {@link #SERIALIZER} 选择测试的序列化格式(Delimiter 模式只支持 JSON)
  6. *
  7. 运行以下测试方法: *
      *
    • {@link #testAuth()} - 设备认证
    • @@ -58,10 +66,16 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { private static final int SERVER_PORT = 8091; private static final int TIMEOUT_MS = 5000; - // ===================== 编解码器选择(修改此处切换 JSON / Binary) ===================== + // TODO @AI:这里可以通过 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec 么?例如说:使用 vertx vertx tcp client???从而更好的复用解码逻辑; + /** + * 分隔符(需与 application.yaml 中的 delimiter 配置一致) + */ + private static final String DELIMITER = "\n"; -// private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec(); - private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec(); + // ===================== 序列化器选择(Delimiter 模式推荐使用 JSON) ===================== + + private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer(); +// private static final IotMessageSerializer SERIALIZER = new IotBinarySerializer(); // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) ===================== @@ -82,18 +96,19 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { .setClientId(authInfo.getClientId()) .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - // 1.2 编码 - byte[] payload = CODEC.encode(request); - log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO); + // 1.2 序列化 + // TODO @AI:是不是把 SERIALIZER 放到 sendAndReceive 里; + byte[] payload = SERIALIZER.serialize(request); + log.info("[testAuth][Serializer: {}, 请求消息: {}, 数据包长度: {} 字节]", SERIALIZER.getType(), request, payload.length); // 2.1 发送请求 try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { socket.setSoTimeout(TIMEOUT_MS); byte[] responseBytes = sendAndReceive(socket, payload); - // 2.2 解码响应 + // 2.2 反序列化响应 if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); + IotDeviceMessage response = SERIALIZER.deserialize(responseBytes); log.info("[testAuth][响应消息: {}]", response); } else { log.warn("[testAuth][未收到响应]"); @@ -119,17 +134,17 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { registerReqDTO.setProductSecret("test-product-secret"); IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null); - // 1.2 编码 - byte[] payload = CODEC.encode(request); - log.info("[testDeviceRegister][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + // 1.2 序列化 + byte[] payload = SERIALIZER.serialize(request); + log.info("[testDeviceRegister][Serializer: {}, 请求消息: {}, 数据包长度: {} 字节]", SERIALIZER.getType(), request, payload.length); // 2.1 发送请求 try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { socket.setSoTimeout(TIMEOUT_MS); byte[] responseBytes = sendAndReceive(socket, payload); - // 2.2 解码响应 + // 2.2 反序列化响应 if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); + IotDeviceMessage response = SERIALIZER.deserialize(responseBytes); log.info("[testDeviceRegister][响应消息: {}]", response); log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]"); } else { @@ -161,15 +176,15 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { .put("height", "2") .build()), null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); - log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + // 2.2 序列化 + byte[] payload = SERIALIZER.serialize(request); + log.info("[testPropertyPost][Serializer: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送请求 byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 解码响应 + // 3.2 反序列化响应 if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); + IotDeviceMessage response = SERIALIZER.deserialize(responseBytes); log.info("[testPropertyPost][响应消息: {}]", response); } else { log.warn("[testPropertyPost][未收到响应]"); @@ -200,15 +215,15 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { MapUtil.builder().put("rice", 3).build(), System.currentTimeMillis()), null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); - log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + // 2.2 序列化 + byte[] payload = SERIALIZER.serialize(request); + log.info("[testEventPost][Serializer: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送请求 byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 解码响应 + // 3.2 反序列化响应 if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); + IotDeviceMessage response = SERIALIZER.deserialize(responseBytes); log.info("[testEventPost][响应消息: {}]", response); } else { log.warn("[testEventPost][未收到响应]"); @@ -231,41 +246,44 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - byte[] payload = CODEC.encode(request); + byte[] payload = SERIALIZER.serialize(request); byte[] responseBytes = sendAndReceive(socket, payload); if (responseBytes != null) { log.info("[authenticate][响应数据长度: {} 字节,首字节: 0x{}, HEX: {}]", responseBytes.length, String.format("%02X", responseBytes[0]), HexUtil.encodeHexStr(responseBytes)); - return CODEC.decode(responseBytes); + return SERIALIZER.deserialize(responseBytes); } return null; } /** - * 发送 TCP 请求并接收响应 + * 发送 TCP 请求并接收响应(支持 Delimiter 分隔符协议) + *

      + * 发送格式:[消息体][分隔符] + * 接收格式:[消息体][分隔符] * * @param socket TCP Socket - * @param payload 请求数据 - * @return 响应数据 + * @param payload 请求数据(消息体,不含分隔符) + * @return 响应数据(消息体,不含分隔符) */ private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception { - // 1. 发送请求 OutputStream out = socket.getOutputStream(); - InputStream in = socket.getInputStream(); - out.write(payload); - out.flush(); + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); - // 2.1 等待一小段时间让服务器处理 - Thread.sleep(100); - // 2.2 接收响应 - byte[] buffer = new byte[4096]; + // 1. 发送请求(添加分隔符后缀) + out.write(payload); + out.write(DELIMITER.getBytes(StandardCharsets.UTF_8)); + out.flush(); + log.info("[sendAndReceive][发送数据: {} 字节(不含分隔符)]", payload.length); + + // 2. 接收响应(读取到分隔符为止) try { - int length = in.read(buffer); - if (length > 0) { - byte[] response = new byte[length]; - System.arraycopy(buffer, 0, response, 0, length); + String responseLine = in.readLine(); + if (responseLine != null) { + byte[] response = responseLine.getBytes(StandardCharsets.UTF_8); + log.info("[sendAndReceive][接收数据: {} 字节]", response.length); return response; } return null; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-binary-packet-examples.md b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-binary-packet-examples.md deleted file mode 100644 index d6b2b3fdb5..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-binary-packet-examples.md +++ /dev/null @@ -1,193 +0,0 @@ -# TCP 二进制协议数据包格式说明 - -## 1. 协议概述 - -TCP 二进制协议是一种高效的自定义协议格式,采用紧凑的二进制格式传输数据,适用于对带宽和性能要求较高的 IoT 场景。 - -### 1.1 协议特点 - -- **高效传输**:完全二进制格式,减少数据传输量 -- **版本控制**:内置协议版本号,支持协议升级 -- **类型安全**:明确的消息类型标识 -- **简洁设计**:去除冗余字段,协议更加精简 -- **兼容性**:与现有 `IotDeviceMessage` 接口完全兼容 - -## 2. 协议格式 - -### 2.1 整体结构 - -``` -+--------+--------+--------+---------------------------+--------+--------+ -| 魔术字 | 版本号 | 消息类型| 消息长度(4字节) | -+--------+--------+--------+---------------------------+--------+--------+ -| 消息 ID 长度(2字节) | 消息 ID (变长字符串) | -+--------+--------+--------+--------+--------+--------+--------+--------+ -| 方法名长度(2字节) | 方法名(变长字符串) | -+--------+--------+--------+--------+--------+--------+--------+--------+ -| 消息体数据(变长) | -+--------+--------+--------+--------+--------+--------+--------+--------+ -``` - -### 2.2 字段详细说明 - -| 字段 | 长度 | 类型 | 说明 | -|------|------|------|------| -| 魔术字 | 1字节 | byte | `0x7E` - 协议识别标识,用于数据同步 | -| 版本号 | 1字节 | byte | `0x01` - 协议版本号,支持版本控制 | -| 消息类型 | 1字节 | byte | `0x01`=请求, `0x02`=响应 | -| 消息长度 | 4字节 | int | 整个消息的总长度(包含头部) | -| 消息 ID 长度 | 2字节 | short | 消息 ID 字符串的字节长度 | -| 消息 ID | 变长 | string | 消息唯一标识符(UTF-8编码) | -| 方法名长度 | 2字节 | short | 方法名字符串的字节长度 | -| 方法名 | 变长 | string | 消息方法名(UTF-8编码) | -| 消息体 | 变长 | binary | 根据消息类型的不同数据格式 | - -**⚠️ 重要说明**:deviceId 不包含在协议中,由服务器根据连接上下文自动设置 - -### 2.3 协议常量定义 - -```java -// 协议标识 -private static final byte MAGIC_NUMBER = (byte) 0x7E; -private static final byte PROTOCOL_VERSION = (byte) 0x01; - -// 消息类型 -private static final byte REQUEST = (byte) 0x01; // 请求消息 -private static final byte RESPONSE = (byte) 0x02; // 响应消息 - -// 协议长度 -private static final int HEADER_FIXED_LENGTH = 7; // 固定头部长度 -private static final int MIN_MESSAGE_LENGTH = 11; // 最小消息长度 -``` - -## 3. 消息类型和格式 - -### 3.1 请求消息 (REQUEST - 0x01) - -请求消息用于设备向服务器发送数据或请求。 - -#### 3.1.1 消息体格式 -``` -消息体 = params 数据(JSON格式) -``` - -#### 3.1.2 示例:设备认证请求 - -**消息内容:** -- 消息 ID: `auth_1704067200000_123` -- 方法名: `auth` -- 参数: `{"clientId":"device_001","username":"productKey_deviceName","password":"device_password"}` - -**二进制数据包结构:** -``` -7E // 魔术字 (0x7E) -01 // 版本号 (0x01) -01 // 消息类型 (REQUEST) -00 00 00 89 // 消息长度 (137字节) -00 19 // 消息 ID 长度 (25字节) -61 75 74 68 5F 31 37 30 34 30 // 消息 ID: "auth_1704067200000_123" -36 37 32 30 30 30 30 30 5F 31 -32 33 -00 04 // 方法名长度 (4字节) -61 75 74 68 // 方法名: "auth" -7B 22 63 6C 69 65 6E 74 49 64 // JSON参数数据 -22 3A 22 64 65 76 69 63 65 5F // {"clientId":"device_001", -30 30 31 22 2C 22 75 73 65 72 // "username":"productKey_deviceName", -6E 61 6D 65 22 3A 22 70 72 6F // "password":"device_password"} -64 75 63 74 4B 65 79 5F 64 65 -76 69 63 65 4E 61 6D 65 22 2C -22 70 61 73 73 77 6F 72 64 22 -3A 22 64 65 76 69 63 65 5F 70 -61 73 73 77 6F 72 64 22 7D -``` - -#### 3.1.3 示例:属性数据上报 - -**消息内容:** -- 消息 ID: `property_1704067200000_456` -- 方法名: `thing.property.post` -- 参数: `{"temperature":25.5,"humidity":60.2,"pressure":1013.25}` - -### 3.2 响应消息 (RESPONSE - 0x02) - -响应消息用于服务器向设备回复请求结果。 - -#### 3.2.1 消息体格式 -``` -消息体 = 响应码(4字节) + 响应消息长度(2字节) + 响应消息(UTF-8) + 响应数据(JSON) -``` - -#### 3.2.2 字段说明 - -| 字段 | 长度 | 类型 | 说明 | -|------|------|------|------| -| 响应码 | 4字节 | int | HTTP状态码风格,0=成功,其他=错误 | -| 响应消息长度 | 2字节 | short | 响应消息字符串的字节长度 | -| 响应消息 | 变长 | string | 响应提示信息(UTF-8编码) | -| 响应数据 | 变长 | binary | JSON格式的响应数据(可选) | - -#### 3.2.3 示例:认证成功响应 - -**消息内容:** -- 消息 ID: `auth_response_1704067200000_123` -- 方法名: `auth` -- 响应码: `0` -- 响应消息: `认证成功` -- 响应数据: `{"success":true,"message":"认证成功"}` - -**二进制数据包结构:** -``` -7E // 魔术字 (0x7E) -01 // 版本号 (0x01) -02 // 消息类型 (RESPONSE) -00 00 00 A4 // 消息长度 (164字节) -00 22 // 消息 ID 长度 (34字节) -61 75 74 68 5F 72 65 73 70 6F // 消息 ID: "auth_response_1704067200000_123" -6E 73 65 5F 31 37 30 34 30 36 -37 32 30 30 30 30 30 5F 31 32 -33 -00 04 // 方法名长度 (4字节) -61 75 74 68 // 方法名: "auth" -00 00 00 00 // 响应码 (0 = 成功) -00 0C // 响应消息长度 (12字节) -E8 AE A4 E8 AF 81 E6 88 90 E5 // 响应消息: "认证成功" (UTF-8) -8A 9F -7B 22 73 75 63 63 65 73 73 22 // JSON响应数据 -3A 74 72 75 65 2C 22 6D 65 73 // {"success":true,"message":"认证成功"} -73 61 67 65 22 3A 22 E8 AE A4 -E8 AF 81 E6 88 90 E5 8A 9F 22 -7D -``` - -## 4. 编解码器标识 - -```java -public static final String TYPE = "TCP_BINARY"; -``` - -## 5. 协议优势 - -- **数据紧凑**:二进制格式,相比 JSON 减少 30-50% 的数据量 -- **解析高效**:直接二进制操作,减少字符串转换开销 -- **类型安全**:明确的消息类型和字段定义 -- **设计简洁**:去除冗余字段,协议更加精简高效 -- **版本控制**:内置版本号支持协议升级 - -## 6. 与 JSON 协议对比 - -| 特性 | 二进制协议 | JSON协议 | -|------|-------------|--------| -| 数据大小 | 小(节省30-50%) | 大 | -| 解析性能 | 高 | 中等 | -| 网络开销 | 低 | 高 | -| 可读性 | 差 | 优秀 | -| 调试难度 | 高 | 低 | -| 扩展性 | 良好 | 优秀 | - -**推荐场景**: -- ✅ **高频数据传输**:传感器数据实时上报 -- ✅ **带宽受限环境**:移动网络、卫星通信 -- ✅ **性能要求高**:需要低延迟、高吞吐的场景 -- ✅ **设备资源有限**:嵌入式设备、低功耗设备 -- ❌ **开发调试阶段**:调试困难,建议使用 JSON 协议 -- ❌ **快速原型开发**:开发效率低 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-json-packet-examples.md b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-json-packet-examples.md deleted file mode 100644 index 09ef50cfe5..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-json-packet-examples.md +++ /dev/null @@ -1,191 +0,0 @@ -# TCP JSON 格式协议说明 - -## 1. 协议概述 - -TCP JSON 格式协议采用纯 JSON 格式进行数据传输,具有以下特点: - -- **标准化**:使用标准 JSON 格式,易于解析和处理 -- **可读性**:人类可读,便于调试和维护 -- **扩展性**:可以轻松添加新字段,向后兼容 -- **跨平台**:JSON 格式支持所有主流编程语言 -- **安全优化**:移除冗余的 deviceId 字段,提高安全性 - -## 2. 消息格式 - -### 2.1 基础消息结构 - -```json -{ - "id": "消息唯一标识", - "method": "消息方法", - "params": { - // 请求参数 - }, - "data": { - // 响应数据 - }, - "code": 响应码, - "msg": "响应消息", - "timestamp": 时间戳 -} -``` - -**⚠️ 重要说明**: -- **不包含 deviceId 字段**:由服务器通过 TCP 连接上下文自动确定设备 ID -- **避免伪造攻击**:防止设备伪造其他设备的 ID 发送消息 - -### 2.2 字段详细说明 - -| 字段名 | 类型 | 必填 | 用途 | 说明 | -|--------|------|------|------|------| -| id | String | 是 | 所有消息 | 消息唯一标识 | -| method | String | 是 | 所有消息 | 消息方法,如 `auth`、`thing.property.post` | -| params | Object | 否 | 请求消息 | 请求参数,具体内容根据method而定 | -| data | Object | 否 | 响应消息 | 响应数据,服务器返回的结果数据 | -| code | Integer | 否 | 响应消息 | 响应码,0=成功,其他=错误 | -| msg | String | 否 | 响应消息 | 响应提示信息 | -| timestamp | Long | 是 | 所有消息 | 时间戳(毫秒),编码时自动生成 | - -### 2.3 消息分类 - -#### 2.3.1 请求消息(上行) -- **特征**:包含 `params` 字段,不包含 `code`、`msg` 字段 -- **方向**:设备 → 服务器 -- **用途**:设备认证、数据上报、状态更新等 - -#### 2.3.2 响应消息(下行) -- **特征**:包含 `code`、`msg` 字段,可能包含 `data` 字段 -- **方向**:服务器 → 设备 -- **用途**:认证结果、指令响应、错误提示等 - -## 3. 消息示例 - -### 3.1 设备认证 (auth) - -#### 认证请求格式 -**消息方向**:设备 → 服务器 - -```json -{ - "id": "auth_1704067200000_123", - "method": "auth", - "params": { - "clientId": "device_001", - "username": "productKey_deviceName", - "password": "设备密码" - }, - "timestamp": 1704067200000 -} -``` - -**认证参数说明:** - -| 字段名 | 类型 | 必填 | 说明 | -|--------|------|------|------| -| clientId | String | 是 | 客户端唯一标识,用于连接管理 | -| username | String | 是 | 设备用户名,格式为 `productKey_deviceName` | -| password | String | 是 | 设备密码,在设备管理平台配置 | - -#### 认证响应格式 -**消息方向**:服务器 → 设备 - -**认证成功响应:** -```json -{ - "id": "response_auth_1704067200000_123", - "method": "auth", - "data": { - "success": true, - "message": "认证成功" - }, - "code": 0, - "msg": "认证成功", - "timestamp": 1704067200001 -} -``` - -**认证失败响应:** -```json -{ - "id": "response_auth_1704067200000_123", - "method": "auth", - "data": { - "success": false, - "message": "认证失败:用户名或密码错误" - }, - "code": 401, - "msg": "认证失败", - "timestamp": 1704067200001 -} -``` - -### 3.2 属性数据上报 (thing.property.post) - -**消息方向**:设备 → 服务器 - -**示例:温度传感器数据上报** -```json -{ - "id": "property_1704067200000_456", - "method": "thing.property.post", - "params": { - "temperature": 25.5, - "humidity": 60.2, - "pressure": 1013.25, - "battery": 85, - "signal_strength": -65 - }, - "timestamp": 1704067200000 -} -``` - -### 3.3 设备状态更新 (thing.state.update) - -**消息方向**:设备 → 服务器 - -**示例:心跳请求** -```json -{ - "id": "heartbeat_1704067200000_321", - "method": "thing.state.update", - "params": { - "state": "online", - "uptime": 86400, - "memory_usage": 65.2, - "cpu_usage": 12.8 - }, - "timestamp": 1704067200000 -} -``` - -## 4. 编解码器标识 - -```java -public static final String TYPE = "TCP_JSON"; -``` - -## 5. 协议优势 - -- **开发效率高**:JSON 格式,开发和调试简单 -- **跨语言支持**:所有主流语言都支持 JSON -- **可读性优秀**:可以直接查看消息内容 -- **扩展性强**:可以轻松添加新字段 -- **安全性高**:移除 deviceId 字段,防止伪造攻击 - -## 6. 与二进制协议对比 - -| 特性 | JSON协议 | 二进制协议 | -|------|----------|------------| -| 开发难度 | 低 | 高 | -| 调试难度 | 低 | 高 | -| 可读性 | 优秀 | 差 | -| 数据大小 | 中等 | 小(节省30-50%) | -| 解析性能 | 中等 | 高 | -| 学习成本 | 低 | 高 | - -**推荐场景**: -- ✅ **开发调试阶段**:调试友好,开发效率高 -- ✅ **快速原型开发**:实现简单,快速迭代 -- ✅ **多语言集成**:广泛的语言支持 -- ❌ **高频数据传输**:建议使用二进制协议 -- ❌ **带宽受限环境**:建议使用二进制协议 \ No newline at end of file