feat:【iot】UDP 协议:兼容下行的时候,基于连接的 codec 处理

This commit is contained in:
YunaiV
2026-01-26 21:16:43 +08:00
parent 4003f4b028
commit de1a53a5f1
8 changed files with 329 additions and 240 deletions

View File

@@ -223,11 +223,9 @@ public class IotGatewayConfiguration {
@Bean
public IotUdpDownstreamSubscriber iotUdpDownstreamSubscriber(IotUdpUpstreamProtocol protocolHandler,
IotDeviceMessageService messageService,
IotDeviceService deviceService,
IotUdpSessionManager sessionManager,
IotMessageBus messageBus) {
return new IotUdpDownstreamSubscriber(protocolHandler, messageService, deviceService, sessionManager,
messageBus);
return new IotUdpDownstreamSubscriber(protocolHandler, messageService, sessionManager, messageBus);
}
}

View File

@@ -6,7 +6,6 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
@@ -25,8 +24,6 @@ public class IotUdpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
private final IotDeviceMessageService messageService;
private final IotDeviceService deviceService;
private final IotUdpSessionManager sessionManager;
private final IotMessageBus messageBus;
@@ -36,7 +33,7 @@ public class IotUdpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
@PostConstruct
public void init() {
// 初始化下游处理器
this.downstreamHandler = new IotUdpDownstreamHandler(messageService, deviceService, sessionManager, protocol);
this.downstreamHandler = new IotUdpDownstreamHandler(messageService, sessionManager, protocol);
messageBus.register(this);
log.info("[init][UDP 下游订阅者初始化完成,服务器 ID: {}Topic: {}]",

View File

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -29,9 +30,9 @@ import java.util.concurrent.ConcurrentHashMap;
public class IotUdpSessionManager {
/**
* 设备 ID -> 设备地址(用于下行消息发送
* 设备 ID -> 会话信息(包含地址和 codecType
*/
private final Map<Long, InetSocketAddress> deviceAddressMap = new ConcurrentHashMap<>();
private final Map<Long, SessionInfo> deviceSessionMap = new ConcurrentHashMap<>();
/**
* 设备地址 Key -> 最后活跃时间(用于清理)
@@ -44,18 +45,41 @@ public class IotUdpSessionManager {
private final Map<String, Long> addressDeviceMap = new ConcurrentHashMap<>();
/**
* 更新设备地址(每次收到上行消息时调用)
* 更新设备会话(每次收到上行消息时调用)
*
* @param deviceId 设备 ID
* @param address 设备地址
* @param codecType 消息编解码类型
*/
public void updateDeviceSession(Long deviceId, InetSocketAddress address, String codecType) {
String addressKey = buildAddressKey(address);
// 更新设备会话映射
deviceSessionMap.put(deviceId, new SessionInfo().setAddress(address).setCodecType(codecType));
lastActiveTimeMap.put(addressKey, LocalDateTime.now());
addressDeviceMap.put(addressKey, deviceId);
log.debug("[updateDeviceSession][更新设备会话,设备 ID: {},地址: {}codecType: {}]", deviceId, addressKey, codecType);
}
/**
* 更新设备地址(兼容旧接口,默认不更新 codecType
*
* @param deviceId 设备 ID
* @param address 设备地址
*/
public void updateDeviceAddress(Long deviceId, InetSocketAddress address) {
String addressKey = buildAddressKey(address);
// 更新设备地址映射
deviceAddressMap.put(deviceId, address);
lastActiveTimeMap.put(addressKey, LocalDateTime.now());
addressDeviceMap.put(addressKey, deviceId);
log.debug("[updateDeviceAddress][更新设备地址,设备 ID: {},地址: {}]", deviceId, addressKey);
SessionInfo sessionInfo = deviceSessionMap.get(deviceId);
String codecType = sessionInfo != null ? sessionInfo.getCodecType() : null;
updateDeviceSession(deviceId, address, codecType);
}
/**
* 获取设备会话信息
*
* @param deviceId 设备 ID
* @return 会话信息
*/
public SessionInfo getSessionInfo(Long deviceId) {
return deviceSessionMap.get(deviceId);
}
/**
@@ -65,7 +89,7 @@ public class IotUdpSessionManager {
* @return 是否在线
*/
public boolean isDeviceOnline(Long deviceId) {
return deviceAddressMap.containsKey(deviceId);
return deviceSessionMap.containsKey(deviceId);
}
/**
@@ -87,12 +111,13 @@ public class IotUdpSessionManager {
* @return 是否发送成功
*/
public boolean sendToDevice(Long deviceId, byte[] data, DatagramSocket socket) {
InetSocketAddress address = deviceAddressMap.get(deviceId);
if (address == null) {
log.warn("[sendToDevice][设备地址不存在,设备 ID: {}]", deviceId);
SessionInfo sessionInfo = deviceSessionMap.get(deviceId);
if (sessionInfo == null || sessionInfo.getAddress() == null) {
log.warn("[sendToDevice][设备会话不存在,设备 ID: {}]", deviceId);
return false;
}
InetSocketAddress address = sessionInfo.getAddress();
try {
socket.send(Buffer.buffer(data), address.getPort(), address.getHostString(), result -> {
if (result.succeeded()) {
@@ -134,8 +159,8 @@ public class IotUdpSessionManager {
iterator.remove();
continue;
}
InetSocketAddress address = deviceAddressMap.remove(deviceId);
if (address == null) {
SessionInfo sessionInfo = deviceSessionMap.remove(deviceId);
if (sessionInfo == null) {
iterator.remove();
continue;
}
@@ -157,4 +182,22 @@ public class IotUdpSessionManager {
return address.getHostString() + ":" + address.getPort();
}
/**
* 会话信息
*/
@Data
public static class SessionInfo {
/**
* 设备地址
*/
private InetSocketAddress address;
/**
* 消息编解码类型
*/
private String codecType;
}
}

View File

@@ -1,10 +1,8 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.router;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.datagram.DatagramSocket;
import lombok.extern.slf4j.Slf4j;
@@ -19,18 +17,14 @@ public class IotUdpDownstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotUdpSessionManager sessionManager;
private final IotUdpUpstreamProtocol protocol;
public IotUdpDownstreamHandler(IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotUdpSessionManager sessionManager,
IotUdpUpstreamProtocol protocol) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.sessionManager = sessionManager;
this.protocol = protocol;
}
@@ -45,21 +39,15 @@ public class IotUdpDownstreamHandler {
log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
// 1.1 获取设备信息
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
if (deviceInfo == null) {
log.error("[handle][设备不存在,设备 ID: {}]", message.getDeviceId());
return;
}
// 1.2 检查设备是否在线(即是否有地址映射)
if (sessionManager.isDeviceOffline(message.getDeviceId())) {
// 1. 获取会话信息(包含 codecType
IotUdpSessionManager.SessionInfo sessionInfo = sessionManager.getSessionInfo(message.getDeviceId());
if (sessionInfo == null) {
log.warn("[handle][设备不在线,设备 ID: {}]", message.getDeviceId());
return;
}
// 2. 根据产品 Key 和设备名称编码消息,并发送到设备
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
// 2. 使用会话中的 codecType 编码消息,并发送到设备
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, sessionInfo.getCodecType());
DatagramSocket socket = protocol.getUdpSocket();
if (socket == null) {
log.error("[handle][UDP Socket 不可用,设备 ID: {}]", message.getDeviceId());

View File

@@ -195,8 +195,8 @@ public class IotUdpUpstreamHandler {
// 3.1 生成 JWT Token无状态
String token = deviceTokenService.createToken(device.getProductKey(), device.getDeviceName());
// 3.2 更新设备地址映射(用于下行消息)
sessionManager.updateDeviceAddress(device.getId(), senderAddress);
// 3.2 更新设备会话信息(用于下行消息,保存 codecType
sessionManager.updateDeviceSession(device.getId(), senderAddress, codecType);
// 3.3 发送上线消息
sendOnlineMessage(device);
@@ -298,8 +298,8 @@ public class IotUdpUpstreamHandler {
return;
}
// 3. 更新设备地址映射(保持最新)
sessionManager.updateDeviceAddress(device.getId(), senderAddress);
// 3. 更新设备会话信息(保持最新,保存 codecType
sessionManager.updateDeviceSession(device.getId(), senderAddress, codecType);
// 4. 将 body 设置为实际的 params发送消息到消息总线
message.setParams(body);

View File

@@ -1,21 +1,23 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -24,10 +26,16 @@ import java.util.Map;
*
* <p>测试场景直连设备IotProductDeviceTypeEnum 的 DIRECT 类型)通过 UDP 协议直接连接平台
*
* <p>支持两种编解码格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务UDP 端口 8092</li>
* <li>运行 {@link #testDeviceRegister()} 测试直连设备动态注册(一型一密)</li>
* <li>启动 yudao-module-iot-gateway 服务UDP 端口 8093</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>运行 {@link #testAuth()} 获取设备 token将返回的 token 粘贴到 {@link #TOKEN} 常量</li>
* <li>运行以下测试方法:
* <ul>
@@ -48,6 +56,10 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
private static final int SERVER_PORT = 8093;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
// ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
private static final String DEVICE_NAME = "small";
@@ -65,27 +77,32 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建请求
// 1.1 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", "auth")
.put("params", authReqDTO)
.build());
// 1.2 输出请求
log.info("[testAuth][请求体: {}]", payload);
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testAuth][响应体: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
} else {
log.warn("[testAuth][未收到响应]");
}
}
}
@@ -96,25 +113,30 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
// 1.1 构建请求UDP 协议token 放在 params 中)
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
.put("version", "1.0")
.put("params", withToken(IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
// 1.1 构建属性上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
withToken(IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
.build())))
.build());
// 1.2 输出请求
log.info("[testPropertyPost][请求体: {}]", payload);
.build())),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2. 发送请求
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testPropertyPost][响应体: {}]", response);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testPropertyPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPost][未收到响应]");
}
}
}
@@ -125,60 +147,30 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testEventPost() throws Exception {
// 1.1 构建请求UDP 协议token 放在 params 中)
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
.put("version", "1.0")
.put("params", withToken(IotDeviceEventPostReqDTO.of(
// 1.1 构建事件上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
withToken(IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis())))
.build());
// 1.2 输出请求
log.info("[testEventPost][请求体: {}]", payload);
System.currentTimeMillis())),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testEventPost][响应体: {}]", response);
}
}
// ===================== 动态注册测试 =====================
/**
* 直连设备动态注册测试(一型一密)
* <p>
* 使用产品密钥productSecret验证身份成功后返回设备密钥deviceSecret
* <p>
* 注意:此接口不需要 Token 认证
*/
@Test
public void testDeviceRegister() throws Exception {
// 1.1 构建请求参数
IotDeviceRegisterReqDTO reqDTO = new IotDeviceRegisterReqDTO();
reqDTO.setProductKey(PRODUCT_KEY);
reqDTO.setDeviceName("test-" + System.currentTimeMillis());
reqDTO.setProductSecret("test-product-secret");
// 1.2 构建请求
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod())
.put("params", reqDTO)
.build());
// 1.3 输出请求
log.info("[testDeviceRegister][请求体: {}]", payload);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testDeviceRegister][响应体: {}]", response);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testEventPost][响应消息: {}]", response);
} else {
log.warn("[testEventPost][未收到响应]");
}
}
}
@@ -201,20 +193,18 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
return result;
}
/**
* 发送 UDP 请求并接收响应
*
* @param socket UDP Socket
* @param payload 请求
* @return 响应内容
* @param payload 请求数据
* @return 响应数据
*/
public static String sendAndReceive(DatagramSocket socket, String payload) throws Exception {
byte[] sendData = payload.getBytes(StandardCharsets.UTF_8);
public static byte[] sendAndReceive(DatagramSocket socket, byte[] payload) throws Exception {
InetAddress address = InetAddress.getByName(SERVER_HOST);
// 发送请求
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, address, SERVER_PORT);
DatagramPacket sendPacket = new DatagramPacket(payload, payload.length, address, SERVER_PORT);
socket.send(sendPacket);
// 接收响应
@@ -222,7 +212,9 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
try {
socket.receive(receivePacket);
return new String(receivePacket.getData(), 0, receivePacket.getLength(), StandardCharsets.UTF_8);
byte[] response = new byte[receivePacket.getLength()];
System.arraycopy(receivePacket.getData(), 0, response, 0, receivePacket.getLength());
return response;
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;

View File

@@ -1,10 +1,11 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO;
@@ -12,6 +13,9 @@ import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
@@ -28,9 +32,16 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd
*
* <p>测试场景网关设备IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 UDP 协议管理子设备拓扑关系
*
* <p>支持两种编解码格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务UDP 端口 8093</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>运行 {@link #testAuth()} 获取网关设备 token将返回的 token 粘贴到 {@link #GATEWAY_TOKEN} 常量</li>
* <li>运行以下测试方法:
* <ul>
@@ -50,10 +61,12 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd
@Slf4j
public class IotGatewayDeviceUdpProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8093;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
// ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) =====================
private static final String GATEWAY_PRODUCT_KEY = "m6XcS1ZJ3TW8eC0v";
private static final String GATEWAY_DEVICE_NAME = "sub-ddd";
@@ -76,28 +89,33 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建请求
// 1.1 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", "auth")
.put("params", authReqDTO)
.build());
// 1.2 输出请求
log.info("[testAuth][请求体: {}]", payload);
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testAuth][响应体: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 GATEWAY_TOKEN 常量中]");
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 GATEWAY_TOKEN 常量中]");
} else {
log.warn("[testAuth][未收到响应]");
}
}
}
@@ -120,21 +138,26 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest {
// 1.2 构建请求参数
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO();
params.setSubDevices(Collections.singletonList(subDeviceAuth));
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.TOPO_ADD.getMethod())
.put("version", "1.0")
.put("params", withToken(params))
.build());
// 1.3 输出请求
log.info("[testTopoAdd][请求体: {}]", payload);
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(),
withToken(params),
null, null, null);
// 1.3 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testTopoAdd][响应体: {}]", response);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoAdd][响应消息: {}]", response);
} else {
log.warn("[testTopoAdd][未收到响应]");
}
}
}
@@ -149,21 +172,26 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest {
IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO();
params.setSubDevices(Collections.singletonList(
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod())
.put("version", "1.0")
.put("params", withToken(params))
.build());
// 1.2 输出请求
log.info("[testTopoDelete][请求体: {}]", payload);
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(),
withToken(params),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testTopoDelete][响应体: {}]", response);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoDelete][响应消息: {}]", response);
} else {
log.warn("[testTopoDelete][未收到响应]");
}
}
}
@@ -176,21 +204,26 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest {
public void testTopoGet() throws Exception {
// 1.1 构建请求参数(目前为空,预留扩展)
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.TOPO_GET.getMethod())
.put("version", "1.0")
.put("params", withToken(params))
.build());
// 1.2 输出请求
log.info("[testTopoGet][请求体: {}]", payload);
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(),
withToken(params),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testTopoGet][响应体: {}]", response);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoGet][响应消息: {}]", response);
} else {
log.warn("[testTopoGet][未收到响应]");
}
}
}
@@ -209,21 +242,26 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest {
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO();
subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY);
subDevice.setDeviceName("mougezishebei");
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod())
.put("version", "1.0")
.put("params", withToken(Collections.singletonList(subDevice)))
.build());
// 1.2 输出请求
log.info("[testSubDeviceRegister][请求体: {}]", payload);
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(),
withToken(Collections.singletonList(subDevice)),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testSubDeviceRegister][响应体: {}]", response);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testSubDeviceRegister][响应消息: {}]", response);
} else {
log.warn("[testSubDeviceRegister][未收到响应]");
}
}
}
@@ -268,21 +306,26 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest {
params.setProperties(gatewayProperties);
params.setEvents(gatewayEvents);
params.setSubDevices(List.of(subDeviceData));
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod())
.put("version", "1.0")
.put("params", withToken(params))
.build());
// 1.7 输出请求
log.info("[testPropertyPackPost][请求体: {}]", payload);
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(),
withToken(params),
null, null, null);
// 1.7 编码
byte[] payload = CODEC.encode(request);
log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testPropertyPackPost][响应体: {}]", response);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testPropertyPackPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPackPost][未收到响应]");
}
}
}

