From d9a08094d9f9640f94f7b2e3cb94961019b8efef Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 1 Feb 2026 18:43:50 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88iot=EF=BC=89=EF=BC=9A=E3=80=90?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=94=B9=E9=80=A0=E3=80=91websocket=20?= =?UTF-8?q?=E5=88=9D=E6=AD=A5=E6=94=B9=E9=80=A0=EF=BC=8850%=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gateway/protocol/IotProtocolManager.java | 6 +- .../gateway/protocol/tcp/IotTcpProtocol.java | 10 +- .../length/IotTcpLengthFieldFrameCodec.java | 6 + .../upstream/IotTcpUpstreamHandler.java | 28 +-- .../gateway/protocol/udp/IotUdpProtocol.java | 9 +- .../websocket/IotWebSocketConfig.java | 54 +++++ .../websocket/IotWebSocketProtocol.java | 218 ++++++++++++++++++ .../IotWebSocketUpstreamProtocol.java | 135 ----------- .../IotWebSocketDownstreamHandler.java | 21 +- .../IotWebSocketDownstreamSubscriber.java | 6 +- .../IotWebSocketUpstreamHandler.java | 142 ++++++++---- .../IotWebSocketConnectionManager.java | 42 +++- .../src/main/resources/application.yaml | 3 +- ...irectDeviceTcpProtocolIntegrationTest.java | 2 +- ...tewayDeviceTcpProtocolIntegrationTest.java | 2 +- ...aySubDeviceTcpProtocolIntegrationTest.java | 2 +- ...eviceWebSocketProtocolIntegrationTest.java | 42 ++-- ...eviceWebSocketProtocolIntegrationTest.java | 60 ++--- ...eviceWebSocketProtocolIntegrationTest.java | 36 +-- 19 files changed, 522 insertions(+), 302 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketProtocol.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/{router => handler/downstream}/IotWebSocketDownstreamHandler.java (70%) rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/{ => handler/downstream}/IotWebSocketDownstreamSubscriber.java (78%) rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/{router => handler/upstream}/IotWebSocketUpstreamHandler.java (80%) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java index ed60897e55..45b6789041 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java @@ -8,7 +8,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.context.SmartLifecycle; @@ -158,8 +158,8 @@ public class IotProtocolManager implements SmartLifecycle { * @param config 协议实例配置 * @return WebSocket 协议实例 */ - private IotWebSocketUpstreamProtocol createWebSocketProtocol(IotGatewayProperties.ProtocolInstanceProperties config) { - return new IotWebSocketUpstreamProtocol(config); + private IotWebSocketProtocol createWebSocketProtocol(IotGatewayProperties.ProtocolInstanceProperties config) { + return new IotWebSocketProtocol(config); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java index 937745c584..3a31f505b5 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java @@ -57,6 +57,10 @@ public class IotTcpProtocol implements IotProtocol { * TCP 服务器 */ private NetServer tcpServer; + /** + * TCP 连接管理器 + */ + private final IotTcpConnectionManager connectionManager; /** * 下行消息订阅者 @@ -67,17 +71,11 @@ public class IotTcpProtocol implements IotProtocol { * 消息序列化器 */ private final IotMessageSerializer serializer; - /** * TCP 帧编解码器 */ private final IotTcpFrameCodec frameCodec; - /** - * TCP 连接管理器 - */ - private final IotTcpConnectionManager connectionManager; - public IotTcpProtocol(ProtocolInstanceProperties properties) { IotTcpConfig tcpConfig = properties.getTcp(); Assert.notNull(tcpConfig, "TCP 协议配置(tcp)不能为空"); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpLengthFieldFrameCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpLengthFieldFrameCodec.java index 0fbe42d7c9..4200b6b1fb 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpLengthFieldFrameCodec.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec/length/IotTcpLengthFieldFrameCodec.java @@ -29,6 +29,11 @@ import java.util.concurrent.atomic.AtomicReference; @Slf4j public class IotTcpLengthFieldFrameCodec implements IotTcpFrameCodec { + /** + * 最大帧长度(64KB),防止 DoS 攻击 + */ + private static final int MAX_FRAME_LENGTH = 65536; + private final int lengthFieldOffset; private final int lengthFieldLength; private final int lengthAdjustment; @@ -60,6 +65,7 @@ public class IotTcpLengthFieldFrameCodec implements IotTcpFrameCodec { public RecordParser createDecodeParser(Handler handler) { // 创建状态机:先读取头部,再读取消息体 RecordParser parser = RecordParser.newFixed(headerLength); + parser.maxRecordSize(MAX_FRAME_LENGTH); // 设置最大记录大小,防止 DoS 攻击 final AtomicReference bodyLength = new AtomicReference<>(null); // 消息体长度,null 表示读取头部阶段 final AtomicReference headerBuffer = new AtomicReference<>(null); // 头部消息 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java index 45cc3e3ffa..9542b504c2 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java @@ -77,7 +77,9 @@ public class IotTcpUpstreamHandler implements Handler { } @Override + @SuppressWarnings("DuplicatedCode") public void handle(NetSocket socket) { + // TODO @AI:clientId 去掉;其它模块也看看,怎么去掉下看看; String clientId = IdUtil.simpleUUID(); log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); @@ -92,9 +94,10 @@ public class IotTcpUpstreamHandler implements Handler { }); // 2.1 设置消息处理器 + // TODO @AI:去掉 clientId; Handler messageHandler = buffer -> { try { - processMessage(clientId, buffer, socket); + processMessage(buffer, socket); } catch (Exception e) { log.error("[handle][消息处理失败,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress(), e); @@ -110,11 +113,10 @@ public class IotTcpUpstreamHandler implements Handler { /** * 处理消息 * - * @param clientId 客户端 ID * @param buffer 消息 * @param socket 网络连接 */ - private void processMessage(String clientId, Buffer buffer, NetSocket socket) { + private void processMessage(Buffer buffer, NetSocket socket) { IotDeviceMessage message = null; try { // 1. 反序列化消息 @@ -127,29 +129,29 @@ public class IotTcpUpstreamHandler implements Handler { // 2. 根据消息类型路由处理 if (AUTH_METHOD.equals(message.getMethod())) { // 认证请求 - handleAuthenticationRequest(clientId, message, socket); + handleAuthenticationRequest(message, socket); } else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) { // 设备动态注册请求 - handleRegisterRequest(clientId, message, socket); + handleRegisterRequest(message, socket); } else { // 业务消息 - handleBusinessRequest(clientId, message, socket); + handleBusinessRequest(null, message, socket); } } catch (ServiceException e) { // 业务异常,返回对应的错误码和错误信息 - log.warn("[processMessage][业务异常,客户端 ID: {},错误: {}]", clientId, e.getMessage()); + log.warn("[processMessage][业务异常,客户端 ID: {},错误: {}]", null, e.getMessage()); String requestId = message != null ? message.getRequestId() : null; String method = message != null ? message.getMethod() : null; sendErrorResponse(socket, requestId, method, e.getCode(), e.getMessage()); } catch (IllegalArgumentException e) { // 参数校验失败,返回 400 - log.warn("[processMessage][参数校验失败,客户端 ID: {},错误: {}]", clientId, e.getMessage()); + log.warn("[processMessage][参数校验失败,客户端 ID: {},错误: {}]", null, e.getMessage()); String requestId = message != null ? message.getRequestId() : null; String method = message != null ? message.getMethod() : null; sendErrorResponse(socket, requestId, method, BAD_REQUEST.getCode(), e.getMessage()); } catch (Exception e) { // 其他异常,返回 500,并重新抛出让上层关闭连接 - log.error("[processMessage][处理消息失败,客户端 ID: {}]", clientId, e); + log.error("[processMessage][处理消息失败,客户端 ID: {}]", null, e); String requestId = message != null ? message.getRequestId() : null; String method = message != null ? message.getMethod() : null; sendErrorResponse(socket, requestId, method, @@ -161,12 +163,11 @@ public class IotTcpUpstreamHandler implements Handler { /** * 处理认证请求 * - * @param clientId 客户端 ID * @param message 消息信息 * @param socket 网络连接 */ @SuppressWarnings("DuplicatedCode") - private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, NetSocket socket) { + private void handleAuthenticationRequest(IotDeviceMessage message, NetSocket socket) { // 1. 解析认证参数 IotDeviceAuthReqDTO authParams = JsonUtils.convertObject(message.getParams(), IotDeviceAuthReqDTO.class); Assert.notNull(authParams, "认证参数不能为空"); @@ -198,13 +199,12 @@ public class IotTcpUpstreamHandler implements Handler { /** * 处理设备动态注册请求(一型一密,不需要认证) * - * @param clientId 客户端 ID * @param message 消息信息 * @param socket 网络连接 * @see 阿里云 - 一型一密 */ @SuppressWarnings("DuplicatedCode") - private void handleRegisterRequest(String clientId, IotDeviceMessage message, NetSocket socket) { + private void handleRegisterRequest(IotDeviceMessage message, NetSocket socket) { // 1. 解析注册参数 IotDeviceRegisterReqDTO params = JsonUtils.convertObject(message.getParams(), IotDeviceRegisterReqDTO.class); Assert.notNull(params, "注册参数不能为空"); @@ -218,7 +218,7 @@ public class IotTcpUpstreamHandler implements Handler { // 3. 发送成功响应 sendSuccessResponse(socket, message.getRequestId(), IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), result.getData()); - log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]", clientId, params.getDeviceName()); + log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]", null, params.getDeviceName()); } /** diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocol.java index 96f523dfd8..647a713b55 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocol.java @@ -55,6 +55,10 @@ public class IotUdpProtocol implements IotProtocol { */ @Getter private DatagramSocket udpSocket; + /** + * UDP 会话管理器 + */ + private final IotUdpSessionManager sessionManager; /** * 下行消息订阅者 @@ -66,11 +70,6 @@ public class IotUdpProtocol implements IotProtocol { */ private final IotMessageSerializer serializer; - /** - * UDP 会话管理器 - */ - private final IotUdpSessionManager sessionManager; - public IotUdpProtocol(ProtocolInstanceProperties properties) { IotUdpConfig udpConfig = properties.getUdp(); Assert.notNull(udpConfig, "UDP 协议配置(udp)不能为空"); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketConfig.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketConfig.java new file mode 100644 index 0000000000..e64e11dc51 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketConfig.java @@ -0,0 +1,54 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; + +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * IoT WebSocket 协议配置 + * + * @author 芋道源码 + */ +@Data +public class IotWebSocketConfig { + + /** + * WebSocket 路径(默认:/ws) + */ + @NotEmpty(message = "WebSocket 路径不能为空") + private String path = "/ws"; + + /** + * 最大消息大小(字节,默认 64KB) + */ + @NotNull(message = "最大消息大小不能为空") + private Integer maxMessageSize = 65536; + /** + * 最大帧大小(字节,默认 64KB) + */ + @NotNull(message = "最大帧大小不能为空") + private Integer maxFrameSize = 65536; + + /** + * 空闲超时时间(秒,默认 60) + */ + @NotNull(message = "空闲超时时间不能为空") + private Integer idleTimeoutSeconds = 60; + + /** + * 是否启用 SSL(wss://) + */ + @NotNull(message = "是否启用 SSL 不能为空") + private Boolean sslEnabled = false; + + /** + * SSL 证书路径 + */ + private String sslCertPath; + + /** + * SSL 私钥路径 + */ + private String sslKeyPath; + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketProtocol.java new file mode 100644 index 0000000000..112c5acf5c --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketProtocol.java @@ -0,0 +1,218 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; + +import cn.hutool.core.util.ObjUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.upstream.IotWebSocketUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.net.PemKeyCertOptions; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.Assert; + +// TODO @AI:注释调整下,参考 IotTcpProtocol +/** + * IoT 网关 WebSocket 协议:接收设备上行消息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotWebSocketProtocol implements IotProtocol { + + /** + * 协议配置 + */ + private final ProtocolInstanceProperties properties; + /** + * 服务器 ID(用于消息追踪,全局唯一) + */ + @Getter + private final String serverId; + + /** + * 运行状态 + */ + @Getter + private volatile boolean running = false; + + /** + * Vert.x 实例 + */ + private Vertx vertx; + /** + * WebSocket 服务器 + */ + private HttpServer httpServer; + /** + * WebSocket 连接管理器 + */ + private final IotWebSocketConnectionManager connectionManager; + + // TODO @AI:可以不用这个变量,从 properties 里面获取 + /** + * WebSocket 配置 + */ + private final IotWebSocketConfig wsConfig; + + /** + * 下行消息订阅者 + */ + private final IotWebSocketDownstreamSubscriber downstreamSubscriber; + + /** + * 消息序列化器 + */ + private final IotMessageSerializer serializer; + + private final IotDeviceService deviceService; + private final IotDeviceMessageService messageService; + + public IotWebSocketProtocol(ProtocolInstanceProperties properties) { + Assert.notNull(properties, "协议实例配置不能为空"); + Assert.notNull(properties.getWebsocket(), "WebSocket 协议配置(websocket)不能为空"); + this.properties = properties; + this.wsConfig = properties.getWebsocket(); + this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); + + // 初始化序列化器 + IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(properties.getSerialize()); + Assert.notNull(serializeType, "不支持的序列化类型:" + properties.getSerialize()); + IotMessageSerializerManager serializerManager = SpringUtil.getBean(IotMessageSerializerManager.class); + this.serializer = serializerManager.get(serializeType); + + // 初始化基础依赖 + this.deviceService = SpringUtil.getBean(IotDeviceService.class); + this.messageService = SpringUtil.getBean(IotDeviceMessageService.class); + this.connectionManager = new IotWebSocketConnectionManager(); + + // 初始化下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + IotWebSocketDownstreamHandler downstreamHandler = new IotWebSocketDownstreamHandler(serializer, connectionManager); + this.downstreamSubscriber = new IotWebSocketDownstreamSubscriber(this, downstreamHandler, messageBus); + } + + @Override + public String getId() { + return properties.getId(); + } + + @Override + public IotProtocolTypeEnum getType() { + return IotProtocolTypeEnum.WEBSOCKET; + } + + @Override + @SuppressWarnings("deprecation") + public void start() { + if (running) { + log.warn("[start][IoT WebSocket 协议 {} 已经在运行中]", getId()); + return; + } + + // 1.1 创建 Vertx 实例 + this.vertx = Vertx.vertx(); + + // 1.2 创建服务器选项 + HttpServerOptions options = new HttpServerOptions() + .setPort(properties.getPort()) + .setIdleTimeout(wsConfig.getIdleTimeoutSeconds()) + .setMaxWebSocketFrameSize(wsConfig.getMaxFrameSize()) + .setMaxWebSocketMessageSize(wsConfig.getMaxMessageSize()); + if (Boolean.TRUE.equals(wsConfig.getSslEnabled())) { + PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions() + .setKeyPath(wsConfig.getSslKeyPath()) + .setCertPath(wsConfig.getSslCertPath()); + options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); + } + + // 1.3 创建服务器并设置 WebSocket 处理器 + httpServer = vertx.createHttpServer(options); + httpServer.webSocketHandler(socket -> { + // 验证路径 + if (ObjUtil.notEqual(wsConfig.getPath(), socket.path())) { + log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]", + socket.path(), wsConfig.getPath()); + socket.reject(); + return; + } + // 创建上行处理器 + IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler(this, + messageService, deviceService, connectionManager, serializer); + handler.handle(socket); + }); + + // 1.4 启动服务器 + try { + httpServer.listen().result(); + running = true; + log.info("[start][IoT WebSocket 协议 {} 启动成功,端口:{},路径:{},serverId:{}]", + getId(), properties.getPort(), wsConfig.getPath(), serverId); + + // 2. 启动下行消息订阅者 + downstreamSubscriber.start(); + } catch (Exception e) { + log.error("[start][IoT WebSocket 协议 {} 启动失败]", getId(), e); + if (httpServer != null) { + httpServer.close(); + httpServer = null; + } + if (vertx != null) { + vertx.close(); + vertx = null; + } + throw e; + } + } + + @Override + public void stop() { + if (!running) { + return; + } + // 1. 停止下行消息订阅者 + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT WebSocket 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT WebSocket 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + + // 2.1 关闭 WebSocket 服务器 + if (httpServer != null) { + try { + httpServer.close().result(); + log.info("[stop][IoT WebSocket 协议 {} 服务器已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT WebSocket 协议 {} 服务器停止失败]", getId(), e); + } + httpServer = null; + } + // 2.2 关闭 Vertx 实例 + if (vertx != null) { + try { + vertx.close().result(); + log.info("[stop][IoT WebSocket 协议 {} Vertx 已关闭]", getId()); + } catch (Exception e) { + log.error("[stop][IoT WebSocket 协议 {} Vertx 关闭失败]", getId(), e); + } + vertx = null; + } + running = false; + log.info("[stop][IoT WebSocket 协议 {} 已停止]", getId()); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java deleted file mode 100644 index 75465954da..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java +++ /dev/null @@ -1,135 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; - -import cn.hutool.core.util.ObjUtil; -import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketUpstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.core.Vertx; -import io.vertx.core.http.HttpServer; -import io.vertx.core.http.HttpServerOptions; -import io.vertx.core.net.PemKeyCertOptions; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 WebSocket 协议:接收设备上行消息 - * - * @author 芋道源码 - */ -@Slf4j -public class IotWebSocketUpstreamProtocol implements IotProtocol { - - private static final String ID = "websocket"; - - private final IotGatewayProperties.WebSocketProperties wsProperties; - - private final IotDeviceService deviceService; - - private final IotDeviceMessageService messageService; - - private final IotWebSocketConnectionManager connectionManager; - - private final Vertx vertx; - - @Getter - private final String serverId; - - private HttpServer httpServer; - - private volatile boolean running = false; - - public IotWebSocketUpstreamProtocol(IotGatewayProperties.WebSocketProperties wsProperties, - IotDeviceService deviceService, - IotDeviceMessageService messageService, - IotWebSocketConnectionManager connectionManager, - Vertx vertx) { - this.wsProperties = wsProperties; - this.deviceService = deviceService; - this.messageService = messageService; - this.connectionManager = connectionManager; - this.vertx = vertx; - this.serverId = IotDeviceMessageUtils.generateServerId(wsProperties.getPort()); - } - - @Override - public String getId() { - return ID; - } - - @Override - public IotProtocolTypeEnum getType() { - return IotProtocolTypeEnum.WEBSOCKET; - } - - @Override - @PostConstruct - @SuppressWarnings("deprecation") - public void start() { - // 1.1 创建服务器选项 - HttpServerOptions options = new HttpServerOptions() - .setPort(wsProperties.getPort()) - .setIdleTimeout(wsProperties.getIdleTimeoutSeconds()) - .setMaxWebSocketFrameSize(wsProperties.getMaxFrameSize()) - .setMaxWebSocketMessageSize(wsProperties.getMaxMessageSize()); - // 1.2 配置 SSL(如果启用) - if (Boolean.TRUE.equals(wsProperties.getSslEnabled())) { - PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions() - .setKeyPath(wsProperties.getSslKeyPath()) - .setCertPath(wsProperties.getSslCertPath()); - options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); - } - - // 2. 创建服务器并设置 WebSocket 处理器 - httpServer = vertx.createHttpServer(options); - httpServer.webSocketHandler(socket -> { - // 验证路径 - if (ObjUtil.notEqual(wsProperties.getPath(), socket.path())) { - log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]", - socket.path(), wsProperties.getPath()); - socket.reject(); - return; - } - // 创建上行处理器 - IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler(this, - messageService, deviceService, connectionManager); - handler.handle(socket); - }); - - // 3. 启动服务器 - try { - httpServer.listen().result(); - running = true; - log.info("[start][IoT 网关 WebSocket 协议启动成功,端口:{},路径:{}]", wsProperties.getPort(), wsProperties.getPath()); - } catch (Exception e) { - log.error("[start][IoT 网关 WebSocket 协议启动失败]", e); - throw e; - } - } - - @Override - @PreDestroy - public void stop() { - if (httpServer != null) { - try { - httpServer.close().result(); - running = false; - log.info("[stop][IoT 网关 WebSocket 协议已停止]"); - } catch (Exception e) { - log.error("[stop][IoT 网关 WebSocket 协议停止失败]", e); - } - } - } - - @Override - public boolean isRunning() { - return running; - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/downstream/IotWebSocketDownstreamHandler.java similarity index 70% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/downstream/IotWebSocketDownstreamHandler.java index 05e3c8c91f..096435eacb 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/downstream/IotWebSocketDownstreamHandler.java @@ -1,9 +1,10 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream; -import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -16,7 +17,7 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class IotWebSocketDownstreamHandler { - private final IotDeviceMessageService deviceMessageService; + private final IotMessageSerializer serializer; private final IotWebSocketConnectionManager connectionManager; @@ -37,9 +38,15 @@ public class IotWebSocketDownstreamHandler { } // 2. 编码消息并发送到设备 - byte[] bytes = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getCodecType()); - String jsonMessage = StrUtil.utf8Str(bytes); - boolean success = connectionManager.sendToDevice(message.getDeviceId(), jsonMessage); + byte[] bytes = serializer.serialize(message); + // TODO @AI:参考别的模块的做法,直接发?类似 tcp 这种; + boolean success; + if (serializer.getType() == IotSerializeTypeEnum.BINARY) { + success = connectionManager.sendToDevice(message.getDeviceId(), bytes); + } else { + String jsonMessage = StrUtil.utf8Str(bytes); + success = connectionManager.sendToDevice(message.getDeviceId(), jsonMessage); + } if (success) { log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]", message.getDeviceId(), message.getMethod(), message.getId(), bytes.length); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/downstream/IotWebSocketDownstreamSubscriber.java similarity index 78% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketDownstreamSubscriber.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/downstream/IotWebSocketDownstreamSubscriber.java index 4b11bb02be..efe5f437e8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/downstream/IotWebSocketDownstreamSubscriber.java @@ -1,9 +1,9 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol; import lombok.extern.slf4j.Slf4j; /** @@ -16,7 +16,7 @@ public class IotWebSocketDownstreamSubscriber extends IotProtocolDownstreamSubsc private final IotWebSocketDownstreamHandler downstreamHandler; - public IotWebSocketDownstreamSubscriber(IotWebSocketUpstreamProtocol protocol, + public IotWebSocketDownstreamSubscriber(IotWebSocketProtocol protocol, IotWebSocketDownstreamHandler downstreamHandler, IotMessageBus messageBus) { super(protocol, messageBus); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/upstream/IotWebSocketUpstreamHandler.java similarity index 80% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/upstream/IotWebSocketUpstreamHandler.java index 630246afa3..2b6b5e4fbe 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/upstream/IotWebSocketUpstreamHandler.java @@ -1,6 +1,7 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.upstream; import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; @@ -11,17 +12,19 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.ServerWebSocket; import lombok.extern.slf4j.Slf4j; @@ -36,82 +39,120 @@ import java.util.Map; @Slf4j public class IotWebSocketUpstreamHandler implements Handler { - /** - * 默认消息编解码类型 - */ - private static final String CODEC_TYPE = IotAlinkDeviceMessageCodec.TYPE; - private static final String AUTH_METHOD = "auth"; - private final IotDeviceMessageService deviceMessageService; - - private final IotDeviceService deviceService; - - private final IotWebSocketConnectionManager connectionManager; - - private final IotDeviceCommonApi deviceApi; - private final String serverId; - public IotWebSocketUpstreamHandler(IotWebSocketUpstreamProtocol protocol, + /** + * 消息序列化器(处理业务消息序列化/反序列化) + */ + private final IotMessageSerializer serializer; + /** + * 连接管理器 + */ + private final IotWebSocketConnectionManager connectionManager; + + // TODO @AI:是不是可以去掉? + private final boolean binaryPayload; + + private final IotDeviceMessageService deviceMessageService; + private final IotDeviceService deviceService; + private final IotDeviceCommonApi deviceApi; + + // TODO @AI:参数、顺序参考 IotTcpUpstreamHandler + public IotWebSocketUpstreamHandler(IotWebSocketProtocol protocol, IotDeviceMessageService deviceMessageService, IotDeviceService deviceService, - IotWebSocketConnectionManager connectionManager) { + IotWebSocketConnectionManager connectionManager, + IotMessageSerializer serializer) { this.deviceMessageService = deviceMessageService; this.deviceService = deviceService; this.connectionManager = connectionManager; + this.serializer = serializer; + this.binaryPayload = serializer.getType() == IotSerializeTypeEnum.BINARY; this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); this.serverId = protocol.getServerId(); + // TODO @AI:通过 springutil;deviceService、deviceMessageService; } @Override + @SuppressWarnings("DuplicatedCode") public void handle(ServerWebSocket socket) { String clientId = IdUtil.simpleUUID(); log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); // 1. 设置异常和关闭处理器 + // TODO @AI:clientId 去掉; socket.exceptionHandler(ex -> { log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); - cleanupConnection(socket); + socket.close(); }); socket.closeHandler(v -> { log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); cleanupConnection(socket); }); - // 2. 设置文本消息处理器 - socket.textMessageHandler(message -> { - try { - processMessage(clientId, message, socket); - } catch (Exception e) { - log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]", - clientId, socket.remoteAddress(), e.getMessage()); + // 2. 设置消息处理器(JSON 使用文本,BINARY 使用二进制) + // TODO @AI:是不是 text、binary 保持统一?用一个 mesagehandler? + if (binaryPayload) { + socket.binaryMessageHandler(buffer -> { + try { + processMessage(clientId, buffer.getBytes(), socket); + } catch (Exception e) { + log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]", + clientId, socket.remoteAddress(), e.getMessage()); + cleanupConnection(socket); + socket.close(); + } + }); + socket.textMessageHandler(message -> { + log.warn("[handle][收到文本帧但当前序列化为 BINARY,断开连接,客户端 ID: {},地址: {}]", + clientId, socket.remoteAddress()); cleanupConnection(socket); socket.close(); - } - }); + }); + } else { + socket.textMessageHandler(message -> { + try { + processMessage(clientId, StrUtil.utf8Bytes(message), socket); + } catch (Exception e) { + log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]", + clientId, socket.remoteAddress(), e.getMessage()); + // TODO @AI:是不是不用 cleanupConnection?closehandler 本身就吹了了; + cleanupConnection(socket); + socket.close(); + } + }); + socket.binaryMessageHandler(buffer -> { + try { + processMessage(clientId, buffer.getBytes(), socket); + } catch (Exception e) { + log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]", + clientId, socket.remoteAddress(), e.getMessage()); + cleanupConnection(socket); + socket.close(); + } + }); + } } /** * 处理消息 * * @param clientId 客户端 ID - * @param message 消息(JSON 字符串) + * @param payload 消息负载 * @param socket WebSocket 连接 * @throws Exception 消息解码失败时抛出异常 */ - private void processMessage(String clientId, String message, ServerWebSocket socket) throws Exception { + private void processMessage(String clientId, byte[] payload, ServerWebSocket socket) throws Exception { // 1.1 基础检查 - if (StrUtil.isBlank(message)) { + if (ArrayUtil.isEmpty(payload)) { return; } - // 1.2 解码消息(已认证连接使用其 codecType,未认证连接使用默认 CODEC_TYPE) - IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket); - String codecType = connectionInfo != null ? connectionInfo.getCodecType() : CODEC_TYPE; + // 1.2 解码消息 IotDeviceMessage deviceMessage; try { - deviceMessage = deviceMessageService.decodeDeviceMessage( - StrUtil.utf8Bytes(message), codecType); + deviceMessage = serializer.deserialize(payload); if (deviceMessage == null) { throw new Exception("解码后消息为空"); } @@ -132,6 +173,7 @@ public class IotWebSocketUpstreamHandler implements Handler { handleBusinessRequest(clientId, deviceMessage, socket); } } catch (Exception e) { + // TODO @AI:参考 IotTcpUpstreamHandler 处理;业务、参数、其它 log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]", clientId, deviceMessage.getMethod(), e); // 发送错误响应,避免客户端一直等待 @@ -153,6 +195,7 @@ public class IotWebSocketUpstreamHandler implements Handler { private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) { try { // 1.1 解析认证参数 + // TODO @AI:参数解析;参考 tcp 对应的 handleAuthenticationRequest IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams()); if (authParams == null) { log.warn("[handleAuthenticationRequest][认证参数解析失败,客户端 ID: {}]", clientId); @@ -204,6 +247,7 @@ public class IotWebSocketUpstreamHandler implements Handler { * @see 阿里云 - 一型一密 */ private void handleRegisterRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) { + // TODO @AI:参数解析;参考 tcp 对应的 handleRegisterRequest try { // 1. 解析注册参数 IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams()); @@ -232,6 +276,7 @@ public class IotWebSocketUpstreamHandler implements Handler { } } + // TODO @AI:参考对应的 tcp 的 handleBusinessRequest /** * 处理业务请求 * @@ -270,10 +315,7 @@ public class IotWebSocketUpstreamHandler implements Handler { IotWebSocketConnectionManager.ConnectionInfo connectionInfo = new IotWebSocketConnectionManager.ConnectionInfo() .setDeviceId(device.getId()) .setProductKey(device.getProductKey()) - .setDeviceName(device.getDeviceName()) - .setClientId(clientId) - .setCodecType(CODEC_TYPE); - // 注册连接 + .setDeviceName(device.getDeviceName()); connectionManager.registerConnection(socket, device.getId(), connectionInfo); } @@ -314,6 +356,8 @@ public class IotWebSocketUpstreamHandler implements Handler { } } + // ===================== 发送响应消息 ===================== + /** * 发送响应消息 * @@ -332,8 +376,7 @@ public class IotWebSocketUpstreamHandler implements Handler { int code = success ? 0 : 401; IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, code, message); - byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE); - socket.writeTextMessage(StrUtil.utf8Str(encodedData)); + writeResponse(socket, responseMessage); } catch (Exception e) { log.error("[sendResponse][发送响应失败,requestId: {}]", requestId, e); } @@ -461,11 +504,22 @@ public class IotWebSocketUpstreamHandler implements Handler { IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null); // 2. 发送响应 - byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE); - socket.writeTextMessage(StrUtil.utf8Str(encodedData)); + writeResponse(socket, responseMessage); } catch (Exception e) { log.error("[sendRegisterSuccessResponse][发送注册成功响应失败,requestId: {}]", requestId, e); } } + /** + * 写入响应消息 + */ + private void writeResponse(ServerWebSocket socket, IotDeviceMessage responseMessage) { + byte[] payload = serializer.serialize(responseMessage); + if (binaryPayload) { + socket.writeBinaryMessage(Buffer.buffer(payload)); + } else { + socket.writeTextMessage(StrUtil.utf8Str(payload)); + } + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java index 128b360086..1477406450 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java @@ -1,10 +1,10 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.ServerWebSocket; import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentHashMap; * @author 芋道源码 */ @Slf4j -@Component public class IotWebSocketConnectionManager { /** @@ -69,7 +68,8 @@ public class IotWebSocketConnectionManager { return; } Long deviceId = connectionInfo.getDeviceId(); - deviceSocketMap.remove(deviceId); + // 仅当 deviceSocketMap 中的 socket 是当前 socket 时才移除,避免误删新连接 + deviceSocketMap.remove(deviceId, socket); log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId, socket.remoteAddress()); } @@ -115,6 +115,33 @@ public class IotWebSocketConnectionManager { } } + // TODO @AI:没必要这里加一个; + /** + * 发送消息到设备(二进制消息) + * + * @param deviceId 设备 ID + * @param payload 二进制消息 + * @return 是否发送成功 + */ + public boolean sendToDevice(Long deviceId, byte[] payload) { + ServerWebSocket socket = deviceSocketMap.get(deviceId); + if (socket == null) { + log.warn("[sendToDevice][设备未连接,设备 ID: {}]", deviceId); + return false; + } + + try { + socket.writeBinaryMessage(Buffer.buffer(payload)); + log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, payload.length); + return true; + } catch (Exception e) { + log.error("[sendToDevice][发送消息失败,设备 ID: {}]", deviceId, e); + // 发送失败时清理连接 + unregisterConnection(socket); + return false; + } + } + /** * 连接信息(包含认证信息) */ @@ -135,15 +162,6 @@ public class IotWebSocketConnectionManager { */ private String deviceName; - /** - * 客户端 ID - */ - private String clientId; - /** - * 消息编解码类型(认证后确定) - */ - private String codecType; - } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index ddc353f399..add4dce6a8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -94,7 +94,7 @@ yudao: - id: websocket-json type: websocket port: 8094 - enabled: false + enabled: true serialize: json websocket: path: /ws @@ -159,6 +159,7 @@ yudao: max-message-size: 8192 connect-timeout-seconds: 60 ssl-enabled: false + --- #################### 日志相关配置 #################### # 基础日志配置 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java index 4f3fe2daf2..192dce359c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java @@ -270,7 +270,7 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { }); socket.handler(parser); - // 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑) + // 2.1 序列化 + 帧编码 byte[] serializedData = SERIALIZER.serialize(request); Buffer frameData = FRAME_CODEC.encode(serializedData); log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length()); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java index 2efbd4d677..5bb113b919 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java @@ -373,7 +373,7 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { }); socket.handler(parser); - // 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑) + // 2.1 序列化 + 帧编码 byte[] serializedData = SERIALIZER.serialize(request); Buffer frameData = FRAME_CODEC.encode(serializedData); log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length()); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java index 1980d0a08c..22b654a869 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java @@ -249,7 +249,7 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { }); socket.handler(parser); - // 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑) + // 2.1 序列化 + 帧编码 byte[] serializedData = SERIALIZER.serialize(request); Buffer frameData = FRAME_CODEC.encode(serializedData); log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length()); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java index ca79c4220c..15eed61e2a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java @@ -10,8 +10,8 @@ import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer; import io.vertx.core.Vertx; import io.vertx.core.http.WebSocket; import io.vertx.core.http.WebSocketClient; @@ -61,7 +61,7 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { // ===================== 编解码器选择 ===================== - private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec(); + private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer(); // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) ===================== @@ -95,10 +95,10 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - // 1.2 编码 - byte[] payload = CODEC.encode(request); + // 1.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testAuth][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 2.1 创建 WebSocket 连接(同步) WebSocket ws = createWebSocketConnection(); @@ -109,7 +109,7 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { // 3. 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testAuth][响应消息: {}]", responseMessage); } else { log.warn("[testAuth][未收到响应]"); @@ -137,10 +137,10 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { registerReqDTO.setProductSecret("test-product-secret"); IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null); - // 1.2 编码 - byte[] payload = CODEC.encode(request); + // 1.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testDeviceRegister][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 2.1 创建 WebSocket 连接(同步) WebSocket ws = createWebSocketConnection(); @@ -151,7 +151,7 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { // 3. 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testDeviceRegister][响应消息: {}]", responseMessage); log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]"); } else { @@ -186,16 +186,16 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { .put("height", "2") .build()), null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); + // 2.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testPropertyPost][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送并等待响应 String response = sendAndReceive(ws, jsonMessage); // 3.2 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testPropertyPost][响应消息: {}]", responseMessage); } else { log.warn("[testPropertyPost][未收到响应]"); @@ -229,16 +229,16 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { MapUtil.builder().put("rice", 3).build(), System.currentTimeMillis()), null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); + // 2.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testEventPost][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送并等待响应 String response = sendAndReceive(ws, jsonMessage); // 3.2 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testEventPost][响应消息: {}]", responseMessage); } else { log.warn("[testEventPost][未收到响应]"); @@ -308,13 +308,13 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { .setPassword(authInfo.getPassword()); IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - byte[] payload = CODEC.encode(request); + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); log.info("[authenticate][发送认证请求: {}]", jsonMessage); String response = sendAndReceive(ws, jsonMessage); if (response != null) { - return CODEC.decode(StrUtil.utf8Bytes(response)); + return SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); } return null; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java index a44f2f6dd5..20d66fa0a7 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java @@ -14,8 +14,8 @@ import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO; import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO; import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer; import io.vertx.core.Vertx; import io.vertx.core.http.WebSocket; import io.vertx.core.http.WebSocketClient; @@ -67,9 +67,9 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { private static Vertx vertx; - // ===================== 编解码器选择 ===================== + // ===================== 序列化器选择 ===================== - private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec(); + private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer(); // ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) ===================== @@ -110,10 +110,10 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - // 1.2 编码 - byte[] payload = CODEC.encode(request); + // 1.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testAuth][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 2.1 创建 WebSocket 连接(同步) WebSocket ws = createWebSocketConnection(); @@ -124,7 +124,7 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { // 3. 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testAuth][响应消息: {}]", responseMessage); } else { log.warn("[testAuth][未收到响应]"); @@ -164,16 +164,16 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), params, null, null, null); - // 2.3 编码 - byte[] payload = CODEC.encode(request); + // 2.3 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testTopoAdd][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送并等待响应 String response = sendAndReceive(ws, jsonMessage); // 3.2 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testTopoAdd][响应消息: {}]", responseMessage); } else { log.warn("[testTopoAdd][未收到响应]"); @@ -205,16 +205,16 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), params, null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); + // 2.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testTopoDelete][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送并等待响应 String response = sendAndReceive(ws, jsonMessage); // 3.2 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testTopoDelete][响应消息: {}]", responseMessage); } else { log.warn("[testTopoDelete][未收到响应]"); @@ -244,16 +244,16 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), params, null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); + // 2.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testTopoGet][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送并等待响应 String response = sendAndReceive(ws, jsonMessage); // 3.2 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testTopoGet][响应消息: {}]", responseMessage); } else { log.warn("[testTopoGet][未收到响应]"); @@ -287,16 +287,16 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(), Collections.singletonList(subDevice), null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); + // 2.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testSubDeviceRegister][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送并等待响应 String response = sendAndReceive(ws, jsonMessage); // 3.2 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testSubDeviceRegister][响应消息: {}]", responseMessage); } else { log.warn("[testSubDeviceRegister][未收到响应]"); @@ -358,16 +358,16 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), params, null, null, null); - // 2.7 编码 - byte[] payload = CODEC.encode(request); + // 2.7 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testPropertyPackPost][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送并等待响应 String response = sendAndReceive(ws, jsonMessage); // 3.2 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testPropertyPackPost][响应消息: {}]", responseMessage); } else { log.warn("[testPropertyPackPost][未收到响应]"); @@ -438,13 +438,13 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { .setPassword(authInfo.getPassword()); IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - byte[] payload = CODEC.encode(request); + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); log.info("[authenticate][发送认证请求: {}]", jsonMessage); String response = sendAndReceive(ws, jsonMessage); if (response != null) { - return CODEC.decode(StrUtil.utf8Bytes(response)); + return SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); } return null; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java index 04bf3d5632..f792288fe3 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java @@ -9,8 +9,8 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer; import io.vertx.core.Vertx; import io.vertx.core.http.WebSocket; import io.vertx.core.http.WebSocketClient; @@ -60,9 +60,9 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest { private static Vertx vertx; - // ===================== 编解码器选择 ===================== + // ===================== 序列化器选择 ===================== - private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec(); + private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer(); // ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== @@ -96,10 +96,10 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest { .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - // 1.2 编码 - byte[] payload = CODEC.encode(request); + // 1.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testAuth][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 2.1 创建 WebSocket 连接(同步) WebSocket ws = createWebSocketConnection(); @@ -110,7 +110,7 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest { // 3. 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testAuth][响应消息: {}]", responseMessage); } else { log.warn("[testAuth][未收到响应]"); @@ -146,16 +146,16 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest { .put("temperature", 36.5) .build()), null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); + // 2.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testPropertyPost][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送并等待响应 String response = sendAndReceive(ws, jsonMessage); // 3.2 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testPropertyPost][响应消息: {}]", responseMessage); } else { log.warn("[testPropertyPost][未收到响应]"); @@ -195,16 +195,16 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest { .build(), System.currentTimeMillis()), null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); + // 2.2 序列化 + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); - log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + log.info("[testEventPost][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request); // 3.1 发送并等待响应 String response = sendAndReceive(ws, jsonMessage); // 3.2 解码响应 if (response != null) { - IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); log.info("[testEventPost][响应消息: {}]", responseMessage); } else { log.warn("[testEventPost][未收到响应]"); @@ -274,13 +274,13 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest { .setPassword(authInfo.getPassword()); IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - byte[] payload = CODEC.encode(request); + byte[] payload = SERIALIZER.serialize(request); String jsonMessage = StrUtil.utf8Str(payload); log.info("[authenticate][发送认证请求: {}]", jsonMessage); String response = sendAndReceive(ws, jsonMessage); if (response != null) { - return CODEC.decode(StrUtil.utf8Bytes(response)); + return SERIALIZER.deserialize(StrUtil.utf8Bytes(response)); } return null; }