From de1a53a5f14af453f3c22459c409db6d26e5f0e5 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Mon, 26 Jan 2026 21:16:43 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91UDP=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=EF=BC=9A=E5=85=BC=E5=AE=B9=E4=B8=8B=E8=A1=8C?= =?UTF-8?q?=E7=9A=84=E6=97=B6=E5=80=99=EF=BC=8C=E5=9F=BA=E4=BA=8E=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E7=9A=84=20codec=20=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 4 +- .../udp/IotUdpDownstreamSubscriber.java | 5 +- .../udp/manager/IotUdpSessionManager.java | 73 +++++-- .../udp/router/IotUdpDownstreamHandler.java | 22 +-- .../udp/router/IotUdpUpstreamHandler.java | 8 +- ...irectDeviceUdpProtocolIntegrationTest.java | 164 ++++++++-------- ...tewayDeviceUdpProtocolIntegrationTest.java | 183 +++++++++++------- ...aySubDeviceUdpProtocolIntegrationTest.java | 110 +++++++---- 8 files changed, 329 insertions(+), 240 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 1b475e9fce..85d394f4e1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -223,11 +223,9 @@ public class IotGatewayConfiguration { @Bean public IotUdpDownstreamSubscriber iotUdpDownstreamSubscriber(IotUdpUpstreamProtocol protocolHandler, IotDeviceMessageService messageService, - IotDeviceService deviceService, IotUdpSessionManager sessionManager, IotMessageBus messageBus) { - return new IotUdpDownstreamSubscriber(protocolHandler, messageService, deviceService, sessionManager, - messageBus); + return new IotUdpDownstreamSubscriber(protocolHandler, messageService, sessionManager, messageBus); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpDownstreamSubscriber.java index 29a2afa159..1bfa46bff1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpDownstreamSubscriber.java @@ -6,7 +6,6 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpDownstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; @@ -25,8 +24,6 @@ public class IotUdpDownstreamSubscriber implements IotMessageSubscriber 设备地址(用于下行消息发送) + * 设备 ID -> 会话信息(包含地址和 codecType) */ - private final Map deviceAddressMap = new ConcurrentHashMap<>(); + private final Map deviceSessionMap = new ConcurrentHashMap<>(); /** * 设备地址 Key -> 最后活跃时间(用于清理) @@ -44,18 +45,41 @@ public class IotUdpSessionManager { private final Map addressDeviceMap = new ConcurrentHashMap<>(); /** - * 更新设备地址(每次收到上行消息时调用) + * 更新设备会话(每次收到上行消息时调用) + * + * @param deviceId 设备 ID + * @param address 设备地址 + * @param codecType 消息编解码类型 + */ + public void updateDeviceSession(Long deviceId, InetSocketAddress address, String codecType) { + String addressKey = buildAddressKey(address); + // 更新设备会话映射 + deviceSessionMap.put(deviceId, new SessionInfo().setAddress(address).setCodecType(codecType)); + lastActiveTimeMap.put(addressKey, LocalDateTime.now()); + addressDeviceMap.put(addressKey, deviceId); + log.debug("[updateDeviceSession][更新设备会话,设备 ID: {},地址: {},codecType: {}]", deviceId, addressKey, codecType); + } + + /** + * 更新设备地址(兼容旧接口,默认不更新 codecType) * * @param deviceId 设备 ID * @param address 设备地址 */ public void updateDeviceAddress(Long deviceId, InetSocketAddress address) { - String addressKey = buildAddressKey(address); - // 更新设备地址映射 - deviceAddressMap.put(deviceId, address); - lastActiveTimeMap.put(addressKey, LocalDateTime.now()); - addressDeviceMap.put(addressKey, deviceId); - log.debug("[updateDeviceAddress][更新设备地址,设备 ID: {},地址: {}]", deviceId, addressKey); + SessionInfo sessionInfo = deviceSessionMap.get(deviceId); + String codecType = sessionInfo != null ? sessionInfo.getCodecType() : null; + updateDeviceSession(deviceId, address, codecType); + } + + /** + * 获取设备会话信息 + * + * @param deviceId 设备 ID + * @return 会话信息 + */ + public SessionInfo getSessionInfo(Long deviceId) { + return deviceSessionMap.get(deviceId); } /** @@ -65,7 +89,7 @@ public class IotUdpSessionManager { * @return 是否在线 */ public boolean isDeviceOnline(Long deviceId) { - return deviceAddressMap.containsKey(deviceId); + return deviceSessionMap.containsKey(deviceId); } /** @@ -87,12 +111,13 @@ public class IotUdpSessionManager { * @return 是否发送成功 */ public boolean sendToDevice(Long deviceId, byte[] data, DatagramSocket socket) { - InetSocketAddress address = deviceAddressMap.get(deviceId); - if (address == null) { - log.warn("[sendToDevice][设备地址不存在,设备 ID: {}]", deviceId); + SessionInfo sessionInfo = deviceSessionMap.get(deviceId); + if (sessionInfo == null || sessionInfo.getAddress() == null) { + log.warn("[sendToDevice][设备会话不存在,设备 ID: {}]", deviceId); return false; } + InetSocketAddress address = sessionInfo.getAddress(); try { socket.send(Buffer.buffer(data), address.getPort(), address.getHostString(), result -> { if (result.succeeded()) { @@ -134,8 +159,8 @@ public class IotUdpSessionManager { iterator.remove(); continue; } - InetSocketAddress address = deviceAddressMap.remove(deviceId); - if (address == null) { + SessionInfo sessionInfo = deviceSessionMap.remove(deviceId); + if (sessionInfo == null) { iterator.remove(); continue; } @@ -157,4 +182,22 @@ public class IotUdpSessionManager { return address.getHostString() + ":" + address.getPort(); } + /** + * 会话信息 + */ + @Data + public static class SessionInfo { + + /** + * 设备地址 + */ + private InetSocketAddress address; + + /** + * 消息编解码类型 + */ + private String codecType; + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpDownstreamHandler.java index c8da38ccc4..6aeb2cb7aa 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpDownstreamHandler.java @@ -1,10 +1,8 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp.router; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.datagram.DatagramSocket; import lombok.extern.slf4j.Slf4j; @@ -19,18 +17,14 @@ public class IotUdpDownstreamHandler { private final IotDeviceMessageService deviceMessageService; - private final IotDeviceService deviceService; - private final IotUdpSessionManager sessionManager; private final IotUdpUpstreamProtocol protocol; public IotUdpDownstreamHandler(IotDeviceMessageService deviceMessageService, - IotDeviceService deviceService, IotUdpSessionManager sessionManager, IotUdpUpstreamProtocol protocol) { this.deviceMessageService = deviceMessageService; - this.deviceService = deviceService; this.sessionManager = sessionManager; this.protocol = protocol; } @@ -45,21 +39,15 @@ public class IotUdpDownstreamHandler { log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]", message.getDeviceId(), message.getMethod(), message.getId()); - // 1.1 获取设备信息 - IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId()); - if (deviceInfo == null) { - log.error("[handle][设备不存在,设备 ID: {}]", message.getDeviceId()); - return; - } - // 1.2 检查设备是否在线(即是否有地址映射) - if (sessionManager.isDeviceOffline(message.getDeviceId())) { + // 1. 获取会话信息(包含 codecType) + IotUdpSessionManager.SessionInfo sessionInfo = sessionManager.getSessionInfo(message.getDeviceId()); + if (sessionInfo == null) { log.warn("[handle][设备不在线,设备 ID: {}]", message.getDeviceId()); return; } - // 2. 根据产品 Key 和设备名称编码消息,并发送到设备 - byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), - deviceInfo.getDeviceName()); + // 2. 使用会话中的 codecType 编码消息,并发送到设备 + byte[] bytes = deviceMessageService.encodeDeviceMessage(message, sessionInfo.getCodecType()); DatagramSocket socket = protocol.getUdpSocket(); if (socket == null) { log.error("[handle][UDP Socket 不可用,设备 ID: {}]", message.getDeviceId()); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java index e982af340f..80ab76a8e5 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java @@ -195,8 +195,8 @@ public class IotUdpUpstreamHandler { // 3.1 生成 JWT Token(无状态) String token = deviceTokenService.createToken(device.getProductKey(), device.getDeviceName()); - // 3.2 更新设备地址映射(用于下行消息) - sessionManager.updateDeviceAddress(device.getId(), senderAddress); + // 3.2 更新设备会话信息(用于下行消息,保存 codecType) + sessionManager.updateDeviceSession(device.getId(), senderAddress, codecType); // 3.3 发送上线消息 sendOnlineMessage(device); @@ -298,8 +298,8 @@ public class IotUdpUpstreamHandler { return; } - // 3. 更新设备地址映射(保持最新) - sessionManager.updateDeviceAddress(device.getId(), senderAddress); + // 3. 更新设备会话信息(保持最新,保存 codecType) + sessionManager.updateDeviceSession(device.getId(), senderAddress, codecType); // 4. 将 body 设置为实际的 params,发送消息到消息总线 message.setParams(body); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java index 837c29ce62..e15212f54a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java @@ -1,21 +1,23 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp; import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.HexUtil; import cn.hutool.core.util.IdUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; -import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -24,10 +26,16 @@ import java.util.Map; * *

测试场景:直连设备(IotProductDeviceTypeEnum 的 DIRECT 类型)通过 UDP 协议直接连接平台 * + *

支持两种编解码格式: + *

    + *
  • {@link IotTcpJsonDeviceMessageCodec} - JSON 格式
  • + *
  • {@link IotTcpBinaryDeviceMessageCodec} - 二进制格式
  • + *
+ * *

使用步骤: *

    - *
  1. 启动 yudao-module-iot-gateway 服务(UDP 端口 8092)
  2. - *
  3. 运行 {@link #testDeviceRegister()} 测试直连设备动态注册(一型一密)
  4. + *
  5. 启动 yudao-module-iot-gateway 服务(UDP 端口 8093)
  6. + *
  7. 修改 {@link #CODEC} 选择测试的编解码格式
  8. *
  9. 运行 {@link #testAuth()} 获取设备 token,将返回的 token 粘贴到 {@link #TOKEN} 常量
  10. *
  11. 运行以下测试方法: *
      @@ -48,6 +56,10 @@ public class IotDirectDeviceUdpProtocolIntegrationTest { private static final int SERVER_PORT = 8093; private static final int TIMEOUT_MS = 5000; + // ===================== 编解码器选择(修改此处切换 JSON / Binary) ===================== + private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec(); +// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec(); + // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT"; private static final String DEVICE_NAME = "small"; @@ -65,27 +77,32 @@ public class IotDirectDeviceUdpProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - // 1.1 构建请求 + // 1.1 构建认证消息 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() .setClientId(authInfo.getClientId()) .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", "auth") - .put("params", authReqDTO) - .build()); - // 1.2 输出请求 - log.info("[testAuth][请求体: {}]", payload); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { + log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); + } // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testAuth][响应体: {}]", response); - log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]"); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testAuth][响应消息: {}]", response); + log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]"); + } else { + log.warn("[testAuth][未收到响应]"); + } } } @@ -96,25 +113,30 @@ public class IotDirectDeviceUdpProtocolIntegrationTest { */ @Test public void testPropertyPost() throws Exception { - // 1.1 构建请求(UDP 协议:token 放在 params 中) - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()) - .put("version", "1.0") - .put("params", withToken(IotDevicePropertyPostReqDTO.of(MapUtil.builder() + // 1.1 构建属性上报消息(UDP 协议:token 放在 params 中) + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + withToken(IotDevicePropertyPostReqDTO.of(MapUtil.builder() .put("width", 1) .put("height", "2") - .build()))) - .build()); - // 1.2 输出请求 - log.info("[testPropertyPost][请求体: {}]", payload); + .build())), + null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); - // 2. 发送请求 + // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testPropertyPost][响应体: {}]", response); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testPropertyPost][响应消息: {}]", response); + } else { + log.warn("[testPropertyPost][未收到响应]"); + } } } @@ -125,60 +147,30 @@ public class IotDirectDeviceUdpProtocolIntegrationTest { */ @Test public void testEventPost() throws Exception { - // 1.1 构建请求(UDP 协议:token 放在 params 中) - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod()) - .put("version", "1.0") - .put("params", withToken(IotDeviceEventPostReqDTO.of( + // 1.1 构建事件上报消息(UDP 协议:token 放在 params 中) + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + withToken(IotDeviceEventPostReqDTO.of( "eat", MapUtil.builder().put("rice", 3).build(), - System.currentTimeMillis()))) - .build()); - // 1.2 输出请求 - log.info("[testEventPost][请求体: {}]", payload); + System.currentTimeMillis())), + null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testEventPost][响应体: {}]", response); - } - } - - // ===================== 动态注册测试 ===================== - - /** - * 直连设备动态注册测试(一型一密) - *

      - * 使用产品密钥(productSecret)验证身份,成功后返回设备密钥(deviceSecret) - *

      - * 注意:此接口不需要 Token 认证 - */ - @Test - public void testDeviceRegister() throws Exception { - // 1.1 构建请求参数 - IotDeviceRegisterReqDTO reqDTO = new IotDeviceRegisterReqDTO(); - reqDTO.setProductKey(PRODUCT_KEY); - reqDTO.setDeviceName("test-" + System.currentTimeMillis()); - reqDTO.setProductSecret("test-product-secret"); - // 1.2 构建请求 - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod()) - .put("params", reqDTO) - .build()); - // 1.3 输出请求 - log.info("[testDeviceRegister][请求体: {}]", payload); - - // 2.1 发送请求 - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testDeviceRegister][响应体: {}]", response); - log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]"); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testEventPost][响应消息: {}]", response); + } else { + log.warn("[testEventPost][未收到响应]"); + } } } @@ -201,20 +193,18 @@ public class IotDirectDeviceUdpProtocolIntegrationTest { return result; } - /** * 发送 UDP 请求并接收响应 * * @param socket UDP Socket - * @param payload 请求体 - * @return 响应内容 + * @param payload 请求数据 + * @return 响应数据 */ - public static String sendAndReceive(DatagramSocket socket, String payload) throws Exception { - byte[] sendData = payload.getBytes(StandardCharsets.UTF_8); + public static byte[] sendAndReceive(DatagramSocket socket, byte[] payload) throws Exception { InetAddress address = InetAddress.getByName(SERVER_HOST); // 发送请求 - DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, address, SERVER_PORT); + DatagramPacket sendPacket = new DatagramPacket(payload, payload.length, address, SERVER_PORT); socket.send(sendPacket); // 接收响应 @@ -222,7 +212,9 @@ public class IotDirectDeviceUdpProtocolIntegrationTest { DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); try { socket.receive(receivePacket); - return new String(receivePacket.getData(), 0, receivePacket.getLength(), StandardCharsets.UTF_8); + byte[] response = new byte[receivePacket.getLength()]; + System.arraycopy(receivePacket.getData(), 0, response, 0, receivePacket.getLength()); + return response; } catch (java.net.SocketTimeoutException e) { log.warn("[sendAndReceive][接收响应超时]"); return null; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewayDeviceUdpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewayDeviceUdpProtocolIntegrationTest.java index 16694720ee..e58f5bbc55 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewayDeviceUdpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewayDeviceUdpProtocolIntegrationTest.java @@ -1,10 +1,11 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp; import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.HexUtil; import cn.hutool.core.util.IdUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO; @@ -12,6 +13,9 @@ import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO; import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO; import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; @@ -28,9 +32,16 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd * *

      测试场景:网关设备(IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 UDP 协议管理子设备拓扑关系 * + *

      支持两种编解码格式: + *

        + *
      • {@link IotTcpJsonDeviceMessageCodec} - JSON 格式
      • + *
      • {@link IotTcpBinaryDeviceMessageCodec} - 二进制格式
      • + *
      + * *

      使用步骤: *

        *
      1. 启动 yudao-module-iot-gateway 服务(UDP 端口 8093)
      2. + *
      3. 修改 {@link #CODEC} 选择测试的编解码格式
      4. *
      5. 运行 {@link #testAuth()} 获取网关设备 token,将返回的 token 粘贴到 {@link #GATEWAY_TOKEN} 常量
      6. *
      7. 运行以下测试方法: *
          @@ -50,10 +61,12 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd @Slf4j public class IotGatewayDeviceUdpProtocolIntegrationTest { - private static final String SERVER_HOST = "127.0.0.1"; - private static final int SERVER_PORT = 8093; private static final int TIMEOUT_MS = 5000; + // ===================== 编解码器选择(修改此处切换 JSON / Binary) ===================== + private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec(); +// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec(); + // ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) ===================== private static final String GATEWAY_PRODUCT_KEY = "m6XcS1ZJ3TW8eC0v"; private static final String GATEWAY_DEVICE_NAME = "sub-ddd"; @@ -76,28 +89,33 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - // 1.1 构建请求 + // 1.1 构建认证消息 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() .setClientId(authInfo.getClientId()) .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", "auth") - .put("params", authReqDTO) - .build()); - // 1.2 输出请求 - log.info("[testAuth][请求体: {}]", payload); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { + log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); + } // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testAuth][响应体: {}]", response); - log.info("[testAuth][请将返回的 token 复制到 GATEWAY_TOKEN 常量中]"); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testAuth][响应消息: {}]", response); + log.info("[testAuth][请将返回的 token 复制到 GATEWAY_TOKEN 常量中]"); + } else { + log.warn("[testAuth][未收到响应]"); + } } } @@ -120,21 +138,26 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest { // 1.2 构建请求参数 IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO(); params.setSubDevices(Collections.singletonList(subDeviceAuth)); - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.TOPO_ADD.getMethod()) - .put("version", "1.0") - .put("params", withToken(params)) - .build()); - // 1.3 输出请求 - log.info("[testTopoAdd][请求体: {}]", payload); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), + withToken(params), + null, null, null); + // 1.3 编码 + byte[] payload = CODEC.encode(request); + log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request); // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testTopoAdd][响应体: {}]", response); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testTopoAdd][响应消息: {}]", response); + } else { + log.warn("[testTopoAdd][未收到响应]"); + } } } @@ -149,21 +172,26 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest { IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO(); params.setSubDevices(Collections.singletonList( new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))); - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod()) - .put("version", "1.0") - .put("params", withToken(params)) - .build()); - // 1.2 输出请求 - log.info("[testTopoDelete][请求体: {}]", payload); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), + withToken(params), + null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request); // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testTopoDelete][响应体: {}]", response); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testTopoDelete][响应消息: {}]", response); + } else { + log.warn("[testTopoDelete][未收到响应]"); + } } } @@ -176,21 +204,26 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest { public void testTopoGet() throws Exception { // 1.1 构建请求参数(目前为空,预留扩展) IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO(); - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.TOPO_GET.getMethod()) - .put("version", "1.0") - .put("params", withToken(params)) - .build()); - // 1.2 输出请求 - log.info("[testTopoGet][请求体: {}]", payload); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), + withToken(params), + null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request); // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testTopoGet][响应体: {}]", response); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testTopoGet][响应消息: {}]", response); + } else { + log.warn("[testTopoGet][未收到响应]"); + } } } @@ -209,21 +242,26 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest { IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO(); subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY); subDevice.setDeviceName("mougezishebei"); - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod()) - .put("version", "1.0") - .put("params", withToken(Collections.singletonList(subDevice))) - .build()); - // 1.2 输出请求 - log.info("[testSubDeviceRegister][请求体: {}]", payload); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(), + withToken(Collections.singletonList(subDevice)), + null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request); // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testSubDeviceRegister][响应体: {}]", response); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testSubDeviceRegister][响应消息: {}]", response); + } else { + log.warn("[testSubDeviceRegister][未收到响应]"); + } } } @@ -268,21 +306,26 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest { params.setProperties(gatewayProperties); params.setEvents(gatewayEvents); params.setSubDevices(List.of(subDeviceData)); - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod()) - .put("version", "1.0") - .put("params", withToken(params)) - .build()); - // 1.7 输出请求 - log.info("[testPropertyPackPost][请求体: {}]", payload); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), + withToken(params), + null, null, null); + // 1.7 编码 + byte[] payload = CODEC.encode(request); + log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testPropertyPackPost][响应体: {}]", response); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testPropertyPackPost][响应消息: {}]", response); + } else { + log.warn("[testPropertyPackPost][未收到响应]"); + } } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewaySubDeviceUdpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewaySubDeviceUdpProtocolIntegrationTest.java index 644d89b63b..ff775196ff 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewaySubDeviceUdpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewaySubDeviceUdpProtocolIntegrationTest.java @@ -1,13 +1,17 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp; import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.HexUtil; import cn.hutool.core.util.IdUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; @@ -25,10 +29,17 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd *

          重要说明:子设备无法直接连接平台,所有请求均由网关设备(Gateway)代为转发。 *

          网关设备转发子设备请求时,Token 使用子设备自己的信息。 * + *

          支持两种编解码格式: + *

            + *
          • {@link IotTcpJsonDeviceMessageCodec} - JSON 格式
          • + *
          • {@link IotTcpBinaryDeviceMessageCodec} - 二进制格式
          • + *
          + * *

          使用步骤: *

            *
          1. 启动 yudao-module-iot-gateway 服务(UDP 端口 8093)
          2. *
          3. 确保子设备已通过 {@link IotGatewayDeviceUdpProtocolIntegrationTest#testTopoAdd()} 绑定到网关
          4. + *
          5. 修改 {@link #CODEC} 选择测试的编解码格式
          6. *
          7. 运行 {@link #testAuth()} 获取子设备 token,将返回的 token 粘贴到 {@link #TOKEN} 常量
          8. *
          9. 运行以下测试方法: *
              @@ -45,10 +56,12 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd @Slf4j public class IotGatewaySubDeviceUdpProtocolIntegrationTest { - private static final String SERVER_HOST = "127.0.0.1"; - private static final int SERVER_PORT = 8093; private static final int TIMEOUT_MS = 5000; + // ===================== 编解码器选择(修改此处切换 JSON / Binary) ===================== + private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec(); +// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec(); + // ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== private static final String PRODUCT_KEY = "jAufEMTF1W6wnPhn"; private static final String DEVICE_NAME = "chazuo-it"; @@ -66,27 +79,32 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - // 1.1 构建请求 + // 1.1 构建认证消息 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() .setClientId(authInfo.getClientId()) .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", "auth") - .put("params", authReqDTO) - .build()); - // 1.2 输出请求 - log.info("[testAuth][请求体: {}]", payload); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { + log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); + } // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testAuth][响应体: {}]", response); - log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]"); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testAuth][响应消息: {}]", response); + log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]"); + } else { + log.warn("[testAuth][未收到响应]"); + } } } @@ -97,27 +115,32 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest { */ @Test public void testPropertyPost() throws Exception { - // 1.1 构建请求(UDP 协议:token 放在 params 中) - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()) - .put("version", "1.0") - .put("params", withToken(IotDevicePropertyPostReqDTO.of(MapUtil.builder() + // 1.1 构建属性上报消息(UDP 协议:token 放在 params 中) + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + withToken(IotDevicePropertyPostReqDTO.of(MapUtil.builder() .put("power", 100) .put("status", "online") .put("temperature", 36.5) - .build()))) - .build()); - // 1.2 输出请求 + .build())), + null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]"); - log.info("[testPropertyPost][请求体: {}]", payload); + log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testPropertyPost][响应体: {}]", response); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testPropertyPost][响应消息: {}]", response); + } else { + log.warn("[testPropertyPost][未收到响应]"); + } } } @@ -128,12 +151,11 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest { */ @Test public void testEventPost() throws Exception { - // 1.1 构建请求(UDP 协议:token 放在 params 中) - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod()) - .put("version", "1.0") - .put("params", withToken(IotDeviceEventPostReqDTO.of( + // 1.1 构建事件上报消息(UDP 协议:token 放在 params 中) + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + withToken(IotDeviceEventPostReqDTO.of( "alarm", MapUtil.builder() .put("level", "warning") @@ -141,18 +163,24 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest { .put("threshold", 40) .put("current", 42) .build(), - System.currentTimeMillis()))) - .build()); - // 1.2 输出请求 + System.currentTimeMillis())), + null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]"); - log.info("[testEventPost][请求体: {}]", payload); + log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testEventPost][响应体: {}]", response); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testEventPost][响应消息: {}]", response); + } else { + log.warn("[testEventPost][未收到响应]"); + } } }