feat:【iot】modbus-tcp 协议接入 100%:完善注释、完善单测

This commit is contained in:
YunaiV
2026-01-18 11:47:56 +08:00
parent b2fef46b2c
commit 52b8e66466
6 changed files with 84 additions and 120 deletions

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -119,7 +120,7 @@ public class IotTcpConnectionManager {
}
try {
socket.write(io.vertx.core.buffer.Buffer.buffer(data));
socket.write(Buffer.buffer(data));
log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, data.length);
return true;
} catch (Exception e) {

View File

@@ -1,5 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
@@ -15,6 +17,8 @@ import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* IoT 网关 UDP 协议:接收设备上行消息
* <p>
@@ -80,19 +84,18 @@ public class IotUdpUpstreamProtocol {
// 4. 监听端口
udpSocket.listen(udpProperties.getPort(), "0.0.0.0", result -> {
// TODO @AIif return简化下成功才继续往下走
if (result.succeeded()) {
if (result.failed()) {
log.error("[start][IoT 网关 UDP 协议启动失败]", result.cause());
return;
}
// 设置数据包处理器
udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket, this));
udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket));
log.info("[start][IoT 网关 UDP 协议启动成功,端口:{},接收缓冲区:{} 字节,发送缓冲区:{} 字节]",
udpProperties.getPort(), udpProperties.getReceiveBufferSize(),
udpProperties.getSendBufferSize());
// 5. 启动会话清理定时器
startSessionCleanTimer();
} else {
log.error("[start][IoT 网关 UDP 协议启动失败]", result.cause());
}
});
}
@@ -123,16 +126,14 @@ public class IotUdpUpstreamProtocol {
cleanTimerId = vertx.setPeriodic(udpProperties.getSessionCleanIntervalMs(), id -> {
try {
// 1. 清理超时的设备地址映射,并获取离线设备列表
// TODO @AI兼容 jdk8不要用 var
var offlineDevices = sessionManager.cleanExpiredMappings(udpProperties.getSessionTimeoutMs());
List<Long> offlineDeviceIds = sessionManager.cleanExpiredMappings(udpProperties.getSessionTimeoutMs());
// 2. 为每个离线设备发送离线消息
for (var offlineInfo : offlineDevices) {
sendOfflineMessage(offlineInfo.getDeviceId());
for (Long deviceId : offlineDeviceIds) {
sendOfflineMessage(deviceId);
}
// TODO @AICollUtil.isNotEmpty ;简化下 if 判断;
if (!offlineDevices.isEmpty()) {
log.info("[cleanExpiredMappings][本次清理 {} 个超时设备]", offlineDevices.size());
if (CollUtil.isNotEmpty(offlineDeviceIds)) {
log.info("[cleanExpiredMappings][本次清理 {} 个超时设备]", offlineDeviceIds.size());
}
} catch (Exception e) {
log.error("[cleanExpiredMappings][清理超时会话失败]", e);
@@ -150,7 +151,7 @@ public class IotUdpUpstreamProtocol {
private void sendOfflineMessage(Long deviceId) {
try {
// 获取设备信息
var device = deviceService.getDeviceFromCache(deviceId);
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceId);
if (device == null) {
log.warn("[sendOfflineMessage][设备不存在,设备 ID: {}]", deviceId);
return;

View File

@@ -2,12 +2,14 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,8 +36,7 @@ public class IotUdpSessionManager {
/**
* 设备地址 Key -> 最后活跃时间(用于清理)
*/
// TODO @AI是不是尽量使用 LocalDateTime ?统一时间类型
private final Map<String, Long> lastActiveTimeMap = new ConcurrentHashMap<>();
private final Map<String, LocalDateTime> lastActiveTimeMap = new ConcurrentHashMap<>();
/**
* 设备地址 Key -> 设备 ID反向映射用于清理时同步
@@ -52,22 +53,11 @@ public class IotUdpSessionManager {
String addressKey = buildAddressKey(address);
// 更新设备地址映射
deviceAddressMap.put(deviceId, address);
lastActiveTimeMap.put(addressKey, System.currentTimeMillis());
lastActiveTimeMap.put(addressKey, LocalDateTime.now());
addressDeviceMap.put(addressKey, deviceId);
log.debug("[updateDeviceAddress][更新设备地址,设备 ID: {},地址: {}]", deviceId, addressKey);
}
// TODO @AI是不是用不到用不掉就删除掉简化
/**
* 获取设备地址(下行消息发送时使用)
*
* @param deviceId 设备 ID
* @return 设备地址,如果不存在返回 null
*/
public InetSocketAddress getDeviceAddress(Long deviceId) {
return deviceAddressMap.get(deviceId);
}
/**
* 检查设备是否在线(即是否有地址映射)
*
@@ -126,46 +116,35 @@ public class IotUdpSessionManager {
* @param timeoutMs 超时时间(毫秒)
* @return 清理的设备 ID 列表(用于发送离线消息)
*/
// TODO @AI目前暂时用不到 address 字段,是不是只返回 list of deviceId 就行?简化
public java.util.List<DeviceOfflineInfo> cleanExpiredMappings(long timeoutMs) {
java.util.List<DeviceOfflineInfo> offlineDevices = new java.util.ArrayList<>();
long now = System.currentTimeMillis();
Iterator<Map.Entry<String, Long>> iterator = lastActiveTimeMap.entrySet().iterator();
public List<Long> cleanExpiredMappings(long timeoutMs) {
List<Long> offlineDeviceIds = new ArrayList<>();
LocalDateTime now = LocalDateTime.now();
LocalDateTime expireTime = now.minusNanos(timeoutMs * 1_000_000);
Iterator<Map.Entry<String, LocalDateTime>> iterator = lastActiveTimeMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> entry = iterator.next();
if (now - entry.getValue() > timeoutMs) {
// 未过期,跳过
Map.Entry<String, LocalDateTime> entry = iterator.next();
if (entry.getValue().isAfter(expireTime)) {
continue;
}
// 过期处理:记录离线设备 ID
String addressKey = entry.getKey();
Long deviceId = addressDeviceMap.remove(addressKey);
// TODO @AIif continue减少括号层级
if (deviceId != null) {
if (deviceId == null) {
iterator.remove();
continue;
}
InetSocketAddress address = deviceAddressMap.remove(deviceId);
if (address != null) {
// 获取设备信息用于发送离线消息
offlineDevices.add(new DeviceOfflineInfo(deviceId, addressKey));
log.info("[cleanExpiredMappings][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}ms 前]",
deviceId, addressKey, now - entry.getValue());
}
if (address == null) {
iterator.remove();
continue;
}
offlineDeviceIds.add(deviceId);
log.debug("[cleanExpiredMappings][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}]",
deviceId, addressKey, entry.getValue());
iterator.remove();
}
}
return offlineDevices;
}
// TODO @AI是不是用不到用不掉就删除掉简化
/**
* 移除设备地址映射
*
* @param deviceId 设备 ID
*/
public void removeDeviceAddress(Long deviceId) {
InetSocketAddress address = deviceAddressMap.remove(deviceId);
if (address != null) {
String addressKey = buildAddressKey(address);
lastActiveTimeMap.remove(addressKey);
addressDeviceMap.remove(addressKey);
log.debug("[removeDeviceAddress][移除设备地址,设备 ID: {},地址: {}]", deviceId, addressKey);
}
return offlineDeviceIds;
}
/**
@@ -178,27 +157,4 @@ public class IotUdpSessionManager {
return address.getHostString() + ":" + address.getPort();
}
/**
* 设备离线信息
*/
@Data
public static class DeviceOfflineInfo {
/**
* 设备 ID
*/
private final Long deviceId;
/**
* 设备地址
*/
private final String address;
public DeviceOfflineInfo(Long deviceId, String address) {
this.deviceId = deviceId;
this.address = address;
}
}
}

View File

@@ -1,2 +1,6 @@
// TODO @AI完善下注释参考 mqtt 的 package.json
/**
* UDP 协议实现包
* <p>
* 提供基于 Vert.x DatagramSocket 的 IoT 设备连接和消息处理功能
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;

View File

@@ -26,11 +26,10 @@ import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Map;
// TODO @AI注释里不要出现 CoAP避免理解成本过高
/**
* UDP 上行消息处理器
* <p>
* 采用 CoAP 风格的 Token 机制(无状态,每次请求携带 token
* 采用无状态 Token 机制(每次请求携带 token
* 1. 认证请求:设备发送 auth 消息,携带 clientId、username、password
* 2. 返回 Token服务端验证后返回 JWT token
* 3. 后续请求:每次请求在 params 中携带 token
@@ -45,6 +44,10 @@ public class IotUdpUpstreamHandler {
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
private static final String AUTH_METHOD = "auth";
/**
* Token 参数 Key
*/
private static final String PARAM_KEY_TOKEN = "token";
private final IotDeviceMessageService deviceMessageService;
@@ -70,15 +73,13 @@ public class IotUdpUpstreamHandler {
this.serverId = protocol.getServerId();
}
// TODO @AIprotocol 这个参数如果用不到,就删除下;
/**
* 处理 UDP 数据包
*
* @param packet 数据包
* @param socket UDP Socket
* @param protocol UDP 协议
*/
public void handle(DatagramPacket packet, DatagramSocket socket, IotUdpUpstreamProtocol protocol) {
public void handle(DatagramPacket packet, DatagramSocket socket) {
InetSocketAddress senderAddress = new InetSocketAddress(packet.sender().host(), packet.sender().port());
Buffer data = packet.data();
log.debug("[handle][收到 UDP 数据包,来源: {},数据长度: {} 字节]",
@@ -180,7 +181,7 @@ public class IotUdpUpstreamHandler {
return;
}
// 3.1 生成 JWT TokenCoAP 风格
// 3.1 生成 JWT Token无状态
String token = deviceTokenService.createToken(device.getProductKey(), device.getDeviceName());
// 3.2 更新设备地址映射(用于下行消息)
@@ -212,20 +213,18 @@ public class IotUdpUpstreamHandler {
InetSocketAddress senderAddress, DatagramSocket socket) {
String addressKey = sessionManager.buildAddressKey(senderAddress);
try {
// TODO @AItoken 需要枚举个 KEY考虑到是通过 params 传递的话,需要获取到后,从 map 里移除掉,避免影响后续业务逻辑处理;
// 1. 从消息中提取 tokenCoAP 风格:消息体携带 token
// 1.1 从消息中提取 token无状态消息体携带 token
String token = null;
if (message.getParams() instanceof Map) {
token = MapUtil.getStr((Map<String, Object>) message.getParams(), "token");
Map<String, Object> paramsMap = (Map<String, Object>) message.getParams();
token = (String) paramsMap.remove(PARAM_KEY_TOKEN);
}
if (StrUtil.isBlank(token)) {
log.warn("[handleBusinessRequest][缺少 token来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "请先进行认证", codecType);
return;
}
// 2. 验证 token获取设备信息
// 1.2 验证 token获取设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token);
if (deviceInfo == null) {
log.warn("[handleBusinessRequest][token 无效或已过期,来源: {}]", addressKey);
@@ -233,7 +232,7 @@ public class IotUdpUpstreamHandler {
return;
}
// 3. 获取设备详细信息
// 2. 获取设备详细信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
@@ -243,14 +242,14 @@ public class IotUdpUpstreamHandler {
return;
}
// 4. 更新设备地址映射(保持最新)
// 3. 更新设备地址映射(保持最新)
sessionManager.updateDeviceAddress(device.getId(), senderAddress);
// 5. 发送消息到消息总线
// 4. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, device.getProductKey(),
device.getDeviceName(), serverId);
// 6. 发送成功响应
// 5. 发送成功响应
sendSuccessResponse(socket, senderAddress, message.getRequestId(), "处理成功", codecType);
log.debug("[handleBusinessRequest][业务消息处理成功,设备 ID: {},方法: {},来源: {}]",
device.getId(), message.getMethod(), addressKey);

View File

@@ -36,13 +36,14 @@ public class IotUdpProtocolIntegrationTest {
private static final String DEVICE_NAME = "small";
private static final String PASSWORD = "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75";
// TODO @芋艿1、IotDeviceAuthUtils 调整下拼接2、password 的生成3、后续给 http 也整个单测4、后续给 tcp 也整个单测5、后续给 mqtt 也整个单测6、后续给 emqp 也整个单测
private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME;
private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY;
/**
* 设备 Token从 {@link #testAuth()} 方法获取后,粘贴到这里
*/
private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMwNTA1NSwiZGV2aWNlTmFtZSI6InNtYWxsIn0.mf3MEATCn5bp6cXgULunZjs8d00RGUxj96JEz0hMS7k";
private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMxMTY0NiwiZGV2aWNlTmFtZSI6InNtYWxsIn0.re6LCaRfKiE9VQTP3w0Brh2ScVIgrvN3H96z_snndoM";
/**
* 认证测试:获取设备 Token
@@ -107,13 +108,15 @@ public class IotUdpProtocolIntegrationTest {
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
.put("version", "1.0")
.put("identifier", "eat")
.put("params", MapUtil.builder()
.put("token", TOKEN)
.put("identifier", "eat")
.put("value", MapUtil.builder()
.put("width", 1)
.put("height", "2")
.put("oneThree", "3")
.build())
.put("time", System.currentTimeMillis())
.build())
.build());
try (DatagramSocket socket = new DatagramSocket()) {