mirror of
https://gitee.com/zhijiantianya/ruoyi-vue-pro.git
synced 2026-03-22 13:17:17 +08:00
feat:【iot】udp 协议接入 50%:初始化,基于 soft-frolicking-breeze.md 实现
This commit is contained in:
@@ -17,6 +17,9 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDowns
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.Vertx;
|
||||
@@ -194,4 +197,39 @@ public class IotGatewayConfiguration {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* IoT 网关 UDP 协议配置类
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.udp", name = "enabled", havingValue = "true")
|
||||
@Slf4j
|
||||
public static class UdpProtocolConfiguration {
|
||||
|
||||
@Bean(name = "udpVertx", destroyMethod = "close")
|
||||
public Vertx udpVertx() {
|
||||
return Vertx.vertx();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IotUdpUpstreamProtocol iotUdpUpstreamProtocol(IotGatewayProperties gatewayProperties,
|
||||
IotDeviceService deviceService,
|
||||
IotDeviceMessageService messageService,
|
||||
IotUdpSessionManager sessionManager,
|
||||
@Qualifier("udpVertx") Vertx udpVertx) {
|
||||
return new IotUdpUpstreamProtocol(gatewayProperties.getProtocol().getUdp(),
|
||||
deviceService, messageService, sessionManager, udpVertx);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IotUdpDownstreamSubscriber iotUdpDownstreamSubscriber(IotUdpUpstreamProtocol protocolHandler,
|
||||
IotDeviceMessageService messageService,
|
||||
IotDeviceService deviceService,
|
||||
IotUdpSessionManager sessionManager,
|
||||
IotMessageBus messageBus) {
|
||||
return new IotUdpDownstreamSubscriber(protocolHandler, messageService, deviceService, sessionManager,
|
||||
messageBus);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -93,6 +93,11 @@ public class IotGatewayProperties {
|
||||
*/
|
||||
private MqttWsProperties mqttWs;
|
||||
|
||||
/**
|
||||
* UDP 组件配置
|
||||
*/
|
||||
private UdpProperties udp;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
@@ -503,4 +508,42 @@ public class IotGatewayProperties {
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class UdpProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* 服务端口(默认 8092)
|
||||
*/
|
||||
private Integer port = 8092;
|
||||
|
||||
/**
|
||||
* 接收缓冲区大小(默认 64KB)
|
||||
*/
|
||||
private Integer receiveBufferSize = 65536;
|
||||
|
||||
/**
|
||||
* 发送缓冲区大小(默认 64KB)
|
||||
*/
|
||||
private Integer sendBufferSize = 65536;
|
||||
|
||||
/**
|
||||
* 会话超时时间(毫秒,默认 60 秒)
|
||||
* <p>
|
||||
* 用于清理不活跃的设备地址映射
|
||||
*/
|
||||
private Long sessionTimeoutMs = 60000L;
|
||||
|
||||
/**
|
||||
* 会话清理间隔(毫秒,默认 30 秒)
|
||||
*/
|
||||
private Long sessionCleanIntervalMs = 30000L;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ public class IotTcpDownstreamHandler {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 根据产品 Key 和设备名称编码消息并发送到设备
|
||||
// 2. 根据产品 Key 和设备名称编码消息,并发送到设备
|
||||
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
|
||||
deviceInfo.getDeviceName());
|
||||
boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes);
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpDownstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* IoT 网关 UDP 下游订阅者:接收下行给设备的消息
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class IotUdpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
|
||||
|
||||
private final IotUdpUpstreamProtocol protocol;
|
||||
|
||||
private final IotDeviceMessageService messageService;
|
||||
|
||||
private final IotDeviceService deviceService;
|
||||
|
||||
private final IotUdpSessionManager sessionManager;
|
||||
|
||||
private final IotMessageBus messageBus;
|
||||
|
||||
private IotUdpDownstreamHandler downstreamHandler;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
// 初始化下游处理器
|
||||
this.downstreamHandler = new IotUdpDownstreamHandler(messageService, deviceService, sessionManager, protocol);
|
||||
|
||||
messageBus.register(this);
|
||||
log.info("[init][UDP 下游订阅者初始化完成,服务器 ID: {},Topic: {}]",
|
||||
protocol.getServerId(), getTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
// 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group
|
||||
return getTopic();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(IotDeviceMessage message) {
|
||||
try {
|
||||
downstreamHandler.handle(message);
|
||||
} catch (Exception e) {
|
||||
log.error("[onMessage][处理下行消息失败,设备 ID: {},方法: {},消息 ID: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,170 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpUpstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.datagram.DatagramSocket;
|
||||
import io.vertx.core.datagram.DatagramSocketOptions;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* IoT 网关 UDP 协议:接收设备上行消息
|
||||
* <p>
|
||||
* 采用 Vertx DatagramSocket 实现 UDP 服务器,主要功能:
|
||||
* 1. 监听 UDP 端口,接收设备消息
|
||||
* 2. 定期清理不活跃的设备地址映射
|
||||
* 3. 提供 UDP Socket 用于下行消息发送
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotUdpUpstreamProtocol {
|
||||
|
||||
private final IotGatewayProperties.UdpProperties udpProperties;
|
||||
|
||||
private final IotDeviceService deviceService;
|
||||
|
||||
private final IotDeviceMessageService messageService;
|
||||
|
||||
private final IotUdpSessionManager sessionManager;
|
||||
|
||||
private final Vertx vertx;
|
||||
|
||||
@Getter
|
||||
private final String serverId;
|
||||
|
||||
@Getter
|
||||
private DatagramSocket udpSocket;
|
||||
|
||||
/**
|
||||
* 会话清理定时器 ID
|
||||
*/
|
||||
private Long cleanTimerId;
|
||||
|
||||
private IotUdpUpstreamHandler upstreamHandler;
|
||||
|
||||
public IotUdpUpstreamProtocol(IotGatewayProperties.UdpProperties udpProperties,
|
||||
IotDeviceService deviceService,
|
||||
IotDeviceMessageService messageService,
|
||||
IotUdpSessionManager sessionManager,
|
||||
Vertx vertx) {
|
||||
this.udpProperties = udpProperties;
|
||||
this.deviceService = deviceService;
|
||||
this.messageService = messageService;
|
||||
this.sessionManager = sessionManager;
|
||||
this.vertx = vertx;
|
||||
this.serverId = IotDeviceMessageUtils.generateServerId(udpProperties.getPort());
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
// 1. 初始化上行消息处理器
|
||||
this.upstreamHandler = new IotUdpUpstreamHandler(this, messageService, deviceService, sessionManager);
|
||||
|
||||
// 2. 创建 UDP Socket 选项
|
||||
DatagramSocketOptions options = new DatagramSocketOptions()
|
||||
.setReceiveBufferSize(udpProperties.getReceiveBufferSize())
|
||||
.setSendBufferSize(udpProperties.getSendBufferSize())
|
||||
.setReuseAddress(true);
|
||||
|
||||
// 3. 创建 UDP Socket
|
||||
udpSocket = vertx.createDatagramSocket(options);
|
||||
|
||||
// 4. 监听端口
|
||||
udpSocket.listen(udpProperties.getPort(), "0.0.0.0", result -> {
|
||||
// TODO @AI:if return;简化下;成功才继续往下走;
|
||||
if (result.succeeded()) {
|
||||
// 设置数据包处理器
|
||||
udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket, this));
|
||||
log.info("[start][IoT 网关 UDP 协议启动成功,端口:{},接收缓冲区:{} 字节,发送缓冲区:{} 字节]",
|
||||
udpProperties.getPort(), udpProperties.getReceiveBufferSize(),
|
||||
udpProperties.getSendBufferSize());
|
||||
|
||||
// 5. 启动会话清理定时器
|
||||
startSessionCleanTimer();
|
||||
} else {
|
||||
log.error("[start][IoT 网关 UDP 协议启动失败]", result.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
// 1. 取消会话清理定时器
|
||||
if (cleanTimerId != null) {
|
||||
vertx.cancelTimer(cleanTimerId);
|
||||
cleanTimerId = null;
|
||||
log.info("[stop][会话清理定时器已取消]");
|
||||
}
|
||||
|
||||
// 2. 关闭 UDP Socket
|
||||
if (udpSocket != null) {
|
||||
try {
|
||||
udpSocket.close().result();
|
||||
log.info("[stop][IoT 网关 UDP 协议已停止]");
|
||||
} catch (Exception e) {
|
||||
log.error("[stop][IoT 网关 UDP 协议停止失败]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动会话清理定时器
|
||||
*/
|
||||
private void startSessionCleanTimer() {
|
||||
cleanTimerId = vertx.setPeriodic(udpProperties.getSessionCleanIntervalMs(), id -> {
|
||||
try {
|
||||
// 1. 清理超时的设备地址映射,并获取离线设备列表
|
||||
// TODO @AI:兼容 jdk8,不要用 var;
|
||||
var offlineDevices = sessionManager.cleanExpiredMappings(udpProperties.getSessionTimeoutMs());
|
||||
|
||||
// 2. 为每个离线设备发送离线消息
|
||||
for (var offlineInfo : offlineDevices) {
|
||||
sendOfflineMessage(offlineInfo.getDeviceId());
|
||||
}
|
||||
// TODO @AI:CollUtil.isNotEmpty ;简化下 if 判断;
|
||||
if (!offlineDevices.isEmpty()) {
|
||||
log.info("[cleanExpiredMappings][本次清理 {} 个超时设备]", offlineDevices.size());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[cleanExpiredMappings][清理超时会话失败]", e);
|
||||
}
|
||||
});
|
||||
log.info("[startSessionCleanTimer][会话清理定时器启动,间隔:{} ms,超时:{} ms]",
|
||||
udpProperties.getSessionCleanIntervalMs(), udpProperties.getSessionTimeoutMs());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送设备离线消息
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
*/
|
||||
private void sendOfflineMessage(Long deviceId) {
|
||||
try {
|
||||
// 获取设备信息
|
||||
var device = deviceService.getDeviceFromCache(deviceId);
|
||||
if (device == null) {
|
||||
log.warn("[sendOfflineMessage][设备不存在,设备 ID: {}]", deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 发送离线消息
|
||||
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
|
||||
messageService.sendDeviceMessage(offlineMessage, device.getProductKey(),
|
||||
device.getDeviceName(), serverId);
|
||||
log.info("[sendOfflineMessage][发送离线消息,设备 ID: {},设备名: {}]",
|
||||
deviceId, device.getDeviceName());
|
||||
} catch (Exception e) {
|
||||
log.error("[sendOfflineMessage][发送离线消息失败,设备 ID: {}]", deviceId, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,204 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.datagram.DatagramSocket;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* IoT 网关 UDP 会话管理器
|
||||
* <p>
|
||||
* 采用无状态设计,SessionManager 主要用于:
|
||||
* 1. 管理设备地址映射(用于下行消息发送)
|
||||
* 2. 定期清理不活跃的设备地址映射
|
||||
* <p>
|
||||
* 注意:UDP 是无连接协议,上行消息通过 token 验证身份,不依赖会话状态
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class IotUdpSessionManager {
|
||||
|
||||
/**
|
||||
* 设备 ID -> 设备地址(用于下行消息发送)
|
||||
*/
|
||||
private final Map<Long, InetSocketAddress> deviceAddressMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 设备地址 Key -> 最后活跃时间(用于清理)
|
||||
*/
|
||||
// TODO @AI:是不是尽量使用 LocalDateTime ?统一时间类型
|
||||
private final Map<String, Long> lastActiveTimeMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 设备地址 Key -> 设备 ID(反向映射,用于清理时同步)
|
||||
*/
|
||||
private final Map<String, Long> addressDeviceMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 更新设备地址(每次收到上行消息时调用)
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @param address 设备地址
|
||||
*/
|
||||
public void updateDeviceAddress(Long deviceId, InetSocketAddress address) {
|
||||
String addressKey = buildAddressKey(address);
|
||||
// 更新设备地址映射
|
||||
deviceAddressMap.put(deviceId, address);
|
||||
lastActiveTimeMap.put(addressKey, System.currentTimeMillis());
|
||||
addressDeviceMap.put(addressKey, deviceId);
|
||||
log.debug("[updateDeviceAddress][更新设备地址,设备 ID: {},地址: {}]", deviceId, addressKey);
|
||||
}
|
||||
|
||||
// TODO @AI:是不是用不到?用不掉就删除掉!简化
|
||||
/**
|
||||
* 获取设备地址(下行消息发送时使用)
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @return 设备地址,如果不存在返回 null
|
||||
*/
|
||||
public InetSocketAddress getDeviceAddress(Long deviceId) {
|
||||
return deviceAddressMap.get(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查设备是否在线(即是否有地址映射)
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @return 是否在线
|
||||
*/
|
||||
public boolean isDeviceOnline(Long deviceId) {
|
||||
return deviceAddressMap.containsKey(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查设备是否离线
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @return 是否离线
|
||||
*/
|
||||
public boolean isDeviceOffline(Long deviceId) {
|
||||
return !isDeviceOnline(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息到设备
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @param data 数据
|
||||
* @param socket UDP Socket
|
||||
* @return 是否发送成功
|
||||
*/
|
||||
public boolean sendToDevice(Long deviceId, byte[] data, DatagramSocket socket) {
|
||||
InetSocketAddress address = deviceAddressMap.get(deviceId);
|
||||
if (address == null) {
|
||||
log.warn("[sendToDevice][设备地址不存在,设备 ID: {}]", deviceId);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
socket.send(Buffer.buffer(data), address.getPort(), address.getHostString(), result -> {
|
||||
if (result.succeeded()) {
|
||||
log.debug("[sendToDevice][发送消息成功,设备 ID: {},地址: {},数据长度: {} 字节]",
|
||||
deviceId, buildAddressKey(address), data.length);
|
||||
} else {
|
||||
log.error("[sendToDevice][发送消息失败,设备 ID: {},地址: {}]",
|
||||
deviceId, buildAddressKey(address), result.cause());
|
||||
}
|
||||
});
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("[sendToDevice][发送消息异常,设备 ID: {}]", deviceId, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 定期清理不活跃的设备地址映射
|
||||
*
|
||||
* @param timeoutMs 超时时间(毫秒)
|
||||
* @return 清理的设备 ID 列表(用于发送离线消息)
|
||||
*/
|
||||
// TODO @AI:目前暂时用不到 address 字段,是不是只返回 list of deviceId 就行?简化
|
||||
public java.util.List<DeviceOfflineInfo> cleanExpiredMappings(long timeoutMs) {
|
||||
java.util.List<DeviceOfflineInfo> offlineDevices = new java.util.ArrayList<>();
|
||||
long now = System.currentTimeMillis();
|
||||
Iterator<Map.Entry<String, Long>> iterator = lastActiveTimeMap.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<String, Long> entry = iterator.next();
|
||||
if (now - entry.getValue() > timeoutMs) {
|
||||
String addressKey = entry.getKey();
|
||||
Long deviceId = addressDeviceMap.remove(addressKey);
|
||||
// TODO @AI:if continue,减少括号层级;
|
||||
if (deviceId != null) {
|
||||
InetSocketAddress address = deviceAddressMap.remove(deviceId);
|
||||
if (address != null) {
|
||||
// 获取设备信息用于发送离线消息
|
||||
offlineDevices.add(new DeviceOfflineInfo(deviceId, addressKey));
|
||||
log.info("[cleanExpiredMappings][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}ms 前]",
|
||||
deviceId, addressKey, now - entry.getValue());
|
||||
}
|
||||
}
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
return offlineDevices;
|
||||
}
|
||||
|
||||
// TODO @AI:是不是用不到?用不掉就删除掉!简化
|
||||
/**
|
||||
* 移除设备地址映射
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
*/
|
||||
public void removeDeviceAddress(Long deviceId) {
|
||||
InetSocketAddress address = deviceAddressMap.remove(deviceId);
|
||||
if (address != null) {
|
||||
String addressKey = buildAddressKey(address);
|
||||
lastActiveTimeMap.remove(addressKey);
|
||||
addressDeviceMap.remove(addressKey);
|
||||
log.debug("[removeDeviceAddress][移除设备地址,设备 ID: {},地址: {}]", deviceId, addressKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建地址 Key
|
||||
*
|
||||
* @param address 地址
|
||||
* @return 地址 Key
|
||||
*/
|
||||
public String buildAddressKey(InetSocketAddress address) {
|
||||
return address.getHostString() + ":" + address.getPort();
|
||||
}
|
||||
|
||||
/**
|
||||
* 设备离线信息
|
||||
*/
|
||||
@Data
|
||||
public static class DeviceOfflineInfo {
|
||||
|
||||
/**
|
||||
* 设备 ID
|
||||
*/
|
||||
private final Long deviceId;
|
||||
|
||||
/**
|
||||
* 设备地址
|
||||
*/
|
||||
private final String address;
|
||||
|
||||
public DeviceOfflineInfo(Long deviceId, String address) {
|
||||
this.deviceId = deviceId;
|
||||
this.address = address;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
// TODO @AI:完善下注释,参考 mqtt 的 package.json
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
|
||||
@@ -0,0 +1,82 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.router;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.datagram.DatagramSocket;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* IoT 网关 UDP 下行消息处理器
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotUdpDownstreamHandler {
|
||||
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
private final IotDeviceService deviceService;
|
||||
|
||||
private final IotUdpSessionManager sessionManager;
|
||||
|
||||
private final IotUdpUpstreamProtocol protocol;
|
||||
|
||||
public IotUdpDownstreamHandler(IotDeviceMessageService deviceMessageService,
|
||||
IotDeviceService deviceService,
|
||||
IotUdpSessionManager sessionManager,
|
||||
IotUdpUpstreamProtocol protocol) {
|
||||
this.deviceMessageService = deviceMessageService;
|
||||
this.deviceService = deviceService;
|
||||
this.sessionManager = sessionManager;
|
||||
this.protocol = protocol;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理下行消息
|
||||
*
|
||||
* @param message 下行消息
|
||||
*/
|
||||
public void handle(IotDeviceMessage message) {
|
||||
try {
|
||||
log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId());
|
||||
|
||||
// 1.1 获取设备信息
|
||||
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
|
||||
if (deviceInfo == null) {
|
||||
log.error("[handle][设备不存在,设备 ID: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
// 1.2 检查设备是否在线(即是否有地址映射)
|
||||
if (sessionManager.isDeviceOffline(message.getDeviceId())) {
|
||||
log.warn("[handle][设备不在线,设备 ID: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 根据产品 Key 和设备名称编码消息,并发送到设备
|
||||
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
|
||||
deviceInfo.getDeviceName());
|
||||
DatagramSocket socket = protocol.getUdpSocket();
|
||||
if (socket == null) {
|
||||
log.error("[handle][UDP Socket 不可用,设备 ID: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
boolean success = sessionManager.sendToDevice(message.getDeviceId(), bytes, socket);
|
||||
if (success) {
|
||||
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
|
||||
} else {
|
||||
log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,441 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.router;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.BooleanUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.datagram.DatagramPacket;
|
||||
import io.vertx.core.datagram.DatagramSocket;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Map;
|
||||
|
||||
// TODO @AI:注释里,不要出现 CoAP,避免理解成本过高;
|
||||
/**
|
||||
* UDP 上行消息处理器
|
||||
* <p>
|
||||
* 采用 CoAP 风格的 Token 机制(无状态,每次请求携带 token):
|
||||
* 1. 认证请求:设备发送 auth 消息,携带 clientId、username、password
|
||||
* 2. 返回 Token:服务端验证后返回 JWT token
|
||||
* 3. 后续请求:每次请求在 params 中携带 token
|
||||
* 4. 服务端验证:每次请求通过 IotDeviceTokenService.verifyToken() 验证
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotUdpUpstreamHandler {
|
||||
|
||||
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
|
||||
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
|
||||
|
||||
private static final String AUTH_METHOD = "auth";
|
||||
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
private final IotDeviceService deviceService;
|
||||
|
||||
private final IotUdpSessionManager sessionManager;
|
||||
|
||||
private final IotDeviceTokenService deviceTokenService;
|
||||
|
||||
private final IotDeviceCommonApi deviceApi;
|
||||
|
||||
private final String serverId;
|
||||
|
||||
public IotUdpUpstreamHandler(IotUdpUpstreamProtocol protocol,
|
||||
IotDeviceMessageService deviceMessageService,
|
||||
IotDeviceService deviceService,
|
||||
IotUdpSessionManager sessionManager) {
|
||||
this.deviceMessageService = deviceMessageService;
|
||||
this.deviceService = deviceService;
|
||||
this.sessionManager = sessionManager;
|
||||
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
this.serverId = protocol.getServerId();
|
||||
}
|
||||
|
||||
// TODO @AI:protocol 这个参数如果用不到,就删除下;
|
||||
/**
|
||||
* 处理 UDP 数据包
|
||||
*
|
||||
* @param packet 数据包
|
||||
* @param socket UDP Socket
|
||||
* @param protocol UDP 协议
|
||||
*/
|
||||
public void handle(DatagramPacket packet, DatagramSocket socket, IotUdpUpstreamProtocol protocol) {
|
||||
InetSocketAddress senderAddress = new InetSocketAddress(packet.sender().host(), packet.sender().port());
|
||||
Buffer data = packet.data();
|
||||
log.debug("[handle][收到 UDP 数据包,来源: {},数据长度: {} 字节]",
|
||||
sessionManager.buildAddressKey(senderAddress), data.length());
|
||||
try {
|
||||
processMessage(data, senderAddress, socket);
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][处理消息失败,来源: {},错误: {}]",
|
||||
sessionManager.buildAddressKey(senderAddress), e.getMessage(), e);
|
||||
// UDP 无连接,不需要断开连接,只记录错误
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param buffer 消息
|
||||
* @param senderAddress 发送者地址
|
||||
* @param socket UDP Socket
|
||||
*/
|
||||
private void processMessage(Buffer buffer, InetSocketAddress senderAddress, DatagramSocket socket) {
|
||||
// 1. 基础检查
|
||||
if (buffer == null || buffer.length() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 获取消息格式类型
|
||||
String codecType = getMessageCodecType(buffer);
|
||||
|
||||
// 3. 解码消息
|
||||
IotDeviceMessage message;
|
||||
try {
|
||||
message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
|
||||
if (message == null) {
|
||||
log.warn("[processMessage][消息解码失败,来源: {}]", sessionManager.buildAddressKey(senderAddress));
|
||||
sendErrorResponse(socket, senderAddress, null, "消息解码失败", codecType);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[processMessage][消息解码异常,来源: {}]", sessionManager.buildAddressKey(senderAddress), e);
|
||||
sendErrorResponse(socket, senderAddress, null, "消息解码失败: " + e.getMessage(), codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 根据消息类型路由处理
|
||||
try {
|
||||
if (AUTH_METHOD.equals(message.getMethod())) {
|
||||
// 认证请求
|
||||
handleAuthenticationRequest(message, codecType, senderAddress, socket);
|
||||
} else {
|
||||
// 业务消息
|
||||
handleBusinessRequest(message, codecType, senderAddress, socket);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[processMessage][处理消息失败,来源: {},消息方法: {}]",
|
||||
sessionManager.buildAddressKey(senderAddress), message.getMethod(), e);
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), "消息处理失败", codecType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理认证请求
|
||||
*
|
||||
* @param message 消息信息
|
||||
* @param codecType 消息编解码类型
|
||||
* @param senderAddress 发送者地址
|
||||
* @param socket UDP Socket
|
||||
*/
|
||||
private void handleAuthenticationRequest(IotDeviceMessage message, String codecType,
|
||||
InetSocketAddress senderAddress, DatagramSocket socket) {
|
||||
String addressKey = sessionManager.buildAddressKey(senderAddress);
|
||||
try {
|
||||
// 1.1 解析认证参数
|
||||
IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams());
|
||||
if (authParams == null) {
|
||||
log.warn("[handleAuthenticationRequest][认证参数解析失败,来源: {}]", addressKey);
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证参数不完整", codecType);
|
||||
return;
|
||||
}
|
||||
// 1.2 执行认证
|
||||
if (!validateDeviceAuth(authParams)) {
|
||||
log.warn("[handleAuthenticationRequest][认证失败,来源: {},username: {}]",
|
||||
addressKey, authParams.getUsername());
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证失败", codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2.1 解析设备信息
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
|
||||
if (deviceInfo == null) {
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), "解析设备信息失败", codecType);
|
||||
return;
|
||||
}
|
||||
// 2.2 获取设备信息
|
||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
|
||||
deviceInfo.getDeviceName());
|
||||
if (device == null) {
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), "设备不存在", codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3.1 生成 JWT Token(CoAP 风格)
|
||||
String token = deviceTokenService.createToken(device.getProductKey(), device.getDeviceName());
|
||||
|
||||
// 3.2 更新设备地址映射(用于下行消息)
|
||||
sessionManager.updateDeviceAddress(device.getId(), senderAddress);
|
||||
|
||||
// 3.3 发送上线消息
|
||||
sendOnlineMessage(device);
|
||||
|
||||
// 3.4 发送成功响应(包含 token)
|
||||
sendAuthSuccessResponse(socket, senderAddress, message.getRequestId(), token, codecType);
|
||||
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {},来源: {}]",
|
||||
device.getId(), device.getDeviceName(), addressKey);
|
||||
} catch (Exception e) {
|
||||
log.error("[handleAuthenticationRequest][认证处理异常,来源: {}]", addressKey, e);
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), "认证处理异常", codecType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理业务请求
|
||||
*
|
||||
* @param message 消息信息
|
||||
* @param codecType 消息编解码类型
|
||||
* @param senderAddress 发送者地址
|
||||
* @param socket UDP Socket
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void handleBusinessRequest(IotDeviceMessage message, String codecType,
|
||||
InetSocketAddress senderAddress, DatagramSocket socket) {
|
||||
String addressKey = sessionManager.buildAddressKey(senderAddress);
|
||||
try {
|
||||
// TODO @AI:token 需要枚举个 KEY;考虑到是通过 params 传递的话,需要获取到后,从 map 里移除掉,避免影响后续业务逻辑处理;
|
||||
// 1. 从消息中提取 token(CoAP 风格:消息体携带 token)
|
||||
String token = null;
|
||||
if (message.getParams() instanceof Map) {
|
||||
token = MapUtil.getStr((Map<String, Object>) message.getParams(), "token");
|
||||
}
|
||||
|
||||
if (StrUtil.isBlank(token)) {
|
||||
log.warn("[handleBusinessRequest][缺少 token,来源: {}]", addressKey);
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), "请先进行认证", codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 验证 token,获取设备信息
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token);
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleBusinessRequest][token 无效或已过期,来源: {}]", addressKey);
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), "token 无效或已过期", codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 获取设备详细信息
|
||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
|
||||
deviceInfo.getDeviceName());
|
||||
if (device == null) {
|
||||
log.warn("[handleBusinessRequest][设备不存在,来源: {},productKey: {},deviceName: {}]",
|
||||
addressKey, deviceInfo.getProductKey(), deviceInfo.getDeviceName());
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), "设备不存在", codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 更新设备地址映射(保持最新)
|
||||
sessionManager.updateDeviceAddress(device.getId(), senderAddress);
|
||||
|
||||
// 5. 发送消息到消息总线
|
||||
deviceMessageService.sendDeviceMessage(message, device.getProductKey(),
|
||||
device.getDeviceName(), serverId);
|
||||
|
||||
// 6. 发送成功响应
|
||||
sendSuccessResponse(socket, senderAddress, message.getRequestId(), "处理成功", codecType);
|
||||
log.debug("[handleBusinessRequest][业务消息处理成功,设备 ID: {},方法: {},来源: {}]",
|
||||
device.getId(), message.getMethod(), addressKey);
|
||||
} catch (Exception e) {
|
||||
log.error("[handleBusinessRequest][业务请求处理异常,来源: {}]", addressKey, e);
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), "处理失败", codecType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取消息编解码类型
|
||||
*
|
||||
* @param buffer 消息
|
||||
* @return 消息编解码类型
|
||||
*/
|
||||
private String getMessageCodecType(Buffer buffer) {
|
||||
// 检测消息格式类型
|
||||
return IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(buffer.getBytes()) ? CODEC_TYPE_BINARY
|
||||
: CODEC_TYPE_JSON;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送设备上线消息
|
||||
*
|
||||
* @param device 设备信息
|
||||
*/
|
||||
private void sendOnlineMessage(IotDeviceRespDTO device) {
|
||||
try {
|
||||
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
|
||||
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
|
||||
device.getDeviceName(), serverId);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证设备认证信息
|
||||
*
|
||||
* @param authParams 认证参数
|
||||
* @return 是否认证成功
|
||||
*/
|
||||
private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) {
|
||||
try {
|
||||
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
|
||||
.setClientId(authParams.getClientId()).setUsername(authParams.getUsername())
|
||||
.setPassword(authParams.getPassword()));
|
||||
result.checkError();
|
||||
return BooleanUtil.isTrue(result.getData());
|
||||
} catch (Exception e) {
|
||||
log.error("[validateDeviceAuth][设备认证异常,username: {}]", authParams.getUsername(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送认证成功响应(包含 token)
|
||||
*
|
||||
* @param socket UDP Socket
|
||||
* @param address 目标地址
|
||||
* @param requestId 请求 ID
|
||||
* @param token JWT Token
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
private void sendAuthSuccessResponse(DatagramSocket socket, InetSocketAddress address,
|
||||
String requestId, String token, String codecType) {
|
||||
try {
|
||||
Object responseData = MapUtil.builder()
|
||||
.put("success", true)
|
||||
.put("token", token)
|
||||
.put("message", "认证成功")
|
||||
.build();
|
||||
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, 0, "认证成功");
|
||||
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
|
||||
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), result -> {
|
||||
if (result.failed()) {
|
||||
log.error("[sendAuthSuccessResponse][发送认证成功响应失败,地址: {}]",
|
||||
sessionManager.buildAddressKey(address), result.cause());
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("[sendAuthSuccessResponse][发送认证成功响应异常,地址: {}]",
|
||||
sessionManager.buildAddressKey(address), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送成功响应
|
||||
*
|
||||
* @param socket UDP Socket
|
||||
* @param address 目标地址
|
||||
* @param requestId 请求 ID
|
||||
* @param message 消息
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
@SuppressWarnings("SameParameterValue")
|
||||
private void sendSuccessResponse(DatagramSocket socket, InetSocketAddress address,
|
||||
String requestId, String message, String codecType) {
|
||||
sendResponse(socket, address, true, message, requestId, codecType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送错误响应
|
||||
*
|
||||
* @param socket UDP Socket
|
||||
* @param address 目标地址
|
||||
* @param requestId 请求 ID
|
||||
* @param errorMessage 错误消息
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
private void sendErrorResponse(DatagramSocket socket, InetSocketAddress address,
|
||||
String requestId, String errorMessage, String codecType) {
|
||||
sendResponse(socket, address, false, errorMessage, requestId, codecType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送响应消息
|
||||
*
|
||||
* @param socket UDP Socket
|
||||
* @param address 目标地址
|
||||
* @param success 是否成功
|
||||
* @param message 消息
|
||||
* @param requestId 请求 ID
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
private void sendResponse(DatagramSocket socket, InetSocketAddress address, boolean success,
|
||||
String message, String requestId, String codecType) {
|
||||
try {
|
||||
Object responseData = MapUtil.builder()
|
||||
.put("success", success)
|
||||
.put("message", message)
|
||||
.build();
|
||||
|
||||
int code = success ? 0 : 401;
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, "response", responseData,
|
||||
code, message);
|
||||
|
||||
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
|
||||
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), ar -> {
|
||||
if (ar.failed()) {
|
||||
log.error("[sendResponse][发送响应失败,地址: {}]",
|
||||
sessionManager.buildAddressKey(address), ar.cause());
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("[sendResponse][发送响应异常,地址: {}]",
|
||||
sessionManager.buildAddressKey(address), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析认证参数
|
||||
*
|
||||
* @param params 参数对象(通常为 Map 类型)
|
||||
* @return 认证参数 DTO,解析失败时返回 null
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private IotDeviceAuthReqDTO parseAuthParams(Object params) {
|
||||
if (params == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
// 参数默认为 Map 类型,直接转换
|
||||
if (params instanceof Map) {
|
||||
Map<String, Object> paramMap = (Map<String, Object>) params;
|
||||
return new IotDeviceAuthReqDTO()
|
||||
.setClientId(MapUtil.getStr(paramMap, "clientId"))
|
||||
.setUsername(MapUtil.getStr(paramMap, "username"))
|
||||
.setPassword(MapUtil.getStr(paramMap, "password"));
|
||||
}
|
||||
|
||||
// 如果已经是目标类型,直接返回
|
||||
if (params instanceof IotDeviceAuthReqDTO) {
|
||||
return (IotDeviceAuthReqDTO) params;
|
||||
}
|
||||
|
||||
// 其他情况尝试 JSON 转换
|
||||
String jsonStr = JsonUtils.toJsonString(params);
|
||||
return JsonUtils.parseObject(jsonStr, IotDeviceAuthReqDTO.class);
|
||||
} catch (Exception e) {
|
||||
log.error("[parseAuthParams][解析认证参数({})失败]", params, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -96,6 +96,16 @@ yudao:
|
||||
ssl-cert-path: "classpath:certs/client.jks"
|
||||
ssl-key-path: "classpath:certs/client.jks"
|
||||
# ====================================
|
||||
# 针对引入的 UDP 组件的配置
|
||||
# ====================================
|
||||
udp:
|
||||
enabled: false # 是否启用 UDP
|
||||
port: 8092 # UDP 服务端口
|
||||
receive-buffer-size: 65536 # 接收缓冲区大小(字节,默认 64KB)
|
||||
send-buffer-size: 65536 # 发送缓冲区大小(字节,默认 64KB)
|
||||
session-timeout-ms: 60000 # 会话超时时间(毫秒,默认 60 秒)
|
||||
session-clean-interval-ms: 30000 # 会话清理间隔(毫秒,默认 30 秒)
|
||||
# ====================================
|
||||
# 针对引入的 MQTT 组件的配置
|
||||
# ====================================
|
||||
mqtt:
|
||||
|
||||
@@ -0,0 +1,157 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* IoT 网关 UDP 协议集成测试(手动测试)
|
||||
*
|
||||
* <p>使用步骤:
|
||||
* <ol>
|
||||
* <li>启动 yudao-module-iot-gateway 服务(UDP 端口 8092)</li>
|
||||
* <li>运行 {@link #testAuth()} 获取 token,将返回的 token 粘贴到 {@link #TOKEN} 常量</li>
|
||||
* <li>运行 {@link #testPropertyPost()} 测试属性上报,或运行 {@link #testEventPost()} 测试事件上报</li>
|
||||
* </ol>
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotUdpProtocolIntegrationTest {
|
||||
|
||||
private static final String SERVER_HOST = "127.0.0.1";
|
||||
private static final int SERVER_PORT = 8092;
|
||||
private static final int TIMEOUT_MS = 5000;
|
||||
|
||||
// 设备信息(根据实际情况修改 PRODUCT_KEY、DEVICE_NAME、PASSWORD)
|
||||
private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
|
||||
private static final String DEVICE_NAME = "small";
|
||||
private static final String PASSWORD = "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75";
|
||||
|
||||
private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME;
|
||||
private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY;
|
||||
|
||||
/**
|
||||
* 设备 Token:从 {@link #testAuth()} 方法获取后,粘贴到这里
|
||||
*/
|
||||
private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMwNTA1NSwiZGV2aWNlTmFtZSI6InNtYWxsIn0.mf3MEATCn5bp6cXgULunZjs8d00RGUxj96JEz0hMS7k";
|
||||
|
||||
/**
|
||||
* 认证测试:获取设备 Token
|
||||
*/
|
||||
@Test
|
||||
public void testAuth() throws Exception {
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", "auth")
|
||||
.put("params", MapUtil.builder()
|
||||
.put("clientId", CLIENT_ID)
|
||||
.put("username", USERNAME)
|
||||
.put("password", PASSWORD)
|
||||
.build())
|
||||
.build());
|
||||
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
|
||||
log.info("[testAuth][请求体: {}]", payload);
|
||||
|
||||
String response = sendAndReceive(socket, payload);
|
||||
|
||||
log.info("[testAuth][响应体: {}]", response);
|
||||
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 属性上报测试
|
||||
*/
|
||||
@Test
|
||||
public void testPropertyPost() throws Exception {
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("params", MapUtil.builder()
|
||||
.put("token", TOKEN)
|
||||
.put("width", 1)
|
||||
.put("height", "2")
|
||||
.build())
|
||||
.build());
|
||||
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
|
||||
log.info("[testPropertyPost][请求体: {}]", payload);
|
||||
|
||||
String response = sendAndReceive(socket, payload);
|
||||
|
||||
log.info("[testPropertyPost][响应体: {}]", response);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 事件上报测试
|
||||
*/
|
||||
@Test
|
||||
public void testEventPost() throws Exception {
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("identifier", "eat")
|
||||
.put("params", MapUtil.builder()
|
||||
.put("token", TOKEN)
|
||||
.put("width", 1)
|
||||
.put("height", "2")
|
||||
.put("oneThree", "3")
|
||||
.build())
|
||||
.build());
|
||||
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
|
||||
log.info("[testEventPost][请求体: {}]", payload);
|
||||
|
||||
String response = sendAndReceive(socket, payload);
|
||||
|
||||
log.info("[testEventPost][响应体: {}]", response);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送 UDP 请求并接收响应
|
||||
*
|
||||
* @param socket UDP Socket
|
||||
* @param payload 请求体
|
||||
* @return 响应内容
|
||||
*/
|
||||
private String sendAndReceive(DatagramSocket socket, String payload) throws Exception {
|
||||
byte[] sendData = payload.getBytes(StandardCharsets.UTF_8);
|
||||
InetAddress address = InetAddress.getByName(SERVER_HOST);
|
||||
|
||||
// 发送请求
|
||||
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, address, SERVER_PORT);
|
||||
socket.send(sendPacket);
|
||||
|
||||
// 接收响应
|
||||
byte[] receiveData = new byte[4096];
|
||||
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
|
||||
try {
|
||||
socket.receive(receivePacket);
|
||||
return new String(receivePacket.getData(), 0, receivePacket.getLength(), StandardCharsets.UTF_8);
|
||||
} catch (java.net.SocketTimeoutException e) {
|
||||
log.warn("[sendAndReceive][接收响应超时]");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user