From 52b8e66466f009b8fef0fdd7ef8eea15c2037dac Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 18 Jan 2026 11:47:56 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91modbus-tcp=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=8E=A5=E5=85=A5=20100%=EF=BC=9A=E5=AE=8C?= =?UTF-8?q?=E5=96=84=E6=B3=A8=E9=87=8A=E3=80=81=E5=AE=8C=E5=96=84=E5=8D=95?= =?UTF-8?q?=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tcp/manager/IotTcpConnectionManager.java | 3 +- .../protocol/udp/IotUdpUpstreamProtocol.java | 39 +++---- .../udp/manager/IotUdpSessionManager.java | 106 +++++------------- .../gateway/protocol/udp/package-info.java | 6 +- .../udp/router/IotUdpUpstreamHandler.java | 35 +++--- .../udp/IotUdpProtocolIntegrationTest.java | 15 ++- 6 files changed, 84 insertions(+), 120 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java index c0d209814e..2c41097c42 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager; +import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetSocket; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -119,7 +120,7 @@ public class IotTcpConnectionManager { } try { - socket.write(io.vertx.core.buffer.Buffer.buffer(data)); + socket.write(Buffer.buffer(data)); log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, data.length); return true; } catch (Exception e) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java index 32a59a982c..7448683890 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java @@ -1,5 +1,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp; +import cn.hutool.core.collection.CollUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; @@ -15,6 +17,8 @@ import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.util.List; + /** * IoT 网关 UDP 协议:接收设备上行消息 *

@@ -80,19 +84,18 @@ public class IotUdpUpstreamProtocol { // 4. 监听端口 udpSocket.listen(udpProperties.getPort(), "0.0.0.0", result -> { - // TODO @AI:if return;简化下;成功才继续往下走; - if (result.succeeded()) { - // 设置数据包处理器 - udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket, this)); - log.info("[start][IoT 网关 UDP 协议启动成功,端口:{},接收缓冲区:{} 字节,发送缓冲区:{} 字节]", - udpProperties.getPort(), udpProperties.getReceiveBufferSize(), - udpProperties.getSendBufferSize()); - - // 5. 启动会话清理定时器 - startSessionCleanTimer(); - } else { + if (result.failed()) { log.error("[start][IoT 网关 UDP 协议启动失败]", result.cause()); + return; } + // 设置数据包处理器 + udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket)); + log.info("[start][IoT 网关 UDP 协议启动成功,端口:{},接收缓冲区:{} 字节,发送缓冲区:{} 字节]", + udpProperties.getPort(), udpProperties.getReceiveBufferSize(), + udpProperties.getSendBufferSize()); + + // 5. 启动会话清理定时器 + startSessionCleanTimer(); }); } @@ -123,16 +126,14 @@ public class IotUdpUpstreamProtocol { cleanTimerId = vertx.setPeriodic(udpProperties.getSessionCleanIntervalMs(), id -> { try { // 1. 清理超时的设备地址映射,并获取离线设备列表 - // TODO @AI:兼容 jdk8,不要用 var; - var offlineDevices = sessionManager.cleanExpiredMappings(udpProperties.getSessionTimeoutMs()); + List offlineDeviceIds = sessionManager.cleanExpiredMappings(udpProperties.getSessionTimeoutMs()); // 2. 为每个离线设备发送离线消息 - for (var offlineInfo : offlineDevices) { - sendOfflineMessage(offlineInfo.getDeviceId()); + for (Long deviceId : offlineDeviceIds) { + sendOfflineMessage(deviceId); } - // TODO @AI:CollUtil.isNotEmpty ;简化下 if 判断; - if (!offlineDevices.isEmpty()) { - log.info("[cleanExpiredMappings][本次清理 {} 个超时设备]", offlineDevices.size()); + if (CollUtil.isNotEmpty(offlineDeviceIds)) { + log.info("[cleanExpiredMappings][本次清理 {} 个超时设备]", offlineDeviceIds.size()); } } catch (Exception e) { log.error("[cleanExpiredMappings][清理超时会话失败]", e); @@ -150,7 +151,7 @@ public class IotUdpUpstreamProtocol { private void sendOfflineMessage(Long deviceId) { try { // 获取设备信息 - var device = deviceService.getDeviceFromCache(deviceId); + IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceId); if (device == null) { log.warn("[sendOfflineMessage][设备不存在,设备 ID: {}]", deviceId); return; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java index 854bdb6145..c35d052551 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java @@ -2,12 +2,14 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager; import io.vertx.core.buffer.Buffer; import io.vertx.core.datagram.DatagramSocket; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; +import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -34,8 +36,7 @@ public class IotUdpSessionManager { /** * 设备地址 Key -> 最后活跃时间(用于清理) */ - // TODO @AI:是不是尽量使用 LocalDateTime ?统一时间类型 - private final Map lastActiveTimeMap = new ConcurrentHashMap<>(); + private final Map lastActiveTimeMap = new ConcurrentHashMap<>(); /** * 设备地址 Key -> 设备 ID(反向映射,用于清理时同步) @@ -52,22 +53,11 @@ public class IotUdpSessionManager { String addressKey = buildAddressKey(address); // 更新设备地址映射 deviceAddressMap.put(deviceId, address); - lastActiveTimeMap.put(addressKey, System.currentTimeMillis()); + lastActiveTimeMap.put(addressKey, LocalDateTime.now()); addressDeviceMap.put(addressKey, deviceId); log.debug("[updateDeviceAddress][更新设备地址,设备 ID: {},地址: {}]", deviceId, addressKey); } - // TODO @AI:是不是用不到?用不掉就删除掉!简化 - /** - * 获取设备地址(下行消息发送时使用) - * - * @param deviceId 设备 ID - * @return 设备地址,如果不存在返回 null - */ - public InetSocketAddress getDeviceAddress(Long deviceId) { - return deviceAddressMap.get(deviceId); - } - /** * 检查设备是否在线(即是否有地址映射) * @@ -126,46 +116,35 @@ public class IotUdpSessionManager { * @param timeoutMs 超时时间(毫秒) * @return 清理的设备 ID 列表(用于发送离线消息) */ - // TODO @AI:目前暂时用不到 address 字段,是不是只返回 list of deviceId 就行?简化 - public java.util.List cleanExpiredMappings(long timeoutMs) { - java.util.List offlineDevices = new java.util.ArrayList<>(); - long now = System.currentTimeMillis(); - Iterator> iterator = lastActiveTimeMap.entrySet().iterator(); + public List cleanExpiredMappings(long timeoutMs) { + List offlineDeviceIds = new ArrayList<>(); + LocalDateTime now = LocalDateTime.now(); + LocalDateTime expireTime = now.minusNanos(timeoutMs * 1_000_000); + Iterator> iterator = lastActiveTimeMap.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (now - entry.getValue() > timeoutMs) { - String addressKey = entry.getKey(); - Long deviceId = addressDeviceMap.remove(addressKey); - // TODO @AI:if continue,减少括号层级; - if (deviceId != null) { - InetSocketAddress address = deviceAddressMap.remove(deviceId); - if (address != null) { - // 获取设备信息用于发送离线消息 - offlineDevices.add(new DeviceOfflineInfo(deviceId, addressKey)); - log.info("[cleanExpiredMappings][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}ms 前]", - deviceId, addressKey, now - entry.getValue()); - } - } - iterator.remove(); + // 未过期,跳过 + Map.Entry entry = iterator.next(); + if (entry.getValue().isAfter(expireTime)) { + continue; } + // 过期处理:记录离线设备 ID + String addressKey = entry.getKey(); + Long deviceId = addressDeviceMap.remove(addressKey); + if (deviceId == null) { + iterator.remove(); + continue; + } + InetSocketAddress address = deviceAddressMap.remove(deviceId); + if (address == null) { + iterator.remove(); + continue; + } + offlineDeviceIds.add(deviceId); + log.debug("[cleanExpiredMappings][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}]", + deviceId, addressKey, entry.getValue()); + iterator.remove(); } - return offlineDevices; - } - - // TODO @AI:是不是用不到?用不掉就删除掉!简化 - /** - * 移除设备地址映射 - * - * @param deviceId 设备 ID - */ - public void removeDeviceAddress(Long deviceId) { - InetSocketAddress address = deviceAddressMap.remove(deviceId); - if (address != null) { - String addressKey = buildAddressKey(address); - lastActiveTimeMap.remove(addressKey); - addressDeviceMap.remove(addressKey); - log.debug("[removeDeviceAddress][移除设备地址,设备 ID: {},地址: {}]", deviceId, addressKey); - } + return offlineDeviceIds; } /** @@ -178,27 +157,4 @@ public class IotUdpSessionManager { return address.getHostString() + ":" + address.getPort(); } - /** - * 设备离线信息 - */ - @Data - public static class DeviceOfflineInfo { - - /** - * 设备 ID - */ - private final Long deviceId; - - /** - * 设备地址 - */ - private final String address; - - public DeviceOfflineInfo(Long deviceId, String address) { - this.deviceId = deviceId; - this.address = address; - } - - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java index 80b05406d3..b1fcaa3f9d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java @@ -1,2 +1,6 @@ -// TODO @AI:完善下注释,参考 mqtt 的 package.json +/** + * UDP 协议实现包 + *

