feat(iot):【协议改造】udp 初步改造(50%),主流程跑通

This commit is contained in:
YunaiV
2026-02-01 11:21:12 +08:00
parent 09041a24d7
commit cb301eb788
20 changed files with 1098 additions and 1364 deletions

View File

@@ -1,4 +0,0 @@
/**
* TODO @芋艿:实现一个 alink 的 xml 版本
*/
package cn.iocoder.yudao.module.iot.gateway.codec.simple;

View File

@@ -11,10 +11,6 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscr
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
@@ -42,10 +38,8 @@ public class IotGatewayConfiguration {
}
@Bean
public IotProtocolManager iotProtocolManager(IotGatewayProperties gatewayProperties,
IotMessageSerializerManager serializerManager,
IotMessageBus messageBus) {
return new IotProtocolManager(gatewayProperties, serializerManager, messageBus);
public IotProtocolManager iotProtocolManager(IotGatewayProperties gatewayProperties) {
return new IotProtocolManager(gatewayProperties);
}
/**
@@ -117,45 +111,6 @@ public class IotGatewayConfiguration {
}
/**
* IoT 网关 UDP 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.udp", name = "enabled", havingValue = "true")
@Slf4j
public static class UdpProtocolConfiguration {
@Bean(name = "udpVertx", destroyMethod = "close")
public Vertx udpVertx() {
return Vertx.vertx();
}
@Bean
public IotUdpUpstreamProtocol iotUdpUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotUdpSessionManager sessionManager,
@Qualifier("udpVertx") Vertx udpVertx) {
return new IotUdpUpstreamProtocol(gatewayProperties.getProtocol().getUdp(),
deviceService, messageService, sessionManager, udpVertx);
}
@Bean
public IotUdpDownstreamHandler iotUdpDownstreamHandler(IotDeviceMessageService messageService,
IotUdpSessionManager sessionManager,
IotUdpUpstreamProtocol protocol) {
return new IotUdpDownstreamHandler(messageService, sessionManager, protocol);
}
@Bean
public IotUdpDownstreamSubscriber iotUdpDownstreamSubscriber(IotUdpUpstreamProtocol protocolHandler,
IotUdpDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
return new IotUdpDownstreamSubscriber(protocolHandler, downstreamHandler, messageBus);
}
}
/**
* IoT 网关 CoAP 协议配置类
*/

View File

@@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpConfig;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.TrustOptions;
import jakarta.validation.Valid;
@@ -35,7 +36,7 @@ public class IotGatewayProperties {
private ProtocolProperties protocol;
/**
* 协议实例列表(新版)
* 协议实例列表
*/
private List<ProtocolInstanceProperties> protocols;
@@ -89,11 +90,6 @@ public class IotGatewayProperties {
*/
private MqttProperties mqtt;
/**
* UDP 组件配置
*/
private UdpProperties udp;
/**
* CoAP 组件配置
*/
@@ -348,44 +344,6 @@ public class IotGatewayProperties {
}
@Data
public static class UdpProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* 服务端口(默认 8093
*/
private Integer port = 8093;
/**
* 接收缓冲区大小(默认 64KB
*/
private Integer receiveBufferSize = 65536;
/**
* 发送缓冲区大小(默认 64KB
*/
private Integer sendBufferSize = 65536;
/**
* 会话超时时间(毫秒,默认 60 秒)
* <p>
* 用于清理不活跃的设备地址映射
*/
private Long sessionTimeoutMs = 60000L;
/**
* 会话清理间隔(毫秒,默认 30 秒)
*/
private Long sessionCleanIntervalMs = 30000L;
}
@Data
public static class CoapProperties {
@@ -525,6 +483,12 @@ public class IotGatewayProperties {
@Valid
private IotTcpConfig tcp;
/**
* UDP 协议配置
*/
@Valid
private IotUdpConfig udp;
}
}

View File

@@ -3,11 +3,11 @@ package cn.iocoder.yudao.module.iot.gateway.protocol;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpProtocol;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.SmartLifecycle;
@@ -24,20 +24,16 @@ public class IotProtocolManager implements SmartLifecycle {
private final IotGatewayProperties gatewayProperties;
private final IotMessageSerializerManager serializerManager;
private final IotMessageBus messageBus;
/**
* 协议实例列表
*/
private final List<IotProtocol> protocols = new ArrayList<>();
@Getter
private volatile boolean running = false;
public IotProtocolManager(IotGatewayProperties gatewayProperties,
IotMessageSerializerManager serializerManager,
IotMessageBus messageBus) {
public IotProtocolManager(IotGatewayProperties gatewayProperties) {
this.gatewayProperties = gatewayProperties;
this.serializerManager = serializerManager;
this.messageBus = messageBus;
}
@Override
@@ -84,11 +80,6 @@ public class IotProtocolManager implements SmartLifecycle {
log.info("[stop][协议管理器已停止]");
}
@Override
public boolean isRunning() {
return running;
}
/**
* 创建协议实例
*
@@ -107,7 +98,8 @@ public class IotProtocolManager implements SmartLifecycle {
return createHttpProtocol(config);
case TCP:
return createTcpProtocol(config);
// TODO 后续添加其他协议类型
case UDP:
return createUdpProtocol(config);
default:
throw new IllegalArgumentException(String.format(
"[createProtocol][协议实例 %s 的协议类型 %s 暂不支持]", config.getId(), protocolType));
@@ -121,7 +113,7 @@ public class IotProtocolManager implements SmartLifecycle {
* @return HTTP 协议实例
*/
private IotHttpProtocol createHttpProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
return new IotHttpProtocol(config, messageBus);
return new IotHttpProtocol(config);
}
/**
@@ -131,7 +123,17 @@ public class IotProtocolManager implements SmartLifecycle {
* @return TCP 协议实例
*/
private IotTcpProtocol createTcpProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
return new IotTcpProtocol(config, messageBus, serializerManager);
return new IotTcpProtocol(config);
}
/**
* 创建 UDP 协议实例
*
* @param config 协议实例配置
* @return UDP 协议实例
*/
private IotUdpProtocol createUdpProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
return new IotUdpProtocol(config);
}
}

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
@@ -59,11 +60,12 @@ public class IotHttpProtocol implements IotProtocol {
*/
private IotHttpDownstreamSubscriber downstreamSubscriber;
public IotHttpProtocol(ProtocolInstanceProperties properties, IotMessageBus messageBus) {
public IotHttpProtocol(ProtocolInstanceProperties properties) {
this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus);
}

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
@@ -73,31 +74,30 @@ public class IotTcpProtocol implements IotProtocol {
private final IotTcpFrameCodec frameCodec;
/**
* TCP 连接管理器(每个 Protocol 实例独立)
* TCP 连接管理器
*/
private final IotTcpConnectionManager connectionManager;
public IotTcpProtocol(ProtocolInstanceProperties properties, IotMessageBus messageBus,
IotMessageSerializerManager serializerManager) {
public IotTcpProtocol(ProtocolInstanceProperties properties) {
IotTcpConfig tcpConfig = properties.getTcp();
Assert.notNull(tcpConfig, "TCP 协议配置tcp不能为空");
Assert.notNull(tcpConfig.getCodec(), "TCP 拆包配置tcp.codec不能为空");
this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
// 初始化序列化器
IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(properties.getSerialize());
if (serializeType == null) {
serializeType = IotSerializeTypeEnum.JSON; // 默认 JSON
}
Assert.notNull(serializeType, "不支持的序列化类型:" + properties.getSerialize());
IotMessageSerializerManager serializerManager = SpringUtil.getBean(IotMessageSerializerManager.class);
this.serializer = serializerManager.get(serializeType);
// 初始化帧编解码器
IotTcpConfig tcpConfig = properties.getTcp();
Assert.notNull(tcpConfig, "TCP 协议配置tcp不能为空");
Assert.notNull(tcpConfig.getCodec(), "TCP 拆包配置tcp.codec不能为空");
this.frameCodec = IotTcpFrameCodecFactory.create(tcpConfig.getCodec());
// 初始化连接管理器
this.connectionManager = new IotTcpConnectionManager(tcpConfig.getMaxConnections());
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotTcpDownstreamHandler downstreamHandler = new IotTcpDownstreamHandler(connectionManager, frameCodec, serializer);
this.downstreamSubscriber = new IotTcpDownstreamSubscriber(this, downstreamHandler, messageBus);
}

View File

@@ -49,12 +49,10 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
*/
private final IotMessageSerializer serializer;
/**
* TCP 连接管理器(每个 Protocol 实例独立)
* TCP 连接管理器
*/
private final IotTcpConnectionManager connectionManager;
// ===================== Spring 依赖(构造时注入) =====================
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotDeviceCommonApi deviceApi;
@@ -141,7 +139,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
String method = message != null ? message.getMethod() : null;
sendErrorResponse(socket, requestId, method, BAD_REQUEST.getCode(), e.getMessage());
} catch (Exception e) {
// 其他异常,返回 500 并重新抛出让上层关闭连接
// 其他异常,返回 500并重新抛出让上层关闭连接
log.error("[processMessage][处理消息失败,客户端 ID: {}]", clientId, e);
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
@@ -158,6 +156,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
* @param message 消息信息
* @param socket 网络连接
*/
@SuppressWarnings("DuplicatedCode")
private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, NetSocket socket) {
// 1. 解析认证参数
IotDeviceAuthReqDTO authParams = JsonUtils.convertObject(message.getParams(), IotDeviceAuthReqDTO.class);
@@ -180,7 +179,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
Assert.notNull(device, "设备不存在");
// 3.1 注册连接
registerConnection(socket, device, clientId);
registerConnection(socket, device);
// 3.2 发送上线消息
sendOnlineMessage(device);
// 3.3 发送成功响应
@@ -196,6 +195,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
* @param socket 网络连接
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
@SuppressWarnings("DuplicatedCode")
private void handleRegisterRequest(String clientId, IotDeviceMessage message, NetSocket socket) {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = JsonUtils.convertObject(message.getParams(), IotDeviceRegisterReqDTO.class);
@@ -246,14 +246,12 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
*
* @param socket 网络连接
* @param device 设备
* @param clientId 客户端 ID
*/
private void registerConnection(NetSocket socket, IotDeviceRespDTO device, String clientId) {
private void registerConnection(NetSocket socket, IotDeviceRespDTO device) {
IotTcpConnectionManager.ConnectionInfo connectionInfo = new IotTcpConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setClientId(clientId);
.setDeviceName(device.getDeviceName());
connectionManager.registerConnection(socket, device.getId(), connectionInfo);
}

View File

@@ -40,20 +40,6 @@ public class IotTcpConnectionManager {
this.maxConnections = maxConnections;
}
/**
* 获取当前连接数
*/
public int getConnectionCount() {
return connectionMap.size();
}
/**
* 检查是否可以接受新连接
*/
public boolean canAcceptConnection() {
return connectionMap.size() < maxConnections;
}
/**
* 注册设备连接(包含认证信息)
*
@@ -155,11 +141,6 @@ public class IotTcpConnectionManager {
*/
private String deviceName;
/**
* 客户端 ID
*/
private String clientId;
}
}

View File

@@ -0,0 +1,49 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* IoT UDP 协议配置
*
* @author 芋道源码
*/
@Data
public class IotUdpConfig {
/**
* 最大会话数
*/
@NotNull(message = "最大会话数不能为空")
@Min(value = 1, message = "最大会话数必须大于 0")
private Integer maxSessions = 1000;
/**
* 会话超时时间(毫秒)
* <p>
* 用于清理不活跃的设备地址映射
*/
@NotNull(message = "会话超时时间不能为空")
@Min(value = 1000, message = "会话超时时间必须大于 1000 毫秒")
private Long sessionTimeoutMs = 60000L;
/**
* 会话清理间隔(毫秒)
*/
@NotNull(message = "会话清理间隔不能为空")
@Min(value = 1000, message = "会话清理间隔必须大于 1000 毫秒")
private Long sessionCleanIntervalMs = 30000L;
/**
* 接收缓冲区大小(字节)
*/
@NotNull(message = "接收缓冲区大小不能为空")
@Min(value = 1024, message = "接收缓冲区大小必须大于 1024 字节")
private Integer receiveBufferSize = 65536;
/**
* 发送缓冲区大小(字节)
*/
@NotNull(message = "发送缓冲区大小不能为空")
@Min(value = 1024, message = "发送缓冲区大小必须大于 1024 字节")
private Integer sendBufferSize = 65536;
}

View File

@@ -0,0 +1,259 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.downstream.IotUdpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.downstream.IotUdpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.upstream.IotUdpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import java.util.List;
/**
* IoT UDP 协议实现
* <p>
* 基于 Vert.x 实现 UDP 服务器,接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotUdpProtocol implements IotProtocol {
/**
* 协议配置
*/
private final ProtocolInstanceProperties properties;
/**
* 服务器 ID用于消息追踪全局唯一
*/
@Getter
private final String serverId;
/**
* 运行状态
*/
@Getter
private volatile boolean running = false;
/**
* Vert.x 实例
*/
private Vertx vertx;
/**
* UDP 服务器
*/
@Getter
private DatagramSocket udpSocket;
/**
* 下行消息订阅者
*/
private final IotUdpDownstreamSubscriber downstreamSubscriber;
/**
* 消息序列化器
*/
private final IotMessageSerializer serializer;
/**
* UDP 会话管理器
*/
private final IotUdpSessionManager sessionManager;
private final IotDeviceService deviceService;
private final IotDeviceMessageService deviceMessageService;
/**
* 会话清理定时器 ID
*/
// TODO @AI会话清理是不是放到 sessionManager 更合适?
private Long cleanTimerId;
public IotUdpProtocol(ProtocolInstanceProperties properties) {
IotUdpConfig udpConfig = properties.getUdp();
Assert.notNull(udpConfig, "UDP 协议配置udp不能为空");
this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
// 初始化序列化器
IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(properties.getSerialize());
Assert.notNull(serializeType, "不支持的序列化类型:" + properties.getSerialize());
IotMessageSerializerManager serializerManager = SpringUtil.getBean(IotMessageSerializerManager.class);
this.serializer = serializerManager.get(serializeType);
// 初始化会话管理器
this.sessionManager = new IotUdpSessionManager(udpConfig.getMaxSessions());
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotUdpDownstreamHandler downstreamHandler = new IotUdpDownstreamHandler(this, sessionManager, serializer);
this.downstreamSubscriber = new IotUdpDownstreamSubscriber(this, downstreamHandler, messageBus);
}
@Override
public String getId() {
return properties.getId();
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.UDP;
}
@Override
public void start() {
if (running) {
log.warn("[start][IoT UDP 协议 {} 已经在运行中]", getId());
return;
}
// 1.1 创建 Vertx 实例
this.vertx = Vertx.vertx();
// 1.2 创建 UDP Socket 选项
IotUdpConfig udpConfig = properties.getUdp();
DatagramSocketOptions options = new DatagramSocketOptions()
.setReceiveBufferSize(udpConfig.getReceiveBufferSize())
.setSendBufferSize(udpConfig.getSendBufferSize())
.setReuseAddress(true);
// 1.3 创建 UDP Socket
udpSocket = vertx.createDatagramSocket(options);
// 1.4 创建上行消息处理器
IotUdpUpstreamHandler upstreamHandler = new IotUdpUpstreamHandler(serverId, sessionManager, serializer);
// 1.5 监听端口
udpSocket.listen(properties.getPort(), "0.0.0.0", result -> {
if (result.failed()) {
log.error("[start][IoT UDP 协议 {} 启动失败]", getId(), result.cause());
return;
}
// 设置数据包处理器
udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket));
running = true;
log.info("[start][IoT UDP 协议 {} 启动成功,端口:{}serverId{}]",
getId(), properties.getPort(), serverId);
// 启动会话清理定时器
startSessionCleanTimer(udpConfig);
// 2. 启动下行消息订阅者
// TODO @AI这里会导致 Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2992 ms, time limit is 2000 ms
this.downstreamSubscriber.start();
});
}
@Override
public void stop() {
if (!running) {
return;
}
// 1. 停止下行消息订阅者
try {
downstreamSubscriber.stop();
log.info("[stop][IoT UDP 协议 {} 下行消息订阅者已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT UDP 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
// 2.1 取消会话清理定时器
if (cleanTimerId != null) {
vertx.cancelTimer(cleanTimerId);
cleanTimerId = null;
log.info("[stop][IoT UDP 协议 {} 会话清理定时器已取消]", getId());
}
// 2.2 关闭 UDP Socket
if (udpSocket != null) {
try {
udpSocket.close().result();
log.info("[stop][IoT UDP 协议 {} 服务器已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT UDP 协议 {} 服务器停止失败]", getId(), e);
}
udpSocket = null;
}
// 2.3 关闭 Vertx 实例
if (vertx != null) {
try {
vertx.close().result();
log.info("[stop][IoT UDP 协议 {} Vertx 已关闭]", getId());
} catch (Exception e) {
log.error("[stop][IoT UDP 协议 {} Vertx 关闭失败]", getId(), e);
}
vertx = null;
}
running = false;
log.info("[stop][IoT UDP 协议 {} 已停止]", getId());
}
/**
* 启动会话清理定时器
*/
// TODO @AI这个放到
private void startSessionCleanTimer(IotUdpConfig udpConfig) {
cleanTimerId = vertx.setPeriodic(udpConfig.getSessionCleanIntervalMs(), id -> {
try {
// 1. 清理超时的设备会话,并获取离线设备列表
List<Long> offlineDeviceIds = sessionManager.cleanExpiredSessions(udpConfig.getSessionTimeoutMs());
// 2. 为每个离线设备发送离线消息
for (Long deviceId : offlineDeviceIds) {
sendOfflineMessage(deviceId);
}
if (CollUtil.isNotEmpty(offlineDeviceIds)) {
log.info("[cleanExpiredSessions][本次清理 {} 个超时设备]", offlineDeviceIds.size());
}
} catch (Exception e) {
log.error("[cleanExpiredSessions][清理超时会话失败]", e);
}
});
log.info("[startSessionCleanTimer][会话清理定时器启动,间隔:{} ms超时{} ms]",
udpConfig.getSessionCleanIntervalMs(), udpConfig.getSessionTimeoutMs());
}
/**
* 发送设备离线消息
*
* @param deviceId 设备 ID
*/
private void sendOfflineMessage(Long deviceId) {
try {
// 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceId);
if (device == null) {
log.warn("[sendOfflineMessage][设备不存在,设备 ID: {}]", deviceId);
return;
}
// 发送离线消息
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
log.info("[sendOfflineMessage][发送离线消息,设备 ID: {},设备名: {}]",
deviceId, device.getDeviceName());
} catch (Exception e) {
log.error("[sendOfflineMessage][发送离线消息失败,设备 ID: {}]", deviceId, e);
}
}
}

View File

@@ -1,196 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* IoT 网关 UDP 协议:接收设备上行消息
* <p>
* 采用 Vertx DatagramSocket 实现 UDP 服务器,主要功能:
* 1. 监听 UDP 端口,接收设备消息
* 2. 定期清理不活跃的设备地址映射
* 3. 提供 UDP Socket 用于下行消息发送
*
* @author 芋道源码
*/
@Slf4j
public class IotUdpUpstreamProtocol implements IotProtocol {
private static final String ID = "udp";
private final IotGatewayProperties.UdpProperties udpProperties;
private final IotDeviceService deviceService;
private final IotDeviceMessageService messageService;
private final IotUdpSessionManager sessionManager;
private final Vertx vertx;
@Getter
private final String serverId;
@Getter
private DatagramSocket udpSocket;
/**
* 会话清理定时器 ID
*/
private Long cleanTimerId;
private IotUdpUpstreamHandler upstreamHandler;
private volatile boolean running = false;
public IotUdpUpstreamProtocol(IotGatewayProperties.UdpProperties udpProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotUdpSessionManager sessionManager,
Vertx vertx) {
this.udpProperties = udpProperties;
this.deviceService = deviceService;
this.messageService = messageService;
this.sessionManager = sessionManager;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(udpProperties.getPort());
}
@Override
public String getId() {
return ID;
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.UDP;
}
@Override
@PostConstruct
public void start() {
// 1. 初始化上行消息处理器
this.upstreamHandler = new IotUdpUpstreamHandler(this, messageService, deviceService, sessionManager);
// 2. 创建 UDP Socket 选项
DatagramSocketOptions options = new DatagramSocketOptions()
.setReceiveBufferSize(udpProperties.getReceiveBufferSize())
.setSendBufferSize(udpProperties.getSendBufferSize())
.setReuseAddress(true);
// 3. 创建 UDP Socket
udpSocket = vertx.createDatagramSocket(options);
// 4. 监听端口
udpSocket.listen(udpProperties.getPort(), "0.0.0.0", result -> {
if (result.failed()) {
log.error("[start][IoT 网关 UDP 协议启动失败]", result.cause());
return;
}
// 设置数据包处理器
udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket));
running = true;
log.info("[start][IoT 网关 UDP 协议启动成功,端口:{},接收缓冲区:{} 字节,发送缓冲区:{} 字节]",
udpProperties.getPort(), udpProperties.getReceiveBufferSize(),
udpProperties.getSendBufferSize());
// 5. 启动会话清理定时器
startSessionCleanTimer();
});
}
@Override
@PreDestroy
public void stop() {
// 1. 取消会话清理定时器
if (cleanTimerId != null) {
vertx.cancelTimer(cleanTimerId);
cleanTimerId = null;
log.info("[stop][会话清理定时器已取消]");
}
// 2. 关闭 UDP Socket
if (udpSocket != null) {
try {
udpSocket.close().result();
running = false;
log.info("[stop][IoT 网关 UDP 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 UDP 协议停止失败]", e);
}
}
}
@Override
public boolean isRunning() {
return running;
}
/**
* 启动会话清理定时器
*/
private void startSessionCleanTimer() {
cleanTimerId = vertx.setPeriodic(udpProperties.getSessionCleanIntervalMs(), id -> {
try {
// 1. 清理超时的设备地址映射,并获取离线设备列表
List<Long> offlineDeviceIds = sessionManager.cleanExpiredMappings(udpProperties.getSessionTimeoutMs());
// 2. 为每个离线设备发送离线消息
for (Long deviceId : offlineDeviceIds) {
sendOfflineMessage(deviceId);
}
if (CollUtil.isNotEmpty(offlineDeviceIds)) {
log.info("[cleanExpiredMappings][本次清理 {} 个超时设备]", offlineDeviceIds.size());
}
} catch (Exception e) {
log.error("[cleanExpiredMappings][清理超时会话失败]", e);
}
});
log.info("[startSessionCleanTimer][会话清理定时器启动,间隔:{} ms超时{} ms]",
udpProperties.getSessionCleanIntervalMs(), udpProperties.getSessionTimeoutMs());
}
/**
* 发送设备离线消息
*
* @param deviceId 设备 ID
*/
private void sendOfflineMessage(Long deviceId) {
try {
// 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceId);
if (device == null) {
log.warn("[sendOfflineMessage][设备不存在,设备 ID: {}]", deviceId);
return;
}
// 发送离线消息
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
messageService.sendDeviceMessage(offlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
log.info("[sendOfflineMessage][发送离线消息,设备 ID: {},设备名: {}]",
deviceId, device.getDeviceName());
} catch (Exception e) {
log.error("[sendOfflineMessage][发送离线消息失败,设备 ID: {}]", deviceId, e);
}
}
}

View File

@@ -1,10 +1,11 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.downstream;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import io.vertx.core.datagram.DatagramSocket;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
@@ -13,54 +14,48 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotUdpDownstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final IotUdpProtocol protocol;
private final IotUdpSessionManager sessionManager;
private final IotUdpUpstreamProtocol protocol;
public IotUdpDownstreamHandler(IotDeviceMessageService deviceMessageService,
IotUdpSessionManager sessionManager,
IotUdpUpstreamProtocol protocol) {
this.deviceMessageService = deviceMessageService;
this.sessionManager = sessionManager;
this.protocol = protocol;
}
/**
* 消息序列化器处理业务消息序列化/反序列化
*/
private final IotMessageSerializer serializer;
/**
* 处理下行消息
*
* @param message 下行消息
*/
public void handle(IotDeviceMessage message) {
try {
log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
// 1. 获取会话信息包含 codecType
// 1. 检查设备会话
IotUdpSessionManager.SessionInfo sessionInfo = sessionManager.getSessionInfo(message.getDeviceId());
if (sessionInfo == null) {
log.warn("[handle][设备不在线,设备 ID: {}]", message.getDeviceId());
log.warn("[handle][会话信息不存在,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
return;
}
// 2. 使用会话中的 codecType 编码消息并发送到设备
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, sessionInfo.getCodecType());
DatagramSocket socket = protocol.getUdpSocket();
if (socket == null) {
log.error("[handle][UDP Socket 不可用,设备 ID: {}]", message.getDeviceId());
return;
}
boolean success = sessionManager.sendToDevice(message.getDeviceId(), bytes, socket);
if (success) {
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
} else {
log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
// 2. 序列化消息
byte[] serializedData = serializer.serialize(message);
// 3. 发送到设备
boolean success = sessionManager.sendToDevice(message.getDeviceId(), serializedData, socket);
if (!success) {
throw new RuntimeException("下行消息发送失败");
}
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
message.getDeviceId(), message.getMethod(), message.getId(), serializedData.length);
} catch (Exception e) {
log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]",
message.getDeviceId(), message.getMethod(), message, e);

View File

@@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpDownstreamHandler;
import lombok.extern.slf4j.Slf4j;
/**
@@ -16,7 +16,7 @@ public class IotUdpDownstreamSubscriber extends IotProtocolDownstreamSubscriber
private final IotUdpDownstreamHandler downstreamHandler;
public IotUdpDownstreamSubscriber(IotUdpUpstreamProtocol protocol,
public IotUdpDownstreamSubscriber(IotProtocol protocol,
IotUdpDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
super(protocol, messageBus);

View File

@@ -0,0 +1,395 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.upstream;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramPacket;
import io.vertx.core.datagram.DatagramSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import java.net.InetSocketAddress;
import java.time.LocalDateTime;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
/**
* UDP 上行消息处理器
* <p>
* 采用无状态 Token 机制(每次请求携带 token
* 1. 认证请求:设备发送 auth 消息,携带 clientId、username、password
* 2. 返回 Token服务端验证后返回 JWT token
* 3. 后续请求:每次请求在 params 中携带 token
* 4. 服务端验证:每次请求通过 IotDeviceTokenService.verifyToken() 验证
*
* @author 芋道源码
*/
@Slf4j
public class IotUdpUpstreamHandler {
private static final String AUTH_METHOD = "auth";
/**
* Token 参数 Key
*/
private static final String PARAM_KEY_TOKEN = "token";
/**
* Body 参数 Key实际请求内容
*/
private static final String PARAM_KEY_BODY = "body";
private final String serverId;
/**
* 消息序列化器(处理业务消息序列化/反序列化)
*/
private final IotMessageSerializer serializer;
/**
* UDP 会话管理器
*/
private final IotUdpSessionManager sessionManager;
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotDeviceTokenService deviceTokenService;
private final IotDeviceCommonApi deviceApi;
public IotUdpUpstreamHandler(String serverId,
IotUdpSessionManager sessionManager,
IotMessageSerializer serializer) {
Assert.notNull(serializer, "消息序列化器必须配置");
Assert.notNull(sessionManager, "会话管理器不能为空");
this.serverId = serverId;
this.sessionManager = sessionManager;
this.serializer = serializer;
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
}
// TODO @AIvertx 有 udp 的实现么?
/**
* 处理 UDP 数据包
*
* @param packet 数据包
* @param socket UDP Socket
*/
public void handle(DatagramPacket packet, DatagramSocket socket) {
InetSocketAddress senderAddress = new InetSocketAddress(packet.sender().host(), packet.sender().port());
Buffer data = packet.data();
String addressKey = sessionManager.buildAddressKey(senderAddress);
log.debug("[handle][收到 UDP 数据包,来源: {},数据长度: {} 字节]", addressKey, data.length());
try {
processMessage(data, senderAddress, socket);
} catch (IllegalArgumentException e) {
// 参数校验失败,返回 400
log.warn("[handle][参数校验失败,来源: {},错误: {}]", addressKey, e.getMessage());
sendErrorResponse(socket, senderAddress, null, null, BAD_REQUEST.getCode(), e.getMessage());
} catch (Exception e) {
// 其他异常,返回 500
log.error("[handle][处理消息失败,来源: {}]", addressKey, e);
sendErrorResponse(socket, senderAddress, null, null,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
}
}
/**
* 处理消息
*
* @param buffer 消息
* @param senderAddress 发送者地址
* @param socket UDP Socket
*/
private void processMessage(Buffer buffer, InetSocketAddress senderAddress, DatagramSocket socket) {
// 1.1 基础检查
if (ArrayUtil.isEmpty(buffer)) {
return;
}
// 1.2 反序列化消息
IotDeviceMessage message = serializer.deserialize(buffer.getBytes());
if (message == null) {
sendErrorResponse(socket, senderAddress, null, null, BAD_REQUEST.getCode(), "消息反序列化失败");
return;
}
// 2. 根据消息类型路由处理
if (AUTH_METHOD.equals(message.getMethod())) {
// 认证请求
handleAuthenticationRequest(message, senderAddress, socket);
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) {
// 设备动态注册请求
handleRegisterRequest(message, senderAddress, socket);
} else {
// 业务消息
handleBusinessRequest(message, senderAddress, socket);
}
}
/**
* 处理认证请求
*
* @param message 消息信息
* @param senderAddress 发送者地址
* @param socket UDP Socket
*/
@SuppressWarnings("DuplicatedCode")
private void handleAuthenticationRequest(IotDeviceMessage message, InetSocketAddress senderAddress,
DatagramSocket socket) {
String clientId = IdUtil.simpleUUID();
// 1. 解析认证参数
IotDeviceAuthReqDTO authParams = JsonUtils.convertObject(message.getParams(), IotDeviceAuthReqDTO.class);
Assert.notNull(authParams, "认证参数不能为空");
Assert.hasText(authParams.getUsername(), "username 不能为空");
Assert.hasText(authParams.getPassword(), "password 不能为空");
// 2.1 执行认证
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
if (authResult.isError()) {
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {}username: {}]",
clientId, authParams.getUsername());
sendErrorResponse(socket, senderAddress, message.getRequestId(), AUTH_METHOD,
authResult.getCode(), authResult.getMsg());
return;
}
// 2.2 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
Assert.notNull(deviceInfo, "解析设备信息失败");
// 2.3 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
Assert.notNull(device, "设备不存在");
// 3. 生成 JWT Token无状态
String token = deviceTokenService.createToken(device.getProductKey(), device.getDeviceName());
// 4.1 注册会话
registerSession(senderAddress, device, clientId);
// 4.2 发送上线消息
sendOnlineMessage(device);
// 4.3 发送成功响应(包含 token
sendAuthSuccessResponse(socket, senderAddress, message.getRequestId(), token);
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {},来源: {}]",
device.getId(), device.getDeviceName(), sessionManager.buildAddressKey(senderAddress));
}
/**
* 处理设备动态注册请求(一型一密,不需要认证)
*
* @param message 消息信息
* @param senderAddress 发送者地址
* @param socket UDP Socket
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
@SuppressWarnings("DuplicatedCode")
private void handleRegisterRequest(IotDeviceMessage message, InetSocketAddress senderAddress,
DatagramSocket socket) {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = JsonUtils.convertObject(message.getParams(), IotDeviceRegisterReqDTO.class);
Assert.notNull(params, "注册参数不能为空");
Assert.hasText(params.getProductKey(), "productKey 不能为空");
Assert.hasText(params.getDeviceName(), "deviceName 不能为空");
// 2. 调用动态注册
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
if (result.isError()) {
log.warn("[handleRegisterRequest][注册失败,来源: {},错误: {}]",
sessionManager.buildAddressKey(senderAddress), result.getMsg());
sendErrorResponse(socket, senderAddress, message.getRequestId(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), result.getCode(), result.getMsg());
return;
}
// 3. 发送成功响应
sendSuccessResponse(socket, senderAddress, message.getRequestId(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), result.getData());
log.info("[handleRegisterRequest][注册成功,来源: {},设备名: {}]",
sessionManager.buildAddressKey(senderAddress), params.getDeviceName());
}
/**
* 处理业务请求
* <p>
* 请求参数格式:
* - tokenJWT 令牌
* - body实际请求内容可以是 Map、List 或其他类型)
*
* @param message 消息信息
* @param senderAddress 发送者地址
* @param socket UDP Socket
*/
@SuppressWarnings("unchecked")
private void handleBusinessRequest(IotDeviceMessage message, InetSocketAddress senderAddress,
DatagramSocket socket) {
String addressKey = sessionManager.buildAddressKey(senderAddress);
// 1.1 从消息中提取 token 和 body
String token = null;
Object body = null;
if (message.getParams() instanceof Map) {
Map<String, Object> paramsMap = (Map<String, Object>) message.getParams();
token = (String) paramsMap.get(PARAM_KEY_TOKEN);
body = paramsMap.get(PARAM_KEY_BODY);
}
if (StrUtil.isBlank(token)) {
log.warn("[handleBusinessRequest][缺少 token来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), message.getMethod(),
UNAUTHORIZED.getCode(), "请先进行认证");
return;
}
// 1.2 验证 token获取设备信息
IotDeviceIdentity deviceInfo = deviceTokenService.verifyToken(token);
if (deviceInfo == null) {
log.warn("[handleBusinessRequest][token 无效或已过期,来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), message.getMethod(),
UNAUTHORIZED.getCode(), "token 无效或已过期");
return;
}
// 1.3 获取设备详细信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
log.warn("[handleBusinessRequest][设备不存在,来源: {}productKey: {}deviceName: {}]",
addressKey, deviceInfo.getProductKey(), deviceInfo.getDeviceName());
sendErrorResponse(socket, senderAddress, message.getRequestId(), message.getMethod(),
BAD_REQUEST.getCode(), "设备不存在");
return;
}
// 2. 更新会话活跃时间和地址
// TODO @AI是不是合并到 sessionManager 里面更好?
IotUdpSessionManager.SessionInfo sessionInfo = sessionManager.getSessionInfo(device.getId());
if (sessionInfo != null) {
// 检查地址是否变化,变化则更新
if (!senderAddress.equals(sessionInfo.getAddress())) {
sessionManager.updateSessionAddress(device.getId(), senderAddress);
} else {
sessionManager.updateSessionActivity(device.getId());
}
}
// 3. 将 body 设置为实际的 params发送消息到消息总线
message.setParams(body);
deviceMessageService.sendDeviceMessage(message, device.getProductKey(),
device.getDeviceName(), serverId);
log.debug("[handleBusinessRequest][业务消息处理成功,设备 ID: {},方法: {},来源: {}]",
device.getId(), message.getMethod(), addressKey);
}
/**
* 注册会话信息
*
* @param address 设备地址
* @param device 设备
* @param clientId 客户端 ID
*/
private void registerSession(InetSocketAddress address, IotDeviceRespDTO device, String clientId) {
IotUdpSessionManager.SessionInfo sessionInfo = new IotUdpSessionManager.SessionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setAddress(address)
.setLastActiveTime(LocalDateTime.now());
sessionManager.registerSession(device.getId(), sessionInfo);
}
/**
* 发送设备上线消息
*
* @param device 设备信息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
}
// ===================== 发送响应消息 =====================
/**
* 发送认证成功响应(包含 token
*
* @param socket UDP Socket
* @param address 目标地址
* @param requestId 请求 ID
* @param token JWT Token
*/
private void sendAuthSuccessResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, String token) {
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, token,
SUCCESS.getCode(), null);
writeResponse(socket, address, responseMessage);
}
/**
* 发送成功响应
*
* @param socket UDP Socket
* @param address 目标地址
* @param requestId 请求 ID
* @param method 方法名
* @param data 响应数据
*/
private void sendSuccessResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, String method, Object data) {
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, data, SUCCESS.getCode(), null);
writeResponse(socket, address, responseMessage);
}
/**
* 发送错误响应
*
* @param socket UDP Socket
* @param address 目标地址
* @param requestId 请求 ID
* @param method 方法名
* @param code 错误码
* @param msg 错误消息
*/
private void sendErrorResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, String method, Integer code, String msg) {
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, code, msg);
writeResponse(socket, address, responseMessage);
}
/**
* 写入响应到 Socket
*
* @param socket UDP Socket
* @param address 目标地址
* @param responseMessage 响应消息
*/
private void writeResponse(DatagramSocket socket, InetSocketAddress address, IotDeviceMessage responseMessage) {
try {
byte[] serializedData = serializer.serialize(responseMessage);
socket.send(Buffer.buffer(serializedData), address.getPort(), address.getHostString(), result -> {
if (result.failed()) {
log.error("[writeResponse][发送响应失败,地址: {}]",
sessionManager.buildAddressKey(address), result.cause());
}
});
} catch (Exception e) {
log.error("[writeResponse][发送响应异常,地址: {}]",
sessionManager.buildAddressKey(address), e);
}
}
}

