!1500 feat:【iot】udp 协议 100%:完善注释、完善单测

Merge pull request !1500 from 芋道源码/feature/iot-udp
This commit is contained in:
芋道源码
2026-01-25 10:53:21 +00:00
committed by Gitee
12 changed files with 1180 additions and 2 deletions

View File

@@ -17,6 +17,9 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDowns
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
@@ -194,4 +197,39 @@ public class IotGatewayConfiguration {
}
/**
* IoT 网关 UDP 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.udp", name = "enabled", havingValue = "true")
@Slf4j
public static class UdpProtocolConfiguration {
@Bean(name = "udpVertx", destroyMethod = "close")
public Vertx udpVertx() {
return Vertx.vertx();
}
@Bean
public IotUdpUpstreamProtocol iotUdpUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotUdpSessionManager sessionManager,
@Qualifier("udpVertx") Vertx udpVertx) {
return new IotUdpUpstreamProtocol(gatewayProperties.getProtocol().getUdp(),
deviceService, messageService, sessionManager, udpVertx);
}
@Bean
public IotUdpDownstreamSubscriber iotUdpDownstreamSubscriber(IotUdpUpstreamProtocol protocolHandler,
IotDeviceMessageService messageService,
IotDeviceService deviceService,
IotUdpSessionManager sessionManager,
IotMessageBus messageBus) {
return new IotUdpDownstreamSubscriber(protocolHandler, messageService, deviceService, sessionManager,
messageBus);
}
}
}

View File

@@ -93,6 +93,11 @@ public class IotGatewayProperties {
*/
private MqttWsProperties mqttWs;
/**
* UDP 组件配置
*/
private UdpProperties udp;
}
@Data
@@ -503,4 +508,42 @@ public class IotGatewayProperties {
}
@Data
public static class UdpProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* 服务端口(默认 8092
*/
private Integer port = 8092;
/**
* 接收缓冲区大小(默认 64KB
*/
private Integer receiveBufferSize = 65536;
/**
* 发送缓冲区大小(默认 64KB
*/
private Integer sendBufferSize = 65536;
/**
* 会话超时时间(毫秒,默认 60 秒)
* <p>
* 用于清理不活跃的设备地址映射
*/
private Long sessionTimeoutMs = 60000L;
/**
* 会话清理间隔(毫秒,默认 30 秒)
*/
private Long sessionCleanIntervalMs = 30000L;
}
}

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -119,7 +120,7 @@ public class IotTcpConnectionManager {
}
try {
socket.write(io.vertx.core.buffer.Buffer.buffer(data));
socket.write(Buffer.buffer(data));
log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, data.length);
return true;
} catch (Exception e) {

View File

@@ -43,7 +43,7 @@ public class IotTcpDownstreamHandler {
return;
}
// 2. 根据产品 Key 和设备名称编码消息并发送到设备
// 2. 根据产品 Key 和设备名称编码消息并发送到设备
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes);

View File

@@ -0,0 +1,67 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 UDP 下游订阅者:接收下行给设备的消息
*
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotUdpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotUdpUpstreamProtocol protocol;
private final IotDeviceMessageService messageService;
private final IotDeviceService deviceService;
private final IotUdpSessionManager sessionManager;
private final IotMessageBus messageBus;
private IotUdpDownstreamHandler downstreamHandler;
@PostConstruct
public void init() {
// 初始化下游处理器
this.downstreamHandler = new IotUdpDownstreamHandler(messageService, deviceService, sessionManager, protocol);
messageBus.register(this);
log.info("[init][UDP 下游订阅者初始化完成,服务器 ID: {}Topic: {}]",
protocol.getServerId(), getTopic());
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
try {
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId(), e);
}
}
}

View File