+ * 提供基于 Vert.x DatagramSocket 的 IoT 设备连接和消息处理功能 + */ package cn.iocoder.yudao.module.iot.gateway.protocol.udp; \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java index 77a58cfd2c..e9ae94d6e8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java @@ -26,11 +26,10 @@ import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.Map; -// TODO @AI:注释里,不要出现 CoAP,避免理解成本过高; /** * UDP 上行消息处理器 *

- * 采用 CoAP 风格的 Token 机制(无状态,每次请求携带 token): + * 采用无状态 Token 机制(每次请求携带 token): * 1. 认证请求:设备发送 auth 消息,携带 clientId、username、password * 2. 返回 Token:服务端验证后返回 JWT token * 3. 后续请求:每次请求在 params 中携带 token @@ -45,6 +44,10 @@ public class IotUdpUpstreamHandler { private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE; private static final String AUTH_METHOD = "auth"; + /** + * Token 参数 Key + */ + private static final String PARAM_KEY_TOKEN = "token"; private final IotDeviceMessageService deviceMessageService; @@ -70,15 +73,13 @@ public class IotUdpUpstreamHandler { this.serverId = protocol.getServerId(); } - // TODO @AI:protocol 这个参数如果用不到,就删除下; /** * 处理 UDP 数据包 * - * @param packet 数据包 - * @param socket UDP Socket - * @param protocol UDP 协议 + * @param packet 数据包 + * @param socket UDP Socket */ - public void handle(DatagramPacket packet, DatagramSocket socket, IotUdpUpstreamProtocol protocol) { + public void handle(DatagramPacket packet, DatagramSocket socket) { InetSocketAddress senderAddress = new InetSocketAddress(packet.sender().host(), packet.sender().port()); Buffer data = packet.data(); log.debug("[handle][收到 UDP 数据包,来源: {},数据长度: {} 字节]", @@ -180,7 +181,7 @@ public class IotUdpUpstreamHandler { return; } - // 3.1 生成 JWT Token(CoAP 风格) + // 3.1 生成 JWT Token(无状态) String token = deviceTokenService.createToken(device.getProductKey(), device.getDeviceName()); // 3.2 更新设备地址映射(用于下行消息) @@ -212,20 +213,18 @@ public class IotUdpUpstreamHandler { InetSocketAddress senderAddress, DatagramSocket socket) { String addressKey = sessionManager.buildAddressKey(senderAddress); try { - // TODO @AI:token 需要枚举个 KEY;考虑到是通过 params 传递的话,需要获取到后,从 map 里移除掉,避免影响后续业务逻辑处理; - // 1. 从消息中提取 token(CoAP 风格:消息体携带 token) + // 1.1 从消息中提取 token(无状态:消息体携带 token) String token = null; if (message.getParams() instanceof Map) { - token = MapUtil.getStr((Map) message.getParams(), "token"); + Map paramsMap = (Map) message.getParams(); + token = (String) paramsMap.remove(PARAM_KEY_TOKEN); } - if (StrUtil.isBlank(token)) { log.warn("[handleBusinessRequest][缺少 token,来源: {}]", addressKey); sendErrorResponse(socket, senderAddress, message.getRequestId(), "请先进行认证", codecType); return; } - - // 2. 验证 token,获取设备信息 + // 1.2 验证 token,获取设备信息 IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token); if (deviceInfo == null) { log.warn("[handleBusinessRequest][token 无效或已过期,来源: {}]", addressKey); @@ -233,7 +232,7 @@ public class IotUdpUpstreamHandler { return; } - // 3. 获取设备详细信息 + // 2. 获取设备详细信息 IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); if (device == null) { @@ -243,14 +242,14 @@ public class IotUdpUpstreamHandler { return; } - // 4. 更新设备地址映射(保持最新) + // 3. 更新设备地址映射(保持最新) sessionManager.updateDeviceAddress(device.getId(), senderAddress); - // 5. 发送消息到消息总线 + // 4. 发送消息到消息总线 deviceMessageService.sendDeviceMessage(message, device.getProductKey(), device.getDeviceName(), serverId); - // 6. 发送成功响应 + // 5. 发送成功响应 sendSuccessResponse(socket, senderAddress, message.getRequestId(), "处理成功", codecType); log.debug("[handleBusinessRequest][业务消息处理成功,设备 ID: {},方法: {},来源: {}]", device.getId(), message.getMethod(), addressKey); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java index 6c5e6dd2e1..4f2dbfcf66 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java @@ -36,13 +36,14 @@ public class IotUdpProtocolIntegrationTest { private static final String DEVICE_NAME = "small"; private static final String PASSWORD = "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75"; + // TODO @芋艿:1、IotDeviceAuthUtils 调整下拼接;2、password 的生成;3、后续给 http 也整个单测;4、后续给 tcp 也整个单测;5、后续给 mqtt 也整个单测;6、后续给 emqp 也整个单测 private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME; private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY; /** * 设备 Token:从 {@link #testAuth()} 方法获取后,粘贴到这里 */ - private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMwNTA1NSwiZGV2aWNlTmFtZSI6InNtYWxsIn0.mf3MEATCn5bp6cXgULunZjs8d00RGUxj96JEz0hMS7k"; + private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMxMTY0NiwiZGV2aWNlTmFtZSI6InNtYWxsIn0.re6LCaRfKiE9VQTP3w0Brh2ScVIgrvN3H96z_snndoM"; /** * 认证测试:获取设备 Token @@ -107,12 +108,14 @@ public class IotUdpProtocolIntegrationTest { .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod()) .put("version", "1.0") - .put("identifier", "eat") .put("params", MapUtil.builder() - .put("token", TOKEN) - .put("width", 1) - .put("height", "2") - .put("oneThree", "3") + .put("identifier", "eat") + .put("value", MapUtil.builder() + .put("width", 1) + .put("height", "2") + .put("oneThree", "3") + .build()) + .put("time", System.currentTimeMillis()) .build()) .build());