View File

@@ -4,7 +4,6 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.time.LocalDateTime;
@@ -17,91 +16,122 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* IoT 网关 UDP 会话管理器
* <p>
* 采用无状态设计SessionManager 主要用于
* 1. 管理设备地址映射(用于下行消息发送)
* 2. 定期清理不活跃的设备地址映射
* <p>
* 注意UDP 是无连接协议,上行消息通过 token 验证身份,不依赖会话状态
* 统一管理 UDP 会话的认证状态、设备会话和消息发送功能
* 1. 管理 UDP 会话的认证状态
* 2. 管理设备会话和在线状态
* 3. 管理消息发送到设备
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotUdpSessionManager {
/**
* 设备 ID -> 会话信息(包含地址和 codecType
* 最大会话数
*/
private final int maxSessions;
/**
* 设备 ID -> 会话信息
*/
private final Map<Long, SessionInfo> deviceSessionMap = new ConcurrentHashMap<>();
/**
* 设备地址 Key -> 最后活跃时间(用于清理)
*/
private final Map<String, LocalDateTime> lastActiveTimeMap = new ConcurrentHashMap<>();
/**
* 设备地址 Key -> 设备 ID反向映射用于清理时同步
*/
// TODO @AI1这个变量是否必须2unregisterSession 这个方法是否必须?
private final Map<String, Long> addressDeviceMap = new ConcurrentHashMap<>();
public IotUdpSessionManager(int maxSessions) {
this.maxSessions = maxSessions;
}
/**
* 更新设备会话(每次收到上行消息时调用
* 注册设备会话(包含认证信息
*
* @param deviceId 设备 ID
* @param address 设备地址
* @param codecType 消息编解码类型
* @param deviceId 设备 ID
* @param sessionInfo 会话信息
*/
public void updateDeviceSession(Long deviceId, InetSocketAddress address, String codecType) {
String addressKey = buildAddressKey(address);
// 更新设备会话映射
deviceSessionMap.put(deviceId, new SessionInfo().setAddress(address).setCodecType(codecType));
lastActiveTimeMap.put(addressKey, LocalDateTime.now());
public void registerSession(Long deviceId, SessionInfo sessionInfo) {
// 检查会话数是否已达上限
if (deviceSessionMap.size() >= maxSessions) {
throw new IllegalStateException("会话数已达上限: " + maxSessions);
}
// 如果设备已有其他会话,先清理旧会话
SessionInfo oldSessionInfo = deviceSessionMap.get(deviceId);
if (oldSessionInfo != null) {
String oldAddressKey = buildAddressKey(oldSessionInfo.getAddress());
addressDeviceMap.remove(oldAddressKey, deviceId);
log.info("[registerSession][设备已有其他会话,清理旧会话,设备 ID: {},旧地址: {}]",
deviceId, oldAddressKey);
}
// 注册新会话
String addressKey = buildAddressKey(sessionInfo.getAddress());
deviceSessionMap.put(deviceId, sessionInfo);
addressDeviceMap.put(addressKey, deviceId);
log.debug("[updateDeviceSession][更新设备会话,设备 ID: {},地址: {}codecType: {}]", deviceId, addressKey, codecType);
log.info("[registerSession][注册设备会话,设备 ID: {},地址: {}product key: {}device name: {}]",
deviceId, addressKey, sessionInfo.getProductKey(), sessionInfo.getDeviceName());
}
/**
* 更新设备地址(兼容旧接口,默认不更新 codecType
* 注销设备会话
*
* @param deviceId 设备 ID
* @param address 设备地址
*/
public void updateDeviceAddress(Long deviceId, InetSocketAddress address) {
SessionInfo sessionInfo = deviceSessionMap.get(deviceId);
String codecType = sessionInfo != null ? sessionInfo.getCodecType() : null;
updateDeviceSession(deviceId, address, codecType);
public void unregisterSession(Long deviceId) {
SessionInfo sessionInfo = deviceSessionMap.remove(deviceId);
if (sessionInfo == null) {
return;
}
String addressKey = buildAddressKey(sessionInfo.getAddress());
// 仅当 addressDeviceMap 中的 deviceId 是当前 deviceId 时才移除,避免误删新会话
addressDeviceMap.remove(addressKey, deviceId);
log.info("[unregisterSession][注销设备会话,设备 ID: {},地址: {}]", deviceId, addressKey);
}
/**
* 获取设备会话信息
* 更新会话活跃时间(每次收到上行消息时调用)
*
* @param deviceId 设备 ID
* @return 会话信息
*/
public void updateSessionActivity(Long deviceId) {
SessionInfo sessionInfo = deviceSessionMap.get(deviceId);
if (sessionInfo != null) {
sessionInfo.setLastActiveTime(LocalDateTime.now());
}
}
/**
* 更新设备会话地址(设备地址变更时调用)
*
* @param deviceId 设备 ID
* @param newAddress 新地址
*/
public void updateSessionAddress(Long deviceId, InetSocketAddress newAddress) {
SessionInfo sessionInfo = deviceSessionMap.get(deviceId);
if (sessionInfo == null) {
return;
}
// 清理旧地址映射
String oldAddressKey = buildAddressKey(sessionInfo.getAddress());
addressDeviceMap.remove(oldAddressKey, deviceId);
// 更新新地址
String newAddressKey = buildAddressKey(newAddress);
sessionInfo.setAddress(newAddress);
sessionInfo.setLastActiveTime(LocalDateTime.now());
addressDeviceMap.put(newAddressKey, deviceId);
log.debug("[updateSessionAddress][更新设备地址,设备 ID: {},新地址: {}]", deviceId, newAddressKey);
}
/**
* 获取会话信息
*/
public SessionInfo getSessionInfo(Long deviceId) {
return deviceSessionMap.get(deviceId);
}
/**
* 检查设备是否在线(即是否有地址映射)
*
* @param deviceId 设备 ID
* @return 是否在线
*/
public boolean isDeviceOnline(Long deviceId) {
return deviceSessionMap.containsKey(deviceId);
}
/**
* 检查设备是否离线
*
* @param deviceId 设备 ID
* @return 是否离线
*/
public boolean isDeviceOffline(Long deviceId) {
return !isDeviceOnline(deviceId);
}
/**
* 发送消息到设备
*
@@ -116,17 +146,16 @@ public class IotUdpSessionManager {
log.warn("[sendToDevice][设备会话不存在,设备 ID: {}]", deviceId);
return false;
}
InetSocketAddress address = sessionInfo.getAddress();
try {
socket.send(Buffer.buffer(data), address.getPort(), address.getHostString(), result -> {
if (result.succeeded()) {
log.debug("[sendToDevice][发送消息成功,设备 ID: {},地址: {},数据长度: {} 字节]",
deviceId, buildAddressKey(address), data.length);
} else {
log.error("[sendToDevice][发送消息失败,设备 ID: {},地址: {}]",
deviceId, buildAddressKey(address), result.cause());
return;
}
log.error("[sendToDevice][发送消息失败,设备 ID: {},地址: {}]",
deviceId, buildAddressKey(address), result.cause());
});
return true;
} catch (Exception e) {
@@ -136,37 +165,31 @@ public class IotUdpSessionManager {
}
/**
* 定期清理不活跃的设备地址映射
* 定期清理不活跃的设备会话
*
* @param timeoutMs 超时时间(毫秒)
* @return 清理的设备 ID 列表(用于发送离线消息)
*/
public List<Long> cleanExpiredMappings(long timeoutMs) {
public List<Long> cleanExpiredSessions(long timeoutMs) {
List<Long> offlineDeviceIds = new ArrayList<>();
LocalDateTime now = LocalDateTime.now();
LocalDateTime expireTime = now.minusNanos(timeoutMs * 1_000_000);
Iterator<Map.Entry<String, LocalDateTime>> iterator = lastActiveTimeMap.entrySet().iterator();
Iterator<Map.Entry<Long, SessionInfo>> iterator = deviceSessionMap.entrySet().iterator();
// TODO @AI改成 for each 会不会更好?
while (iterator.hasNext()) {
Map.Entry<Long, SessionInfo> entry = iterator.next();
SessionInfo sessionInfo = entry.getValue();
// 未过期,跳过
Map.Entry<String, LocalDateTime> entry = iterator.next();
if (entry.getValue().isAfter(expireTime)) {
if (sessionInfo.getLastActiveTime().isAfter(expireTime)) {
continue;
}
// 过期处理:记录离线设备 ID
String addressKey = entry.getKey();
Long deviceId = addressDeviceMap.remove(addressKey);
if (deviceId == null) {
iterator.remove();
continue;
}
SessionInfo sessionInfo = deviceSessionMap.remove(deviceId);
if (sessionInfo == null) {
iterator.remove();
continue;
}
Long deviceId = entry.getKey();
String addressKey = buildAddressKey(sessionInfo.getAddress());
addressDeviceMap.remove(addressKey, deviceId);
offlineDeviceIds.add(deviceId);
log.debug("[cleanExpiredMappings][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}]",
deviceId, addressKey, entry.getValue());
log.debug("[cleanExpiredSessions][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}]",
deviceId, addressKey, sessionInfo.getLastActiveTime());
iterator.remove();
}
return offlineDeviceIds;
@@ -183,20 +206,32 @@ public class IotUdpSessionManager {
}
/**
* 会话信息
* 会话信息(包含认证信息)
*/
@Data
public static class SessionInfo {
/**
* 设备 ID
*/
private Long deviceId;
/**
* 产品 Key
*/
private String productKey;
/**
* 设备名称
*/
private String deviceName;
/**
* 设备地址
*/
private InetSocketAddress address;
/**
* 消息编解码类型
* 最后活跃时间
*/
private String codecType;
private LocalDateTime lastActiveTime;
}

View File

@@ -1,542 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.router;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramPacket;
import io.vertx.core.datagram.DatagramSocket;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Map;
/**
* UDP 上行消息处理器
* <p>
* 采用无状态 Token 机制(每次请求携带 token
* 1. 认证请求:设备发送 auth 消息,携带 clientId、username、password
* 2. 返回 Token服务端验证后返回 JWT token
* 3. 后续请求:每次请求在 params 中携带 token
* 4. 服务端验证:每次请求通过 IotDeviceTokenService.verifyToken() 验证
*
* @author 芋道源码
*/
@Slf4j
public class IotUdpUpstreamHandler {
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
private static final String AUTH_METHOD = "auth";
/**
* Token 参数 Key
*/
private static final String PARAM_KEY_TOKEN = "token";
/**
* Body 参数 Key实际请求内容
*/
private static final String PARAM_KEY_BODY = "body";
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotUdpSessionManager sessionManager;
private final IotDeviceTokenService deviceTokenService;
private final IotDeviceCommonApi deviceApi;
private final String serverId;
public IotUdpUpstreamHandler(IotUdpUpstreamProtocol protocol,
IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotUdpSessionManager sessionManager) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.sessionManager = sessionManager;
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.serverId = protocol.getServerId();
}
/**
* 处理 UDP 数据包
*
* @param packet 数据包
* @param socket UDP Socket
*/
public void handle(DatagramPacket packet, DatagramSocket socket) {
InetSocketAddress senderAddress = new InetSocketAddress(packet.sender().host(), packet.sender().port());
Buffer data = packet.data();
log.debug("[handle][收到 UDP 数据包,来源: {},数据长度: {} 字节]",
sessionManager.buildAddressKey(senderAddress), data.length());
try {
processMessage(data, senderAddress, socket);
} catch (Exception e) {
log.error("[handle][处理消息失败,来源: {},错误: {}]",
sessionManager.buildAddressKey(senderAddress), e.getMessage(), e);
// UDP 无连接,不需要断开连接,只记录错误
}
}
/**
* 处理消息
*
* @param buffer 消息
* @param senderAddress 发送者地址
* @param socket UDP Socket
*/
private void processMessage(Buffer buffer, InetSocketAddress senderAddress, DatagramSocket socket) {
// 1. 基础检查
if (buffer == null || buffer.length() == 0) {
return;
}
// 2. 获取消息格式类型
String codecType = getMessageCodecType(buffer);
// 3. 解码消息
IotDeviceMessage message;
try {
message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
if (message == null) {
log.warn("[processMessage][消息解码失败,来源: {}]", sessionManager.buildAddressKey(senderAddress));
sendErrorResponse(socket, senderAddress, null, "消息解码失败", codecType);
return;
}
} catch (Exception e) {
log.error("[processMessage][消息解码异常,来源: {}]", sessionManager.buildAddressKey(senderAddress), e);
sendErrorResponse(socket, senderAddress, null, "消息解码失败: " + e.getMessage(), codecType);
return;
}
// 4. 根据消息类型路由处理
try {
if (AUTH_METHOD.equals(message.getMethod())) {
// 认证请求
handleAuthenticationRequest(message, codecType, senderAddress, socket);
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) {
// 设备动态注册请求
handleRegisterRequest(message, codecType, senderAddress, socket);
} else {
// 业务消息
handleBusinessRequest(message, codecType, senderAddress, socket);
}
} catch (Exception e) {
log.error("[processMessage][处理消息失败,来源: {},消息方法: {}]",
sessionManager.buildAddressKey(senderAddress), message.getMethod(), e);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "消息处理失败", codecType);
}
}
/**
* 处理认证请求
*
* @param message 消息信息
* @param codecType 消息编解码类型
* @param senderAddress 发送者地址
* @param socket UDP Socket
*/
private void handleAuthenticationRequest(IotDeviceMessage message, String codecType,
InetSocketAddress senderAddress, DatagramSocket socket) {
String addressKey = sessionManager.buildAddressKey(senderAddress);
try {
// 1.1 解析认证参数
IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams());
if (authParams == null) {
log.warn("[handleAuthenticationRequest][认证参数解析失败,来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证参数不完整", codecType);
return;
}
// 1.2 执行认证
if (!validateDeviceAuth(authParams)) {
log.warn("[handleAuthenticationRequest][认证失败,来源: {}username: {}]",
addressKey, authParams.getUsername());
sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证失败", codecType);
return;
}
// 2.1 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
if (deviceInfo == null) {
sendErrorResponse(socket, senderAddress, message.getRequestId(), "解析设备信息失败", codecType);
return;
}
// 2.2 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
sendErrorResponse(socket, senderAddress, message.getRequestId(), "设备不存在", codecType);
return;
}
// 3.1 生成 JWT Token无状态
String token = deviceTokenService.createToken(device.getProductKey(), device.getDeviceName());
// 3.2 更新设备会话信息(用于下行消息,保存 codecType
sessionManager.updateDeviceSession(device.getId(), senderAddress, codecType);
// 3.3 发送上线消息
sendOnlineMessage(device);
// 3.4 发送成功响应(包含 token
sendAuthSuccessResponse(socket, senderAddress, message.getRequestId(), token, codecType);
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {},来源: {}]",
device.getId(), device.getDeviceName(), addressKey);
} catch (Exception e) {
log.error("[handleAuthenticationRequest][认证处理异常,来源: {}]", addressKey, e);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证处理异常", codecType);
}
}
/**
* 处理设备动态注册请求(一型一密,不需要 Token
*
* @param message 消息信息
* @param codecType 消息编解码类型
* @param senderAddress 发送者地址
* @param socket UDP Socket
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
private void handleRegisterRequest(IotDeviceMessage message, String codecType,
InetSocketAddress senderAddress, DatagramSocket socket) {
String addressKey = sessionManager.buildAddressKey(senderAddress);
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams());
if (params == null) {
log.warn("[handleRegisterRequest][注册参数解析失败,来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "注册参数不完整", codecType);
return;
}
// 2. 调用动态注册
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
if (result.isError()) {
log.warn("[handleRegisterRequest][注册失败,来源: {},错误: {}]", addressKey, result.getMsg());
sendErrorResponse(socket, senderAddress, message.getRequestId(), result.getMsg(), codecType);
return;
}
// 3. 发送成功响应(包含 deviceSecret
sendRegisterSuccessResponse(socket, senderAddress, message.getRequestId(), result.getData(), codecType);
log.info("[handleRegisterRequest][注册成功,设备名: {},来源: {}]",
params.getDeviceName(), addressKey);
} catch (Exception e) {
log.error("[handleRegisterRequest][注册处理异常,来源: {}]", addressKey, e);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "注册处理异常", codecType);
}
}
/**
* 处理业务请求
* <p>
* 请求参数格式:
* - tokenJWT 令牌
* - body实际请求内容可以是 Map、List 或其他类型)
*
* @param message 消息信息
* @param codecType 消息编解码类型
* @param senderAddress 发送者地址
* @param socket UDP Socket
*/
@SuppressWarnings("unchecked")
private void handleBusinessRequest(IotDeviceMessage message, String codecType,
InetSocketAddress senderAddress, DatagramSocket socket) {
String addressKey = sessionManager.buildAddressKey(senderAddress);
try {
// 1.1 从消息中提取 token 和 body格式{token: "xxx", body: {...}} 或 {token: "xxx", body: [...]}
String token = null;
Object body = null;
if (message.getParams() instanceof Map) {
Map<String, Object> paramsMap = (Map<String, Object>) message.getParams();
token = (String) paramsMap.get(PARAM_KEY_TOKEN);
body = paramsMap.get(PARAM_KEY_BODY);
}
if (StrUtil.isBlank(token)) {
log.warn("[handleBusinessRequest][缺少 token来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "请先进行认证", codecType);
return;
}
// 1.2 验证 token获取设备信息
IotDeviceIdentity deviceInfo = deviceTokenService.verifyToken(token);
if (deviceInfo == null) {
log.warn("[handleBusinessRequest][token 无效或已过期,来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "token 无效或已过期", codecType);
return;
}
// 2. 获取设备详细信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
log.warn("[handleBusinessRequest][设备不存在,来源: {}productKey: {}deviceName: {}]",
addressKey, deviceInfo.getProductKey(), deviceInfo.getDeviceName());
sendErrorResponse(socket, senderAddress, message.getRequestId(), "设备不存在", codecType);
return;
}
// 3. 更新设备会话信息(保持最新,保存 codecType
sessionManager.updateDeviceSession(device.getId(), senderAddress, codecType);
// 4. 将 body 设置为实际的 params发送消息到消息总线
message.setParams(body);
deviceMessageService.sendDeviceMessage(message, device.getProductKey(),
device.getDeviceName(), serverId);
log.debug("[handleBusinessRequest][业务消息处理成功,设备 ID: {},方法: {},来源: {}]",
device.getId(), message.getMethod(), addressKey);
} catch (Exception e) {
log.error("[handleBusinessRequest][业务请求处理异常,来源: {}]", addressKey, e);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "处理失败", codecType);
}
}
/**
* 获取消息编解码类型
*
* @param buffer 消息
* @return 消息编解码类型
*/
private String getMessageCodecType(Buffer buffer) {
// 检测消息格式类型
return IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(buffer.getBytes()) ? CODEC_TYPE_BINARY
: CODEC_TYPE_JSON;
}
/**
* 发送设备上线消息
*
* @param device 设备信息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
} catch (Exception e) {
log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
}
}
/**
* 验证设备认证信息
*
* @param authParams 认证参数
* @return 是否认证成功
*/
private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) {
try {
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(authParams.getClientId()).setUsername(authParams.getUsername())
.setPassword(authParams.getPassword()));
result.checkError();
return BooleanUtil.isTrue(result.getData());
} catch (Exception e) {
log.error("[validateDeviceAuth][设备认证异常username: {}]", authParams.getUsername(), e);
return false;
}
}
/**
* 发送认证成功响应(包含 token
*
* @param socket UDP Socket
* @param address 目标地址
* @param requestId 请求 ID
* @param token JWT Token
* @param codecType 消息编解码类型
*/
private void sendAuthSuccessResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, String token, String codecType) {
try {
// 构建响应数据
Object responseData = MapUtil.builder()
.put("success", true)
.put("token", token)
.put("message", "认证成功")
.build();
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, 0, "认证成功");
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
// 发送响应
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), result -> {
if (result.failed()) {
log.error("[sendAuthSuccessResponse][发送认证成功响应失败,地址: {}]",
sessionManager.buildAddressKey(address), result.cause());
}
});
} catch (Exception e) {
log.error("[sendAuthSuccessResponse][发送认证成功响应异常,地址: {}]",
sessionManager.buildAddressKey(address), e);
}
}
/**
* 发送注册成功响应(包含 deviceSecret
*
* @param socket UDP Socket
* @param address 目标地址
* @param requestId 请求 ID
* @param registerResp 注册响应
* @param codecType 消息编解码类型
*/
private void sendRegisterSuccessResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, IotDeviceRegisterRespDTO registerResp,
String codecType) {
try {
// 1. 构建响应消息(参考 HTTP 返回格式,直接返回 IotDeviceRegisterRespDTO
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null);
// 2. 发送响应
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), result -> {
if (result.failed()) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应失败,地址: {}]",
sessionManager.buildAddressKey(address), result.cause());
}
});
} catch (Exception e) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应异常,地址: {}]",
sessionManager.buildAddressKey(address), e);
}
}
/**
* 发送错误响应
*
* @param socket UDP Socket
* @param address 目标地址
* @param requestId 请求 ID
* @param errorMessage 错误消息
* @param codecType 消息编解码类型
*/
private void sendErrorResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, String errorMessage, String codecType) {
sendResponse(socket, address, false, errorMessage, requestId, codecType);
}
/**
* 发送响应消息
*
* @param socket UDP Socket
* @param address 目标地址
* @param success 是否成功
* @param message 消息
* @param requestId 请求 ID
* @param codecType 消息编解码类型
*/
@SuppressWarnings("SameParameterValue")
private void sendResponse(DatagramSocket socket, InetSocketAddress address, boolean success,
String message, String requestId, String codecType) {
try {
// 构建响应数据
Object responseData = MapUtil.builder()
.put("success", success)
.put("message", message)
.build();
int code = success ? 0 : 401;
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
"response", responseData, code, message);
// 发送响应
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), ar -> {
if (ar.failed()) {
log.error("[sendResponse][发送响应失败,地址: {}]",
sessionManager.buildAddressKey(address), ar.cause());
}
});
} catch (Exception e) {
log.error("[sendResponse][发送响应异常,地址: {}]",
sessionManager.buildAddressKey(address), e);
}
}
/**
* 解析认证参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 认证参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceAuthReqDTO parseAuthParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
return new IotDeviceAuthReqDTO()
.setClientId(MapUtil.getStr(paramMap, "clientId"))
.setUsername(MapUtil.getStr(paramMap, "username"))
.setPassword(MapUtil.getStr(paramMap, "password"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceAuthReqDTO) {
return (IotDeviceAuthReqDTO) params;
}
// 其他情况尝试 JSON 转换
return JsonUtils.convertObject(params, IotDeviceAuthReqDTO.class);
} catch (Exception e) {
log.error("[parseAuthParams][解析认证参数({})失败]", params, e);
return null;
}
}
/**
* 解析注册参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 注册参数 DTO解析失败时返回 null
*/
@SuppressWarnings({"unchecked", "DuplicatedCode"})
private IotDeviceRegisterReqDTO parseRegisterParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
return new IotDeviceRegisterReqDTO()
.setProductKey(MapUtil.getStr(paramMap, "productKey"))
.setDeviceName(MapUtil.getStr(paramMap, "deviceName"))
.setProductSecret(MapUtil.getStr(paramMap, "productSecret"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceRegisterReqDTO) {
return (IotDeviceRegisterReqDTO) params;
}
// 其他情况尝试 JSON 转换
return JsonUtils.convertObject(params, IotDeviceRegisterReqDTO.class);
} catch (Exception e) {
log.error("[parseRegisterParams][解析注册参数({})失败]", params, e);
return null;
}
}
}