View File

@@ -1,13 +1,17 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
@@ -25,10 +29,17 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd
* <p><b>重要说明子设备无法直接连接平台所有请求均由网关设备Gateway代为转发。</b>
* <p>网关设备转发子设备请求时Token 使用子设备自己的信息。
*
* <p>支持两种编解码格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务UDP 端口 8093</li>
* <li>确保子设备已通过 {@link IotGatewayDeviceUdpProtocolIntegrationTest#testTopoAdd()} 绑定到网关</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>运行 {@link #testAuth()} 获取子设备 token将返回的 token 粘贴到 {@link #TOKEN} 常量</li>
* <li>运行以下测试方法:
* <ul>
@@ -45,10 +56,12 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUd
@Slf4j
public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8093;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
// ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
private static final String PRODUCT_KEY = "jAufEMTF1W6wnPhn";
private static final String DEVICE_NAME = "chazuo-it";
@@ -66,27 +79,32 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建请求
// 1.1 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", "auth")
.put("params", authReqDTO)
.build());
// 1.2 输出请求
log.info("[testAuth][请求体: {}]", payload);
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testAuth][响应体: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
} else {
log.warn("[testAuth][未收到响应]");
}
}
}
@@ -97,27 +115,32 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
// 1.1 构建请求UDP 协议token 放在 params 中)
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
.put("version", "1.0")
.put("params", withToken(IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
// 1.1 构建属性上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
withToken(IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("power", 100)
.put("status", "online")
.put("temperature", 36.5)
.build())))
.build());
// 1.2 输出请求
.build())),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]");
log.info("[testPropertyPost][请求体: {}]", payload);
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testPropertyPost][响应体: {}]", response);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testPropertyPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPost][未收到响应]");
}
}
}
@@ -128,12 +151,11 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testEventPost() throws Exception {
// 1.1 构建请求UDP 协议token 放在 params 中)
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
.put("version", "1.0")
.put("params", withToken(IotDeviceEventPostReqDTO.of(
// 1.1 构建事件上报消息UDP 协议token 放在 params 中)
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
withToken(IotDeviceEventPostReqDTO.of(
"alarm",
MapUtil.<String, Object>builder()
.put("level", "warning")
@@ -141,18 +163,24 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
.put("threshold", 40)
.put("current", 42)
.build(),
System.currentTimeMillis())))
.build());
// 1.2 输出请求
System.currentTimeMillis())),
null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]");
log.info("[testEventPost][请求体: {}]", payload);
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testEventPost][响应体: {}]", response);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testEventPost][响应消息: {}]", response);
} else {
log.warn("[testEventPost][未收到响应]");
}
}
}