diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 3e573efdde..68266b9cca 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -17,6 +17,9 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDowns import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; @@ -194,4 +197,39 @@ public class IotGatewayConfiguration { } + /** + * IoT 网关 UDP 协议配置类 + */ + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.udp", name = "enabled", havingValue = "true") + @Slf4j + public static class UdpProtocolConfiguration { + + @Bean(name = "udpVertx", destroyMethod = "close") + public Vertx udpVertx() { + return Vertx.vertx(); + } + + @Bean + public IotUdpUpstreamProtocol iotUdpUpstreamProtocol(IotGatewayProperties gatewayProperties, + IotDeviceService deviceService, + IotDeviceMessageService messageService, + IotUdpSessionManager sessionManager, + @Qualifier("udpVertx") Vertx udpVertx) { + return new IotUdpUpstreamProtocol(gatewayProperties.getProtocol().getUdp(), + deviceService, messageService, sessionManager, udpVertx); + } + + @Bean + public IotUdpDownstreamSubscriber iotUdpDownstreamSubscriber(IotUdpUpstreamProtocol protocolHandler, + IotDeviceMessageService messageService, + IotDeviceService deviceService, + IotUdpSessionManager sessionManager, + IotMessageBus messageBus) { + return new IotUdpDownstreamSubscriber(protocolHandler, messageService, deviceService, sessionManager, + messageBus); + } + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 7655a3759e..a577f88f40 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -93,6 +93,11 @@ public class IotGatewayProperties { */ private MqttWsProperties mqttWs; + /** + * UDP 组件配置 + */ + private UdpProperties udp; + } @Data @@ -503,4 +508,42 @@ public class IotGatewayProperties { } + @Data + public static class UdpProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + + /** + * 服务端口(默认 8092) + */ + private Integer port = 8092; + + /** + * 接收缓冲区大小(默认 64KB) + */ + private Integer receiveBufferSize = 65536; + + /** + * 发送缓冲区大小(默认 64KB) + */ + private Integer sendBufferSize = 65536; + + /** + * 会话超时时间(毫秒,默认 60 秒) + *

+ * 用于清理不活跃的设备地址映射 + */ + private Long sessionTimeoutMs = 60000L; + + /** + * 会话清理间隔(毫秒,默认 30 秒) + */ + private Long sessionCleanIntervalMs = 30000L; + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java index 3ee31d82e4..68d4bdfaac 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java @@ -43,7 +43,7 @@ public class IotTcpDownstreamHandler { return; } - // 2. 根据产品 Key 和设备名称编码消息并发送到设备 + // 2. 根据产品 Key 和设备名称编码消息,并发送到设备 byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpDownstreamSubscriber.java new file mode 100644 index 0000000000..29a2afa159 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpDownstreamSubscriber.java @@ -0,0 +1,67 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.udp; + +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 UDP 下游订阅者:接收下行给设备的消息 + * + * @author 芋道源码 + */ +@Slf4j +@RequiredArgsConstructor +public class IotUdpDownstreamSubscriber implements IotMessageSubscriber { + + private final IotUdpUpstreamProtocol protocol; + + private final IotDeviceMessageService messageService; + + private final IotDeviceService deviceService; + + private final IotUdpSessionManager sessionManager; + + private final IotMessageBus messageBus; + + private IotUdpDownstreamHandler downstreamHandler; + + @PostConstruct + public void init() { + // 初始化下游处理器 + this.downstreamHandler = new IotUdpDownstreamHandler(messageService, deviceService, sessionManager, protocol); + + messageBus.register(this); + log.info("[init][UDP 下游订阅者初始化完成,服务器 ID: {},Topic: {}]", + protocol.getServerId(), getTopic()); + } + + @Override + public String getTopic() { + return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId()); + } + + @Override + public String getGroup() { + // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group + return getTopic(); + } + + @Override + public void onMessage(IotDeviceMessage message) { + try { + downstreamHandler.handle(message); + } catch (Exception e) { + log.error("[onMessage][处理下行消息失败,设备 ID: {},方法: {},消息 ID: {}]", + message.getDeviceId(), message.getMethod(), message.getId(), e); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java new file mode 100644 index 0000000000..32a59a982c --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java @@ -0,0 +1,170 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.udp; + +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Vertx; +import io.vertx.core.datagram.DatagramSocket; +import io.vertx.core.datagram.DatagramSocketOptions; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 UDP 协议:接收设备上行消息 + *