View File

@@ -50,7 +50,7 @@ yudao:
- id: http-json
type: http
port: 8092
enabled: true
enabled: false
http:
ssl-enabled: false
# ====================================
@@ -59,7 +59,7 @@ yudao:
- id: tcp-json
type: tcp
port: 8091
enabled: true
enabled: false
serialize: json
tcp:
max-connections: 1000
@@ -75,6 +75,20 @@ yudao:
# initial-bytes-to-strip: 4 # 初始跳过的字节数
# type: fixed_length # 拆包类型length_field / delimiter / fixed_length
# fixed-length: 256 # 固定长度
# ====================================
# 针对引入的 UDP 组件的配置
# ====================================
- id: udp-json
type: udp
port: 8093
enabled: true
serialize: json
udp:
max-sessions: 1000 # 最大会话数
session-timeout-ms: 60000 # 会话超时时间(毫秒)
session-clean-interval-ms: 30000 # 会话清理间隔(毫秒)
receive-buffer-size: 65536 # 接收缓冲区大小(字节)
send-buffer-size: 65536 # 发送缓冲区大小(字节)
# 协议配置(旧版,保持兼容)
protocol:
@@ -113,16 +127,6 @@ yudao:
trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径
trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码
# ====================================
# 针对引入的 UDP 组件的配置
# ====================================
udp:
enabled: false # 是否启用 UDP
port: 8093 # UDP 服务端口
receive-buffer-size: 65536 # 接收缓冲区大小(字节,默认 64KB
send-buffer-size: 65536 # 发送缓冲区大小(字节,默认 64KB
session-timeout-ms: 60000 # 会话超时时间(毫秒,默认 60 秒)
session-clean-interval-ms: 30000 # 会话清理间隔(毫秒,默认 30 秒)
# ====================================
# 针对引入的 MQTT 组件的配置
# ====================================
mqtt:

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -9,9 +8,8 @@ import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -27,16 +25,9 @@ import java.util.Map;
*
* <p>测试场景直连设备IotProductDeviceTypeEnum 的 DIRECT 类型)通过 UDP 协议直接连接平台
*
* <p>支持两种编解码格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务UDP 端口 8093</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>运行 {@link #testAuth()} 获取设备 token将返回的 token 粘贴到 {@link #TOKEN} 常量</li>
* <li>运行以下测试方法:
* <ul>
@@ -58,10 +49,12 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
private static final int SERVER_PORT = 8093;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
// ===================== 序列化器 =====================
private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
/**
* 消息序列化器
*/
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
@@ -81,30 +74,18 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建认证消息
// 1. 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
} else {
log.warn("[testAuth][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testAuth][响应消息: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
}
// ===================== 动态注册测试 =====================
@@ -118,30 +99,18 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testDeviceRegister() throws Exception {
// 1.1 构建注册消息
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO();
registerReqDTO.setProductKey(PRODUCT_KEY);
registerReqDTO.setDeviceName("test-udp-" + System.currentTimeMillis());
registerReqDTO.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testDeviceRegister][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
// 1. 构建注册消息
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO()
.setProductKey(PRODUCT_KEY)
.setDeviceName("test-udp-" + System.currentTimeMillis())
.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testDeviceRegister][响应消息: {}]", response);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
} else {
log.warn("[testDeviceRegister][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testDeviceRegister][响应消息: {}]", response);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
}
// ===================== 直连设备属性上报测试 =====================
@@ -151,31 +120,17 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
// 1.1 构建属性上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 1. 构建属性上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
withToken(IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
.build())),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
.build())));
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testPropertyPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPost][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testPropertyPost][响应消息: {}]", response);
}
// ===================== 直连设备事件上报测试 =====================
@@ -185,31 +140,17 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testEventPost() throws Exception {
// 1.1 构建事件上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 1. 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
withToken(IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis())),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
System.currentTimeMillis())));
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testEventPost][响应消息: {}]", response);
} else {
log.warn("[testEventPost][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testEventPost][响应消息: {}]", response);
}
// ===================== 辅助方法 =====================
@@ -232,30 +173,36 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
}
/**
* 发送 UDP 请求并接收响应
* 发送 UDP 消息并接收响应
*
* @param socket UDP Socket
* @param payload 请求数据
* @return 响应数据
* @param request 请求消息
* @return 响应消息
*/
public static byte[] sendAndReceive(DatagramSocket socket, byte[] payload) throws Exception {
InetAddress address = InetAddress.getByName(SERVER_HOST);
private IotDeviceMessage sendAndReceive(IotDeviceMessage request) throws Exception {
// 1. 序列化请求
byte[] payload = SERIALIZER.serialize(request);
log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), payload.length);
// 发送请求
DatagramPacket sendPacket = new DatagramPacket(payload, payload.length, address, SERVER_PORT);
socket.send(sendPacket);
// 2. 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
InetAddress address = InetAddress.getByName(SERVER_HOST);
DatagramPacket sendPacket = new DatagramPacket(payload, payload.length, address, SERVER_PORT);
socket.send(sendPacket);
// 接收响应
byte[] receiveData = new byte[4096];
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
try {
socket.receive(receivePacket);
byte[] response = new byte[receivePacket.getLength()];
System.arraycopy(receivePacket.getData(), 0, response, 0, receivePacket.getLength());
return response;
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
// 3. 接收响应
byte[] receiveData = new byte[4096];
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
try {
socket.receive(receivePacket);
byte[] responseBytes = new byte[receivePacket.getLength()];
System.arraycopy(receivePacket.getData(), 0, responseBytes, 0, receivePacket.getLength());
log.info("[sendAndReceive][收到响应,数据长度: {} 字节]", responseBytes.length);
return SERIALIZER.deserialize(responseBytes);
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
}
}

