From 1b4ac9fb2429ce6194a98d41356cfe77a7300126 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Tue, 27 Jan 2026 00:05:07 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91MQTT=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=EF=BC=9A1=EF=BC=89=E5=A2=9E=E5=8A=A0=20gatew?= =?UTF-8?q?ay=20=E7=9B=B8=E5=85=B3=E7=9A=84=E5=8D=95=E6=B5=8B=20feat?= =?UTF-8?q?=EF=BC=9A=E3=80=90iot=E3=80=91=E7=BB=9F=E4=B8=80=E5=87=A0?= =?UTF-8?q?=E4=B8=AA=E5=8D=8F=E8=AE=AE=E7=9A=84=E5=8D=95=E6=B5=8B=E9=A3=8E?= =?UTF-8?q?=E6=A0=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ewayDeviceCoapProtocolIntegrationTest.java | 1 - ...ySubDeviceCoapProtocolIntegrationTest.java | 1 - ...rectDeviceMqttProtocolIntegrationTest.java | 12 +- ...ewayDeviceMqttProtocolIntegrationTest.java | 494 ++++++++++++++++++ ...ySubDeviceMqttProtocolIntegrationTest.java | 329 ++++++++++++ ...irectDeviceTcpProtocolIntegrationTest.java | 2 +- ...tewayDeviceTcpProtocolIntegrationTest.java | 9 +- ...aySubDeviceTcpProtocolIntegrationTest.java | 9 +- ...irectDeviceUdpProtocolIntegrationTest.java | 41 ++ 9 files changed, 887 insertions(+), 11 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotGatewayDeviceCoapProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotGatewayDeviceCoapProtocolIntegrationTest.java index 76c853cfa2..10bcca74fb 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotGatewayDeviceCoapProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotGatewayDeviceCoapProtocolIntegrationTest.java @@ -87,7 +87,6 @@ public class IotGatewayDeviceCoapProtocolIntegrationTest { * 网关设备认证测试:获取网关设备 Token */ @Test - @SuppressWarnings("deprecation") public void testAuth() throws Exception { // 1.1 构建请求 String uri = String.format("coap://%s:%d/auth", SERVER_HOST, SERVER_PORT); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotGatewaySubDeviceCoapProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotGatewaySubDeviceCoapProtocolIntegrationTest.java index f6d474059d..c10bb772c6 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotGatewaySubDeviceCoapProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotGatewaySubDeviceCoapProtocolIntegrationTest.java @@ -75,7 +75,6 @@ public class IotGatewaySubDeviceCoapProtocolIntegrationTest { * 子设备认证测试:获取子设备 Token */ @Test - @SuppressWarnings("deprecation") public void testAuth() throws Exception { // 1.1 构建请求 String uri = String.format("coap://%s:%d/auth", SERVER_HOST, SERVER_PORT); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java index d1a3b4d5f3..640572eceb 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java @@ -89,7 +89,7 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { // 1. 构建认证信息 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - log.info("[testConnect][认证信息: clientId={}, username={}, password={}]", + log.info("[testAuth][认证信息: clientId={}, username={}, password={}]", authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword()); // 2. 创建客户端并连接 @@ -97,19 +97,19 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { client.connect(SERVER_PORT, SERVER_HOST) .onComplete(ar -> { if (ar.succeeded()) { - log.info("[testConnect][连接成功,客户端 ID: {}]", client.clientId()); + log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId()); // 断开连接 client.disconnect() .onComplete(disconnectAr -> { if (disconnectAr.succeeded()) { - log.info("[testConnect][断开连接成功]"); + log.info("[testAuth][断开连接成功]"); } else { - log.error("[testConnect][断开连接失败]", disconnectAr.cause()); + log.error("[testAuth][断开连接失败]", disconnectAr.cause()); } latch.countDown(); }); } else { - log.error("[testConnect][连接失败]", ar.cause()); + log.error("[testAuth][连接失败]", ar.cause()); latch.countDown(); } }); @@ -117,7 +117,7 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { // 3. 等待测试完成 boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); if (!completed) { - log.warn("[testConnect][测试超时]"); + log.warn("[testAuth][测试超时]"); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java new file mode 100644 index 0000000000..57ed6a7779 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java @@ -0,0 +1,494 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * IoT 网关设备 MQTT 协议集成测试(手动测试) + * + *

测试场景:网关设备(IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 MQTT 协议管理子设备拓扑关系 + * + *

使用步骤: + *

    + *
  1. 启动 yudao-module-iot-gateway 服务(MQTT 端口 1883)
  2. + *
  3. 运行以下测试方法: + *
      + *
    • {@link #testAuth()} - 网关设备连接认证
    • + *
    • {@link #testTopoAdd()} - 添加子设备拓扑关系
    • + *
    • {@link #testTopoDelete()} - 删除子设备拓扑关系
    • + *
    • {@link #testTopoGet()} - 获取子设备拓扑关系
    • + *
    • {@link #testSubDeviceRegister()} - 子设备动态注册
    • + *
    • {@link #testPropertyPackPost()} - 批量上报属性(网关 + 子设备)
    • + *
    + *
  4. + *
+ * + *

注意:MQTT 协议是有状态的长连接,认证在连接时通过 username/password 完成, + * 认证成功后同一连接上的后续请求无需再携带认证信息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotGatewayDeviceMqttProtocolIntegrationTest { + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 1883; + private static final int TIMEOUT_SECONDS = 10; + + // ===================== 编解码器(MQTT 使用 Alink 协议) ===================== + private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec(); + + // ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) ===================== + private static final String GATEWAY_PRODUCT_KEY = "m6XcS1ZJ3TW8eC0v"; + private static final String GATEWAY_DEVICE_NAME = "sub-ddd"; + private static final String GATEWAY_DEVICE_SECRET = "b3d62c70f8a4495487ed1d35d61ac2b3"; + + // ===================== 子设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== + private static final String SUB_DEVICE_PRODUCT_KEY = "jAufEMTF1W6wnPhn"; + private static final String SUB_DEVICE_NAME = "chazuo-it"; + private static final String SUB_DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af"; + + // ===================== 全局共享 Vertx 实例 ===================== + private static Vertx vertx; + + @BeforeAll + public static void setUp() { + vertx = Vertx.vertx(); + } + + @AfterAll + public static void tearDown() { + if (vertx != null) { + vertx.close(); + } + } + + // ===================== 连接认证测试 ===================== + + /** + * 网关设备认证测试:获取网关设备 Token + */ + @Test + public void testAuth() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + // 1. 构建认证信息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); + log.info("[testAuth][认证信息: clientId={}, username={}, password={}]", + authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword()); + + // 2. 创建客户端并连接 + MqttClient client = connect(authInfo); + client.connect(SERVER_PORT, SERVER_HOST) + .onComplete(ar -> { + if (ar.succeeded()) { + log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId()); + // 断开连接 + client.disconnect() + .onComplete(disconnectAr -> { + if (disconnectAr.succeeded()) { + log.info("[testAuth][断开连接成功]"); + } else { + log.error("[testAuth][断开连接失败]", disconnectAr.cause()); + } + latch.countDown(); + }); + } else { + log.error("[testAuth][连接失败]", ar.cause()); + latch.countDown(); + } + }); + + // 3. 等待测试完成 + boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!completed) { + log.warn("[testAuth][测试超时]"); + } + } + + // ===================== 拓扑管理测试 ===================== + + /** + * 添加子设备拓扑关系测试 + *

+ * 网关设备向平台上报需要绑定的子设备信息 + */ + @Test + public void testTopoAdd() throws Exception { + // 1. 连接并认证 + MqttClient client = connectAndAuth(); + log.info("[testTopoAdd][连接认证成功]"); + + // 2.1 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/topo/add_reply", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + subscribeReply(client, replyTopic); + + // 2.2 构建子设备认证信息 + IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo( + SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET); + IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO() + .setClientId(subAuthInfo.getClientId()) + .setUsername(subAuthInfo.getUsername()) + .setPassword(subAuthInfo.getPassword()); + + // 2.3 构建请求消息 + IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO(); + params.setSubDevices(Collections.singletonList(subDeviceAuth)); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), + params, + null, null, null); + + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/topo/add", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testTopoAdd][响应消息: {}]", response); + + // 4. 断开连接 + disconnect(client); + } + + /** + * 删除子设备拓扑关系测试 + *

+ * 网关设备向平台上报需要解绑的子设备信息 + */ + @Test + public void testTopoDelete() throws Exception { + // 1. 连接并认证 + MqttClient client = connectAndAuth(); + log.info("[testTopoDelete][连接认证成功]"); + + // 2.1 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/topo/delete_reply", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + subscribeReply(client, replyTopic); + + // 2.2 构建请求消息 + IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO(); + params.setSubDevices(Collections.singletonList( + new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), + params, + null, null, null); + + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/topo/delete", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testTopoDelete][响应消息: {}]", response); + + // 4. 断开连接 + disconnect(client); + } + + /** + * 获取子设备拓扑关系测试 + *

+ * 网关设备向平台查询已绑定的子设备列表 + */ + @Test + public void testTopoGet() throws Exception { + // 1. 连接并认证 + MqttClient client = connectAndAuth(); + log.info("[testTopoGet][连接认证成功]"); + + // 2.1 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/topo/get_reply", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + subscribeReply(client, replyTopic); + + // 2.2 构建请求消息 + IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO(); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), + params, + null, null, null); + + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/topo/get", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testTopoGet][响应消息: {}]", response); + + // 4. 断开连接 + disconnect(client); + } + + // ===================== 子设备注册测试 ===================== + + /** + * 子设备动态注册测试 + *

+ * 网关设备代理子设备进行动态注册,平台返回子设备的 deviceSecret + *

+ * 注意:此接口需要网关认证 + */ + @Test + public void testSubDeviceRegister() throws Exception { + // 1. 连接并认证 + MqttClient client = connectAndAuth(); + log.info("[testSubDeviceRegister][连接认证成功]"); + + // 2.1 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/auth/sub-device/register_reply", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + subscribeReply(client, replyTopic); + + // 2.2 构建请求消息 + IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO(); + subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY); + subDevice.setDeviceName("mougezishebei-mqtt"); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(), + Collections.singletonList(subDevice), + null, null, null); + + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/auth/sub-device/register", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testSubDeviceRegister][响应消息: {}]", response); + + // 4. 断开连接 + disconnect(client); + } + + // ===================== 批量上报测试 ===================== + + /** + * 批量上报属性测试(网关 + 子设备) + *

+ * 网关设备批量上报自身属性、事件,以及子设备的属性、事件 + */ + @Test + public void testPropertyPackPost() throws Exception { + // 1. 连接并认证 + MqttClient client = connectAndAuth(); + log.info("[testPropertyPackPost][连接认证成功]"); + + // 2.1 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/event/property/pack/post_reply", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + subscribeReply(client, replyTopic); + + // 2.2 构建【网关设备】自身属性 + Map gatewayProperties = MapUtil.builder() + .put("temperature", 25.5) + .build(); + + // 2.3 构建【网关设备】自身事件 + IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); + gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build()); + gatewayEvent.setTime(System.currentTimeMillis()); + Map gatewayEvents = MapUtil + .builder() + .put("statusReport", gatewayEvent) + .build(); + + // 2.4 构建【网关子设备】属性 + Map subDeviceProperties = MapUtil.builder() + .put("power", 100) + .build(); + + // 2.5 构建【网关子设备】事件 + IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); + subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build()); + subDeviceEvent.setTime(System.currentTimeMillis()); + Map subDeviceEvents = MapUtil + .builder() + .put("healthCheck", subDeviceEvent) + .build(); + + // 2.6 构建子设备数据 + IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData(); + subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)); + subDeviceData.setProperties(subDeviceProperties); + subDeviceData.setEvents(subDeviceEvents); + + // 2.7 构建请求消息 + IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO(); + params.setProperties(gatewayProperties); + params.setEvents(gatewayEvents); + params.setSubDevices(List.of(subDeviceData)); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), + params, + null, null, null); + + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/event/property/pack/post", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testPropertyPackPost][响应消息: {}]", response); + + // 4. 断开连接 + disconnect(client); + } + + // ===================== 辅助方法 ===================== + + /** + * 创建 MQTT 客户端 + * + * @param authInfo 认证信息 + * @return MQTT 客户端 + */ + private MqttClient connect(IotDeviceAuthReqDTO authInfo) { + MqttClientOptions options = new MqttClientOptions() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()) + .setCleanSession(true) + .setKeepAliveInterval(60); + return MqttClient.create(vertx, options); + } + + /** + * 连接并认证网关设备 + * + * @return 已认证的 MQTT 客户端 + */ + private MqttClient connectAndAuth() throws Exception { + // 1. 创建客户端并连接 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); + MqttClient client = connect(authInfo); + + // 2.1 连接 + CompletableFuture future = new CompletableFuture<>(); + client.connect(SERVER_PORT, SERVER_HOST) + .onComplete(ar -> { + if (ar.succeeded()) { + future.complete(client); + } else { + future.completeExceptionally(ar.cause()); + } + }); + // 2.2 等待连接结果 + return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + /** + * 订阅响应主题 + * + * @param client MQTT 客户端 + * @param replyTopic 响应主题 + */ + private void subscribeReply(MqttClient client, String replyTopic) throws Exception { + // 1. 订阅响应主题 + CompletableFuture future = new CompletableFuture<>(); + client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value()) + .onComplete(ar -> { + if (ar.succeeded()) { + log.info("[subscribeReply][订阅响应主题成功: {}]", replyTopic); + future.complete(null); + } else { + future.completeExceptionally(ar.cause()); + } + }); + // 2. 等待订阅结果 + future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + /** + * 发布消息并等待响应 + * + * @param client MQTT 客户端 + * @param topic 发布主题 + * @param request 请求消息 + * @return 响应消息 + */ + private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) { + // 1. 设置消息处理器,接收响应 + CompletableFuture future = new CompletableFuture<>(); + client.publishHandler(message -> { + log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]", + message.topicName(), message.payload().toString()); + IotDeviceMessage response = CODEC.decode(message.payload().getBytes()); + future.complete(response); + }); + + // 2. 编码并发布消息 + byte[] payload = CODEC.encode(request); + log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]", + CODEC.type(), topic, new String(payload)); + + client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false) + .onComplete(ar -> { + if (ar.succeeded()) { + log.info("[publishAndWaitReply][消息发布成功,messageId={}]", ar.result()); + } else { + log.error("[publishAndWaitReply][消息发布失败]", ar.cause()); + future.completeExceptionally(ar.cause()); + } + }); + + // 3. 等待响应(超时返回 null) + try { + return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("[publishAndWaitReply][等待响应超时或失败]"); + return null; + } + } + + /** + * 断开连接 + * + * @param client MQTT 客户端 + */ + private void disconnect(MqttClient client) throws Exception { + // 1. 断开连接 + CompletableFuture future = new CompletableFuture<>(); + client.disconnect() + .onComplete(ar -> { + if (ar.succeeded()) { + log.info("[disconnect][断开连接成功]"); + future.complete(null); + } else { + future.completeExceptionally(ar.cause()); + } + }); + // 2. 等待断开结果 + future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java new file mode 100644 index 0000000000..f16fa6c4ba --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java @@ -0,0 +1,329 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * IoT 网关子设备 MQTT 协议集成测试(手动测试) + * + *

测试场景:子设备(IotProductDeviceTypeEnum 的 SUB 类型)通过网关设备代理上报数据 + * + *

重要说明:子设备无法直接连接平台,所有请求均由网关设备(Gateway)代为转发。 + *

网关设备转发子设备请求时,使用子设备自己的认证信息连接。 + * + *

使用步骤: + *

    + *
  1. 启动 yudao-module-iot-gateway 服务(MQTT 端口 1883)
  2. + *
  3. 确保子设备已通过 {@link IotGatewayDeviceMqttProtocolIntegrationTest#testTopoAdd()} 绑定到网关
  4. + *
  5. 运行以下测试方法: + *
      + *
    • {@link #testAuth()} - 子设备连接认证
    • + *
    • {@link #testPropertyPost()} - 子设备属性上报(由网关代理转发)
    • + *
    • {@link #testEventPost()} - 子设备事件上报(由网关代理转发)
    • + *
    + *
  6. + *
+ * + *

注意:MQTT 协议是有状态的长连接,认证在连接时通过 username/password 完成, + * 认证成功后同一连接上的后续请求无需再携带认证信息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotGatewaySubDeviceMqttProtocolIntegrationTest { + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 1883; + private static final int TIMEOUT_SECONDS = 10; + + // ===================== 编解码器(MQTT 使用 Alink 协议) ===================== + private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec(); + + // ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== + private static final String PRODUCT_KEY = "jAufEMTF1W6wnPhn"; + private static final String DEVICE_NAME = "chazuo-it"; + private static final String DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af"; + + // ===================== 全局共享 Vertx 实例 ===================== + private static Vertx vertx; + + @BeforeAll + public static void setUp() { + vertx = Vertx.vertx(); + } + + @AfterAll + public static void tearDown() { + if (vertx != null) { + vertx.close(); + } + } + + // ===================== 连接认证测试 ===================== + + /** + * 子设备认证测试:获取子设备 Token + */ + @Test + public void testAuth() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + // 1. 构建认证信息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + log.info("[testAuth][认证信息: clientId={}, username={}, password={}]", + authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword()); + + // 2. 创建客户端并连接 + MqttClient client = connect(authInfo); + client.connect(SERVER_PORT, SERVER_HOST) + .onComplete(ar -> { + if (ar.succeeded()) { + log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId()); + // 断开连接 + client.disconnect() + .onComplete(disconnectAr -> { + if (disconnectAr.succeeded()) { + log.info("[testAuth][断开连接成功]"); + } else { + log.error("[testAuth][断开连接失败]", disconnectAr.cause()); + } + latch.countDown(); + }); + } else { + log.error("[testAuth][连接失败]", ar.cause()); + latch.countDown(); + } + }); + + // 3. 等待测试完成 + boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!completed) { + log.warn("[testAuth][测试超时]"); + } + } + + // ===================== 子设备属性上报测试 ===================== + + /** + * 子设备属性上报测试 + */ + @Test + public void testPropertyPost() throws Exception { + // 1. 连接并认证 + MqttClient client = connectAndAuth(); + log.info("[testPropertyPost][连接认证成功]"); + log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]"); + + // 2. 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME); + subscribeReply(client, replyTopic); + + // 3. 构建属性上报消息 + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + IotDevicePropertyPostReqDTO.of(MapUtil.builder() + .put("power", 100) + .put("status", "online") + .put("temperature", 36.5) + .build()), + null, null, null); + + // 4. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testPropertyPost][响应消息: {}]", response); + + // 5. 断开连接 + disconnect(client); + } + + // ===================== 子设备事件上报测试 ===================== + + /** + * 子设备事件上报测试 + */ + @Test + public void testEventPost() throws Exception { + // 1. 连接并认证 + MqttClient client = connectAndAuth(); + log.info("[testEventPost][连接认证成功]"); + log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]"); + + // 2. 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME); + subscribeReply(client, replyTopic); + + // 3. 构建事件上报消息 + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + IotDeviceEventPostReqDTO.of( + "alarm", + MapUtil.builder() + .put("level", "warning") + .put("message", "temperature too high") + .put("threshold", 40) + .put("current", 42) + .build(), + System.currentTimeMillis()), + null, null, null); + + // 4. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/event/post", PRODUCT_KEY, DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testEventPost][响应消息: {}]", response); + + // 5. 断开连接 + disconnect(client); + } + + // ===================== 辅助方法 ===================== + + /** + * 创建 MQTT 客户端 + * + * @param authInfo 认证信息 + * @return MQTT 客户端 + */ + private MqttClient connect(IotDeviceAuthReqDTO authInfo) { + MqttClientOptions options = new MqttClientOptions() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()) + .setCleanSession(true) + .setKeepAliveInterval(60); + return MqttClient.create(vertx, options); + } + + /** + * 连接并认证子设备 + * + * @return 已认证的 MQTT 客户端 + */ + private MqttClient connectAndAuth() throws Exception { + // 1. 创建客户端并连接 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + MqttClient client = connect(authInfo); + + // 2.1 连接 + CompletableFuture future = new CompletableFuture<>(); + client.connect(SERVER_PORT, SERVER_HOST) + .onComplete(ar -> { + if (ar.succeeded()) { + future.complete(client); + } else { + future.completeExceptionally(ar.cause()); + } + }); + // 2.2 等待连接结果 + return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + /** + * 订阅响应主题 + * + * @param client MQTT 客户端 + * @param replyTopic 响应主题 + */ + private void subscribeReply(MqttClient client, String replyTopic) throws Exception { + // 1. 订阅响应主题 + CompletableFuture future = new CompletableFuture<>(); + client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value()) + .onComplete(ar -> { + if (ar.succeeded()) { + log.info("[subscribeReply][订阅响应主题成功: {}]", replyTopic); + future.complete(null); + } else { + future.completeExceptionally(ar.cause()); + } + }); + // 2. 等待订阅结果 + future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + /** + * 发布消息并等待响应 + * + * @param client MQTT 客户端 + * @param topic 发布主题 + * @param request 请求消息 + * @return 响应消息 + */ + private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) { + // 1. 设置消息处理器,接收响应 + CompletableFuture future = new CompletableFuture<>(); + client.publishHandler(message -> { + log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]", + message.topicName(), message.payload().toString()); + IotDeviceMessage response = CODEC.decode(message.payload().getBytes()); + future.complete(response); + }); + + // 2. 编码并发布消息 + byte[] payload = CODEC.encode(request); + log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]", + CODEC.type(), topic, new String(payload)); + + client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false) + .onComplete(ar -> { + if (ar.succeeded()) { + log.info("[publishAndWaitReply][消息发布成功,messageId={}]", ar.result()); + } else { + log.error("[publishAndWaitReply][消息发布失败]", ar.cause()); + future.completeExceptionally(ar.cause()); + } + }); + + // 3. 等待响应(超时返回 null) + try { + return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("[publishAndWaitReply][等待响应超时或失败]"); + return null; + } + } + + /** + * 断开连接 + * + * @param client MQTT 客户端 + */ + private void disconnect(MqttClient client) throws Exception { + // 1. 断开连接 + CompletableFuture future = new CompletableFuture<>(); + client.disconnect() + .onComplete(ar -> { + if (ar.succeeded()) { + log.info("[disconnect][断开连接成功]"); + future.complete(null); + } else { + future.completeExceptionally(ar.cause()); + } + }); + // 2. 等待断开结果 + future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java index 5ac3cf2b52..98060d3c61 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java @@ -68,7 +68,7 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { // ===================== 认证测试 ===================== /** - * 认证测试:设备认证 + * 认证测试:获取设备 Token */ @Test public void testAuth() throws Exception { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java index 22b2cb9f44..6c21de749e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java @@ -81,7 +81,7 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { // ===================== 认证测试 ===================== /** - * 网关设备认证测试 + * 网关设备认证测试:获取网关设备 Token */ @Test public void testAuth() throws Exception { @@ -341,6 +341,9 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { /** * 执行网关设备认证 + * + * @param socket TCP 连接 + * @return 认证响应消息 */ private IotDeviceMessage authenticate(Socket socket) throws Exception { IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( @@ -360,6 +363,10 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { /** * 发送 TCP 请求并接收响应 + * + * @param socket TCP Socket + * @param payload 请求数据 + * @return 响应数据 */ private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception { // 1. 发送请求 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java index eb0cbb092d..a7fcc654a6 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java @@ -69,7 +69,7 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { // ===================== 认证测试 ===================== /** - * 子设备认证测试 + * 子设备认证测试:获取子设备 Token */ @Test public void testAuth() throws Exception { @@ -191,6 +191,9 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { /** * 执行子设备认证 + * + * @param socket TCP 连接 + * @return 认证响应消息 */ private IotDeviceMessage authenticate(Socket socket) throws Exception { IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); @@ -209,6 +212,10 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { /** * 发送 TCP 请求并接收响应 + * + * @param socket TCP Socket + * @param payload 请求数据 + * @return 响应数据 */ private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception { // 1. 发送请求 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java index e15212f54a..680512ad36 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java @@ -6,6 +6,7 @@ import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; @@ -106,6 +107,46 @@ public class IotDirectDeviceUdpProtocolIntegrationTest { } } + // ===================== 动态注册测试 ===================== + + /** + * 直连设备动态注册测试(一型一密) + *

+ * 使用产品密钥(productSecret)验证身份,成功后返回设备密钥(deviceSecret) + *

+ * 注意:此接口不需要认证 + */ + @Test + public void testDeviceRegister() throws Exception { + // 1.1 构建注册消息 + IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO(); + registerReqDTO.setProductKey(PRODUCT_KEY); + registerReqDTO.setDeviceName("test-udp-" + System.currentTimeMillis()); + registerReqDTO.setProductSecret("test-product-secret"); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testDeviceRegister][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { + log.info("[testDeviceRegister][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); + } + + // 2.1 发送请求 + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(TIMEOUT_MS); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testDeviceRegister][响应消息: {}]", response); + log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]"); + } else { + log.warn("[testDeviceRegister][未收到响应]"); + } + } + } + // ===================== 直连设备属性上报测试 ===================== /**