feat(iot):【协议改造】tcp 初步改造(50%)

This commit is contained in:
YunaiV
2026-02-01 02:52:58 +08:00
parent e89fc2bfbd
commit 44b1950e4a
20 changed files with 1039 additions and 907 deletions

View File

@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol;
@@ -12,10 +11,6 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscr
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
@@ -24,6 +19,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketDownst
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
@@ -84,44 +80,6 @@ public class IotGatewayConfiguration {
}
}
/**
* IoT 网关 TCP 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.tcp", name = "enabled", havingValue = "true")
@Slf4j
public static class TcpProtocolConfiguration {
@Bean(name = "tcpVertx", destroyMethod = "close")
public Vertx tcpVertx() {
return Vertx.vertx();
}
@Bean
public IotTcpUpstreamProtocol iotTcpUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotTcpConnectionManager connectionManager,
@Qualifier("tcpVertx") Vertx tcpVertx) {
return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(),
deviceService, messageService, connectionManager, tcpVertx);
}
@Bean
public IotTcpDownstreamHandler iotTcpDownstreamHandler(IotDeviceMessageService messageService,
IotTcpConnectionManager connectionManager) {
return new IotTcpDownstreamHandler(messageService, connectionManager);
}
@Bean
public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocolHandler,
IotTcpDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
return new IotTcpDownstreamSubscriber(protocolHandler, downstreamHandler, messageBus);
}
}
/**
* IoT 网关 MQTT 协议配置类
*/

View File