View File

@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -13,36 +12,27 @@ import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUdpProtocolIntegrationTest.sendAndReceive;
/**
* IoT 网关设备 UDP 协议集成测试(手动测试)
*
* <p>测试场景网关设备IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 UDP 协议管理子设备拓扑关系
*
* <p>支持两种编解码格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务UDP 端口 8093</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>运行 {@link #testAuth()} 获取网关设备 token将返回的 token 粘贴到 {@link #GATEWAY_TOKEN} 常量</li>
* <li>运行以下测试方法:
* <ul>
@@ -63,12 +53,16 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd
@Disabled
public class IotGatewayDeviceUdpProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8093;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
// ===================== 序列化器 =====================
private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
/**
* 消息序列化器
*/
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) =====================
@@ -93,185 +87,101 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建认证消息
// 1. 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 GATEWAY_TOKEN 常量中]");
} else {
log.warn("[testAuth][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testAuth][响应消息: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 GATEWAY_TOKEN 常量中]");
}
// ===================== 拓扑管理测试 =====================
/**
* 添加子设备拓扑关系测试
* <p>
* 网关设备向平台上报需要绑定的子设备信息
*/
@Test
public void testTopoAdd() throws Exception {
// 1.1 构建子设备认证信息
// 1. 构建子设备认证信息
IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo(
SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET);
IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO()
.setClientId(subAuthInfo.getClientId())
.setUsername(subAuthInfo.getUsername())
.setPassword(subAuthInfo.getPassword());
// 1.2 构建请求参数
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO();
params.setSubDevices(Collections.singletonList(subDeviceAuth));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(),
withToken(params),
null, null, null);
// 1.3 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request);
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), withToken(params));
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoAdd][响应消息: {}]", response);
} else {
log.warn("[testTopoAdd][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testTopoAdd][响应消息: {}]", response);
}
/**
* 删除子设备拓扑关系测试
* <p>
* 网关设备向平台上报需要解绑的子设备信息
*/
@Test
public void testTopoDelete() throws Exception {
// 1.1 构建请求参数
// 1. 构建请求参数
IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO();
params.setSubDevices(Collections.singletonList(
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(),
withToken(params),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request);
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), withToken(params));
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoDelete][响应消息: {}]", response);
} else {
log.warn("[testTopoDelete][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testTopoDelete][响应消息: {}]", response);
}
/**
* 获取子设备拓扑关系测试
* <p>
* 网关设备向平台查询已绑定的子设备列表
*/
@Test
public void testTopoGet() throws Exception {
// 1.1 构建请求参数(目前为空,预留扩展)
// 1. 构建请求参数
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(),
withToken(params),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request);
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), withToken(params));
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoGet][响应消息: {}]", response);
} else {
log.warn("[testTopoGet][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testTopoGet][响应消息: {}]", response);
}
// ===================== 子设备注册测试 =====================
/**
* 子设备动态注册测试
* <p>
* 网关设备代理子设备进行动态注册,平台返回子设备的 deviceSecret
* <p>
* 注意:此接口需要网关 Token 认证
*/
@Test
public void testSubDeviceRegister() throws Exception {
// 1.1 构建请求参数
// 1. 构建请求参数
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO();
subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY);
subDevice.setDeviceName("mougezishebei");
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(),
withToken(Collections.singletonList(subDevice)),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request);
withToken(Collections.singletonList(subDevice)));
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testSubDeviceRegister][响应消息: {}]", response);
} else {
log.warn("[testSubDeviceRegister][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testSubDeviceRegister][响应消息: {}]", response);
}
// ===================== 批量上报测试 =====================
/**
* 批量上报属性测试(网关 + 子设备)
* <p>
* 网关设备批量上报自身属性、事件,以及子设备的属性、事件
*/
@Test
public void testPropertyPackPost() throws Exception {
@@ -307,40 +217,18 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest {
params.setProperties(gatewayProperties);
params.setEvents(gatewayEvents);
params.setSubDevices(ListUtil.of(subDeviceData));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(),
withToken(params),
null, null, null);
// 1.7 编码
byte[] payload = CODEC.encode(request);
log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), withToken(params));
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testPropertyPackPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPackPost][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testPropertyPackPost][响应消息: {}]", response);
}
// ===================== 辅助方法 =====================
/**
* 构建带 token 的 params
* <p>
* 返回格式:{token: "xxx", body: params}
* - tokenJWT 令牌
* - body实际请求内容可以是 Map、List 或其他类型)
*
* @param params 原始参数Map、List 或对象)
* @return 包含 token 和 body 的 Map
*/
private Map<String, Object> withToken(Object params) {
Map<String, Object> result = new HashMap<>();
@@ -349,4 +237,31 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest {
return result;
}
/**
* 发送 UDP 消息并接收响应
*/
private IotDeviceMessage sendAndReceive(IotDeviceMessage request) throws Exception {
byte[] payload = SERIALIZER.serialize(request);
log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), payload.length);
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
InetAddress address = InetAddress.getByName(SERVER_HOST);
DatagramPacket sendPacket = new DatagramPacket(payload, payload.length, address, SERVER_PORT);
socket.send(sendPacket);
byte[] receiveData = new byte[4096];
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
try {
socket.receive(receivePacket);
byte[] responseBytes = new byte[receivePacket.getLength()];
System.arraycopy(receivePacket.getData(), 0, responseBytes, 0, receivePacket.getLength());
return SERIALIZER.deserialize(responseBytes);
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
}
}
}