@@ -0,0 +1,171 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* IoT 网关 UDP 协议:接收设备上行消息
* <p>
* 采用 Vertx DatagramSocket 实现 UDP 服务器,主要功能:
* 1. 监听 UDP 端口,接收设备消息
* 2. 定期清理不活跃的设备地址映射
* 3. 提供 UDP Socket 用于下行消息发送
*
* @author 芋道源码
*/
@Slf4j
public class IotUdpUpstreamProtocol {
private final IotGatewayProperties.UdpProperties udpProperties;
private final IotDeviceService deviceService;
private final IotDeviceMessageService messageService;
private final IotUdpSessionManager sessionManager;
private final Vertx vertx;
@Getter
private final String serverId;
@Getter
private DatagramSocket udpSocket;
/**
* 会话清理定时器 ID
*/
private Long cleanTimerId;
private IotUdpUpstreamHandler upstreamHandler;
public IotUdpUpstreamProtocol(IotGatewayProperties.UdpProperties udpProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotUdpSessionManager sessionManager,
Vertx vertx) {
this.udpProperties = udpProperties;
this.deviceService = deviceService;
this.messageService = messageService;
this.sessionManager = sessionManager;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(udpProperties.getPort());
}
@PostConstruct
public void start() {
// 1. 初始化上行消息处理器
this.upstreamHandler = new IotUdpUpstreamHandler(this, messageService, deviceService, sessionManager);
// 2. 创建 UDP Socket 选项
DatagramSocketOptions options = new DatagramSocketOptions()
.setReceiveBufferSize(udpProperties.getReceiveBufferSize())
.setSendBufferSize(udpProperties.getSendBufferSize())
.setReuseAddress(true);
// 3. 创建 UDP Socket
udpSocket = vertx.createDatagramSocket(options);
// 4. 监听端口
udpSocket.listen(udpProperties.getPort(), "0.0.0.0", result -> {
if (result.failed()) {
log.error("[start][IoT 网关 UDP 协议启动失败]", result.cause());
return;
}
// 设置数据包处理器
udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket));
log.info("[start][IoT 网关 UDP 协议启动成功,端口:{},接收缓冲区:{} 字节,发送缓冲区:{} 字节]",
udpProperties.getPort(), udpProperties.getReceiveBufferSize(),
udpProperties.getSendBufferSize());
// 5. 启动会话清理定时器
startSessionCleanTimer();
});
}
@PreDestroy
public void stop() {
// 1. 取消会话清理定时器
if (cleanTimerId != null) {
vertx.cancelTimer(cleanTimerId);
cleanTimerId = null;
log.info("[stop][会话清理定时器已取消]");
}
// 2. 关闭 UDP Socket
if (udpSocket != null) {
try {
udpSocket.close().result();
log.info("[stop][IoT 网关 UDP 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 UDP 协议停止失败]", e);
}
}
}
/**
* 启动会话清理定时器
*/
private void startSessionCleanTimer() {
cleanTimerId = vertx.setPeriodic(udpProperties.getSessionCleanIntervalMs(), id -> {
try {
// 1. 清理超时的设备地址映射,并获取离线设备列表
List<Long> offlineDeviceIds = sessionManager.cleanExpiredMappings(udpProperties.getSessionTimeoutMs());
// 2. 为每个离线设备发送离线消息
for (Long deviceId : offlineDeviceIds) {
sendOfflineMessage(deviceId);
}
if (CollUtil.isNotEmpty(offlineDeviceIds)) {
log.info("[cleanExpiredMappings][本次清理 {} 个超时设备]", offlineDeviceIds.size());
}
} catch (Exception e) {
log.error("[cleanExpiredMappings][清理超时会话失败]", e);
}
});
log.info("[startSessionCleanTimer][会话清理定时器启动,间隔:{} ms超时{} ms]",
udpProperties.getSessionCleanIntervalMs(), udpProperties.getSessionTimeoutMs());
}
/**
* 发送设备离线消息
*
* @param deviceId 设备 ID
*/
private void sendOfflineMessage(Long deviceId) {
try {
// 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceId);
if (device == null) {
log.warn("[sendOfflineMessage][设备不存在,设备 ID: {}]", deviceId);
return;
}
// 发送离线消息
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
messageService.sendDeviceMessage(offlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
log.info("[sendOfflineMessage][发送离线消息,设备 ID: {},设备名: {}]",
deviceId, device.getDeviceName());
} catch (Exception e) {
log.error("[sendOfflineMessage][发送离线消息失败,设备 ID: {}]", deviceId, e);
}
}
}

View File

