diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java new file mode 100644 index 0000000000..3e5e100549 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java @@ -0,0 +1,370 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * IoT 直连设备 MQTT 协议集成测试(手动测试) + * + *
测试场景:直连设备(IotProductDeviceTypeEnum 的 DIRECT 类型)通过 MQTT 协议直接连接平台 + * + *
使用步骤: + *
注意:MQTT 协议是有状态的长连接,认证在连接时通过 username/password 完成,
+ * 认证成功后同一连接上的后续请求无需再携带认证信息
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class IotDirectDeviceMqttProtocolIntegrationTest {
+
+ private static final String SERVER_HOST = "127.0.0.1";
+ private static final int SERVER_PORT = 1883;
+ private static final int TIMEOUT_SECONDS = 10;
+
+ // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) =====================
+ private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
+ private static final String DEVICE_NAME = "small";
+ private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3";
+
+ // ===================== 全局共享 Vertx 实例 =====================
+ private static Vertx vertx;
+
+ @BeforeAll
+ public static void setUp() {
+ vertx = Vertx.vertx();
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ if (vertx != null) {
+ vertx.close();
+ }
+ }
+
+ // ===================== 连接认证测试 =====================
+
+ /**
+ * 连接认证测试:设备通过 MQTT 协议连接平台
+ */
+ @Test
+ public void testConnect() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // 1. 构建认证信息
+ IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
+ log.info("[testConnect][认证信息: clientId={}, username={}, password={}]",
+ authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword());
+
+ // 2. 创建 MQTT 客户端配置
+ MqttClientOptions options = new MqttClientOptions()
+ .setClientId(authInfo.getClientId())
+ .setUsername(authInfo.getUsername())
+ .setPassword(authInfo.getPassword())
+ .setCleanSession(true)
+ .setKeepAliveInterval(60);
+
+ // 3. 创建 MQTT 客户端并连接
+ MqttClient client = MqttClient.create(vertx, options);
+ client.connect(SERVER_PORT, SERVER_HOST)
+ .onComplete(ar -> {
+ if (ar.succeeded()) {
+ log.info("[testConnect][连接成功,客户端 ID: {}]", client.clientId());
+ // 断开连接
+ client.disconnect()
+ .onComplete(disconnectAr -> {
+ if (disconnectAr.succeeded()) {
+ log.info("[testConnect][断开连接成功]");
+ } else {
+ log.error("[testConnect][断开连接失败]", disconnectAr.cause());
+ }
+ latch.countDown();
+ });
+ } else {
+ log.error("[testConnect][连接失败]", ar.cause());
+ latch.countDown();
+ }
+ });
+
+ // 4. 等待测试完成
+ boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ if (!completed) {
+ log.warn("[testConnect][测试超时]");
+ }
+ }
+
+ // ===================== 直连设备属性上报测试 =====================
+
+ /**
+ * 属性上报测试
+ */
+ @Test
+ public void testPropertyPost() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // 1. 构建认证信息
+ IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
+
+ // 2. 创建 MQTT 客户端配置
+ MqttClientOptions options = new MqttClientOptions()
+ .setClientId(authInfo.getClientId())
+ .setUsername(authInfo.getUsername())
+ .setPassword(authInfo.getPassword())
+ .setCleanSession(true)
+ .setKeepAliveInterval(60);
+
+ // 3. 创建 MQTT 客户端并连接
+ MqttClient client = MqttClient.create(vertx, options);
+ client.connect(SERVER_PORT, SERVER_HOST)
+ .onComplete(connectAr -> {
+ if (connectAr.succeeded()) {
+ log.info("[testPropertyPost][连接成功]");
+
+ // 4.1 设置消息处理器,接收 _reply 响应
+ client.publishHandler(message -> {
+ log.info("[testPropertyPost][收到响应: topic={}, payload={}]",
+ message.topicName(), message.payload().toString());
+ });
+
+ // 4.2 订阅 _reply 主题
+ String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME);
+ client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value())
+ .onComplete(subscribeAr -> {
+ if (subscribeAr.succeeded()) {
+ log.info("[testPropertyPost][订阅响应主题成功: {}]", replyTopic);
+
+ // 5. 构建属性上报消息(Alink 协议格式)
+ String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME);
+ String payload = JsonUtils.toJsonString(MapUtil.builder()
+ .put("id", IdUtil.fastSimpleUUID())
+ .put("version", "1.0")
+ .put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
+ .put("params", IotDevicePropertyPostReqDTO.of(MapUtil.