View File

@@ -1,26 +1,24 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUdpProtocolIntegrationTest.sendAndReceive;
/**
* IoT 网关子设备 UDP 协议集成测试(手动测试)
*
@@ -29,17 +27,10 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd
* <p><b>重要说明子设备无法直接连接平台所有请求均由网关设备Gateway代为转发。</b>
* <p>网关设备转发子设备请求时Token 使用子设备自己的信息。
*
* <p>支持两种编解码格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务UDP 端口 8093</li>
* <li>确保子设备已通过 {@link IotGatewayDeviceUdpProtocolIntegrationTest#testTopoAdd()} 绑定到网关</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>运行 {@link #testAuth()} 获取子设备 token将返回的 token 粘贴到 {@link #TOKEN} 常量</li>
* <li>运行以下测试方法:
* <ul>
@@ -57,12 +48,16 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd
@Disabled
public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8093;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
// ===================== 序列化器 =====================
private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
/**
* 消息序列化器
*/
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
@@ -82,30 +77,18 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建认证消息
// 1. 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
} else {
log.warn("[testAuth][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testAuth][响应消息: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
}
// ===================== 子设备属性上报测试 =====================
@@ -115,33 +98,19 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
// 1.1 构建属性上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 1. 构建属性上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
withToken(IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("power", 100)
.put("status", "online")
.put("temperature", 36.5)
.build())),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
.build())));
log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]");
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testPropertyPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPost][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testPropertyPost][响应消息: {}]", response);
}
// ===================== 子设备事件上报测试 =====================
@@ -151,9 +120,8 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testEventPost() throws Exception {
// 1.1 构建事件上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 1. 构建事件上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
withToken(IotDeviceEventPostReqDTO.of(
"alarm",
@@ -163,38 +131,18 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
.put("threshold", 40)
.put("current", 42)
.build(),
System.currentTimeMillis())),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
System.currentTimeMillis())));
log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]");
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testEventPost][响应消息: {}]", response);
} else {
log.warn("[testEventPost][未收到响应]");
}
}
// 2. 发送并接收响应
IotDeviceMessage response = sendAndReceive(request);
log.info("[testEventPost][响应消息: {}]", response);
}
// ===================== 辅助方法 =====================
/**
* 构建带 token 的 params
* <p>
* 返回格式:{token: "xxx", body: params}
* - tokenJWT 令牌
* - body实际请求内容可以是 Map、List 或其他类型)
*
* @param params 原始参数Map、List 或对象)
* @return 包含 token 和 body 的 Map
*/
private Map<String, Object> withToken(Object params) {
Map<String, Object> result = new HashMap<>();
@@ -203,4 +151,31 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
return result;
}
/**
* 发送 UDP 消息并接收响应
*/
private IotDeviceMessage sendAndReceive(IotDeviceMessage request) throws Exception {
byte[] payload = SERIALIZER.serialize(request);
log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), payload.length);
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
InetAddress address = InetAddress.getByName(SERVER_HOST);
DatagramPacket sendPacket = new DatagramPacket(payload, payload.length, address, SERVER_PORT);
socket.send(sendPacket);
byte[] receiveData = new byte[4096];
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
try {
socket.receive(receivePacket);
byte[] responseBytes = new byte[receivePacket.getLength()];
System.arraycopy(receivePacket.getData(), 0, responseBytes, 0, receivePacket.getLength());
return SERIALIZER.deserialize(responseBytes);
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
}
}
}