@@ -0,0 +1,160 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* IoT 网关 UDP 会话管理器
* <p>
* 采用无状态设计SessionManager 主要用于:
* 1. 管理设备地址映射(用于下行消息发送)
* 2. 定期清理不活跃的设备地址映射
* <p>
* 注意UDP 是无连接协议,上行消息通过 token 验证身份,不依赖会话状态
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotUdpSessionManager {
/**
* 设备 ID -> 设备地址(用于下行消息发送)
*/
private final Map<Long, InetSocketAddress> deviceAddressMap = new ConcurrentHashMap<>();
/**
* 设备地址 Key -> 最后活跃时间(用于清理)
*/
private final Map<String, LocalDateTime> lastActiveTimeMap = new ConcurrentHashMap<>();
/**
* 设备地址 Key -> 设备 ID反向映射用于清理时同步
*/
private final Map<String, Long> addressDeviceMap = new ConcurrentHashMap<>();
/**
* 更新设备地址(每次收到上行消息时调用)
*
* @param deviceId 设备 ID
* @param address 设备地址
*/
public void updateDeviceAddress(Long deviceId, InetSocketAddress address) {
String addressKey = buildAddressKey(address);
// 更新设备地址映射
deviceAddressMap.put(deviceId, address);
lastActiveTimeMap.put(addressKey, LocalDateTime.now());
addressDeviceMap.put(addressKey, deviceId);
log.debug("[updateDeviceAddress][更新设备地址,设备 ID: {},地址: {}]", deviceId, addressKey);
}
/**
* 检查设备是否在线(即是否有地址映射)
*
* @param deviceId 设备 ID
* @return 是否在线
*/
public boolean isDeviceOnline(Long deviceId) {
return deviceAddressMap.containsKey(deviceId);
}
/**
* 检查设备是否离线
*
* @param deviceId 设备 ID
* @return 是否离线
*/
public boolean isDeviceOffline(Long deviceId) {
return !isDeviceOnline(deviceId);
}
/**
* 发送消息到设备
*
* @param deviceId 设备 ID
* @param data 数据
* @param socket UDP Socket
* @return 是否发送成功
*/
public boolean sendToDevice(Long deviceId, byte[] data, DatagramSocket socket) {
InetSocketAddress address = deviceAddressMap.get(deviceId);
if (address == null) {
log.warn("[sendToDevice][设备地址不存在,设备 ID: {}]", deviceId);
return false;
}
try {
socket.send(Buffer.buffer(data), address.getPort(), address.getHostString(), result -> {
if (result.succeeded()) {
log.debug("[sendToDevice][发送消息成功,设备 ID: {},地址: {},数据长度: {} 字节]",
deviceId, buildAddressKey(address), data.length);
} else {
log.error("[sendToDevice][发送消息失败,设备 ID: {},地址: {}]",
deviceId, buildAddressKey(address), result.cause());
}
});
return true;
} catch (Exception e) {
log.error("[sendToDevice][发送消息异常,设备 ID: {}]", deviceId, e);
return false;
}
}
/**
* 定期清理不活跃的设备地址映射
*
* @param timeoutMs 超时时间(毫秒)
* @return 清理的设备 ID 列表(用于发送离线消息)
*/
public List<Long> cleanExpiredMappings(long timeoutMs) {
List<Long> offlineDeviceIds = new ArrayList<>();
LocalDateTime now = LocalDateTime.now();
LocalDateTime expireTime = now.minusNanos(timeoutMs * 1_000_000);
Iterator<Map.Entry<String, LocalDateTime>> iterator = lastActiveTimeMap.entrySet().iterator();
while (iterator.hasNext()) {
// 未过期,跳过
Map.Entry<String, LocalDateTime> entry = iterator.next();
if (entry.getValue().isAfter(expireTime)) {
continue;
}
// 过期处理:记录离线设备 ID
String addressKey = entry.getKey();
Long deviceId = addressDeviceMap.remove(addressKey);
if (deviceId == null) {
iterator.remove();
continue;
}
InetSocketAddress address = deviceAddressMap.remove(deviceId);
if (address == null) {
iterator.remove();
continue;
}
offlineDeviceIds.add(deviceId);
log.debug("[cleanExpiredMappings][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}]",
deviceId, addressKey, entry.getValue());
iterator.remove();
}
return offlineDeviceIds;
}
/**
* 构建地址 Key
*
* @param address 地址
* @return 地址 Key
*/
public String buildAddressKey(InetSocketAddress address) {
return address.getHostString() + ":" + address.getPort();
}
}

View File

