diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java
index c0d209814e..2c41097c42 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java
@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
+import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -119,7 +120,7 @@ public class IotTcpConnectionManager {
}
try {
- socket.write(io.vertx.core.buffer.Buffer.buffer(data));
+ socket.write(Buffer.buffer(data));
log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, data.length);
return true;
} catch (Exception e) {
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java
index 32a59a982c..7448683890 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpUpstreamProtocol.java
@@ -1,5 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
+import cn.hutool.core.collection.CollUtil;
+import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
@@ -15,6 +17,8 @@ import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import java.util.List;
+
/**
* IoT 网关 UDP 协议:接收设备上行消息
*
@@ -80,19 +84,18 @@ public class IotUdpUpstreamProtocol {
// 4. 监听端口
udpSocket.listen(udpProperties.getPort(), "0.0.0.0", result -> {
- // TODO @AI:if return;简化下;成功才继续往下走;
- if (result.succeeded()) {
- // 设置数据包处理器
- udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket, this));
- log.info("[start][IoT 网关 UDP 协议启动成功,端口:{},接收缓冲区:{} 字节,发送缓冲区:{} 字节]",
- udpProperties.getPort(), udpProperties.getReceiveBufferSize(),
- udpProperties.getSendBufferSize());
-
- // 5. 启动会话清理定时器
- startSessionCleanTimer();
- } else {
+ if (result.failed()) {
log.error("[start][IoT 网关 UDP 协议启动失败]", result.cause());
+ return;
}
+ // 设置数据包处理器
+ udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket));
+ log.info("[start][IoT 网关 UDP 协议启动成功,端口:{},接收缓冲区:{} 字节,发送缓冲区:{} 字节]",
+ udpProperties.getPort(), udpProperties.getReceiveBufferSize(),
+ udpProperties.getSendBufferSize());
+
+ // 5. 启动会话清理定时器
+ startSessionCleanTimer();
});
}
@@ -123,16 +126,14 @@ public class IotUdpUpstreamProtocol {
cleanTimerId = vertx.setPeriodic(udpProperties.getSessionCleanIntervalMs(), id -> {
try {
// 1. 清理超时的设备地址映射,并获取离线设备列表
- // TODO @AI:兼容 jdk8,不要用 var;
- var offlineDevices = sessionManager.cleanExpiredMappings(udpProperties.getSessionTimeoutMs());
+ List offlineDeviceIds = sessionManager.cleanExpiredMappings(udpProperties.getSessionTimeoutMs());
// 2. 为每个离线设备发送离线消息
- for (var offlineInfo : offlineDevices) {
- sendOfflineMessage(offlineInfo.getDeviceId());
+ for (Long deviceId : offlineDeviceIds) {
+ sendOfflineMessage(deviceId);
}
- // TODO @AI:CollUtil.isNotEmpty ;简化下 if 判断;
- if (!offlineDevices.isEmpty()) {
- log.info("[cleanExpiredMappings][本次清理 {} 个超时设备]", offlineDevices.size());
+ if (CollUtil.isNotEmpty(offlineDeviceIds)) {
+ log.info("[cleanExpiredMappings][本次清理 {} 个超时设备]", offlineDeviceIds.size());
}
} catch (Exception e) {
log.error("[cleanExpiredMappings][清理超时会话失败]", e);
@@ -150,7 +151,7 @@ public class IotUdpUpstreamProtocol {
private void sendOfflineMessage(Long deviceId) {
try {
// 获取设备信息
- var device = deviceService.getDeviceFromCache(deviceId);
+ IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceId);
if (device == null) {
log.warn("[sendOfflineMessage][设备不存在,设备 ID: {}]", deviceId);
return;
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java
index 854bdb6145..c35d052551 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java
@@ -2,12 +2,14 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
-import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,8 +36,7 @@ public class IotUdpSessionManager {
/**
* 设备地址 Key -> 最后活跃时间(用于清理)
*/
- // TODO @AI:是不是尽量使用 LocalDateTime ?统一时间类型
- private final Map lastActiveTimeMap = new ConcurrentHashMap<>();
+ private final Map lastActiveTimeMap = new ConcurrentHashMap<>();
/**
* 设备地址 Key -> 设备 ID(反向映射,用于清理时同步)
@@ -52,22 +53,11 @@ public class IotUdpSessionManager {
String addressKey = buildAddressKey(address);
// 更新设备地址映射
deviceAddressMap.put(deviceId, address);
- lastActiveTimeMap.put(addressKey, System.currentTimeMillis());
+ lastActiveTimeMap.put(addressKey, LocalDateTime.now());
addressDeviceMap.put(addressKey, deviceId);
log.debug("[updateDeviceAddress][更新设备地址,设备 ID: {},地址: {}]", deviceId, addressKey);
}
- // TODO @AI:是不是用不到?用不掉就删除掉!简化
- /**
- * 获取设备地址(下行消息发送时使用)
- *
- * @param deviceId 设备 ID
- * @return 设备地址,如果不存在返回 null
- */
- public InetSocketAddress getDeviceAddress(Long deviceId) {
- return deviceAddressMap.get(deviceId);
- }
-
/**
* 检查设备是否在线(即是否有地址映射)
*
@@ -126,46 +116,35 @@ public class IotUdpSessionManager {
* @param timeoutMs 超时时间(毫秒)
* @return 清理的设备 ID 列表(用于发送离线消息)
*/
- // TODO @AI:目前暂时用不到 address 字段,是不是只返回 list of deviceId 就行?简化
- public java.util.List cleanExpiredMappings(long timeoutMs) {
- java.util.List offlineDevices = new java.util.ArrayList<>();
- long now = System.currentTimeMillis();
- Iterator> iterator = lastActiveTimeMap.entrySet().iterator();
+ public List cleanExpiredMappings(long timeoutMs) {
+ List offlineDeviceIds = new ArrayList<>();
+ LocalDateTime now = LocalDateTime.now();
+ LocalDateTime expireTime = now.minusNanos(timeoutMs * 1_000_000);
+ Iterator> iterator = lastActiveTimeMap.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry entry = iterator.next();
- if (now - entry.getValue() > timeoutMs) {
- String addressKey = entry.getKey();
- Long deviceId = addressDeviceMap.remove(addressKey);
- // TODO @AI:if continue,减少括号层级;
- if (deviceId != null) {
- InetSocketAddress address = deviceAddressMap.remove(deviceId);
- if (address != null) {
- // 获取设备信息用于发送离线消息
- offlineDevices.add(new DeviceOfflineInfo(deviceId, addressKey));
- log.info("[cleanExpiredMappings][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}ms 前]",
- deviceId, addressKey, now - entry.getValue());
- }
- }
- iterator.remove();
+ // 未过期,跳过
+ Map.Entry entry = iterator.next();
+ if (entry.getValue().isAfter(expireTime)) {
+ continue;
}
+ // 过期处理:记录离线设备 ID
+ String addressKey = entry.getKey();
+ Long deviceId = addressDeviceMap.remove(addressKey);
+ if (deviceId == null) {
+ iterator.remove();
+ continue;
+ }
+ InetSocketAddress address = deviceAddressMap.remove(deviceId);
+ if (address == null) {
+ iterator.remove();
+ continue;
+ }
+ offlineDeviceIds.add(deviceId);
+ log.debug("[cleanExpiredMappings][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}]",
+ deviceId, addressKey, entry.getValue());
+ iterator.remove();
}
- return offlineDevices;
- }
-
- // TODO @AI:是不是用不到?用不掉就删除掉!简化
- /**
- * 移除设备地址映射
- *
- * @param deviceId 设备 ID
- */
- public void removeDeviceAddress(Long deviceId) {
- InetSocketAddress address = deviceAddressMap.remove(deviceId);
- if (address != null) {
- String addressKey = buildAddressKey(address);
- lastActiveTimeMap.remove(addressKey);
- addressDeviceMap.remove(addressKey);
- log.debug("[removeDeviceAddress][移除设备地址,设备 ID: {},地址: {}]", deviceId, addressKey);
- }
+ return offlineDeviceIds;
}
/**
@@ -178,27 +157,4 @@ public class IotUdpSessionManager {
return address.getHostString() + ":" + address.getPort();
}
- /**
- * 设备离线信息
- */
- @Data
- public static class DeviceOfflineInfo {
-
- /**
- * 设备 ID
- */
- private final Long deviceId;
-
- /**
- * 设备地址
- */
- private final String address;
-
- public DeviceOfflineInfo(Long deviceId, String address) {
- this.deviceId = deviceId;
- this.address = address;
- }
-
- }
-
}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java
index 80b05406d3..b1fcaa3f9d 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/package-info.java
@@ -1,2 +1,6 @@
-// TODO @AI:完善下注释,参考 mqtt 的 package.json
+/**
+ * UDP 协议实现包
+ *
+ * 提供基于 Vert.x DatagramSocket 的 IoT 设备连接和消息处理功能
+ */
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java
index 77a58cfd2c..e9ae94d6e8 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/router/IotUdpUpstreamHandler.java
@@ -26,11 +26,10 @@ import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Map;
-// TODO @AI:注释里,不要出现 CoAP,避免理解成本过高;
/**
* UDP 上行消息处理器
*
- * 采用 CoAP 风格的 Token 机制(无状态,每次请求携带 token):
+ * 采用无状态 Token 机制(每次请求携带 token):
* 1. 认证请求:设备发送 auth 消息,携带 clientId、username、password
* 2. 返回 Token:服务端验证后返回 JWT token
* 3. 后续请求:每次请求在 params 中携带 token
@@ -45,6 +44,10 @@ public class IotUdpUpstreamHandler {
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
private static final String AUTH_METHOD = "auth";
+ /**
+ * Token 参数 Key
+ */
+ private static final String PARAM_KEY_TOKEN = "token";
private final IotDeviceMessageService deviceMessageService;
@@ -70,15 +73,13 @@ public class IotUdpUpstreamHandler {
this.serverId = protocol.getServerId();
}
- // TODO @AI:protocol 这个参数如果用不到,就删除下;
/**
* 处理 UDP 数据包
*
- * @param packet 数据包
- * @param socket UDP Socket
- * @param protocol UDP 协议
+ * @param packet 数据包
+ * @param socket UDP Socket
*/
- public void handle(DatagramPacket packet, DatagramSocket socket, IotUdpUpstreamProtocol protocol) {
+ public void handle(DatagramPacket packet, DatagramSocket socket) {
InetSocketAddress senderAddress = new InetSocketAddress(packet.sender().host(), packet.sender().port());
Buffer data = packet.data();
log.debug("[handle][收到 UDP 数据包,来源: {},数据长度: {} 字节]",
@@ -180,7 +181,7 @@ public class IotUdpUpstreamHandler {
return;
}
- // 3.1 生成 JWT Token(CoAP 风格)
+ // 3.1 生成 JWT Token(无状态)
String token = deviceTokenService.createToken(device.getProductKey(), device.getDeviceName());
// 3.2 更新设备地址映射(用于下行消息)
@@ -212,20 +213,18 @@ public class IotUdpUpstreamHandler {
InetSocketAddress senderAddress, DatagramSocket socket) {
String addressKey = sessionManager.buildAddressKey(senderAddress);
try {
- // TODO @AI:token 需要枚举个 KEY;考虑到是通过 params 传递的话,需要获取到后,从 map 里移除掉,避免影响后续业务逻辑处理;
- // 1. 从消息中提取 token(CoAP 风格:消息体携带 token)
+ // 1.1 从消息中提取 token(无状态:消息体携带 token)
String token = null;
if (message.getParams() instanceof Map) {
- token = MapUtil.getStr((Map) message.getParams(), "token");
+ Map paramsMap = (Map) message.getParams();
+ token = (String) paramsMap.remove(PARAM_KEY_TOKEN);
}
-
if (StrUtil.isBlank(token)) {
log.warn("[handleBusinessRequest][缺少 token,来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "请先进行认证", codecType);
return;
}
-
- // 2. 验证 token,获取设备信息
+ // 1.2 验证 token,获取设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token);
if (deviceInfo == null) {
log.warn("[handleBusinessRequest][token 无效或已过期,来源: {}]", addressKey);
@@ -233,7 +232,7 @@ public class IotUdpUpstreamHandler {
return;
}
- // 3. 获取设备详细信息
+ // 2. 获取设备详细信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
@@ -243,14 +242,14 @@ public class IotUdpUpstreamHandler {
return;
}
- // 4. 更新设备地址映射(保持最新)
+ // 3. 更新设备地址映射(保持最新)
sessionManager.updateDeviceAddress(device.getId(), senderAddress);
- // 5. 发送消息到消息总线
+ // 4. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, device.getProductKey(),
device.getDeviceName(), serverId);
- // 6. 发送成功响应
+ // 5. 发送成功响应
sendSuccessResponse(socket, senderAddress, message.getRequestId(), "处理成功", codecType);
log.debug("[handleBusinessRequest][业务消息处理成功,设备 ID: {},方法: {},来源: {}]",
device.getId(), message.getMethod(), addressKey);
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java
index 6c5e6dd2e1..4f2dbfcf66 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocolIntegrationTest.java
@@ -36,13 +36,14 @@ public class IotUdpProtocolIntegrationTest {
private static final String DEVICE_NAME = "small";
private static final String PASSWORD = "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75";
+ // TODO @芋艿:1、IotDeviceAuthUtils 调整下拼接;2、password 的生成;3、后续给 http 也整个单测;4、后续给 tcp 也整个单测;5、后续给 mqtt 也整个单测;6、后续给 emqp 也整个单测
private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME;
private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY;
/**
* 设备 Token:从 {@link #testAuth()} 方法获取后,粘贴到这里
*/
- private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMwNTA1NSwiZGV2aWNlTmFtZSI6InNtYWxsIn0.mf3MEATCn5bp6cXgULunZjs8d00RGUxj96JEz0hMS7k";
+ private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMxMTY0NiwiZGV2aWNlTmFtZSI6InNtYWxsIn0.re6LCaRfKiE9VQTP3w0Brh2ScVIgrvN3H96z_snndoM";
/**
* 认证测试:获取设备 Token
@@ -107,12 +108,14 @@ public class IotUdpProtocolIntegrationTest {
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
.put("version", "1.0")
- .put("identifier", "eat")
.put("params", MapUtil.builder()
- .put("token", TOKEN)
- .put("width", 1)
- .put("height", "2")
- .put("oneThree", "3")
+ .put("identifier", "eat")
+ .put("value", MapUtil.builder()
+ .put("width", 1)
+ .put("height", "2")
+ .put("oneThree", "3")
+ .build())
+ .put("time", System.currentTimeMillis())
.build())
.build());