From 0072482af8faf113b5ed663d15eaae90b5d15dd5 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Tue, 27 Jan 2026 21:09:00 +0800 Subject: [PATCH] =?UTF-8?q?feat(iot):=20=E5=AE=8C=E5=96=84=20WebSocket=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E5=AE=9E=E7=8E=B0=EF=BC=8C=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B4=A8=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 配置属性校验完善 - CoAP 配置添加 @NotNull 校验注解,替换 TODO 注释 2. WebSocket 协议核心优化 - ConnectionInfo 新增 codecType 字段,支持动态编解码类型 - 上行/下行处理器根据连接的 codecType 进行消息编解码 - 使用 StrUtil.utf8Str/utf8Bytes 替换 StandardCharsets 硬编码 3. 包注释完善 - http/tcp package-info.java 添加规范的包级注释 4. 单元测试重构 - 使用 WebSocketClient.connect() 替换废弃的 HttpClient.webSocket() - 提取公共方法,简化测试代码结构 --- .../gateway/config/IotGatewayProperties.java | 8 +- .../gateway/protocol/http/package-info.java | 8 +- .../gateway/protocol/tcp/package-info.java | 8 +- .../IotWebSocketUpstreamProtocol.java | 10 +- .../IotWebSocketConnectionManager.java | 6 +- .../router/IotWebSocketDownstreamHandler.java | 11 +- .../router/IotWebSocketUpstreamHandler.java | 26 +- ...eviceWebSocketProtocolIntegrationTest.java | 407 +++++++-------- ...eviceWebSocketProtocolIntegrationTest.java | 478 +++++++++++------- ...eviceWebSocketProtocolIntegrationTest.java | 292 ++++++----- 10 files changed, 662 insertions(+), 592 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 0b9720ad12..9a86ee600d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -568,25 +568,25 @@ public class IotGatewayProperties { /** * 服务端口(CoAP 默认端口 5683) */ - // TODO @AI:默认不为空 + @NotNull(message = "服务端口不能为空") private Integer port = 5683; /** * 最大消息大小(字节) */ - // TODO @AI:默认不为空 + @NotNull(message = "最大消息大小不能为空") private Integer maxMessageSize = 1024; /** * ACK 超时时间(毫秒) */ - // TODO @AI:默认不为空 + @NotNull(message = "ACK 超时时间不能为空") private Integer ackTimeout = 2000; /** * 最大重传次数 */ - // TODO @AI:默认不为空 + @NotNull(message = "最大重传次数不能为空") private Integer maxRetransmit = 4; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/package-info.java index 5a027da02b..20124f8d07 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/package-info.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/package-info.java @@ -1,2 +1,6 @@ -// TODO @AI:参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java 完善注释; -package cn.iocoder.yudao.module.iot.gateway.protocol.http; \ No newline at end of file +/** + * HTTP 协议实现包 + *

+ * 提供基于 Vert.x HTTP Server 的 IoT 设备连接和消息处理功能 + */ +package cn.iocoder.yudao.module.iot.gateway.protocol.http; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java index e67eb497f4..1b59f5446e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java @@ -1,2 +1,6 @@ -// TODO @AI:参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; \ No newline at end of file +/** + * TCP 协议实现包 + *

+ * 提供基于 Vert.x TCP Server 的 IoT 设备连接和消息处理功能 + */ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java index 5e2a3e284b..9c612acec5 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java @@ -53,6 +53,7 @@ public class IotWebSocketUpstreamProtocol { } @PostConstruct + @SuppressWarnings("deprecation") public void start() { // 1.1 创建服务器选项 HttpServerOptions options = new HttpServerOptions() @@ -73,15 +74,14 @@ public class IotWebSocketUpstreamProtocol { httpServer.webSocketHandler(socket -> { // 验证路径 if (ObjUtil.notEqual(wsProperties.getPath(), socket.path())) { - log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]", socket.path(), wsProperties.getPath()); - // TODO @AI:已经被废弃,看看换什么其他方法; + log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]", + socket.path(), wsProperties.getPath()); socket.reject(); return; } - // 创建上行处理器 - IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler( - this, messageService, deviceService, connectionManager); + IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler(this, + messageService, deviceService, connectionManager); handler.handle(socket); }); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java index 406aa1443e..128b360086 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java @@ -139,8 +139,10 @@ public class IotWebSocketConnectionManager { * 客户端 ID */ private String clientId; - - // TODO @AI:增加有个 codecType 字段;后续可以使用,参考 tcp、udp;然后下行的时候,也基于这个 codeType 去获取; + /** + * 消息编解码类型(认证后确定) + */ + private String codecType; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java index 91310cd2a0..05e3c8c91f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java @@ -1,14 +1,12 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import java.nio.charset.StandardCharsets; - /** * IoT 网关 WebSocket 下行消息处理器 * @@ -18,9 +16,6 @@ import java.nio.charset.StandardCharsets; @RequiredArgsConstructor public class IotWebSocketDownstreamHandler { - // TODO @芋艿:codeType 的处理; - private static final String CODEC_TYPE = IotWebSocketJsonDeviceMessageCodec.TYPE; - private final IotDeviceMessageService deviceMessageService; private final IotWebSocketConnectionManager connectionManager; @@ -42,8 +37,8 @@ public class IotWebSocketDownstreamHandler { } // 2. 编码消息并发送到设备 - byte[] bytes = deviceMessageService.encodeDeviceMessage(message, CODEC_TYPE); - String jsonMessage = new String(bytes, StandardCharsets.UTF_8); + byte[] bytes = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getCodecType()); + String jsonMessage = StrUtil.utf8Str(bytes); boolean success = connectionManager.sendToDevice(message.getDeviceId(), jsonMessage); if (success) { log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]", diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java index e7deda3546..1615596a5b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java @@ -25,7 +25,6 @@ import io.vertx.core.Handler; import io.vertx.core.http.ServerWebSocket; import lombok.extern.slf4j.Slf4j; -import java.nio.charset.StandardCharsets; /** * WebSocket 上行消息处理器 @@ -35,7 +34,9 @@ import java.nio.charset.StandardCharsets; @Slf4j public class IotWebSocketUpstreamHandler implements Handler { - // TODO @芋艿:codeType 的处理; + /** + * 默认消息编解码类型 + */ private static final String CODEC_TYPE = IotWebSocketJsonDeviceMessageCodec.TYPE; private static final String AUTH_METHOD = "auth"; @@ -63,13 +64,10 @@ public class IotWebSocketUpstreamHandler implements Handler { @Override public void handle(ServerWebSocket socket) { - // 1. 接受 WebSocket 连接 String clientId = IdUtil.simpleUUID(); log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); - // TODO @AI:这个方法已经废弃,看看有没其他替换的 - socket.accept(); - // 2.1 设置异常和关闭处理器 + // 1. 设置异常和关闭处理器 socket.exceptionHandler(ex -> { log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); cleanupConnection(socket); @@ -79,7 +77,7 @@ public class IotWebSocketUpstreamHandler implements Handler { cleanupConnection(socket); }); - // 2.2 设置文本消息处理器 + // 2. 设置文本消息处理器 socket.textMessageHandler(message -> { try { processMessage(clientId, message, socket); @@ -105,12 +103,13 @@ public class IotWebSocketUpstreamHandler implements Handler { if (StrUtil.isBlank(message)) { return; } - // 1.2 解码消息 - // TODO @AI:应该只有初始使用 CODEC_TYPE 解析,后续基于 + // 1.2 解码消息(已认证连接使用其 codecType,未认证连接使用默认 CODEC_TYPE) + IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket); + String codecType = connectionInfo != null ? connectionInfo.getCodecType() : CODEC_TYPE; IotDeviceMessage deviceMessage; try { deviceMessage = deviceMessageService.decodeDeviceMessage( - message.getBytes(StandardCharsets.UTF_8), CODEC_TYPE); + StrUtil.utf8Bytes(message), codecType); if (deviceMessage == null) { throw new Exception("解码后消息为空"); } @@ -269,7 +268,8 @@ public class IotWebSocketUpstreamHandler implements Handler { .setDeviceId(device.getId()) .setProductKey(device.getProductKey()) .setDeviceName(device.getDeviceName()) - .setClientId(clientId); + .setClientId(clientId) + .setCodecType(CODEC_TYPE); // 注册连接 connectionManager.registerConnection(socket, device.getId(), connectionInfo); } @@ -330,7 +330,7 @@ public class IotWebSocketUpstreamHandler implements Handler { IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, code, message); byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE); - socket.writeTextMessage(new String(encodedData, StandardCharsets.UTF_8)); + socket.writeTextMessage(StrUtil.utf8Str(encodedData)); } catch (Exception e) { log.error("[sendResponse][发送响应失败,requestId: {}]", requestId, e); } @@ -472,7 +472,7 @@ public class IotWebSocketUpstreamHandler implements Handler { IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null); // 2. 发送响应 byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE); - socket.writeTextMessage(new String(encodedData, StandardCharsets.UTF_8)); + socket.writeTextMessage(StrUtil.utf8Str(encodedData)); } catch (Exception e) { log.error("[sendRegisterSuccessResponse][发送注册成功响应失败,requestId: {}]", requestId, e); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java index 1cca286c83..3a20668655 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -12,15 +13,14 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec; import io.vertx.core.Vertx; -import io.vertx.core.http.HttpClient; import io.vertx.core.http.WebSocket; +import io.vertx.core.http.WebSocketClient; import io.vertx.core.http.WebSocketConnectOptions; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -81,119 +81,83 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { // ===================== 认证测试 ===================== - // TODO @AI:参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java 或 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java 类,优化代码结构 - /** * 认证测试:获取设备 Token */ @Test public void testAuth() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference responseRef = new AtomicReference<>(); + // 1.1 构建认证消息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request); - // 1. 创建 WebSocket 连接 - HttpClient client = vertx.createHttpClient(); - WebSocketConnectOptions options = new WebSocketConnectOptions() - .setHost(SERVER_HOST) - .setPort(SERVER_PORT) - .setURI(WS_PATH); + // 2.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testAuth][WebSocket 连接成功]"); - // TODO @AI:这里有告警;Deprecate /instead use WebSocketClient.connect(WebSocketConnectOptions) - client.webSocket(options).onComplete(ar -> { - if (ar.succeeded()) { - WebSocket ws = ar.result(); - log.info("[testAuth][WebSocket 连接成功]"); + // 2.2 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); - // 设置消息处理器 - ws.textMessageHandler(message -> { - log.info("[testAuth][收到响应: {}]", message); - responseRef.set(message); - ws.close(); - latch.countDown(); - }); - - // 2. 构建认证消息 - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - - // 3. 编码并发送 - byte[] payload = CODEC.encode(request); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[testAuth][发送认证请求: {}]", jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - log.error("[testAuth][WebSocket 连接失败]", ar.cause()); - latch.countDown(); - } - }); - - // 4. 等待响应 - boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (completed && responseRef.get() != null) { - IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[testAuth][解码响应: {}]", response); + // 3. 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testAuth][响应消息: {}]", responseMessage); } else { - log.warn("[testAuth][测试超时或未收到响应]"); + log.warn("[testAuth][未收到响应]"); } + + // 4. 关闭连接 + ws.close(); } // ===================== 动态注册测试 ===================== /** * 直连设备动态注册测试(一型一密) + *

+ * 使用产品密钥(productSecret)验证身份,成功后返回设备密钥(deviceSecret) + *

+ * 注意:此接口不需要认证 */ @Test public void testDeviceRegister() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference responseRef = new AtomicReference<>(); + // 1.1 构建注册消息 + IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO(); + registerReqDTO.setProductKey(PRODUCT_KEY); + registerReqDTO.setDeviceName("test-ws-" + System.currentTimeMillis()); + registerReqDTO.setProductSecret("test-product-secret"); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request); - HttpClient client = vertx.createHttpClient(); - WebSocketConnectOptions options = new WebSocketConnectOptions() - .setHost(SERVER_HOST) - .setPort(SERVER_PORT) - .setURI(WS_PATH); + // 2.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testDeviceRegister][WebSocket 连接成功]"); - client.webSocket(options).onComplete(ar -> { - if (ar.succeeded()) { - WebSocket ws = ar.result(); - log.info("[testDeviceRegister][WebSocket 连接成功]"); + // 2.2 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); - ws.textMessageHandler(message -> { - log.info("[testDeviceRegister][收到响应: {}]", message); - responseRef.set(message); - ws.close(); - latch.countDown(); - }); - - // 构建注册消息 - IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO(); - registerReqDTO.setProductKey(PRODUCT_KEY); - registerReqDTO.setDeviceName("test-ws-" + System.currentTimeMillis()); - registerReqDTO.setProductSecret("test-product-secret"); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null); - - byte[] payload = CODEC.encode(request); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[testDeviceRegister][发送注册请求: {}]", jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - log.error("[testDeviceRegister][WebSocket 连接失败]", ar.cause()); - latch.countDown(); - } - }); - - boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (completed && responseRef.get() != null) { - IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[testDeviceRegister][解码响应: {}]", response); + // 3. 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testDeviceRegister][响应消息: {}]", responseMessage); + log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]"); } else { - log.warn("[testDeviceRegister][测试超时或未收到响应]"); + log.warn("[testDeviceRegister][未收到响应]"); } + + // 4. 关闭连接 + ws.close(); } // ===================== 直连设备属性上报测试 ===================== @@ -203,82 +167,40 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { */ @Test public void testPropertyPost() throws Exception { - CountDownLatch latch = new CountDownLatch(2); // 认证 + 属性上报 - AtomicReference authResponseRef = new AtomicReference<>(); - AtomicReference propertyResponseRef = new AtomicReference<>(); + // 1.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testPropertyPost][WebSocket 连接成功]"); - HttpClient client = vertx.createHttpClient(); - WebSocketConnectOptions options = new WebSocketConnectOptions() - .setHost(SERVER_HOST) - .setPort(SERVER_PORT) - .setURI(WS_PATH); + // 1.2 先进行认证 + IotDeviceMessage authResponse = authenticate(ws); + log.info("[testPropertyPost][认证响应: {}]", authResponse); - client.webSocket(options).onComplete(ar -> { - if (ar.succeeded()) { - WebSocket ws = ar.result(); - log.info("[testPropertyPost][WebSocket 连接成功]"); + // 2.1 构建属性上报消息 + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + IotDevicePropertyPostReqDTO.of(MapUtil.builder() + .put("width", 1) + .put("height", "2") + .build()), + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); - final boolean[] authenticated = {false}; - - ws.textMessageHandler(message -> { - log.info("[testPropertyPost][收到响应: {}]", message); - if (!authenticated[0]) { - authResponseRef.set(message); - authenticated[0] = true; - latch.countDown(); - - // 认证成功后发送属性上报 - IotDeviceMessage propertyRequest = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), - IotDevicePropertyPostReqDTO.of(MapUtil.builder() - .put("width", 1) - .put("height", "2") - .build()), - null, null, null); - byte[] payload = CODEC.encode(propertyRequest); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[testPropertyPost][发送属性上报请求: {}]", jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - propertyResponseRef.set(message); - ws.close(); - latch.countDown(); - } - }); - - // 先发送认证请求 - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - - byte[] payload = CODEC.encode(authRequest); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[testPropertyPost][发送认证请求: {}]", jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - log.error("[testPropertyPost][WebSocket 连接失败]", ar.cause()); - latch.countDown(); - latch.countDown(); - } - }); - - boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); - if (completed) { - if (authResponseRef.get() != null) { - IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[testPropertyPost][认证响应: {}]", authResponse); - } - if (propertyResponseRef.get() != null) { - IotDeviceMessage propertyResponse = CODEC.decode(propertyResponseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[testPropertyPost][属性上报响应: {}]", propertyResponse); - } + // 3.1 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); + // 3.2 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testPropertyPost][响应消息: {}]", responseMessage); } else { - log.warn("[testPropertyPost][测试超时]"); + log.warn("[testPropertyPost][未收到响应]"); } + + // 4. 关闭连接 + ws.close(); } // ===================== 直连设备事件上报测试 ===================== @@ -288,82 +210,111 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { */ @Test public void testEventPost() throws Exception { - CountDownLatch latch = new CountDownLatch(2); // 认证 + 事件上报 - AtomicReference authResponseRef = new AtomicReference<>(); - AtomicReference eventResponseRef = new AtomicReference<>(); + // 1.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testEventPost][WebSocket 连接成功]"); - HttpClient client = vertx.createHttpClient(); + // 1.2 先进行认证 + IotDeviceMessage authResponse = authenticate(ws); + log.info("[testEventPost][认证响应: {}]", authResponse); + + // 2.1 构建事件上报消息 + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + IotDeviceEventPostReqDTO.of( + "eat", + MapUtil.builder().put("rice", 3).build(), + System.currentTimeMillis()), + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); + // 3.2 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testEventPost][响应消息: {}]", responseMessage); + } else { + log.warn("[testEventPost][未收到响应]"); + } + + // 4. 关闭连接 + ws.close(); + } + + // ===================== 辅助方法 ===================== + + /** + * 创建 WebSocket 连接(同步) + * + * @return WebSocket 连接 + */ + private WebSocket createWebSocketConnection() throws Exception { + WebSocketClient wsClient = vertx.createWebSocketClient(); WebSocketConnectOptions options = new WebSocketConnectOptions() .setHost(SERVER_HOST) .setPort(SERVER_PORT) .setURI(WS_PATH); + return wsClient.connect(options).toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } - client.webSocket(options).onComplete(ar -> { - if (ar.succeeded()) { - WebSocket ws = ar.result(); - log.info("[testEventPost][WebSocket 连接成功]"); + /** + * 发送消息并等待响应(同步) + * + * @param ws WebSocket 连接 + * @param message 请求消息 + * @return 响应消息 + */ + public static String sendAndReceive(WebSocket ws, String message) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); - final boolean[] authenticated = {false}; - - ws.textMessageHandler(message -> { - log.info("[testEventPost][收到响应: {}]", message); - if (!authenticated[0]) { - authResponseRef.set(message); - authenticated[0] = true; - latch.countDown(); - - // 认证成功后发送事件上报 - IotDeviceMessage eventRequest = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), - IotDeviceEventPostReqDTO.of( - "eat", - MapUtil.builder().put("rice", 3).build(), - System.currentTimeMillis()), - null, null, null); - byte[] payload = CODEC.encode(eventRequest); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[testEventPost][发送事件上报请求: {}]", jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - eventResponseRef.set(message); - ws.close(); - latch.countDown(); - } - }); - - // 先发送认证请求 - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - - byte[] payload = CODEC.encode(authRequest); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[testEventPost][发送认证请求: {}]", jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - log.error("[testEventPost][WebSocket 连接失败]", ar.cause()); - latch.countDown(); - latch.countDown(); - } + // 设置消息处理器 + ws.textMessageHandler(response -> { + log.info("[sendAndReceive][收到响应: {}]", response); + responseRef.set(response); + latch.countDown(); }); - boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); - if (completed) { - if (authResponseRef.get() != null) { - IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[testEventPost][认证响应: {}]", authResponse); - } - if (eventResponseRef.get() != null) { - IotDeviceMessage eventResponse = CODEC.decode(eventResponseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[testEventPost][事件上报响应: {}]", eventResponse); - } - } else { - log.warn("[testEventPost][测试超时]"); + // 发送请求 + log.info("[sendAndReceive][发送请求: {}]", message); + ws.writeTextMessage(message); + + // 等待响应 + boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!completed) { + log.warn("[sendAndReceive][等待响应超时]"); } + return responseRef.get(); + } + + /** + * 执行设备认证(同步) + * + * @param ws WebSocket 连接 + * @return 认证响应消息 + */ + private IotDeviceMessage authenticate(WebSocket ws) throws Exception { + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[authenticate][发送认证请求: {}]", jsonMessage); + + String response = sendAndReceive(ws, jsonMessage); + if (response != null) { + return CODEC.decode(StrUtil.utf8Bytes(response)); + } + return null; } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java index e0fcc7a044..5c3770d538 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -15,15 +16,14 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec; import io.vertx.core.Vertx; -import io.vertx.core.http.HttpClient; import io.vertx.core.http.WebSocket; +import io.vertx.core.http.WebSocketClient; import io.vertx.core.http.WebSocketConnectOptions; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.Map; @@ -100,53 +100,36 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference responseRef = new AtomicReference<>(); + // 1.1 构建认证消息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request); - HttpClient client = vertx.createHttpClient(); - WebSocketConnectOptions options = new WebSocketConnectOptions() - .setHost(SERVER_HOST) - .setPort(SERVER_PORT) - .setURI(WS_PATH); + // 2.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testAuth][WebSocket 连接成功]"); - client.webSocket(options).onComplete(ar -> { - if (ar.succeeded()) { - WebSocket ws = ar.result(); - log.info("[testAuth][WebSocket 连接成功]"); + // 2.2 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); - ws.textMessageHandler(message -> { - log.info("[testAuth][收到响应: {}]", message); - responseRef.set(message); - ws.close(); - latch.countDown(); - }); - - // 构建认证消息 - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - - byte[] payload = CODEC.encode(request); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[testAuth][发送认证请求: {}]", jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - log.error("[testAuth][WebSocket 连接失败]", ar.cause()); - latch.countDown(); - } - }); - - boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (completed && responseRef.get() != null) { - IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[testAuth][解码响应: {}]", response); + // 3. 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testAuth][响应消息: {}]", responseMessage); } else { - log.warn("[testAuth][测试超时或未收到响应]"); + log.warn("[testAuth][未收到响应]"); } + + // 4. 关闭连接 + ws.close(); } // ===================== 拓扑管理测试 ===================== @@ -156,23 +139,46 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { */ @Test public void testTopoAdd() throws Exception { - executeAuthenticatedRequest("testTopoAdd", ws -> { - // 构建子设备认证信息 - IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo( - SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET); - IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO() - .setClientId(subAuthInfo.getClientId()) - .setUsername(subAuthInfo.getUsername()) - .setPassword(subAuthInfo.getPassword()); - // 构建请求参数 - IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO(); - params.setSubDevices(Collections.singletonList(subDeviceAuth)); - return IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), - params, - null, null, null); - }); + // 1.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testTopoAdd][WebSocket 连接成功]"); + + // 1.2 先进行认证 + IotDeviceMessage authResponse = authenticate(ws); + log.info("[testTopoAdd][认证响应: {}]", authResponse); + + // 2.1 构建子设备认证信息 + IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo( + SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET); + IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO() + .setClientId(subAuthInfo.getClientId()) + .setUsername(subAuthInfo.getUsername()) + .setPassword(subAuthInfo.getPassword()); + // 2.2 构建请求参数 + IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO(); + params.setSubDevices(Collections.singletonList(subDeviceAuth)); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), + params, + null, null, null); + // 2.3 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); + // 3.2 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testTopoAdd][响应消息: {}]", responseMessage); + } else { + log.warn("[testTopoAdd][未收到响应]"); + } + + // 4. 关闭连接 + ws.close(); } /** @@ -180,16 +186,40 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { */ @Test public void testTopoDelete() throws Exception { - executeAuthenticatedRequest("testTopoDelete", ws -> { - IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO(); - params.setSubDevices(Collections.singletonList( - new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))); - return IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), - params, - null, null, null); - }); + // 1.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testTopoDelete][WebSocket 连接成功]"); + + // 1.2 先进行认证 + IotDeviceMessage authResponse = authenticate(ws); + log.info("[testTopoDelete][认证响应: {}]", authResponse); + + // 2.1 构建请求参数 + IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO(); + params.setSubDevices(Collections.singletonList( + new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), + params, + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); + // 3.2 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testTopoDelete][响应消息: {}]", responseMessage); + } else { + log.warn("[testTopoDelete][未收到响应]"); + } + + // 4. 关闭连接 + ws.close(); } /** @@ -197,14 +227,38 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { */ @Test public void testTopoGet() throws Exception { - executeAuthenticatedRequest("testTopoGet", ws -> { - IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO(); - return IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), - params, - null, null, null); - }); + // 1.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testTopoGet][WebSocket 连接成功]"); + + // 1.2 先进行认证 + IotDeviceMessage authResponse = authenticate(ws); + log.info("[testTopoGet][认证响应: {}]", authResponse); + + // 2.1 构建请求参数 + IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO(); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), + params, + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); + // 3.2 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testTopoGet][响应消息: {}]", responseMessage); + } else { + log.warn("[testTopoGet][未收到响应]"); + } + + // 4. 关闭连接 + ws.close(); } // ===================== 子设备注册测试 ===================== @@ -214,16 +268,40 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { */ @Test public void testSubDeviceRegister() throws Exception { - executeAuthenticatedRequest("testSubDeviceRegister", ws -> { - IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO(); - subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY); - subDevice.setDeviceName("mougezishebei-ws"); - return IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(), - Collections.singletonList(subDevice), - null, null, null); - }); + // 1.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testSubDeviceRegister][WebSocket 连接成功]"); + + // 1.2 先进行认证 + IotDeviceMessage authResponse = authenticate(ws); + log.info("[testSubDeviceRegister][认证响应: {}]", authResponse); + + // 2.1 构建请求参数 + IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO(); + subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY); + subDevice.setDeviceName("mougezishebei-ws"); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(), + Collections.singletonList(subDevice), + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); + // 3.2 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testSubDeviceRegister][响应消息: {}]", responseMessage); + } else { + log.warn("[testSubDeviceRegister][未收到响应]"); + } + + // 4. 关闭连接 + ws.close(); } // ===================== 批量上报测试 ===================== @@ -233,126 +311,140 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest { */ @Test public void testPropertyPackPost() throws Exception { - executeAuthenticatedRequest("testPropertyPackPost", ws -> { - // 构建【网关设备】自身属性 - Map gatewayProperties = MapUtil.builder() - .put("temperature", 25.5) - .build(); - // 构建【网关设备】自身事件 - IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); - gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build()); - gatewayEvent.setTime(System.currentTimeMillis()); - Map gatewayEvents = MapUtil.builder() - .put("statusReport", gatewayEvent) - .build(); - // 构建【网关子设备】属性 - Map subDeviceProperties = MapUtil.builder() - .put("power", 100) - .build(); - // 构建【网关子设备】事件 - IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); - subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build()); - subDeviceEvent.setTime(System.currentTimeMillis()); - Map subDeviceEvents = MapUtil.builder() - .put("healthCheck", subDeviceEvent) - .build(); - // 构建子设备数据 - IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData(); - subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)); - subDeviceData.setProperties(subDeviceProperties); - subDeviceData.setEvents(subDeviceEvents); - // 构建请求参数 - IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO(); - params.setProperties(gatewayProperties); - params.setEvents(gatewayEvents); - params.setSubDevices(List.of(subDeviceData)); - return IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), - params, - null, null, null); - }); + // 1.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testPropertyPackPost][WebSocket 连接成功]"); + + // 1.2 先进行认证 + IotDeviceMessage authResponse = authenticate(ws); + log.info("[testPropertyPackPost][认证响应: {}]", authResponse); + + // 2.1 构建【网关设备】自身属性 + Map gatewayProperties = MapUtil.builder() + .put("temperature", 25.5) + .build(); + // 2.2 构建【网关设备】自身事件 + IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); + gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build()); + gatewayEvent.setTime(System.currentTimeMillis()); + Map gatewayEvents = MapUtil.builder() + .put("statusReport", gatewayEvent) + .build(); + // 2.3 构建【网关子设备】属性 + Map subDeviceProperties = MapUtil.builder() + .put("power", 100) + .build(); + // 2.4 构建【网关子设备】事件 + IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); + subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build()); + subDeviceEvent.setTime(System.currentTimeMillis()); + Map subDeviceEvents = MapUtil.builder() + .put("healthCheck", subDeviceEvent) + .build(); + // 2.5 构建子设备数据 + IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData(); + subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)); + subDeviceData.setProperties(subDeviceProperties); + subDeviceData.setEvents(subDeviceEvents); + // 2.6 构建请求参数 + IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO(); + params.setProperties(gatewayProperties); + params.setEvents(gatewayEvents); + params.setSubDevices(List.of(subDeviceData)); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), + params, + null, null, null); + // 2.7 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); + // 3.2 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testPropertyPackPost][响应消息: {}]", responseMessage); + } else { + log.warn("[testPropertyPackPost][未收到响应]"); + } + + // 4. 关闭连接 + ws.close(); } // ===================== 辅助方法 ===================== /** - * 执行需要认证的请求 + * 创建 WebSocket 连接(同步) * - * @param testName 测试名称 - * @param requestSupplier 请求消息提供者 + * @return WebSocket 连接 */ - private void executeAuthenticatedRequest(String testName, java.util.function.Function requestSupplier) throws Exception { - CountDownLatch latch = new CountDownLatch(2); - AtomicReference authResponseRef = new AtomicReference<>(); - AtomicReference businessResponseRef = new AtomicReference<>(); - - HttpClient client = vertx.createHttpClient(); + private WebSocket createWebSocketConnection() throws Exception { + WebSocketClient wsClient = vertx.createWebSocketClient(); WebSocketConnectOptions options = new WebSocketConnectOptions() .setHost(SERVER_HOST) .setPort(SERVER_PORT) .setURI(WS_PATH); + return wsClient.connect(options).toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } - client.webSocket(options).onComplete(ar -> { - if (ar.succeeded()) { - WebSocket ws = ar.result(); - log.info("[{}][WebSocket 连接成功]", testName); + /** + * 发送消息并等待响应(同步) + * + * @param ws WebSocket 连接 + * @param message 请求消息 + * @return 响应消息 + */ + private String sendAndReceive(WebSocket ws, String message) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); - final boolean[] authenticated = {false}; - - ws.textMessageHandler(message -> { - log.info("[{}][收到响应: {}]", testName, message); - if (!authenticated[0]) { - authResponseRef.set(message); - authenticated[0] = true; - latch.countDown(); - - // 认证成功后发送业务请求 - IotDeviceMessage businessRequest = requestSupplier.apply(ws); - byte[] payload = CODEC.encode(businessRequest); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[{}][发送业务请求: {}]", testName, jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - businessResponseRef.set(message); - ws.close(); - latch.countDown(); - } - }); - - // 先发送认证请求 - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - - byte[] payload = CODEC.encode(authRequest); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[{}][发送认证请求: {}]", testName, jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - log.error("[{}][WebSocket 连接失败]", testName, ar.cause()); - latch.countDown(); - latch.countDown(); - } + // 设置消息处理器 + ws.textMessageHandler(response -> { + log.info("[sendAndReceive][收到响应: {}]", response); + responseRef.set(response); + latch.countDown(); }); - boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); - if (completed) { - if (authResponseRef.get() != null) { - IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[{}][认证响应: {}]", testName, authResponse); - } - if (businessResponseRef.get() != null) { - IotDeviceMessage businessResponse = CODEC.decode(businessResponseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[{}][业务响应: {}]", testName, businessResponse); - } - } else { - log.warn("[{}][测试超时]", testName); + // 发送请求 + log.info("[sendAndReceive][发送请求: {}]", message); + ws.writeTextMessage(message); + + // 等待响应 + boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!completed) { + log.warn("[sendAndReceive][等待响应超时]"); } + return responseRef.get(); + } + + /** + * 执行网关设备认证(同步) + * + * @param ws WebSocket 连接 + * @return 认证响应消息 + */ + private IotDeviceMessage authenticate(WebSocket ws) throws Exception { + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[authenticate][发送认证请求: {}]", jsonMessage); + + String response = sendAndReceive(ws, jsonMessage); + if (response != null) { + return CODEC.decode(StrUtil.utf8Bytes(response)); + } + return null; } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java index 8368940a6d..111736b5b7 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -11,15 +12,14 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec; import io.vertx.core.Vertx; -import io.vertx.core.http.HttpClient; import io.vertx.core.http.WebSocket; +import io.vertx.core.http.WebSocketClient; import io.vertx.core.http.WebSocketConnectOptions; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -87,52 +87,35 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference responseRef = new AtomicReference<>(); + // 1.1 构建认证消息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request); - HttpClient client = vertx.createHttpClient(); - WebSocketConnectOptions options = new WebSocketConnectOptions() - .setHost(SERVER_HOST) - .setPort(SERVER_PORT) - .setURI(WS_PATH); + // 2.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testAuth][WebSocket 连接成功]"); - client.webSocket(options).onComplete(ar -> { - if (ar.succeeded()) { - WebSocket ws = ar.result(); - log.info("[testAuth][WebSocket 连接成功]"); + // 2.2 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); - ws.textMessageHandler(message -> { - log.info("[testAuth][收到响应: {}]", message); - responseRef.set(message); - ws.close(); - latch.countDown(); - }); - - // 构建认证消息 - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - - byte[] payload = CODEC.encode(request); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[testAuth][发送认证请求: {}]", jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - log.error("[testAuth][WebSocket 连接失败]", ar.cause()); - latch.countDown(); - } - }); - - boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (completed && responseRef.get() != null) { - IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[testAuth][解码响应: {}]", response); + // 3. 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testAuth][响应消息: {}]", responseMessage); } else { - log.warn("[testAuth][测试超时或未收到响应]"); + log.warn("[testAuth][未收到响应]"); } + + // 4. 关闭连接 + ws.close(); } // ===================== 子设备属性上报测试 ===================== @@ -142,18 +125,42 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest { */ @Test public void testPropertyPost() throws Exception { - executeAuthenticatedRequest("testPropertyPost", ws -> { - log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]"); - return IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), - IotDevicePropertyPostReqDTO.of(MapUtil.builder() - .put("power", 100) - .put("status", "online") - .put("temperature", 36.5) - .build()), - null, null, null); - }); + // 1.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testPropertyPost][WebSocket 连接成功]"); + + // 1.2 先进行认证 + IotDeviceMessage authResponse = authenticate(ws); + log.info("[testPropertyPost][认证响应: {}]", authResponse); + log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]"); + + // 2.1 构建属性上报消息 + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + IotDevicePropertyPostReqDTO.of(MapUtil.builder() + .put("power", 100) + .put("status", "online") + .put("temperature", 36.5) + .build()), + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); + // 3.2 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testPropertyPost][响应消息: {}]", responseMessage); + } else { + log.warn("[testPropertyPost][未收到响应]"); + } + + // 4. 关闭连接 + ws.close(); } // ===================== 子设备事件上报测试 ===================== @@ -163,102 +170,117 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest { */ @Test public void testEventPost() throws Exception { - executeAuthenticatedRequest("testEventPost", ws -> { - log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]"); - return IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), - IotDeviceEventPostReqDTO.of( - "alarm", - MapUtil.builder() - .put("level", "warning") - .put("message", "temperature too high") - .put("threshold", 40) - .put("current", 42) - .build(), - System.currentTimeMillis()), - null, null, null); - }); + // 1.1 创建 WebSocket 连接(同步) + WebSocket ws = createWebSocketConnection(); + log.info("[testEventPost][WebSocket 连接成功]"); + + // 1.2 先进行认证 + IotDeviceMessage authResponse = authenticate(ws); + log.info("[testEventPost][认证响应: {}]", authResponse); + log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]"); + + // 2.1 构建事件上报消息 + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + IotDeviceEventPostReqDTO.of( + "alarm", + MapUtil.builder() + .put("level", "warning") + .put("message", "temperature too high") + .put("threshold", 40) + .put("current", 42) + .build(), + System.currentTimeMillis()), + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送并等待响应 + String response = sendAndReceive(ws, jsonMessage); + // 3.2 解码响应 + if (response != null) { + IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response)); + log.info("[testEventPost][响应消息: {}]", responseMessage); + } else { + log.warn("[testEventPost][未收到响应]"); + } + + // 4. 关闭连接 + ws.close(); } // ===================== 辅助方法 ===================== /** - * 执行需要认证的请求 + * 创建 WebSocket 连接(同步) * - * @param testName 测试名称 - * @param requestSupplier 请求消息提供者 + * @return WebSocket 连接 */ - private void executeAuthenticatedRequest(String testName, java.util.function.Function requestSupplier) throws Exception { - CountDownLatch latch = new CountDownLatch(2); - AtomicReference authResponseRef = new AtomicReference<>(); - AtomicReference businessResponseRef = new AtomicReference<>(); - - HttpClient client = vertx.createHttpClient(); + private WebSocket createWebSocketConnection() throws Exception { + WebSocketClient wsClient = vertx.createWebSocketClient(); WebSocketConnectOptions options = new WebSocketConnectOptions() .setHost(SERVER_HOST) .setPort(SERVER_PORT) .setURI(WS_PATH); + return wsClient.connect(options).toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } - client.webSocket(options).onComplete(ar -> { - if (ar.succeeded()) { - WebSocket ws = ar.result(); - log.info("[{}][WebSocket 连接成功]", testName); + /** + * 发送消息并等待响应(同步) + * + * @param ws WebSocket 连接 + * @param message 请求消息 + * @return 响应消息 + */ + private String sendAndReceive(WebSocket ws, String message) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); - final boolean[] authenticated = {false}; - - ws.textMessageHandler(message -> { - log.info("[{}][收到响应: {}]", testName, message); - if (!authenticated[0]) { - authResponseRef.set(message); - authenticated[0] = true; - latch.countDown(); - - // 认证成功后发送业务请求 - IotDeviceMessage businessRequest = requestSupplier.apply(ws); - byte[] payload = CODEC.encode(businessRequest); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[{}][发送业务请求: {}]", testName, jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - businessResponseRef.set(message); - ws.close(); - latch.countDown(); - } - }); - - // 先发送认证请求 - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - - byte[] payload = CODEC.encode(authRequest); - String jsonMessage = new String(payload, StandardCharsets.UTF_8); - log.info("[{}][发送认证请求: {}]", testName, jsonMessage); - ws.writeTextMessage(jsonMessage); - } else { - log.error("[{}][WebSocket 连接失败]", testName, ar.cause()); - latch.countDown(); - latch.countDown(); - } + // 设置消息处理器 + ws.textMessageHandler(response -> { + log.info("[sendAndReceive][收到响应: {}]", response); + responseRef.set(response); + latch.countDown(); }); - boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); - if (completed) { - if (authResponseRef.get() != null) { - IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[{}][认证响应: {}]", testName, authResponse); - } - if (businessResponseRef.get() != null) { - IotDeviceMessage businessResponse = CODEC.decode(businessResponseRef.get().getBytes(StandardCharsets.UTF_8)); - log.info("[{}][业务响应: {}]", testName, businessResponse); - } - } else { - log.warn("[{}][测试超时]", testName); + // 发送请求 + log.info("[sendAndReceive][发送请求: {}]", message); + ws.writeTextMessage(message); + + // 等待响应 + boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!completed) { + log.warn("[sendAndReceive][等待响应超时]"); } + return responseRef.get(); + } + + /** + * 执行子设备认证(同步) + * + * @param ws WebSocket 连接 + * @return 认证响应消息 + */ + private IotDeviceMessage authenticate(WebSocket ws) throws Exception { + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + + byte[] payload = CODEC.encode(request); + String jsonMessage = StrUtil.utf8Str(payload); + log.info("[authenticate][发送认证请求: {}]", jsonMessage); + + String response = sendAndReceive(ws, jsonMessage); + if (response != null) { + return CODEC.decode(StrUtil.utf8Bytes(response)); + } + return null; } }