@@ -0,0 +1,6 @@
/**
* UDP 协议实现包
* <p>
* 提供基于 Vert.x DatagramSocket 的 IoT 设备连接和消息处理功能
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;

View File

@@ -0,0 +1,82 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.router;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.datagram.DatagramSocket;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 UDP 下行消息处理器
*
* @author 芋道源码
*/
@Slf4j
public class IotUdpDownstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotUdpSessionManager sessionManager;
private final IotUdpUpstreamProtocol protocol;
public IotUdpDownstreamHandler(IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotUdpSessionManager sessionManager,
IotUdpUpstreamProtocol protocol) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.sessionManager = sessionManager;
this.protocol = protocol;
}
/**
* 处理下行消息
*
* @param message 下行消息
*/
public void handle(IotDeviceMessage message) {
try {
log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
// 1.1 获取设备信息
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
if (deviceInfo == null) {
log.error("[handle][设备不存在,设备 ID: {}]", message.getDeviceId());
return;
}
// 1.2 检查设备是否在线(即是否有地址映射)
if (sessionManager.isDeviceOffline(message.getDeviceId())) {
log.warn("[handle][设备不在线,设备 ID: {}]", message.getDeviceId());
return;
}
// 2. 根据产品 Key 和设备名称编码消息,并发送到设备
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
DatagramSocket socket = protocol.getUdpSocket();
if (socket == null) {
log.error("[handle][UDP Socket 不可用,设备 ID: {}]", message.getDeviceId());
return;
}
boolean success = sessionManager.sendToDevice(message.getDeviceId(), bytes, socket);
if (success) {
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
} else {
log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
}
} catch (Exception e) {
log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]",
message.getDeviceId(), message.getMethod(), message, e);
}
}
}

View File