@@ -2,6 +2,9 @@ package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.TrustOptions;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
@@ -76,21 +79,11 @@ public class IotGatewayProperties {
@Data
public static class ProtocolProperties {
/**
* HTTP 组件配置
*/
private HttpProperties http;
/**
* EMQX 组件配置
*/
private EmqxProperties emqx;
/**
* TCP 组件配置
*/
private TcpProperties tcp;
/**
* MQTT 组件配置
*/
@@ -113,36 +106,6 @@ public class IotGatewayProperties {
}
@Data
public static class HttpProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* 服务端口
*/
private Integer serverPort;
/**
* 是否开启 SSL
*/
@NotNull(message = "是否开启 SSL 不能为空")
private Boolean sslEnabled = false;
/**
* SSL 证书路径
*/
private String sslKeyPath;
/**
* SSL 证书路径
*/
private String sslCertPath;
}
@Data
public static class EmqxProperties {
@@ -312,47 +275,6 @@ public class IotGatewayProperties {
}
@Data
public static class TcpProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* 服务器端口
*/
private Integer port = 8091;
/**
* 心跳超时时间(毫秒)
*/
private Long keepAliveTimeoutMs = 30000L;
/**
* 最大连接数
*/
private Integer maxConnections = 1000;
/**
* 是否启用SSL
*/
private Boolean sslEnabled = false;
/**
* SSL证书路径
*/
private String sslCertPath;
/**
* SSL私钥路径
*/
private String sslKeyPath;
}
@Data
public static class MqttProperties {
@@ -381,6 +303,7 @@ public class IotGatewayProperties {
*/
private Integer keepAliveTimeoutSeconds = 300;
// TODO @AI所有跟 ssl 相关的参数,是不是可以统一?放到 protocol 层级ProtocolInstanceProperties【优先级低】暂时不用规划
/**
* 是否启用 SSL
*/
@@ -399,11 +322,11 @@ public class IotGatewayProperties {
/**
* 密钥证书选项
*/
private io.vertx.core.net.KeyCertOptions keyCertOptions;
private KeyCertOptions keyCertOptions;
/**
* 信任选项
*/
private io.vertx.core.net.TrustOptions trustOptions;
private TrustOptions trustOptions;
/**
* SSL 证书路径
*/
@@ -596,78 +519,11 @@ public class IotGatewayProperties {
@Valid
private IotHttpConfig http;
// TODO @AI后续改下
/**
* TCP 协议配置(后续扩展)
* TCP 协议配置
*/
@Valid
private TcpInstanceConfig tcp;
}
/**
* TCP 协议实例配置(后续扩展)
*/
@Data
public static class TcpInstanceConfig {
/**
* 最大连接数
*/
private Integer maxConnections = 1000;
/**
* 心跳超时时间(毫秒)
*/
private Long keepAliveTimeoutMs = 30000L;
/**
* 是否启用 SSL
*/
private Boolean sslEnabled = false;
/**
* SSL 证书路径
*/
private String sslCertPath;
/**
* SSL 私钥路径
*/
private String sslKeyPath;
/**
* 拆包配置
*/
private CodecConfig codec;
/**
* TCP 拆包配置
*/
@Data
public static class CodecConfig {
/**
* 拆包类型LENGTH_FIELD / DELIMITER
*/
private String type;
/**
* LENGTH_FIELD: 偏移量
*/
private Integer lengthFieldOffset;
/**
* LENGTH_FIELD: 长度字段长度
*/
private Integer lengthFieldLength;
/**
* DELIMITER: 分隔符
*/
private String delimiter;
}
private IotTcpConfig tcp;
}

View File

@@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.SmartLifecycle;
@@ -14,9 +15,7 @@ import java.util.ArrayList;
import java.util.List;
/**
* IoT 协议管理器
*
* 负责根据配置创建和管理协议实例
* IoT 协议管理器:负责根据配置创建和管理协议实例
*
* @author 芋道源码
*/
@@ -96,7 +95,7 @@ public class IotProtocolManager implements SmartLifecycle {
* @param config 协议实例配置
* @return 协议实例
*/
@SuppressWarnings({"SwitchStatementWithTooFewBranches", "EnhancedSwitchMigration"})
@SuppressWarnings({"EnhancedSwitchMigration"})
private IotProtocol createProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
IotProtocolTypeEnum protocolType = IotProtocolTypeEnum.of(config.getType());
if (protocolType == null) {
@@ -106,6 +105,8 @@ public class IotProtocolManager implements SmartLifecycle {
switch (protocolType) {
case HTTP:
return createHttpProtocol(config);
case TCP:
return createTcpProtocol(config);
// TODO 后续添加其他协议类型
default:
throw new IllegalArgumentException(String.format(
@@ -123,4 +124,14 @@ public class IotProtocolManager implements SmartLifecycle {
return new IotHttpProtocol(config, messageBus);
}
/**
* 创建 TCP 协议实例
*
* @param config 协议实例配置
* @return TCP 协议实例
*/
private IotTcpProtocol createTcpProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
return new IotTcpProtocol(config, messageBus, serializerManager);
}
}

View File

@@ -33,10 +33,6 @@ public class IotHttpProtocol implements IotProtocol {
* 协议配置
*/
private final ProtocolInstanceProperties properties;
/**
* 消息总线
*/
private final IotMessageBus messageBus;
/**
* 服务器 ID用于消息追踪全局唯一
*/
@@ -44,26 +40,26 @@ public class IotHttpProtocol implements IotProtocol {
private final String serverId;
/**
* Vert.x 实例(每个 Protocol 自己管理)
* 运行状态
*/
private volatile boolean running = false;
/**
* Vert.x 实例
*/
private Vertx vertx;
/**
* HTTP 服务器
*/
private HttpServer httpServer;
/**
* 下行消息订阅者
*/
private IotHttpDownstreamSubscriber downstreamSubscriber;
/**
* 运行状态
*/
private volatile boolean running = false;
public IotHttpProtocol(ProtocolInstanceProperties properties, IotMessageBus messageBus) {
this.properties = properties;
this.messageBus = messageBus;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus);
}
@@ -121,7 +117,6 @@ public class IotHttpProtocol implements IotProtocol {
getId(), properties.getPort(), serverId);
// 2. 启动下行消息订阅者
this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus);
this.downstreamSubscriber.start();
} catch (Exception e) {
log.error("[start][IoT HTTP 协议 {} 启动失败]", getId(), e);
@@ -132,9 +127,6 @@ public class IotHttpProtocol implements IotProtocol {
}
throw e;
}
// 2. 启动下行消息订阅者
this.downstreamSubscriber.start();
}
@Override

View File

@@ -0,0 +1,114 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* IoT TCP 协议配置
*
* @author 芋道源码
*/
@Data
public class IotTcpConfig {
/**
* 最大连接数
*/
@NotNull(message = "最大连接数不能为空")
@Min(value = 1, message = "最大连接数必须大于 0")
private Integer maxConnections = 1000;
/**
* 心跳超时时间(毫秒)
*/
@NotNull(message = "心跳超时时间不能为空")
@Min(value = 1000, message = "心跳超时时间必须大于 1000 毫秒")
private Long keepAliveTimeoutMs = 30000L;
/**
* 是否启用 SSL
*/
@NotNull(message = "是否启用 SSL 不能为空")
private Boolean sslEnabled = false;
/**
* SSL 证书路径
*/
private String sslCertPath;
/**
* SSL 私钥路径
*/
private String sslKeyPath;
/**
* 拆包配置
*/
@Valid
private CodecConfig codec;
/**
* TCP 拆包配置
*/
@Data
public static class CodecConfig {
/**
* 拆包类型
*
* @see IotTcpCodecTypeEnum
*/
@NotNull(message = "拆包类型不能为空")
private String type;
/**
* LENGTH_FIELD: 长度字段偏移量
* <p>
* 表示长度字段在消息中的起始位置从0开始
*/
private Integer lengthFieldOffset;
/**
* LENGTH_FIELD: 长度字段长度(字节数)
* <p>
* 常见值1最大255、2最大65535、4最大2GB
*/
private Integer lengthFieldLength;
/**
* LENGTH_FIELD: 长度调整值
* <p>
* 用于调整长度字段的值,例如长度字段包含头部长度时需要减去头部长度
*/
private Integer lengthAdjustment = 0;
/**
* LENGTH_FIELD: 跳过的初始字节数
* <p>
* 解码后跳过的字节数,通常等于 lengthFieldOffset + lengthFieldLength
*/
private Integer initialBytesToStrip = 0;
/**
* DELIMITER: 分隔符
* <p>
* 支持转义字符:\n换行、\r回车、\r\n回车换行
*/
private String delimiter;
/**
* FIXED_LENGTH: 固定消息长度(字节)
* <p>
* 每条消息的固定长度
*/
private Integer fixedLength;
/**
* 最大帧长度(字节)
* <p>
* 防止内存溢出,默认 1MB
*/
@NotNull(message = "最大帧长度不能为空")
@Min(value = 1, message = "最大帧长度必须大于 0")
private Integer maxFrameLength = 1048576;
}
}

View File

@@ -0,0 +1,205 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream.IotTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream.IotTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.upstream.IotTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.PemKeyCertOptions;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* IoT TCP 协议实现
* <p>
* 基于 Vert.x 实现 TCP 服务器,接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotTcpProtocol implements IotProtocol {
/**
* 协议配置
*/
private final ProtocolInstanceProperties properties;
/**
* 服务器 ID用于消息追踪全局唯一
*/
@Getter
private final String serverId;
/**
* 运行状态
*/
private volatile boolean running = false;
/**
* Vert.x 实例
*/
private Vertx vertx;
/**
* TCP 服务器
*/
private NetServer tcpServer;
/**
* 下行消息订阅者
*/
private final IotTcpDownstreamSubscriber downstreamSubscriber;
/**
* 消息序列化器
*/
private final IotMessageSerializer serializer;
/**
* TCP 帧编解码器
*/
private final IotTcpFrameCodec frameCodec;
public IotTcpProtocol(ProtocolInstanceProperties properties, IotMessageBus messageBus,
IotMessageSerializerManager serializerManager) {
this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
// 初始化序列化器
IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(properties.getSerialize());
if (serializeType == null) {
serializeType = IotSerializeTypeEnum.JSON; // 默认 JSON
}
this.serializer = serializerManager.get(serializeType);
// 初始化帧编解码器
IotTcpConfig tcpConfig = properties.getTcp();
IotTcpConfig.CodecConfig codecConfig = tcpConfig != null ? tcpConfig.getCodec() : null;
this.frameCodec = IotTcpFrameCodec.create(codecConfig);
// 初始化下行消息订阅者
IotTcpConnectionManager connectionManager = SpringUtil.getBean(IotTcpConnectionManager.class);
IotTcpDownstreamHandler downstreamHandler = new IotTcpDownstreamHandler(connectionManager, frameCodec, serializer);
this.downstreamSubscriber = new IotTcpDownstreamSubscriber(this, downstreamHandler, messageBus);
}
@Override
public String getId() {
return properties.getId();
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.TCP;
}
@Override
public void start() {
if (running) {
log.warn("[start][IoT TCP 协议 {} 已经在运行中]", getId());
return;
}
// 1.1 创建 Vertx 实例(每个 Protocol 独立管理)
this.vertx = Vertx.vertx();
// 1.2 创建服务器选项
IotTcpConfig tcpConfig = properties.getTcp();
NetServerOptions options = new NetServerOptions()
.setPort(properties.getPort())
.setTcpKeepAlive(true)
.setTcpNoDelay(true)
.setReuseAddress(true);
if (tcpConfig != null && Boolean.TRUE.equals(tcpConfig.getSslEnabled())) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions()
.setKeyPath(tcpConfig.getSslKeyPath())
.setCertPath(tcpConfig.getSslCertPath());
options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
}
// 1.3 创建服务器并设置连接处理器
tcpServer = vertx.createNetServer(options);
IotDeviceService deviceService = SpringUtil.getBean(IotDeviceService.class);
IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotTcpConnectionManager connectionManager = SpringUtil.getBean(IotTcpConnectionManager.class);
tcpServer.connectHandler(socket -> {
IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, deviceService,
connectionManager, frameCodec, serializer);
handler.handle(socket);
});
// 1.4 启动 TCP 服务器
try {
tcpServer.listen().result();
running = true;
log.info("[start][IoT TCP 协议 {} 启动成功,端口:{}serverId{}]",
getId(), properties.getPort(), serverId);
// 2. 启动下行消息订阅者
this.downstreamSubscriber.start();
} catch (Exception e) {
log.error("[start][IoT TCP 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭 Vertx
if (vertx != null) {
vertx.close();
vertx = null;
}
throw e;
}
}
@Override
public void stop() {
if (!running) {
return;
}
// 1. 停止下行消息订阅者
try {
downstreamSubscriber.stop();
log.info("[stop][IoT TCP 协议 {} 下行消息订阅者已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT TCP 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
// 2.1 关闭 TCP 服务器
if (tcpServer != null) {
try {
tcpServer.close().result();
log.info("[stop][IoT TCP 协议 {} 服务器已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT TCP 协议 {} 服务器停止失败]", getId(), e);
}
tcpServer = null;
}
// 2.2 关闭 Vertx 实例
if (vertx != null) {
try {
vertx.close().result();
log.info("[stop][IoT TCP 协议 {} Vertx 已关闭]", getId());
} catch (Exception e) {
log.error("[stop][IoT TCP 协议 {} Vertx 关闭失败]", getId(), e);
}
vertx = null;
}
running = false;
log.info("[stop][IoT TCP 协议 {} 已停止]", getId());
}
@Override
public boolean isRunning() {
return running;
}
}

View File

@@ -1,125 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.PemKeyCertOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 TCP 协议:接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotTcpUpstreamProtocol implements IotProtocol {
private static final String ID = "tcp";
private final IotGatewayProperties.TcpProperties tcpProperties;
private final IotDeviceService deviceService;
private final IotDeviceMessageService messageService;
private final IotTcpConnectionManager connectionManager;
private final Vertx vertx;
@Getter
private final String serverId;
private NetServer tcpServer;
private volatile boolean running = false;
public IotTcpUpstreamProtocol(IotGatewayProperties.TcpProperties tcpProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotTcpConnectionManager connectionManager,
Vertx vertx) {
this.tcpProperties = tcpProperties;
this.deviceService = deviceService;
this.messageService = messageService;
this.connectionManager = connectionManager;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getPort());
}
@Override
public String getId() {
return ID;
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.TCP;
}
@Override
@PostConstruct
public void start() {
// 创建服务器选项
NetServerOptions options = new NetServerOptions()
.setPort(tcpProperties.getPort())
.setTcpKeepAlive(true)
.setTcpNoDelay(true)
.setReuseAddress(true);
// 配置 SSL如果启用
if (Boolean.TRUE.equals(tcpProperties.getSslEnabled())) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions()
.setKeyPath(tcpProperties.getSslKeyPath())
.setCertPath(tcpProperties.getSslCertPath());
options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
}
// 创建服务器并设置连接处理器
tcpServer = vertx.createNetServer(options);
tcpServer.connectHandler(socket -> {
IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, deviceService,
connectionManager);
handler.handle(socket);
});
// 启动服务器
try {
tcpServer.listen().result();
running = true;
log.info("[start][IoT 网关 TCP 协议启动成功,端口:{}]", tcpProperties.getPort());
} catch (Exception e) {
log.error("[start][IoT 网关 TCP 协议启动失败]", e);
throw e;
}
}
@Override
@PreDestroy
public void stop() {
if (tcpServer != null) {
try {
tcpServer.close().result();
running = false;
log.info("[stop][IoT 网关 TCP 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 TCP 协议停止失败]", e);
}
}
}
@Override
public boolean isRunning() {
return running;
}
}

View File

@@ -0,0 +1,77 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec;
import cn.hutool.core.util.ArrayUtil;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.delimiter.IotTcpDelimiterFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.length.IotTcpFixedLengthFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.length.IotTcpLengthFieldFrameCodec;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.function.Function;
/**
* IoT TCP 拆包类型枚举
*
* @author 芋道源码
*/
@AllArgsConstructor
@Getter
public enum IotTcpCodecTypeEnum {
/**
* 基于固定长度的拆包
* <p>
* 消息格式:固定长度的消息体
* 需要配置fixedLength固定长度
*/
FIXED_LENGTH("fixed_length", IotTcpFixedLengthFrameCodec::new),
/**
* 基于分隔符的拆包
* <p>
* 消息格式:消息内容 + 分隔符
* 需要配置delimiter分隔符
*/
DELIMITER("delimiter", IotTcpDelimiterFrameCodec::new),
/**
* 基于长度字段的拆包
* <p>
* 消息格式:[长度字段][消息体]
* 需要配置lengthFieldOffset长度字段偏移量、lengthFieldLength长度字段长度
*/
LENGTH_FIELD("length_field", IotTcpLengthFieldFrameCodec::new),
;
/**
* 类型标识
*/
private final String type;
/**
* Codec 创建工厂
*/
private final Function<IotTcpConfig.CodecConfig, IotTcpFrameCodec> codecFactory;
/**
* 根据类型获取枚举
*
* @param type 类型标识
* @return 枚举值
*/
public static IotTcpCodecTypeEnum of(String type) {
return ArrayUtil.firstMatch(e -> e.getType().equalsIgnoreCase(type), values());
}
/**
* 创建 Codec 实例
*
* @param config 拆包配置
* @return Codec 实例
*/
public IotTcpFrameCodec createCodec(IotTcpConfig.CodecConfig config) {
return codecFactory.apply(config);
}
}

View File

@@ -0,0 +1,64 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
/**
* IoT TCP 帧编解码器接口
* <p>
* 用于解决 TCP 粘包/拆包问题,提供解码(拆包)和编码(加帧)能力
*
* @author 芋道源码
*/
public interface IotTcpFrameCodec {
/**
* 获取编解码器类型
*
* @return 编解码器类型
*/
IotTcpCodecTypeEnum getType();
/**
* 创建解码器RecordParser
* <p>
* 每个连接调用一次,返回的 parser 需绑定到 socket.handler()
*
* @param handler 消息处理器,当收到完整的消息帧后回调
* @return RecordParser 实例
*/
RecordParser createDecodeParser(Handler<Buffer> handler);
/**
* 编码消息(加帧)
* <p>
* 根据不同的编解码类型添加帧头/分隔符
*
* @param data 原始数据
* @return 编码后的数据(带帧头/分隔符)
*/
Buffer encode(byte[] data);
// TODO @AI还是搞个 facory 类 ,更好理解;
// ========== 静态工厂方法 ==========
/**
* 根据配置创建编解码器
*
* @param config 拆包配置
* @return 编解码器实例,如果配置为空则返回 null
*/
static IotTcpFrameCodec create(IotTcpConfig.CodecConfig config) {
if (config == null) {
return null;
}
IotTcpCodecTypeEnum type = IotTcpCodecTypeEnum.of(config.getType());
if (type == null) {
return null;
}
return type.createCodec(config);
}
}

View File

@@ -0,0 +1,107 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.delimiter;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
/**
* IoT TCP 分隔符帧编解码器
* <p>
* 基于分隔符的拆包策略,消息格式:消息内容 + 分隔符
* <p>
* 支持的分隔符:
* <ul>
* <li>\n - 换行符</li>
* <li>\r - 回车符</li>
* <li>\r\n - 回车换行</li>
* <li>自定义字符串</li>
* </ul>
*
* @author 芋道源码
*/
@Slf4j
public class IotTcpDelimiterFrameCodec implements IotTcpFrameCodec {
private final IotTcpConfig.CodecConfig config;
/**
* 解析后的分隔符字节数组
*/
private final byte[] delimiterBytes;
/**
* 最大帧长度
*/
// TODO @AI最大帧数要不去掉简洁一点包括其他地方的配置项
private final int maxFrameLength;
public IotTcpDelimiterFrameCodec(IotTcpConfig.CodecConfig config) {
this.config = config;
// TODO @AI禁止为空
this.delimiterBytes = parseDelimiter(config.getDelimiter());
this.maxFrameLength = config.getMaxFrameLength() != null ? config.getMaxFrameLength() : 1048576;
}
@Override
public IotTcpCodecTypeEnum getType() {
return IotTcpCodecTypeEnum.DELIMITER;
}
@Override
public RecordParser createDecodeParser(Handler<Buffer> handler) {
RecordParser parser = RecordParser.newDelimited(Buffer.buffer(delimiterBytes));
parser.handler(buffer -> {
// 检查帧长度是否超过限制
if (buffer.length() > maxFrameLength) {
log.warn("[createDecodeParser][帧长度超过限制length: {}, maxFrameLength: {}]",
buffer.length(), maxFrameLength);
return;
}
// 处理完整消息(不包含分隔符)
handler.handle(buffer);
});
// TODO @AI异常处理
parser.exceptionHandler(ex -> log.error("[createDecodeParser][解析异常]", ex));
return parser;
}
@Override
public Buffer encode(byte[] data) {
Buffer buffer = Buffer.buffer();
buffer.appendBytes(data);
buffer.appendBytes(delimiterBytes);
return buffer;
}
/**
* 解析分隔符字符串为字节数组
* <p>
* 支持转义字符:\n、\r、\r\n、\t
*
* @param delimiter 分隔符字符串
* @return 分隔符字节数组
*/
private byte[] parseDelimiter(String delimiter) {
if (StrUtil.isBlank(delimiter)) {
// 默认使用换行符
return new byte[]{'\n'};
}
// 处理转义字符
// TODO @AI是否必要不调整感觉也没问题用户自己写对就 ok 了是哇?
String parsed = delimiter
.replace("\\r\\n", "\r\n")
.replace("\\r", "\r")
.replace("\\n", "\n")
.replace("\\t", "\t");
return parsed.getBytes();
}
}

View File

@@ -0,0 +1,58 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.length;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
/**
* IoT TCP 定长帧编解码器
* <p>
* 基于固定长度的拆包策略,每条消息固定字节数
*
* @author 芋道源码
*/
@Slf4j
public class IotTcpFixedLengthFrameCodec implements IotTcpFrameCodec {
/**
* 固定消息长度
*/
private final int fixedLength;
public IotTcpFixedLengthFrameCodec(IotTcpConfig.CodecConfig config) {
// TODO @AIconfig.getFixedLength() 禁止为空;
this.fixedLength = config.getFixedLength() != null ? config.getFixedLength() : 1024;
}
@Override
public IotTcpCodecTypeEnum getType() {
return IotTcpCodecTypeEnum.FIXED_LENGTH;
}
@Override
public RecordParser createDecodeParser(Handler<Buffer> handler) {
RecordParser parser = RecordParser.newFixed(fixedLength);
parser.handler(handler);
// TODO @AI解析失败是不是要抛出异常因为要 close 掉连接;
parser.exceptionHandler(ex -> log.error("[createDecodeParser][解析异常]", ex));
return parser;
}
@Override
public Buffer encode(byte[] data) {
Buffer buffer = Buffer.buffer(fixedLength);
buffer.appendBytes(data);
// 如果数据不足固定长度,填充 0
// TODO @AI这里的填充是合理的么RecordParser.newFixed(fixedLength) 有填充的逻辑么?
if (data.length < fixedLength) {
byte[] padding = new byte[fixedLength - data.length];
buffer.appendBytes(padding);
}
return buffer;
}
}

View File

@@ -0,0 +1,166 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.length;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
/**
* IoT TCP 长度字段帧编解码器
* <p>
* 基于长度字段的拆包策略,消息格式:[长度字段][消息体]
* <p>
* 参数说明:
* <ul>
* <li>lengthFieldOffset: 长度字段在消息中的偏移量</li>
* <li>lengthFieldLength: 长度字段的字节数1/2/4</li>
* <li>lengthAdjustment: 长度调整值,用于调整长度字段的实际含义</li>
* <li>initialBytesToStrip: 解码后跳过的字节数</li>
* </ul>
*
* @author 芋道源码
*/
@Slf4j
public class IotTcpLengthFieldFrameCodec implements IotTcpFrameCodec {
private final int lengthFieldOffset;
private final int lengthFieldLength;
private final int lengthAdjustment;
private final int initialBytesToStrip;
// TODO @AI去掉 maxFrameLength 相关字段;
private final int maxFrameLength;
/**
* 头部长度 = 长度字段偏移量 + 长度字段长度
*/
private final int headerLength;
public IotTcpLengthFieldFrameCodec(IotTcpConfig.CodecConfig config) {
// TODO @AI 增加参数校验;不要 default 逻辑;
this.lengthFieldOffset = config.getLengthFieldOffset() != null ? config.getLengthFieldOffset() : 0;
this.lengthFieldLength = config.getLengthFieldLength() != null ? config.getLengthFieldLength() : 4;
this.lengthAdjustment = config.getLengthAdjustment() != null ? config.getLengthAdjustment() : 0;
this.initialBytesToStrip = config.getInitialBytesToStrip() != null ? config.getInitialBytesToStrip() : 0;
this.maxFrameLength = config.getMaxFrameLength() != null ? config.getMaxFrameLength() : 1048576;
this.headerLength = lengthFieldOffset + lengthFieldLength;
}
@Override
public IotTcpCodecTypeEnum getType() {
return IotTcpCodecTypeEnum.LENGTH_FIELD;
}
@Override
public RecordParser createDecodeParser(Handler<Buffer> handler) {
// 创建状态机:先读取头部,再读取消息体
RecordParser parser = RecordParser.newFixed(headerLength);
// 使用数组保存状态和头部数据
// TODO @AIbodyLength 只使用第 0 位,是不是 atomicInteger 更合适?
final int[] bodyLength = {-1};
final Buffer[] headerBuffer = {null};
// 处理读取到的数据
parser.handler(buffer -> {
if (bodyLength[0] == -1) {
// 阶段 1: 读取头部,解析长度字段
headerBuffer[0] = buffer.copy();
int length = readLength(buffer, lengthFieldOffset, lengthFieldLength);
int frameBodyLength = length + lengthAdjustment;
// 检查帧长度是否超过限制
if (frameBodyLength < 0 || frameBodyLength > maxFrameLength - headerLength) {
log.warn("[createDecodeParser][帧长度异常length: {}, frameBodyLength: {}, maxFrameLength: {}]",
length, frameBodyLength, maxFrameLength);
return;
}
if (frameBodyLength == 0) {
// 消息体为空,直接处理
// TODO @AI消息体为空是不是不合理哈应该抛出异常
Buffer frame = processFrame(headerBuffer[0], null);
handler.handle(frame);
} else {
// 切换到读取消息体模式
bodyLength[0] = frameBodyLength;
parser.fixedSizeMode(frameBodyLength);
}
} else {
// 阶段 2: 读取消息体,组装完整帧
Buffer frame = processFrame(headerBuffer[0], buffer);
// 重置状态,准备读取下一帧
bodyLength[0] = -1;
headerBuffer[0] = null;
parser.fixedSizeMode(headerLength);
// 处理完整消息
handler.handle(frame);
}
});
parser.exceptionHandler(ex -> log.error("[createDecodeParser][解析异常]", ex));
return parser;
}
@Override
public Buffer encode(byte[] data) {
Buffer buffer = Buffer.buffer();
// 计算要写入的长度值
int lengthValue = data.length - lengthAdjustment;
// 写入偏移量前的填充字节(如果有)
for (int i = 0; i < lengthFieldOffset; i++) {
buffer.appendByte((byte) 0);
}
// 写入长度字段
writeLength(buffer, lengthValue, lengthFieldLength);
// 写入消息体
buffer.appendBytes(data);
return buffer;
}
/**
* 从 Buffer 中读取长度字段
*/
// TODO @AI兼容 JDK8
private int readLength(Buffer buffer, int offset, int length) {
return switch (length) {
case 1 -> buffer.getUnsignedByte(offset);
case 2 -> buffer.getUnsignedShort(offset);
case 4 -> buffer.getInt(offset);
default -> throw new IllegalArgumentException("不支持的长度字段长度: " + length);
};
}
/**
* 向 Buffer 中写入长度字段
*/
// TODO @AI兼容 JDK8
private void writeLength(Buffer buffer, int length, int fieldLength) {
switch (fieldLength) {
case 1 -> buffer.appendByte((byte) length);
case 2 -> buffer.appendShort((short) length);
case 4 -> buffer.appendInt(length);
default -> throw new IllegalArgumentException("不支持的长度字段长度: " + fieldLength);
}
}
/**
* 处理帧数据(根据 initialBytesToStrip 跳过指定字节)
*/
private Buffer processFrame(Buffer header, Buffer body) {
Buffer fullFrame = Buffer.buffer();
if (header != null) {
fullFrame.appendBuffer(header);
}
if (body != null) {
fullFrame.appendBuffer(body);
}
// 根据 initialBytesToStrip 跳过指定字节
if (initialBytesToStrip > 0 && initialBytesToStrip < fullFrame.length()) {
return fullFrame.slice(initialBytesToStrip, fullFrame.length());
}
return fullFrame;
}
}

View File

@@ -1,8 +1,10 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import io.vertx.core.buffer.Buffer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -15,10 +17,17 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class IotTcpDownstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final IotTcpConnectionManager connectionManager;
/**
* TCP 帧编解码器处理粘包/拆包
*/
private final IotTcpFrameCodec codec;
/**
* 消息序列化器处理业务消息序列化/反序列化
*/
private final IotMessageSerializer serializer;
/**
* 处理下行消息
*/
@@ -26,21 +35,25 @@ public class IotTcpDownstreamHandler {
try {
log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
// 1. 获取连接信息包含 codecType
// 1. 检查设备连接
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId(
message.getDeviceId());
if (connectionInfo == null) {
log.error("[handle][连接信息不存在,设备 ID: {}]", message.getDeviceId());
// TODO @AI是不是把消息 id 也打印进去类似上面的日志
log.warn("[handle][连接信息不存在,设备 ID: {}]", message.getDeviceId());
return;
}
// 2. 使用连接时的 codecType 编码消息并发送到设备
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getCodecType());
boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes);
// 2. 序列化 + 编码
byte[] serializedData = serializer.serialize(message);
Buffer frameData = codec.encode(serializedData);
// 3. 发送到设备
boolean success = connectionManager.sendToDevice(message.getDeviceId(), frameData.getBytes());
// TODO @AI不成功直接抛出异常反正下面的日志也会打印失败的
if (success) {
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
message.getDeviceId(), message.getMethod(), message.getId(), frameData.length());
} else {
log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());

View File

@@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler;
import lombok.extern.slf4j.Slf4j;
/**
@@ -16,7 +16,7 @@ public class IotTcpDownstreamSubscriber extends IotProtocolDownstreamSubscriber
private final IotTcpDownstreamHandler downstreamHandler;
public IotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocol,
public IotTcpDownstreamSubscriber(IotProtocol protocol,
IotTcpDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
super(protocol, messageBus);

View File

@@ -1,9 +1,8 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.upstream;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
@@ -16,15 +15,16 @@ import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@@ -37,9 +37,6 @@ import java.util.Map;
@Slf4j
public class IotTcpUpstreamHandler implements Handler<NetSocket> {
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
private static final String AUTH_METHOD = "auth";
private final IotDeviceMessageService deviceMessageService;
@@ -52,15 +49,29 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
private final String serverId;
public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol,
/**
* TCP 帧编解码器处理粘包/拆包
*/
private final IotTcpFrameCodec codec;
/**
* 消息序列化器处理业务消息序列化/反序列化
*/
private final IotMessageSerializer serializer;
public IotTcpUpstreamHandler(IotProtocol protocol,
IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotTcpConnectionManager connectionManager) {
IotTcpConnectionManager connectionManager,
IotTcpFrameCodec codec,
IotMessageSerializer serializer) {
this.serverId = protocol.getServerId();
this.codec = codec;
this.serializer = serializer;
this.connectionManager = connectionManager;
// TODO @AI都通过 springutil 获取下
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.connectionManager = connectionManager;
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.serverId = protocol.getServerId();
}
@Override
@@ -78,18 +89,32 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
cleanupConnection(socket);
});
// 设置消息处理器
socket.handler(buffer -> {
// TODO @AITODO @芋艿这里应该有拆粘包的问题
// 设置消息处理器带拆包支持
Handler<Buffer> messageHandler = buffer -> {
// TODO @AI需要跟 AI 讨论哪些情况关闭哪些情况发送异常消息
try {
processMessage(clientId, buffer, socket);
} catch (Exception e) {
// TODO @AI这里能合并到 exceptionHandler 还是怎么搞好点
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage());
cleanupConnection(socket);
socket.close();
}
});
};
// 根据是否配置了 FrameCodec 来决定是否使用拆包器
// TODO @AI必须配置
if (codec != null) {
// 使用拆包器处理粘包/拆包
RecordParser parser = codec.createDecodeParser(messageHandler);
socket.handler(parser);
log.debug("[handle][启用 {} 拆包器,客户端 ID: {}]", codec.getType(), clientId);
} else {
// 未配置拆包器直接处理原始数据可能存在粘包问题
socket.handler(messageHandler);
log.debug("[handle][未配置拆包器,客户端 ID: {}]", clientId);
}
}
/**
@@ -102,43 +127,42 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
*/
private void processMessage(String clientId, Buffer buffer, NetSocket socket) throws Exception {
// 1. 基础检查
// TODO @AI不太应该为空
if (buffer == null || buffer.length() == 0) {
return;
}
// 2. 获取消息格式类型
String codecType = getMessageCodecType(buffer, socket);
// 3. 解码消息
// 2. 反序列化消息
IotDeviceMessage message;
try {
message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
message = serializer.deserialize(buffer.getBytes());
if (message == null) {
throw new Exception("解码后消息为空");
throw new IllegalArgumentException("反序列化后消息为空");
}
} catch (Exception e) {
// 消息格式错误时抛出异常由上层处理连接断开
throw new Exception("消息解码失败: " + e.getMessage(), e);
// TODO @AI是不是不用 try catch
throw new Exception("消息反序列化失败: " + e.getMessage(), e);
}
// 4. 根据消息类型路由处理
// 3. 根据消息类型路由处理
try {
if (AUTH_METHOD.equals(message.getMethod())) {
// 认证请求
handleAuthenticationRequest(clientId, message, codecType, socket);
handleAuthenticationRequest(clientId, message, socket);
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) {
// 设备动态注册请求
handleRegisterRequest(clientId, message, codecType, socket);
handleRegisterRequest(clientId, message, socket);
} else {
// 业务消息
handleBusinessRequest(clientId, message, codecType, socket);
handleBusinessRequest(clientId, message, socket);
}
} catch (Exception e) {
log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]",
clientId, message.getMethod(), e);
// TODO @AI如果参数不正确不断开连接
log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]", clientId, message.getMethod(), e);
// 发送错误响应避免客户端一直等待
// TODO @AI发送失败是不是不用 try catch
try {
sendErrorResponse(socket, message.getRequestId(), "消息处理失败", codecType);
sendErrorResponse(socket, message.getRequestId(), "消息处理失败");
} catch (Exception responseEx) {
log.error("[processMessage][发送错误响应失败,客户端 ID: {}]", clientId, responseEx);
}
@@ -148,74 +172,73 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
/**
* 处理认证请求
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param codecType 消息编解码类型
* @param socket 网络连接
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket 网络连接
*/
private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, String codecType,
NetSocket socket) {
private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, NetSocket socket) {
try {
// 1.1 解析认证参数
// TODO @AI直接 JsonUtils.convertObject(params, IotDeviceAuthReqDTO.class)然后校验参数不正确抛出 invalid exception http 那一样
IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams());
if (authParams == null) {
log.warn("[handleAuthenticationRequest][认证参数解析失败,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "认证参数不完整", codecType);
sendErrorResponse(socket, message.getRequestId(), "认证参数不完整");
return;
}
// 1.2 执行认证
if (!validateDeviceAuth(authParams)) {
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {}username: {}]",
clientId, authParams.getUsername());
sendErrorResponse(socket, message.getRequestId(), "认证失败", codecType);
sendErrorResponse(socket, message.getRequestId(), "认证失败");
return;
}
// 2.1 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
// TODO @AI这里就断言 deviceInfo 不为空了
if (deviceInfo == null) {
sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败", codecType);
sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败");
return;
}
// 2.2 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
// TODO @AI这里就断言 device 不为空了
if (device == null) {
sendErrorResponse(socket, message.getRequestId(), "设备不存在", codecType);
sendErrorResponse(socket, message.getRequestId(), "设备不存在");
return;
}
// 3.1 注册连接
registerConnection(socket, device, clientId, codecType);
registerConnection(socket, device, clientId);
// 3.2 发送上线消息
sendOnlineMessage(device);
// 3.3 发送成功响应
sendSuccessResponse(socket, message.getRequestId(), "认证成功", codecType);
sendSuccessResponse(socket, message.getRequestId(), "认证成功");
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]",
device.getId(), device.getDeviceName());
} catch (Exception e) {
// TODO @AI最大化去掉 try catch这个方法里的
log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e);
sendErrorResponse(socket, message.getRequestId(), "认证处理异常", codecType);
sendErrorResponse(socket, message.getRequestId(), "认证处理异常");
}
}
/**
* 处理设备动态注册请求一型一密不需要认证
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param codecType 消息编解码类型
* @param socket 网络连接
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket 网络连接
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
private void handleRegisterRequest(String clientId, IotDeviceMessage message, String codecType,
NetSocket socket) {
private void handleRegisterRequest(String clientId, IotDeviceMessage message, NetSocket socket) {
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams());
if (params == null) {
log.warn("[handleRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "注册参数不完整", codecType);
sendErrorResponse(socket, message.getRequestId(), "注册参数不完整");
return;
}
@@ -223,34 +246,33 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
if (result.isError()) {
log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg());
sendErrorResponse(socket, message.getRequestId(), result.getMsg(), codecType);
sendErrorResponse(socket, message.getRequestId(), result.getMsg());
return;
}
// 3. 发送成功响应包含 deviceSecret
sendRegisterSuccessResponse(socket, message.getRequestId(), result.getData(), codecType);
sendRegisterSuccessResponse(socket, message.getRequestId(), result.getData());
log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]",
clientId, params.getDeviceName());
} catch (Exception e) {
log.error("[handleRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e);
sendErrorResponse(socket, message.getRequestId(), "注册处理异常", codecType);
sendErrorResponse(socket, message.getRequestId(), "注册处理异常");
}
}
/**
* 处理业务请求
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param codecType 消息编解码类型
* @param socket 网络连接
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket 网络连接
*/
private void handleBusinessRequest(String clientId, IotDeviceMessage message, String codecType, NetSocket socket) {
private void handleBusinessRequest(String clientId, IotDeviceMessage message, NetSocket socket) {
try {
// 1. 检查认证状态
if (connectionManager.isNotAuthenticated(socket)) {
log.warn("[handleBusinessRequest][设备未认证,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "请先进行认证", codecType);
sendErrorResponse(socket, message.getRequestId(), "请先进行认证");
return;
}
@@ -267,42 +289,19 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
}
}
/**
* 获取消息编解码类型
*
* @param buffer 消息
* @param socket 网络连接
* @return 消息编解码类型
*/
private String getMessageCodecType(Buffer buffer, NetSocket socket) {
// 1. 如果已认证优先使用缓存的编解码类型
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
if (connectionInfo != null
&& StrUtil.isNotBlank(connectionInfo.getCodecType())) {
return connectionInfo.getCodecType();
}
// 2. 未认证时检测消息格式类型
return IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(buffer.getBytes()) ? CODEC_TYPE_BINARY
: CODEC_TYPE_JSON;
}
/**
* 注册连接信息
*
* @param socket 网络连接
* @param device 设备
* @param clientId 客户端 ID
* @param codecType 消息编解码类型
* @param socket 网络连接
* @param device 设备
* @param clientId 客户端 ID
*/
private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
String clientId, String codecType) {
private void registerConnection(NetSocket socket, IotDeviceRespDTO device, String clientId) {
IotTcpConnectionManager.ConnectionInfo connectionInfo = new IotTcpConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setClientId(clientId)
.setCodecType(codecType);
.setClientId(clientId);
// 注册连接
connectionManager.registerConnection(socket, device.getId(), connectionInfo);
}
@@ -351,10 +350,10 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
* @param success 是否成功
* @param message 消息
* @param requestId 请求 ID
* @param codecType 消息编解码类型
*/
private void sendResponse(NetSocket socket, boolean success, String message, String requestId, String codecType) {
private void sendResponse(NetSocket socket, boolean success, String message, String requestId) {
try {
// TODO @AI是不是不用
Object responseData = MapUtil.builder()
.put("success", success)
.put("message", message)
@@ -364,14 +363,17 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData,
code, message);
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
// 序列化 + 帧编码
byte[] serializedData = serializer.serialize(responseMessage);
Buffer frameData = codec.encode(serializedData);
socket.write(frameData);
} catch (Exception e) {
log.error("[sendResponse][发送响应失败requestId: {}]", requestId, e);
}
}
// TODO @AI合并到 handleAuthenticationRequest
/**
* 验证设备认证信息
*
@@ -397,10 +399,9 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
* @param socket 网络连接
* @param requestId 请求 ID
* @param errorMessage 错误消息
* @param codecType 消息编解码类型
*/
private void sendErrorResponse(NetSocket socket, String requestId, String errorMessage, String codecType) {
sendResponse(socket, false, errorMessage, requestId, codecType);
private void sendErrorResponse(NetSocket socket, String requestId, String errorMessage) {
sendResponse(socket, false, errorMessage, requestId);
}
/**
@@ -409,11 +410,10 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
* @param socket 网络连接
* @param requestId 请求 ID
* @param message 消息
* @param codecType 消息编解码类型
*/
@SuppressWarnings("SameParameterValue")
private void sendSuccessResponse(NetSocket socket, String requestId, String message, String codecType) {
sendResponse(socket, true, message, requestId, codecType);
private void sendSuccessResponse(NetSocket socket, String requestId, String message) {
sendResponse(socket, true, message, requestId);
}
/**
@@ -489,17 +489,17 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
* @param socket 网络连接
* @param requestId 请求 ID
* @param registerResp 注册响应
* @param codecType 消息编解码类型
*/
private void sendRegisterSuccessResponse(NetSocket socket, String requestId,
IotDeviceRegisterRespDTO registerResp, String codecType) {
IotDeviceRegisterRespDTO registerResp) {
try {
// 1. 构建响应消息参考 HTTP 返回格式直接返回 IotDeviceRegisterRespDTO
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null);
// 2. 发送响应
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
// 2. 序列化 + 帧编码
byte[] serializedData = serializer.serialize(responseMessage);
Buffer frameData = codec.encode(serializedData);
socket.write(frameData);
} catch (Exception e) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应失败requestId: {}]", requestId, e);
}

View File

@@ -81,6 +81,7 @@ public class IotTcpConnectionManager {
return info != null;
}
// TODO @AI是不是可以去掉因为现在只有认证成功的才会注册连接
/**
* 检查连接是否未认证
*/
@@ -148,10 +149,6 @@ public class IotTcpConnectionManager {
* 客户端 ID
*/
private String clientId;
/**
* 消息编解码类型(认证后确定)
*/
private String codecType;
}

View File

@@ -42,7 +42,7 @@ yudao:
secret: yudaoIotGatewayTokenSecret123456789 # Token 密钥至少32位
expiration: 7d
# 协议实例列表(新版配置方式)
# 协议实例列表
protocols:
# ====================================
# 针对引入的 HTTP 组件的配置
@@ -53,6 +53,22 @@ yudao:
enabled: true
http:
ssl-enabled: false
# ====================================
# 针对引入的 TCP 组件的配置
# ====================================
- id: tcp-json
type: tcp
port: 8091
enabled: true
serialize: json
tcp:
max-connections: 1000
keep-alive-timeout-ms: 30000
ssl-enabled: false
codec:
type: delimiter # 拆包类型length_field / delimiter / fixed_length
delimiter: "\\n" # 分隔符(支持转义:\\n=换行, \\r=回车, \\t=制表符)
max-frame-length: 1048576 # 最大帧长度(字节)
# 协议配置(旧版,保持兼容)
protocol:
@@ -91,17 +107,6 @@ yudao:
trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径
trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码
# ====================================
# 针对引入的 TCP 组件的配置
# ====================================
tcp:
enabled: false
port: 8091
keep-alive-timeout-ms: 30000
max-connections: 1000
ssl-enabled: false
ssl-cert-path: "classpath:certs/client.jks"
ssl-key-path: "classpath:certs/client.jks"
# ====================================
# 针对引入的 UDP 组件的配置
# ====================================
udp:

View File

@@ -10,32 +10,40 @@ import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.binary.IotBinarySerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
/**
* IoT 直连设备 TCP 协议集成测试(手动测试)
*
* <p>测试场景直连设备IotProductDeviceTypeEnum 的 DIRECT 类型)通过 TCP 协议直接连接平台
*
* <p>支持两种编解码格式:
* <p>支持两种序列化格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* <li>{@link IotJsonSerializer} - JSON 格式</li>
* <li>{@link IotBinarySerializer} - 二进制格式</li>
* </ul>
*
* <p>TCP 拆包配置(需与 application.yaml 中的 codec 配置一致):
* <ul>
* <li>type: delimiter - 基于分隔符拆包</li>
* <li>delimiter: \n - 换行符作为分隔符</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务TCP 端口 8091</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>修改 {@link #SERIALIZER} 选择测试的序列化格式Delimiter 模式只支持 JSON</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testAuth()} - 设备认证</li>
@@ -58,10 +66,16 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
private static final int SERVER_PORT = 8091;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
// TODO @AI这里可以通过 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec 么?例如说:使用 vertx vertx tcp client从而更好的复用解码逻辑
/**
* 分隔符(需与 application.yaml 中的 delimiter 配置一致)
*/
private static final String DELIMITER = "\n";
// private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
// ===================== 序列化器选择Delimiter 模式推荐使用 JSON =====================
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// private static final IotMessageSerializer SERIALIZER = new IotBinarySerializer();
// ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) =====================
@@ -82,18 +96,19 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO);
// 1.2 序列化
// TODO @AI是不是把 SERIALIZER 放到 sendAndReceive 里;
byte[] payload = SERIALIZER.serialize(request);
log.info("[testAuth][Serializer: {}, 请求消息: {}, 数据包长度: {} 字节]", SERIALIZER.getType(), request, payload.length);
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
// 2.2 反序列化响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
IotDeviceMessage response = SERIALIZER.deserialize(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
} else {
log.warn("[testAuth][未收到响应]");
@@ -119,17 +134,17 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
registerReqDTO.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testDeviceRegister][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
// 1.2 序列化
byte[] payload = SERIALIZER.serialize(request);
log.info("[testDeviceRegister][Serializer: {}, 请求消息: {}, 数据包长度: {} 字节]", SERIALIZER.getType(), request, payload.length);
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
// 2.2 反序列化响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
IotDeviceMessage response = SERIALIZER.deserialize(responseBytes);
log.info("[testDeviceRegister][响应消息: {}]", response);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
} else {
@@ -161,15 +176,15 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
.put("height", "2")
.build()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
log.info("[testPropertyPost][Serializer: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
// 3.2 反序列化响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
IotDeviceMessage response = SERIALIZER.deserialize(responseBytes);
log.info("[testPropertyPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPost][未收到响应]");
@@ -200,15 +215,15 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
log.info("[testEventPost][Serializer: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
// 3.2 反序列化响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
IotDeviceMessage response = SERIALIZER.deserialize(responseBytes);
log.info("[testEventPost][响应消息: {}]", response);
} else {
log.warn("[testEventPost][未收到响应]");
@@ -231,41 +246,44 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
byte[] payload = SERIALIZER.serialize(request);
byte[] responseBytes = sendAndReceive(socket, payload);
if (responseBytes != null) {
log.info("[authenticate][响应数据长度: {} 字节,首字节: 0x{}, HEX: {}]",
responseBytes.length,
String.format("%02X", responseBytes[0]),
HexUtil.encodeHexStr(responseBytes));
return CODEC.decode(responseBytes);
return SERIALIZER.deserialize(responseBytes);
}
return null;
}
/**
* 发送 TCP 请求并接收响应
* 发送 TCP 请求并接收响应(支持 Delimiter 分隔符协议)
* <p>
* 发送格式:[消息体][分隔符]
* 接收格式:[消息体][分隔符]
*
* @param socket TCP Socket
* @param payload 请求数据
* @return 响应数据
* @param payload 请求数据(消息体,不含分隔符)
* @return 响应数据(消息体,不含分隔符)
*/
private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception {
// 1. 发送请求
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
out.write(payload);
out.flush();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
// 2.1 等待一小段时间让服务器处理
Thread.sleep(100);
// 2.2 接收响应
byte[] buffer = new byte[4096];
// 1. 发送请求(添加分隔符后缀)
out.write(payload);
out.write(DELIMITER.getBytes(StandardCharsets.UTF_8));
out.flush();
log.info("[sendAndReceive][发送数据: {} 字节(不含分隔符)]", payload.length);
// 2. 接收响应(读取到分隔符为止)
try {
int length = in.read(buffer);
if (length > 0) {
byte[] response = new byte[length];
System.arraycopy(buffer, 0, response, 0, length);
String responseLine = in.readLine();
if (responseLine != null) {
byte[] response = responseLine.getBytes(StandardCharsets.UTF_8);
log.info("[sendAndReceive][接收数据: {} 字节]", response.length);
return response;
}
return null;

View File

@@ -1,193 +0,0 @@
# TCP 二进制协议数据包格式说明
## 1. 协议概述
TCP 二进制协议是一种高效的自定义协议格式,采用紧凑的二进制格式传输数据,适用于对带宽和性能要求较高的 IoT 场景。
### 1.1 协议特点
- **高效传输**:完全二进制格式,减少数据传输量
- **版本控制**:内置协议版本号,支持协议升级
- **类型安全**:明确的消息类型标识
- **简洁设计**:去除冗余字段,协议更加精简
- **兼容性**:与现有 `IotDeviceMessage` 接口完全兼容
## 2. 协议格式
### 2.1 整体结构
```
+--------+--------+--------+---------------------------+--------+--------+
| 魔术字 | 版本号 | 消息类型| 消息长度(4字节) |
+--------+--------+--------+---------------------------+--------+--------+
| 消息 ID 长度(2字节) | 消息 ID (变长字符串) |
+--------+--------+--------+--------+--------+--------+--------+--------+
| 方法名长度(2字节) | 方法名(变长字符串) |
+--------+--------+--------+--------+--------+--------+--------+--------+
| 消息体数据(变长) |
+--------+--------+--------+--------+--------+--------+--------+--------+
```
### 2.2 字段详细说明
| 字段 | 长度 | 类型 | 说明 |
|------|------|------|------|
| 魔术字 | 1字节 | byte | `0x7E` - 协议识别标识,用于数据同步 |
| 版本号 | 1字节 | byte | `0x01` - 协议版本号,支持版本控制 |
| 消息类型 | 1字节 | byte | `0x01`=请求, `0x02`=响应 |
| 消息长度 | 4字节 | int | 整个消息的总长度(包含头部) |
| 消息 ID 长度 | 2字节 | short | 消息 ID 字符串的字节长度 |
| 消息 ID | 变长 | string | 消息唯一标识符UTF-8编码 |
| 方法名长度 | 2字节 | short | 方法名字符串的字节长度 |
| 方法名 | 变长 | string | 消息方法名UTF-8编码 |
| 消息体 | 变长 | binary | 根据消息类型的不同数据格式 |
**⚠️ 重要说明**deviceId 不包含在协议中,由服务器根据连接上下文自动设置
### 2.3 协议常量定义
```java
// 协议标识
private static final byte MAGIC_NUMBER = (byte) 0x7E;
private static final byte PROTOCOL_VERSION = (byte) 0x01;
// 消息类型
private static final byte REQUEST = (byte) 0x01; // 请求消息
private static final byte RESPONSE = (byte) 0x02; // 响应消息
// 协议长度
private static final int HEADER_FIXED_LENGTH = 7; // 固定头部长度
private static final int MIN_MESSAGE_LENGTH = 11; // 最小消息长度
```
## 3. 消息类型和格式
### 3.1 请求消息 (REQUEST - 0x01)
请求消息用于设备向服务器发送数据或请求。
#### 3.1.1 消息体格式
```
消息体 = params 数据(JSON格式)
```
#### 3.1.2 示例:设备认证请求
**消息内容:**
- 消息 ID: `auth_1704067200000_123`
- 方法名: `auth`
- 参数: `{"clientId":"device_001","username":"productKey_deviceName","password":"device_password"}`
**二进制数据包结构:**
```
7E // 魔术字 (0x7E)
01 // 版本号 (0x01)
01 // 消息类型 (REQUEST)
00 00 00 89 // 消息长度 (137字节)
00 19 // 消息 ID 长度 (25字节)
61 75 74 68 5F 31 37 30 34 30 // 消息 ID: "auth_1704067200000_123"
36 37 32 30 30 30 30 30 5F 31
32 33
00 04 // 方法名长度 (4字节)
61 75 74 68 // 方法名: "auth"
7B 22 63 6C 69 65 6E 74 49 64 // JSON参数数据
22 3A 22 64 65 76 69 63 65 5F // {"clientId":"device_001",
30 30 31 22 2C 22 75 73 65 72 // "username":"productKey_deviceName",
6E 61 6D 65 22 3A 22 70 72 6F // "password":"device_password"}
64 75 63 74 4B 65 79 5F 64 65
76 69 63 65 4E 61 6D 65 22 2C
22 70 61 73 73 77 6F 72 64 22
3A 22 64 65 76 69 63 65 5F 70
61 73 73 77 6F 72 64 22 7D
```
#### 3.1.3 示例:属性数据上报
**消息内容:**
- 消息 ID: `property_1704067200000_456`
- 方法名: `thing.property.post`
- 参数: `{"temperature":25.5,"humidity":60.2,"pressure":1013.25}`
### 3.2 响应消息 (RESPONSE - 0x02)
响应消息用于服务器向设备回复请求结果。
#### 3.2.1 消息体格式
```
消息体 = 响应码(4字节) + 响应消息长度(2字节) + 响应消息(UTF-8) + 响应数据(JSON)
```
#### 3.2.2 字段说明
| 字段 | 长度 | 类型 | 说明 |
|------|------|------|------|
| 响应码 | 4字节 | int | HTTP状态码风格0=成功,其他=错误 |
| 响应消息长度 | 2字节 | short | 响应消息字符串的字节长度 |
| 响应消息 | 变长 | string | 响应提示信息UTF-8编码 |
| 响应数据 | 变长 | binary | JSON格式的响应数据可选 |
#### 3.2.3 示例:认证成功响应
**消息内容:**
- 消息 ID: `auth_response_1704067200000_123`
- 方法名: `auth`
- 响应码: `0`
- 响应消息: `认证成功`
- 响应数据: `{"success":true,"message":"认证成功"}`
**二进制数据包结构:**
```
7E // 魔术字 (0x7E)
01 // 版本号 (0x01)
02 // 消息类型 (RESPONSE)
00 00 00 A4 // 消息长度 (164字节)
00 22 // 消息 ID 长度 (34字节)
61 75 74 68 5F 72 65 73 70 6F // 消息 ID: "auth_response_1704067200000_123"
6E 73 65 5F 31 37 30 34 30 36
37 32 30 30 30 30 30 5F 31 32
33
00 04 // 方法名长度 (4字节)
61 75 74 68 // 方法名: "auth"
00 00 00 00 // 响应码 (0 = 成功)
00 0C // 响应消息长度 (12字节)
E8 AE A4 E8 AF 81 E6 88 90 E5 // 响应消息: "认证成功" (UTF-8)
8A 9F
7B 22 73 75 63 63 65 73 73 22 // JSON响应数据
3A 74 72 75 65 2C 22 6D 65 73 // {"success":true,"message":"认证成功"}
73 61 67 65 22 3A 22 E8 AE A4
E8 AF 81 E6 88 90 E5 8A 9F 22
7D
```
## 4. 编解码器标识
```java
public static final String TYPE = "TCP_BINARY";
```
## 5. 协议优势
- **数据紧凑**:二进制格式,相比 JSON 减少 30-50% 的数据量
- **解析高效**:直接二进制操作,减少字符串转换开销
- **类型安全**:明确的消息类型和字段定义
- **设计简洁**:去除冗余字段,协议更加精简高效
- **版本控制**:内置版本号支持协议升级
## 6. 与 JSON 协议对比
| 特性 | 二进制协议 | JSON协议 |
|------|-------------|--------|
| 数据大小 | 小节省30-50% | 大 |
| 解析性能 | 高 | 中等 |
| 网络开销 | 低 | 高 |
| 可读性 | 差 | 优秀 |
| 调试难度 | 高 | 低 |
| 扩展性 | 良好 | 优秀 |
**推荐场景**
-**高频数据传输**:传感器数据实时上报
-**带宽受限环境**:移动网络、卫星通信
-**性能要求高**:需要低延迟、高吞吐的场景
-**设备资源有限**:嵌入式设备、低功耗设备
-**开发调试阶段**:调试困难,建议使用 JSON 协议
-**快速原型开发**:开发效率低

View File

@@ -1,191 +0,0 @@
# TCP JSON 格式协议说明
## 1. 协议概述
TCP JSON 格式协议采用纯 JSON 格式进行数据传输,具有以下特点:
- **标准化**:使用标准 JSON 格式,易于解析和处理
- **可读性**:人类可读,便于调试和维护
- **扩展性**:可以轻松添加新字段,向后兼容
- **跨平台**JSON 格式支持所有主流编程语言
- **安全优化**:移除冗余的 deviceId 字段,提高安全性
## 2. 消息格式
### 2.1 基础消息结构
```json
{
"id": "消息唯一标识",
"method": "消息方法",
"params": {
// 请求参数
},
"data": {
// 响应数据
},
"code": 响应码,
"msg": "响应消息",
"timestamp": 时间戳
}
```
**⚠️ 重要说明**
- **不包含 deviceId 字段**:由服务器通过 TCP 连接上下文自动确定设备 ID
- **避免伪造攻击**:防止设备伪造其他设备的 ID 发送消息
### 2.2 字段详细说明
| 字段名 | 类型 | 必填 | 用途 | 说明 |
|--------|------|------|------|------|
| id | String | 是 | 所有消息 | 消息唯一标识 |
| method | String | 是 | 所有消息 | 消息方法,如 `auth``thing.property.post` |
| params | Object | 否 | 请求消息 | 请求参数具体内容根据method而定 |
| data | Object | 否 | 响应消息 | 响应数据,服务器返回的结果数据 |
| code | Integer | 否 | 响应消息 | 响应码0=成功,其他=错误 |
| msg | String | 否 | 响应消息 | 响应提示信息 |
| timestamp | Long | 是 | 所有消息 | 时间戳(毫秒),编码时自动生成 |
### 2.3 消息分类
#### 2.3.1 请求消息(上行)
- **特征**:包含 `params` 字段,不包含 `code``msg` 字段
- **方向**:设备 → 服务器
- **用途**:设备认证、数据上报、状态更新等
#### 2.3.2 响应消息(下行)
- **特征**:包含 `code``msg` 字段,可能包含 `data` 字段
- **方向**:服务器 → 设备
- **用途**:认证结果、指令响应、错误提示等
## 3. 消息示例
### 3.1 设备认证 (auth)
#### 认证请求格式
**消息方向**:设备 → 服务器
```json
{
"id": "auth_1704067200000_123",
"method": "auth",
"params": {
"clientId": "device_001",
"username": "productKey_deviceName",
"password": "设备密码"
},
"timestamp": 1704067200000
}
```
**认证参数说明:**
| 字段名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| clientId | String | 是 | 客户端唯一标识,用于连接管理 |
| username | String | 是 | 设备用户名,格式为 `productKey_deviceName` |
| password | String | 是 | 设备密码,在设备管理平台配置 |
#### 认证响应格式
**消息方向**:服务器 → 设备
**认证成功响应:**
```json
{
"id": "response_auth_1704067200000_123",
"method": "auth",
"data": {
"success": true,
"message": "认证成功"
},
"code": 0,
"msg": "认证成功",
"timestamp": 1704067200001
}
```
**认证失败响应:**
```json
{
"id": "response_auth_1704067200000_123",
"method": "auth",
"data": {
"success": false,
"message": "认证失败:用户名或密码错误"
},
"code": 401,
"msg": "认证失败",
"timestamp": 1704067200001
}
```
### 3.2 属性数据上报 (thing.property.post)
**消息方向**:设备 → 服务器
**示例:温度传感器数据上报**
```json
{
"id": "property_1704067200000_456",
"method": "thing.property.post",
"params": {
"temperature": 25.5,
"humidity": 60.2,
"pressure": 1013.25,
"battery": 85,
"signal_strength": -65
},
"timestamp": 1704067200000
}
```
### 3.3 设备状态更新 (thing.state.update)
**消息方向**:设备 → 服务器
**示例:心跳请求**
```json
{
"id": "heartbeat_1704067200000_321",
"method": "thing.state.update",
"params": {
"state": "online",
"uptime": 86400,
"memory_usage": 65.2,
"cpu_usage": 12.8
},
"timestamp": 1704067200000
}
```
## 4. 编解码器标识
```java
public static final String TYPE = "TCP_JSON";
```
## 5. 协议优势
- **开发效率高**JSON 格式,开发和调试简单
- **跨语言支持**:所有主流语言都支持 JSON
- **可读性优秀**:可以直接查看消息内容
- **扩展性强**:可以轻松添加新字段
- **安全性高**:移除 deviceId 字段,防止伪造攻击
## 6. 与二进制协议对比
| 特性 | JSON协议 | 二进制协议 |
|------|----------|------------|
| 开发难度 | 低 | 高 |
| 调试难度 | 低 | 高 |
| 可读性 | 优秀 | 差 |
| 数据大小 | 中等 | 小节省30-50% |
| 解析性能 | 中等 | 高 |
| 学习成本 | 低 | 高 |
**推荐场景**
-**开发调试阶段**:调试友好,开发效率高
-**快速原型开发**:实现简单,快速迭代
-**多语言集成**:广泛的语言支持
-**高频数据传输**:建议使用二进制协议
- ❌ **带宽受限环境**:建议使用二进制协议