+ * 采用 Vertx DatagramSocket 实现 UDP 服务器,主要功能: + * 1. 监听 UDP 端口,接收设备消息 + * 2. 定期清理不活跃的设备地址映射 + * 3. 提供 UDP Socket 用于下行消息发送 + * + * @author 芋道源码 + */ +@Slf4j +public class IotUdpUpstreamProtocol { + + private final IotGatewayProperties.UdpProperties udpProperties; + + private final IotDeviceService deviceService; + + private final IotDeviceMessageService messageService; + + private final IotUdpSessionManager sessionManager; + + private final Vertx vertx; + + @Getter + private final String serverId; + + @Getter + private DatagramSocket udpSocket; + + /** + * 会话清理定时器 ID + */ + private Long cleanTimerId; + + private IotUdpUpstreamHandler upstreamHandler; + + public IotUdpUpstreamProtocol(IotGatewayProperties.UdpProperties udpProperties, + IotDeviceService deviceService, + IotDeviceMessageService messageService, + IotUdpSessionManager sessionManager, + Vertx vertx) { + this.udpProperties = udpProperties; + this.deviceService = deviceService; + this.messageService = messageService; + this.sessionManager = sessionManager; + this.vertx = vertx; + this.serverId = IotDeviceMessageUtils.generateServerId(udpProperties.getPort()); + } + + @PostConstruct + public void start() { + // 1. 初始化上行消息处理器 + this.upstreamHandler = new IotUdpUpstreamHandler(this, messageService, deviceService, sessionManager); + + // 2. 创建 UDP Socket 选项 + DatagramSocketOptions options = new DatagramSocketOptions() + .setReceiveBufferSize(udpProperties.getReceiveBufferSize()) + .setSendBufferSize(udpProperties.getSendBufferSize()) + .setReuseAddress(true); + + // 3. 创建 UDP Socket + udpSocket = vertx.createDatagramSocket(options); + + // 4. 监听端口 + udpSocket.listen(udpProperties.getPort(), "0.0.0.0", result -> { + // TODO @AI:if return;简化下;成功才继续往下走; + if (result.succeeded()) { + // 设置数据包处理器 + udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket, this)); + log.info("[start][IoT 网关 UDP 协议启动成功,端口:{},接收缓冲区:{} 字节,发送缓冲区:{} 字节]", + udpProperties.getPort(), udpProperties.getReceiveBufferSize(), + udpProperties.getSendBufferSize()); + + // 5. 启动会话清理定时器 + startSessionCleanTimer(); + } else { + log.error("[start][IoT 网关 UDP 协议启动失败]", result.cause()); + } + }); + } + + @PreDestroy + public void stop() { + // 1. 取消会话清理定时器 + if (cleanTimerId != null) { + vertx.cancelTimer(cleanTimerId); + cleanTimerId = null; + log.info("[stop][会话清理定时器已取消]"); + } + + // 2. 关闭 UDP Socket + if (udpSocket != null) { + try { + udpSocket.close().result(); + log.info("[stop][IoT 网关 UDP 协议已停止]"); + } catch (Exception e) { + log.error("[stop][IoT 网关 UDP 协议停止失败]", e); + } + } + } + + /** + * 启动会话清理定时器 + */ + private void startSessionCleanTimer() { + cleanTimerId = vertx.setPeriodic(udpProperties.getSessionCleanIntervalMs(), id -> { + try { + // 1. 清理超时的设备地址映射,并获取离线设备列表 + // TODO @AI:兼容 jdk8,不要用 var; + var offlineDevices = sessionManager.cleanExpiredMappings(udpProperties.getSessionTimeoutMs()); + + // 2. 为每个离线设备发送离线消息 + for (var offlineInfo : offlineDevices) { + sendOfflineMessage(offlineInfo.getDeviceId()); + } + // TODO @AI:CollUtil.isNotEmpty ;简化下 if 判断; + if (!offlineDevices.isEmpty()) { + log.info("[cleanExpiredMappings][本次清理 {} 个超时设备]", offlineDevices.size()); + } + } catch (Exception e) { + log.error("[cleanExpiredMappings][清理超时会话失败]", e); + } + }); + log.info("[startSessionCleanTimer][会话清理定时器启动,间隔:{} ms,超时:{} ms]", + udpProperties.getSessionCleanIntervalMs(), udpProperties.getSessionTimeoutMs()); + } + + /** + * 发送设备离线消息 + * + * @param deviceId 设备 ID + */ + private void sendOfflineMessage(Long deviceId) { + try { + // 获取设备信息 + var device = deviceService.getDeviceFromCache(deviceId); + if (device == null) { + log.warn("[sendOfflineMessage][设备不存在,设备 ID: {}]", deviceId); + return; + } + + // 发送离线消息 + IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); + messageService.sendDeviceMessage(offlineMessage, device.getProductKey(), + device.getDeviceName(), serverId); + log.info("[sendOfflineMessage][发送离线消息,设备 ID: {},设备名: {}]", + deviceId, device.getDeviceName()); + } catch (Exception e) { + log.error("[sendOfflineMessage][发送离线消息失败,设备 ID: {}]", deviceId, e); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java new file mode 100644 index 0000000000..854bdb6145 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java @@ -0,0 +1,204 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.datagram.DatagramSocket; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * IoT 网关 UDP 会话管理器 + *

+ * 采用无状态设计,SessionManager 主要用于: + * 1. 管理设备地址映射(用于下行消息发送) + * 2. 定期清理不活跃的设备地址映射 + *

+ * 注意:UDP 是无连接协议,上行消息通过 token 验证身份,不依赖会话状态 + * + * @author 芋道源码 + */ +@Slf4j +@Component +public class IotUdpSessionManager { + + /** + * 设备 ID -> 设备地址(用于下行消息发送) + */ + private final Map deviceAddressMap = new ConcurrentHashMap<>(); + + /** + * 设备地址 Key -> 最后活跃时间(用于清理) + */ + // TODO @AI:是不是尽量使用 LocalDateTime ?统一时间类型 + private final Map lastActiveTimeMap = new ConcurrentHashMap<>(); + + /** + * 设备地址 Key -> 设备 ID(反向映射,用于清理时同步) + */ + private final Map addressDeviceMap = new ConcurrentHashMap<>(); + + /** + * 更新设备地址(每次收到上行消息时调用) + * + * @param deviceId 设备 ID + * @param address 设备地址 + */ + public void updateDeviceAddress(Long deviceId, InetSocketAddress address) { + String addressKey = buildAddressKey(address); + // 更新设备地址映射 + deviceAddressMap.put(deviceId, address); + lastActiveTimeMap.put(addressKey, System.currentTimeMillis()); + addressDeviceMap.put(addressKey, deviceId); + log.debug("[updateDeviceAddress][更新设备地址,设备 ID: {},地址: {}]", deviceId, addressKey); + } + + // TODO @AI:是不是用不到?用不掉就删除掉!简化 + /** + * 获取设备地址(下行消息发送时使用) + * + * @param deviceId 设备 ID + * @return 设备地址,如果不存在返回 null + */ + public InetSocketAddress getDeviceAddress(Long deviceId) { + return deviceAddressMap.get(deviceId); + } + + /** + * 检查设备是否在线(即是否有地址映射) + * + * @param deviceId 设备 ID + * @return 是否在线 + */ + public boolean isDeviceOnline(Long deviceId) { + return deviceAddressMap.containsKey(deviceId); + } + + /** + * 检查设备是否离线 + * + * @param deviceId 设备 ID + * @return 是否离线 + */ + public boolean isDeviceOffline(Long deviceId) { + return !isDeviceOnline(deviceId); + } + + /** + * 发送消息到设备 + * + * @param deviceId 设备 ID + * @param data 数据 + * @param socket UDP Socket + * @return 是否发送成功 + */ + public boolean sendToDevice(Long deviceId, byte[] data, DatagramSocket socket) { + InetSocketAddress address = deviceAddressMap.get(deviceId); + if (address == null) { + log.warn("[sendToDevice][设备地址不存在,设备 ID: {}]", deviceId); + return false; + } + + try { + socket.send(Buffer.buffer(data), address.getPort(), address.getHostString(), result -> { + if (result.succeeded()) { + log.debug("[sendToDevice][发送消息成功,设备 ID: {},地址: {},数据长度: {} 字节]", + deviceId, buildAddressKey(address), data.length); + } else { + log.error("[sendToDevice][发送消息失败,设备 ID: {},地址: {}]", + deviceId, buildAddressKey(address), result.cause()); + } + }); + return true; + } catch (Exception e) { + log.error("[sendToDevice][发送消息异常,设备 ID: {}]", deviceId, e); + return false; + } + } + + /** + * 定期清理不活跃的设备地址映射 + * + * @param timeoutMs 超时时间(毫秒) + * @return 清理的设备 ID 列表(用于发送离线消息) + */ + // TODO @AI:目前暂时用不到 address 字段,是不是只返回 list of deviceId 就行?简化 + public java.util.List cleanExpiredMappings(long timeoutMs) { + java.util.List offlineDevices = new java.util.ArrayList<>(); + long now = System.currentTimeMillis(); + Iterator> iterator = lastActiveTimeMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (now - entry.getValue() > timeoutMs) { + String addressKey = entry.getKey(); + Long deviceId = addressDeviceMap.remove(addressKey); + // TODO @AI:if continue,减少括号层级; + if (deviceId != null) { + InetSocketAddress address = deviceAddressMap.remove(deviceId); + if (address != null) { + // 获取设备信息用于发送离线消息 + offlineDevices.add(new DeviceOfflineInfo(deviceId, addressKey)); + log.info("[cleanExpiredMappings][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}ms 前]", + deviceId, addressKey, now - entry.getValue()); + } + } + iterator.remove(); + } + } + return offlineDevices; + } + + // TODO @AI:是不是用不到?用不掉就删除掉!简化 + /** + * 移除设备地址映射 + * + * @param deviceId 设备 ID + */ + public void removeDeviceAddress(Long deviceId) { + InetSocketAddress address = deviceAddressMap.remove(deviceId); + if (address != null) { + String addressKey = buildAddressKey(address); + lastActiveTimeMap.remove(addressKey); + addressDeviceMap.remove(addressKey); + log.debug("[removeDeviceAddress][移除设备地址,设备 ID: {},地址: {}]", deviceId, addressKey); + } + } + + /** + * 构建地址 Key + * + * @param address 地址 + * @return 地址 Key + */ + public String buildAddressKey(InetSocketAddress address) { + return address.getHostString() + ":" + address.getPort(); + } + + /** + * 设备离线信息 + */ + @Data + public static class DeviceOfflineInfo { + + /** + * 设备 ID + */ + private final Long deviceId; + + /** + * 设备地址 + */ + private final String address; + + public DeviceOfflineInfo(Long deviceId, String address) { + this.deviceId = deviceId; + this.address = address; + } + + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java new file mode 100644 index 0000000000..80b05406d3 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java @@ -0,0 +1,2 @@ +// TODO @AI:完善下注释,参考 mqtt 的 package.json +package cn.iocoder.yudao.module.iot.gateway.protocol.udp; \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpDownstreamHandler.java new file mode 100644 index 0000000000..c8da38ccc4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpDownstreamHandler.java @@ -0,0 +1,82 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.udp.router; + +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.datagram.DatagramSocket; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 UDP 下行消息处理器 + * + * @author 芋道源码 + */ +@Slf4j +public class IotUdpDownstreamHandler { + + private final IotDeviceMessageService deviceMessageService; + + private final IotDeviceService deviceService; + + private final IotUdpSessionManager sessionManager; + + private final IotUdpUpstreamProtocol protocol; + + public IotUdpDownstreamHandler(IotDeviceMessageService deviceMessageService, + IotDeviceService deviceService, + IotUdpSessionManager sessionManager, + IotUdpUpstreamProtocol protocol) { + this.deviceMessageService = deviceMessageService; + this.deviceService = deviceService; + this.sessionManager = sessionManager; + this.protocol = protocol; + } + + /** + * 处理下行消息 + * + * @param message 下行消息 + */ + public void handle(IotDeviceMessage message) { + try { + log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]", + message.getDeviceId(), message.getMethod(), message.getId()); + + // 1.1 获取设备信息 + IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId()); + if (deviceInfo == null) { + log.error("[handle][设备不存在,设备 ID: {}]", message.getDeviceId()); + return; + } + // 1.2 检查设备是否在线(即是否有地址映射) + if (sessionManager.isDeviceOffline(message.getDeviceId())) { + log.warn("[handle][设备不在线,设备 ID: {}]", message.getDeviceId()); + return; + } + + // 2. 根据产品 Key 和设备名称编码消息,并发送到设备 + byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), + deviceInfo.getDeviceName()); + DatagramSocket socket = protocol.getUdpSocket(); + if (socket == null) { + log.error("[handle][UDP Socket 不可用,设备 ID: {}]", message.getDeviceId()); + return; + } + boolean success = sessionManager.sendToDevice(message.getDeviceId(), bytes, socket); + if (success) { + log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]", + message.getDeviceId(), message.getMethod(), message.getId(), bytes.length); + } else { + log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]", + message.getDeviceId(), message.getMethod(), message.getId()); + } + } catch (Exception e) { + log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]", + message.getDeviceId(), message.getMethod(), message, e); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java new file mode 100644 index 0000000000..77a58cfd2c --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java @@ -0,0 +1,441 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.udp.router; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager; +import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.datagram.DatagramPacket; +import io.vertx.core.datagram.DatagramSocket; +import lombok.extern.slf4j.Slf4j; + +import java.net.InetSocketAddress; +import java.util.Map; + +// TODO @AI:注释里,不要出现 CoAP,避免理解成本过高; +/** + * UDP 上行消息处理器 + *

+ * 采用 CoAP 风格的 Token 机制(无状态,每次请求携带 token): + * 1. 认证请求:设备发送 auth 消息,携带 clientId、username、password + * 2. 返回 Token:服务端验证后返回 JWT token + * 3. 后续请求:每次请求在 params 中携带 token + * 4. 服务端验证:每次请求通过 IotDeviceTokenService.verifyToken() 验证 + * + * @author 芋道源码 + */ +@Slf4j +public class IotUdpUpstreamHandler { + + private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE; + private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE; + + private static final String AUTH_METHOD = "auth"; + + private final IotDeviceMessageService deviceMessageService; + + private final IotDeviceService deviceService; + + private final IotUdpSessionManager sessionManager; + + private final IotDeviceTokenService deviceTokenService; + + private final IotDeviceCommonApi deviceApi; + + private final String serverId; + + public IotUdpUpstreamHandler(IotUdpUpstreamProtocol protocol, + IotDeviceMessageService deviceMessageService, + IotDeviceService deviceService, + IotUdpSessionManager sessionManager) { + this.deviceMessageService = deviceMessageService; + this.deviceService = deviceService; + this.sessionManager = sessionManager; + this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.serverId = protocol.getServerId(); + } + + // TODO @AI:protocol 这个参数如果用不到,就删除下; + /** + * 处理 UDP 数据包 + * + * @param packet 数据包 + * @param socket UDP Socket + * @param protocol UDP 协议 + */ + public void handle(DatagramPacket packet, DatagramSocket socket, IotUdpUpstreamProtocol protocol) { + InetSocketAddress senderAddress = new InetSocketAddress(packet.sender().host(), packet.sender().port()); + Buffer data = packet.data(); + log.debug("[handle][收到 UDP 数据包,来源: {},数据长度: {} 字节]", + sessionManager.buildAddressKey(senderAddress), data.length()); + try { + processMessage(data, senderAddress, socket); + } catch (Exception e) { + log.error("[handle][处理消息失败,来源: {},错误: {}]", + sessionManager.buildAddressKey(senderAddress), e.getMessage(), e); + // UDP 无连接,不需要断开连接,只记录错误 + } + } + + /** + * 处理消息 + * + * @param buffer 消息 + * @param senderAddress 发送者地址 + * @param socket UDP Socket + */ + private void processMessage(Buffer buffer, InetSocketAddress senderAddress, DatagramSocket socket) { + // 1. 基础检查 + if (buffer == null || buffer.length() == 0) { + return; + } + + // 2. 获取消息格式类型 + String codecType = getMessageCodecType(buffer); + + // 3. 解码消息 + IotDeviceMessage message; + try { + message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType); + if (message == null) { + log.warn("[processMessage][消息解码失败,来源: {}]", sessionManager.buildAddressKey(senderAddress)); + sendErrorResponse(socket, senderAddress, null, "消息解码失败", codecType); + return; + } + } catch (Exception e) { + log.error("[processMessage][消息解码异常,来源: {}]", sessionManager.buildAddressKey(senderAddress), e); + sendErrorResponse(socket, senderAddress, null, "消息解码失败: " + e.getMessage(), codecType); + return; + } + + // 4. 根据消息类型路由处理 + try { + if (AUTH_METHOD.equals(message.getMethod())) { + // 认证请求 + handleAuthenticationRequest(message, codecType, senderAddress, socket); + } else { + // 业务消息 + handleBusinessRequest(message, codecType, senderAddress, socket); + } + } catch (Exception e) { + log.error("[processMessage][处理消息失败,来源: {},消息方法: {}]", + sessionManager.buildAddressKey(senderAddress), message.getMethod(), e); + sendErrorResponse(socket, senderAddress, message.getRequestId(), "消息处理失败", codecType); + } + } + + /** + * 处理认证请求 + * + * @param message 消息信息 + * @param codecType 消息编解码类型 + * @param senderAddress 发送者地址 + * @param socket UDP Socket + */ + private void handleAuthenticationRequest(IotDeviceMessage message, String codecType, + InetSocketAddress senderAddress, DatagramSocket socket) { + String addressKey = sessionManager.buildAddressKey(senderAddress); + try { + // 1.1 解析认证参数 + IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams()); + if (authParams == null) { + log.warn("[handleAuthenticationRequest][认证参数解析失败,来源: {}]", addressKey); + sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证参数不完整", codecType); + return; + } + // 1.2 执行认证 + if (!validateDeviceAuth(authParams)) { + log.warn("[handleAuthenticationRequest][认证失败,来源: {},username: {}]", + addressKey, authParams.getUsername()); + sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证失败", codecType); + return; + } + + // 2.1 解析设备信息 + IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername()); + if (deviceInfo == null) { + sendErrorResponse(socket, senderAddress, message.getRequestId(), "解析设备信息失败", codecType); + return; + } + // 2.2 获取设备信息 + IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), + deviceInfo.getDeviceName()); + if (device == null) { + sendErrorResponse(socket, senderAddress, message.getRequestId(), "设备不存在", codecType); + return; + } + + // 3.1 生成 JWT Token(CoAP 风格) + String token = deviceTokenService.createToken(device.getProductKey(), device.getDeviceName()); + + // 3.2 更新设备地址映射(用于下行消息) + sessionManager.updateDeviceAddress(device.getId(), senderAddress); + + // 3.3 发送上线消息 + sendOnlineMessage(device); + + // 3.4 发送成功响应(包含 token) + sendAuthSuccessResponse(socket, senderAddress, message.getRequestId(), token, codecType); + log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {},来源: {}]", + device.getId(), device.getDeviceName(), addressKey); + } catch (Exception e) { + log.error("[handleAuthenticationRequest][认证处理异常,来源: {}]", addressKey, e); + sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证处理异常", codecType); + } + } + + /** + * 处理业务请求 + * + * @param message 消息信息 + * @param codecType 消息编解码类型 + * @param senderAddress 发送者地址 + * @param socket UDP Socket + */ + @SuppressWarnings("unchecked") + private void handleBusinessRequest(IotDeviceMessage message, String codecType, + InetSocketAddress senderAddress, DatagramSocket socket) { + String addressKey = sessionManager.buildAddressKey(senderAddress); + try { + // TODO @AI:token 需要枚举个 KEY;考虑到是通过 params 传递的话,需要获取到后,从 map 里移除掉,避免影响后续业务逻辑处理; + // 1. 从消息中提取 token(CoAP 风格:消息体携带 token) + String token = null; + if (message.getParams() instanceof Map) { + token = MapUtil.getStr((Map) message.getParams(), "token"); + } + + if (StrUtil.isBlank(token)) { + log.warn("[handleBusinessRequest][缺少 token,来源: {}]", addressKey); + sendErrorResponse(socket, senderAddress, message.getRequestId(), "请先进行认证", codecType); + return; + } + + // 2. 验证 token,获取设备信息 + IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token); + if (deviceInfo == null) { + log.warn("[handleBusinessRequest][token 无效或已过期,来源: {}]", addressKey); + sendErrorResponse(socket, senderAddress, message.getRequestId(), "token 无效或已过期", codecType); + return; + } + + // 3. 获取设备详细信息 + IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), + deviceInfo.getDeviceName()); + if (device == null) { + log.warn("[handleBusinessRequest][设备不存在,来源: {},productKey: {},deviceName: {}]", + addressKey, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + sendErrorResponse(socket, senderAddress, message.getRequestId(), "设备不存在", codecType); + return; + } + + // 4. 更新设备地址映射(保持最新) + sessionManager.updateDeviceAddress(device.getId(), senderAddress); + + // 5. 发送消息到消息总线 + deviceMessageService.sendDeviceMessage(message, device.getProductKey(), + device.getDeviceName(), serverId); + + // 6. 发送成功响应 + sendSuccessResponse(socket, senderAddress, message.getRequestId(), "处理成功", codecType); + log.debug("[handleBusinessRequest][业务消息处理成功,设备 ID: {},方法: {},来源: {}]", + device.getId(), message.getMethod(), addressKey); + } catch (Exception e) { + log.error("[handleBusinessRequest][业务请求处理异常,来源: {}]", addressKey, e); + sendErrorResponse(socket, senderAddress, message.getRequestId(), "处理失败", codecType); + } + } + + /** + * 获取消息编解码类型 + * + * @param buffer 消息 + * @return 消息编解码类型 + */ + private String getMessageCodecType(Buffer buffer) { + // 检测消息格式类型 + return IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(buffer.getBytes()) ? CODEC_TYPE_BINARY + : CODEC_TYPE_JSON; + } + + /** + * 发送设备上线消息 + * + * @param device 设备信息 + */ + private void sendOnlineMessage(IotDeviceRespDTO device) { + try { + IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); + deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(), + device.getDeviceName(), serverId); + } catch (Exception e) { + log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e); + } + } + + /** + * 验证设备认证信息 + * + * @param authParams 认证参数 + * @return 是否认证成功 + */ + private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) { + try { + CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() + .setClientId(authParams.getClientId()).setUsername(authParams.getUsername()) + .setPassword(authParams.getPassword())); + result.checkError(); + return BooleanUtil.isTrue(result.getData()); + } catch (Exception e) { + log.error("[validateDeviceAuth][设备认证异常,username: {}]", authParams.getUsername(), e); + return false; + } + } + + /** + * 发送认证成功响应(包含 token) + * + * @param socket UDP Socket + * @param address 目标地址 + * @param requestId 请求 ID + * @param token JWT Token + * @param codecType 消息编解码类型 + */ + private void sendAuthSuccessResponse(DatagramSocket socket, InetSocketAddress address, + String requestId, String token, String codecType) { + try { + Object responseData = MapUtil.builder() + .put("success", true) + .put("token", token) + .put("message", "认证成功") + .build(); + + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, 0, "认证成功"); + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType); + socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), result -> { + if (result.failed()) { + log.error("[sendAuthSuccessResponse][发送认证成功响应失败,地址: {}]", + sessionManager.buildAddressKey(address), result.cause()); + } + }); + } catch (Exception e) { + log.error("[sendAuthSuccessResponse][发送认证成功响应异常,地址: {}]", + sessionManager.buildAddressKey(address), e); + } + } + + /** + * 发送成功响应 + * + * @param socket UDP Socket + * @param address 目标地址 + * @param requestId 请求 ID + * @param message 消息 + * @param codecType 消息编解码类型 + */ + @SuppressWarnings("SameParameterValue") + private void sendSuccessResponse(DatagramSocket socket, InetSocketAddress address, + String requestId, String message, String codecType) { + sendResponse(socket, address, true, message, requestId, codecType); + } + + /** + * 发送错误响应 + * + * @param socket UDP Socket + * @param address 目标地址 + * @param requestId 请求 ID + * @param errorMessage 错误消息 + * @param codecType 消息编解码类型 + */ + private void sendErrorResponse(DatagramSocket socket, InetSocketAddress address, + String requestId, String errorMessage, String codecType) { + sendResponse(socket, address, false, errorMessage, requestId, codecType); + } + + /** + * 发送响应消息 + * + * @param socket UDP Socket + * @param address 目标地址 + * @param success 是否成功 + * @param message 消息 + * @param requestId 请求 ID + * @param codecType 消息编解码类型 + */ + private void sendResponse(DatagramSocket socket, InetSocketAddress address, boolean success, + String message, String requestId, String codecType) { + try { + Object responseData = MapUtil.builder() + .put("success", success) + .put("message", message) + .build(); + + int code = success ? 0 : 401; + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, "response", responseData, + code, message); + + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType); + socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), ar -> { + if (ar.failed()) { + log.error("[sendResponse][发送响应失败,地址: {}]", + sessionManager.buildAddressKey(address), ar.cause()); + } + }); + } catch (Exception e) { + log.error("[sendResponse][发送响应异常,地址: {}]", + sessionManager.buildAddressKey(address), e); + } + } + + /** + * 解析认证参数 + * + * @param params 参数对象(通常为 Map 类型) + * @return 认证参数 DTO,解析失败时返回 null + */ + @SuppressWarnings("unchecked") + private IotDeviceAuthReqDTO parseAuthParams(Object params) { + if (params == null) { + return null; + } + + try { + // 参数默认为 Map 类型,直接转换 + if (params instanceof Map) { + Map paramMap = (Map) params; + return new IotDeviceAuthReqDTO() + .setClientId(MapUtil.getStr(paramMap, "clientId")) + .setUsername(MapUtil.getStr(paramMap, "username")) + .setPassword(MapUtil.getStr(paramMap, "password")); + } + + // 如果已经是目标类型,直接返回 + if (params instanceof IotDeviceAuthReqDTO) { + return (IotDeviceAuthReqDTO) params; + } + + // 其他情况尝试 JSON 转换 + String jsonStr = JsonUtils.toJsonString(params); + return JsonUtils.parseObject(jsonStr, IotDeviceAuthReqDTO.class); + } catch (Exception e) { + log.error("[parseAuthParams][解析认证参数({})失败]", params, e); + return null; + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index f633f1c60b..b85ed8332a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -96,6 +96,16 @@ yudao: ssl-cert-path: "classpath:certs/client.jks" ssl-key-path: "classpath:certs/client.jks" # ==================================== + # 针对引入的 UDP 组件的配置 + # ==================================== + udp: + enabled: false # 是否启用 UDP + port: 8092 # UDP 服务端口 + receive-buffer-size: 65536 # 接收缓冲区大小(字节,默认 64KB) + send-buffer-size: 65536 # 发送缓冲区大小(字节,默认 64KB) + session-timeout-ms: 60000 # 会话超时时间(毫秒,默认 60 秒) + session-clean-interval-ms: 30000 # 会话清理间隔(毫秒,默认 30 秒) + # ==================================== # 针对引入的 MQTT 组件的配置 # ==================================== mqtt: diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java new file mode 100644 index 0000000000..6c5e6dd2e1 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java @@ -0,0 +1,157 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.udp; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.nio.charset.StandardCharsets; + +/** + * IoT 网关 UDP 协议集成测试(手动测试) + * + *

使用步骤: + *

    + *
  1. 启动 yudao-module-iot-gateway 服务(UDP 端口 8092)
  2. + *
  3. 运行 {@link #testAuth()} 获取 token,将返回的 token 粘贴到 {@link #TOKEN} 常量
  4. + *
  5. 运行 {@link #testPropertyPost()} 测试属性上报,或运行 {@link #testEventPost()} 测试事件上报
  6. + *
+ * + * @author 芋道源码 + */ +@Slf4j +public class IotUdpProtocolIntegrationTest { + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 8092; + private static final int TIMEOUT_MS = 5000; + + // 设备信息(根据实际情况修改 PRODUCT_KEY、DEVICE_NAME、PASSWORD) + private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT"; + private static final String DEVICE_NAME = "small"; + private static final String PASSWORD = "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75"; + + private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME; + private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY; + + /** + * 设备 Token:从 {@link #testAuth()} 方法获取后,粘贴到这里 + */ + private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMwNTA1NSwiZGV2aWNlTmFtZSI6InNtYWxsIn0.mf3MEATCn5bp6cXgULunZjs8d00RGUxj96JEz0hMS7k"; + + /** + * 认证测试:获取设备 Token + */ + @Test + public void testAuth() throws Exception { + String payload = JsonUtils.toJsonString(MapUtil.builder() + .put("id", IdUtil.fastSimpleUUID()) + .put("method", "auth") + .put("params", MapUtil.builder() + .put("clientId", CLIENT_ID) + .put("username", USERNAME) + .put("password", PASSWORD) + .build()) + .build()); + + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(TIMEOUT_MS); + + log.info("[testAuth][请求体: {}]", payload); + + String response = sendAndReceive(socket, payload); + + log.info("[testAuth][响应体: {}]", response); + log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]"); + } + } + + /** + * 属性上报测试 + */ + @Test + public void testPropertyPost() throws Exception { + String payload = JsonUtils.toJsonString(MapUtil.builder() + .put("id", IdUtil.fastSimpleUUID()) + .put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()) + .put("version", "1.0") + .put("params", MapUtil.builder() + .put("token", TOKEN) + .put("width", 1) + .put("height", "2") + .build()) + .build()); + + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(TIMEOUT_MS); + + log.info("[testPropertyPost][请求体: {}]", payload); + + String response = sendAndReceive(socket, payload); + + log.info("[testPropertyPost][响应体: {}]", response); + } + } + + /** + * 事件上报测试 + */ + @Test + public void testEventPost() throws Exception { + String payload = JsonUtils.toJsonString(MapUtil.builder() + .put("id", IdUtil.fastSimpleUUID()) + .put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod()) + .put("version", "1.0") + .put("identifier", "eat") + .put("params", MapUtil.builder() + .put("token", TOKEN) + .put("width", 1) + .put("height", "2") + .put("oneThree", "3") + .build()) + .build()); + + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(TIMEOUT_MS); + + log.info("[testEventPost][请求体: {}]", payload); + + String response = sendAndReceive(socket, payload); + + log.info("[testEventPost][响应体: {}]", response); + } + } + + /** + * 发送 UDP 请求并接收响应 + * + * @param socket UDP Socket + * @param payload 请求体 + * @return 响应内容 + */ + private String sendAndReceive(DatagramSocket socket, String payload) throws Exception { + byte[] sendData = payload.getBytes(StandardCharsets.UTF_8); + InetAddress address = InetAddress.getByName(SERVER_HOST); + + // 发送请求 + DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, address, SERVER_PORT); + socket.send(sendPacket); + + // 接收响应 + byte[] receiveData = new byte[4096]; + DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); + try { + socket.receive(receivePacket); + return new String(receivePacket.getData(), 0, receivePacket.getLength(), StandardCharsets.UTF_8); + } catch (java.net.SocketTimeoutException e) { + log.warn("[sendAndReceive][接收响应超时]"); + return null; + } + } + +}