@@ -0,0 +1,440 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.router;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramPacket;
import io.vertx.core.datagram.DatagramSocket;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Map;
/**
* UDP 上行消息处理器
* <p>
* 采用无状态 Token 机制(每次请求携带 token
* 1. 认证请求:设备发送 auth 消息,携带 clientId、username、password
* 2. 返回 Token服务端验证后返回 JWT token
* 3. 后续请求:每次请求在 params 中携带 token
* 4. 服务端验证:每次请求通过 IotDeviceTokenService.verifyToken() 验证
*
* @author 芋道源码
*/
@Slf4j
public class IotUdpUpstreamHandler {
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
private static final String AUTH_METHOD = "auth";
/**
* Token 参数 Key
*/
private static final String PARAM_KEY_TOKEN = "token";
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotUdpSessionManager sessionManager;
private final IotDeviceTokenService deviceTokenService;
private final IotDeviceCommonApi deviceApi;
private final String serverId;
public IotUdpUpstreamHandler(IotUdpUpstreamProtocol protocol,
IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotUdpSessionManager sessionManager) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.sessionManager = sessionManager;
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.serverId = protocol.getServerId();
}
/**
* 处理 UDP 数据包
*
* @param packet 数据包
* @param socket UDP Socket
*/
public void handle(DatagramPacket packet, DatagramSocket socket) {
InetSocketAddress senderAddress = new InetSocketAddress(packet.sender().host(), packet.sender().port());
Buffer data = packet.data();
log.debug("[handle][收到 UDP 数据包,来源: {},数据长度: {} 字节]",
sessionManager.buildAddressKey(senderAddress), data.length());
try {
processMessage(data, senderAddress, socket);
} catch (Exception e) {
log.error("[handle][处理消息失败,来源: {},错误: {}]",
sessionManager.buildAddressKey(senderAddress), e.getMessage(), e);
// UDP 无连接,不需要断开连接,只记录错误
}
}
/**
* 处理消息
*
* @param buffer 消息
* @param senderAddress 发送者地址
* @param socket UDP Socket
*/
private void processMessage(Buffer buffer, InetSocketAddress senderAddress, DatagramSocket socket) {
// 1. 基础检查
if (buffer == null || buffer.length() == 0) {
return;
}
// 2. 获取消息格式类型
String codecType = getMessageCodecType(buffer);
// 3. 解码消息
IotDeviceMessage message;
try {
message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
if (message == null) {
log.warn("[processMessage][消息解码失败,来源: {}]", sessionManager.buildAddressKey(senderAddress));
sendErrorResponse(socket, senderAddress, null, "消息解码失败", codecType);
return;
}
} catch (Exception e) {
log.error("[processMessage][消息解码异常,来源: {}]", sessionManager.buildAddressKey(senderAddress), e);
sendErrorResponse(socket, senderAddress, null, "消息解码失败: " + e.getMessage(), codecType);
return;
}
// 4. 根据消息类型路由处理
try {
if (AUTH_METHOD.equals(message.getMethod())) {
// 认证请求
handleAuthenticationRequest(message, codecType, senderAddress, socket);
} else {
// 业务消息
handleBusinessRequest(message, codecType, senderAddress, socket);
}
} catch (Exception e) {
log.error("[processMessage][处理消息失败,来源: {},消息方法: {}]",
sessionManager.buildAddressKey(senderAddress), message.getMethod(), e);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "消息处理失败", codecType);
}
}
/**
* 处理认证请求
*
* @param message 消息信息
* @param codecType 消息编解码类型
* @param senderAddress 发送者地址
* @param socket UDP Socket
*/
private void handleAuthenticationRequest(IotDeviceMessage message, String codecType,
InetSocketAddress senderAddress, DatagramSocket socket) {
String addressKey = sessionManager.buildAddressKey(senderAddress);
try {
// 1.1 解析认证参数
IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams());
if (authParams == null) {
log.warn("[handleAuthenticationRequest][认证参数解析失败,来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证参数不完整", codecType);
return;
}
// 1.2 执行认证
if (!validateDeviceAuth(authParams)) {
log.warn("[handleAuthenticationRequest][认证失败,来源: {}username: {}]",
addressKey, authParams.getUsername());
sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证失败", codecType);
return;
}
// 2.1 解析设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
if (deviceInfo == null) {
sendErrorResponse(socket, senderAddress, message.getRequestId(), "解析设备信息失败", codecType);
return;
}
// 2.2 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
sendErrorResponse(socket, senderAddress, message.getRequestId(), "设备不存在", codecType);
return;
}
// 3.1 生成 JWT Token无状态
String token = deviceTokenService.createToken(device.getProductKey(), device.getDeviceName());
// 3.2 更新设备地址映射(用于下行消息)
sessionManager.updateDeviceAddress(device.getId(), senderAddress);
// 3.3 发送上线消息
sendOnlineMessage(device);
// 3.4 发送成功响应(包含 token
sendAuthSuccessResponse(socket, senderAddress, message.getRequestId(), token, codecType);
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {},来源: {}]",
device.getId(), device.getDeviceName(), addressKey);
} catch (Exception e) {
log.error("[handleAuthenticationRequest][认证处理异常,来源: {}]", addressKey, e);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证处理异常", codecType);
}
}
/**
* 处理业务请求
*
* @param message 消息信息
* @param codecType 消息编解码类型
* @param senderAddress 发送者地址
* @param socket UDP Socket
*/
@SuppressWarnings("unchecked")
private void handleBusinessRequest(IotDeviceMessage message, String codecType,
InetSocketAddress senderAddress, DatagramSocket socket) {
String addressKey = sessionManager.buildAddressKey(senderAddress);
try {
// 1.1 从消息中提取 token无状态消息体携带 token
String token = null;
if (message.getParams() instanceof Map) {
Map<String, Object> paramsMap = (Map<String, Object>) message.getParams();
token = (String) paramsMap.remove(PARAM_KEY_TOKEN);
}
if (StrUtil.isBlank(token)) {
log.warn("[handleBusinessRequest][缺少 token来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "请先进行认证", codecType);
return;
}
// 1.2 验证 token获取设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token);
if (deviceInfo == null) {
log.warn("[handleBusinessRequest][token 无效或已过期,来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "token 无效或已过期", codecType);
return;
}
// 2. 获取设备详细信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
log.warn("[handleBusinessRequest][设备不存在,来源: {}productKey: {}deviceName: {}]",
addressKey, deviceInfo.getProductKey(), deviceInfo.getDeviceName());
sendErrorResponse(socket, senderAddress, message.getRequestId(), "设备不存在", codecType);
return;
}
// 3. 更新设备地址映射(保持最新)
sessionManager.updateDeviceAddress(device.getId(), senderAddress);
// 4. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, device.getProductKey(),
device.getDeviceName(), serverId);
// 5. 发送成功响应
sendSuccessResponse(socket, senderAddress, message.getRequestId(), "处理成功", codecType);
log.debug("[handleBusinessRequest][业务消息处理成功,设备 ID: {},方法: {},来源: {}]",
device.getId(), message.getMethod(), addressKey);
} catch (Exception e) {
log.error("[handleBusinessRequest][业务请求处理异常,来源: {}]", addressKey, e);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "处理失败", codecType);
}
}
/**
* 获取消息编解码类型
*
* @param buffer 消息
* @return 消息编解码类型
*/
private String getMessageCodecType(Buffer buffer) {
// 检测消息格式类型
return IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(buffer.getBytes()) ? CODEC_TYPE_BINARY
: CODEC_TYPE_JSON;
}
/**
* 发送设备上线消息
*
* @param device 设备信息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
} catch (Exception e) {
log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
}
}
/**
* 验证设备认证信息
*
* @param authParams 认证参数
* @return 是否认证成功
*/
private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) {
try {
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(authParams.getClientId()).setUsername(authParams.getUsername())
.setPassword(authParams.getPassword()));
result.checkError();
return BooleanUtil.isTrue(result.getData());
} catch (Exception e) {
log.error("[validateDeviceAuth][设备认证异常username: {}]", authParams.getUsername(), e);
return false;
}
}
/**
* 发送认证成功响应(包含 token
*
* @param socket UDP Socket
* @param address 目标地址
* @param requestId 请求 ID
* @param token JWT Token
* @param codecType 消息编解码类型
*/
private void sendAuthSuccessResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, String token, String codecType) {
try {
Object responseData = MapUtil.builder()
.put("success", true)
.put("token", token)
.put("message", "认证成功")
.build();
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, 0, "认证成功");
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), result -> {
if (result.failed()) {
log.error("[sendAuthSuccessResponse][发送认证成功响应失败,地址: {}]",
sessionManager.buildAddressKey(address), result.cause());
}
});
} catch (Exception e) {
log.error("[sendAuthSuccessResponse][发送认证成功响应异常,地址: {}]",
sessionManager.buildAddressKey(address), e);
}
}
/**
* 发送成功响应
*
* @param socket UDP Socket
* @param address 目标地址
* @param requestId 请求 ID
* @param message 消息
* @param codecType 消息编解码类型
*/
@SuppressWarnings("SameParameterValue")
private void sendSuccessResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, String message, String codecType) {
sendResponse(socket, address, true, message, requestId, codecType);
}
/**
* 发送错误响应
*
* @param socket UDP Socket
* @param address 目标地址
* @param requestId 请求 ID
* @param errorMessage 错误消息
* @param codecType 消息编解码类型
*/
private void sendErrorResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, String errorMessage, String codecType) {
sendResponse(socket, address, false, errorMessage, requestId, codecType);
}
/**
* 发送响应消息
*
* @param socket UDP Socket
* @param address 目标地址
* @param success 是否成功
* @param message 消息
* @param requestId 请求 ID
* @param codecType 消息编解码类型
*/
private void sendResponse(DatagramSocket socket, InetSocketAddress address, boolean success,
String message, String requestId, String codecType) {
try {
Object responseData = MapUtil.builder()
.put("success", success)
.put("message", message)
.build();
int code = success ? 0 : 401;
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, "response", responseData,
code, message);
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), ar -> {
if (ar.failed()) {
log.error("[sendResponse][发送响应失败,地址: {}]",
sessionManager.buildAddressKey(address), ar.cause());
}
});
} catch (Exception e) {
log.error("[sendResponse][发送响应异常,地址: {}]",
sessionManager.buildAddressKey(address), e);
}
}
/**
* 解析认证参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 认证参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceAuthReqDTO parseAuthParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
return new IotDeviceAuthReqDTO()
.setClientId(MapUtil.getStr(paramMap, "clientId"))
.setUsername(MapUtil.getStr(paramMap, "username"))
.setPassword(MapUtil.getStr(paramMap, "password"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceAuthReqDTO) {
return (IotDeviceAuthReqDTO) params;
}
// 其他情况尝试 JSON 转换
String jsonStr = JsonUtils.toJsonString(params);
return JsonUtils.parseObject(jsonStr, IotDeviceAuthReqDTO.class);
} catch (Exception e) {
log.error("[parseAuthParams][解析认证参数({})失败]", params, e);
return null;
}
}
}

View File

@@ -96,6 +96,16 @@ yudao:
ssl-cert-path: "classpath:certs/client.jks"
ssl-key-path: "classpath:certs/client.jks"
# ====================================
# 针对引入的 UDP 组件的配置
# ====================================
udp:
enabled: false # 是否启用 UDP
port: 8092 # UDP 服务端口
receive-buffer-size: 65536 # 接收缓冲区大小(字节,默认 64KB
send-buffer-size: 65536 # 发送缓冲区大小(字节,默认 64KB
session-timeout-ms: 60000 # 会话超时时间(毫秒,默认 60 秒)
session-clean-interval-ms: 30000 # 会话清理间隔(毫秒,默认 30 秒)
# ====================================
# 针对引入的 MQTT 组件的配置
# ====================================
mqtt:

View File

@@ -0,0 +1,160 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
/**
* IoT 网关 UDP 协议集成测试(手动测试)
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务UDP 端口 8092</li>
* <li>运行 {@link #testAuth()} 获取 token将返回的 token 粘贴到 {@link #TOKEN} 常量</li>
* <li>运行 {@link #testPropertyPost()} 测试属性上报,或运行 {@link #testEventPost()} 测试事件上报</li>
* </ol>
*
* @author 芋道源码
*/
@Slf4j
public class IotUdpProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8092;
private static final int TIMEOUT_MS = 5000;
// 设备信息(根据实际情况修改 PRODUCT_KEY、DEVICE_NAME、PASSWORD
private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
private static final String DEVICE_NAME = "small";
private static final String PASSWORD = "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75";
// TODO @芋艿1、IotDeviceAuthUtils 调整下拼接2、password 的生成3、后续给 http 也整个单测4、后续给 tcp 也整个单测5、后续给 mqtt 也整个单测6、后续给 emqp 也整个单测
private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME;
private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY;
/**
* 设备 Token从 {@link #testAuth()} 方法获取后,粘贴到这里
*/
private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMxMTY0NiwiZGV2aWNlTmFtZSI6InNtYWxsIn0.re6LCaRfKiE9VQTP3w0Brh2ScVIgrvN3H96z_snndoM";
/**
* 认证测试:获取设备 Token
*/
@Test
public void testAuth() throws Exception {
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", "auth")
.put("params", MapUtil.builder()
.put("clientId", CLIENT_ID)
.put("username", USERNAME)
.put("password", PASSWORD)
.build())
.build());
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
log.info("[testAuth][请求体: {}]", payload);
String response = sendAndReceive(socket, payload);
log.info("[testAuth][响应体: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
}
}
/**
* 属性上报测试
*/
@Test
public void testPropertyPost() throws Exception {
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
.put("version", "1.0")
.put("params", MapUtil.builder()
.put("token", TOKEN)
.put("width", 1)
.put("height", "2")
.build())
.build());
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
log.info("[testPropertyPost][请求体: {}]", payload);
String response = sendAndReceive(socket, payload);
log.info("[testPropertyPost][响应体: {}]", response);
}
}
/**
* 事件上报测试
*/
@Test
public void testEventPost() throws Exception {
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
.put("version", "1.0")
.put("params", MapUtil.builder()
.put("identifier", "eat")
.put("value", MapUtil.builder()
.put("width", 1)
.put("height", "2")
.put("oneThree", "3")
.build())
.put("time", System.currentTimeMillis())
.build())
.build());
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
log.info("[testEventPost][请求体: {}]", payload);
String response = sendAndReceive(socket, payload);
log.info("[testEventPost][响应体: {}]", response);
}
}
/**
* 发送 UDP 请求并接收响应
*
* @param socket UDP Socket
* @param payload 请求体
* @return 响应内容
*/
private String sendAndReceive(DatagramSocket socket, String payload) throws Exception {
byte[] sendData = payload.getBytes(StandardCharsets.UTF_8);
InetAddress address = InetAddress.getByName(SERVER_HOST);
// 发送请求
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, address, SERVER_PORT);
socket.send(sendPacket);
// 接收响应
byte[] receiveData = new byte[4096];
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
try {
socket.receive(receivePacket);
return new String(receivePacket.getData(), 0, receivePacket.getLength(), StandardCharsets.UTF_8);
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
}
}