feat(iot):tcp 协议完善 IotDirectDeviceUdpProtocolIntegrationTest 单测,并支持动态注册

This commit is contained in:
YunaiV
2026-01-25 20:28:42 +08:00
parent eb5fa9fd91
commit 4b67fc2d65
5 changed files with 369 additions and 166 deletions

View File

@@ -518,9 +518,9 @@ public class IotGatewayProperties {
private Boolean enabled;
/**
* 服务端口(默认 8092
* 服务端口(默认 8093
*/
private Integer port = 8092;
private Integer port = 8093;
/**
* 接收缓冲区大小(默认 64KB

View File

@@ -9,7 +9,11 @@ import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
@@ -129,6 +133,9 @@ public class IotUdpUpstreamHandler {
if (AUTH_METHOD.equals(message.getMethod())) {
// 认证请求
handleAuthenticationRequest(message, codecType, senderAddress, socket);
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) {
// 设备动态注册请求
handleRegisterRequest(message, codecType, senderAddress, socket);
} else {
// 业务消息
handleBusinessRequest(message, codecType, senderAddress, socket);
@@ -168,7 +175,7 @@ public class IotUdpUpstreamHandler {
}
// 2.1 解析设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
if (deviceInfo == null) {
sendErrorResponse(socket, senderAddress, message.getRequestId(), "解析设备信息失败", codecType);
return;
@@ -200,6 +207,45 @@ public class IotUdpUpstreamHandler {
}
}
/**
* 处理设备动态注册请求(一型一密,不需要 Token
*
* @param message 消息信息
* @param codecType 消息编解码类型
* @param senderAddress 发送者地址
* @param socket UDP Socket
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
private void handleRegisterRequest(IotDeviceMessage message, String codecType,
InetSocketAddress senderAddress, DatagramSocket socket) {
String addressKey = sessionManager.buildAddressKey(senderAddress);
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO registerParams = parseRegisterParams(message.getParams());
if (registerParams == null) {
log.warn("[handleRegisterRequest][注册参数解析失败,来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "注册参数不完整", codecType);
return;
}
// 2. 调用动态注册
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(registerParams);
if (result.isError()) {
log.warn("[handleRegisterRequest][注册失败,来源: {},错误: {}]", addressKey, result.getMsg());
sendErrorResponse(socket, senderAddress, message.getRequestId(), result.getMsg(), codecType);
return;
}
// 3. 发送成功响应(包含 deviceSecret
sendRegisterSuccessResponse(socket, senderAddress, message.getRequestId(), result.getData(), codecType);
log.info("[handleRegisterRequest][注册成功,设备名: {},来源: {}]",
registerParams.getDeviceName(), addressKey);
} catch (Exception e) {
log.error("[handleRegisterRequest][注册处理异常,来源: {}]", addressKey, e);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "注册处理异常", codecType);
}
}
/**
* 处理业务请求
*
@@ -225,7 +271,7 @@ public class IotUdpUpstreamHandler {
return;
}
// 1.2 验证 token获取设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token);
IotDeviceIdentity deviceInfo = deviceTokenService.verifyToken(token);
if (deviceInfo == null) {
log.warn("[handleBusinessRequest][token 无效或已过期,来源: {}]", addressKey);
sendErrorResponse(socket, senderAddress, message.getRequestId(), "token 无效或已过期", codecType);
@@ -317,14 +363,15 @@ public class IotUdpUpstreamHandler {
private void sendAuthSuccessResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, String token, String codecType) {
try {
// 构建响应数据
Object responseData = MapUtil.builder()
.put("success", true)
.put("token", token)
.put("message", "认证成功")
.build();
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, 0, "认证成功");
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
// 发送响应
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), result -> {
if (result.failed()) {
log.error("[sendAuthSuccessResponse][发送认证成功响应失败,地址: {}]",
@@ -337,6 +384,41 @@ public class IotUdpUpstreamHandler {
}
}
/**
* 发送注册成功响应(包含 deviceSecret
*
* @param socket UDP Socket
* @param address 目标地址
* @param requestId 请求 ID
* @param registerResp 注册响应
* @param codecType 消息编解码类型
*/
private void sendRegisterSuccessResponse(DatagramSocket socket, InetSocketAddress address,
String requestId, IotDeviceRegisterRespDTO registerResp,
String codecType) {
try {
// 构建响应数据
Object responseData = MapUtil.builder()
.put("success", true)
.put("deviceSecret", registerResp.getDeviceSecret())
.put("message", "注册成功")
.build();
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), responseData, 0, "注册成功");
// 发送响应
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), result -> {
if (result.failed()) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应失败,地址: {}]",
sessionManager.buildAddressKey(address), result.cause());
}
});
} catch (Exception e) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应异常,地址: {}]",
sessionManager.buildAddressKey(address), e);
}
}
/**
* 发送成功响应
*
@@ -437,4 +519,46 @@ public class IotUdpUpstreamHandler {
}
}
/**
* 解析注册参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 注册参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceRegisterReqDTO parseRegisterParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
String productKey = MapUtil.getStr(paramMap, "productKey");
String deviceName = MapUtil.getStr(paramMap, "deviceName");
String productSecret = MapUtil.getStr(paramMap, "productSecret");
if (StrUtil.hasBlank(productKey, deviceName, productSecret)) {
return null;
}
return new IotDeviceRegisterReqDTO()
.setProductKey(productKey)
.setDeviceName(deviceName)
.setProductSecret(productSecret);
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceRegisterReqDTO) {
return (IotDeviceRegisterReqDTO) params;
}
// 其他情况尝试 JSON 转换
String jsonStr = JsonUtils.toJsonString(params);
return JsonUtils.parseObject(jsonStr, IotDeviceRegisterReqDTO.class);
} catch (Exception e) {
log.error("[parseRegisterParams][解析注册参数({})失败]", params, e);
return null;
}
}
}

View File

@@ -100,7 +100,7 @@ yudao:
# ====================================
udp:
enabled: false # 是否启用 UDP
port: 8092 # UDP 服务端口
port: 8093 # UDP 服务端口
receive-buffer-size: 65536 # 接收缓冲区大小(字节,默认 64KB
send-buffer-size: 65536 # 发送缓冲区大小(字节,默认 64KB
session-timeout-ms: 60000 # 会话超时时间(毫秒,默认 60 秒)

View File

@@ -0,0 +1,239 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* IoT 直连设备 UDP 协议集成测试(手动测试)
*
* <p>测试场景直连设备IotProductDeviceTypeEnum 的 DIRECT 类型)通过 UDP 协议直接连接平台
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务UDP 端口 8092</li>
* <li>运行 {@link #testDeviceRegister()} 测试直连设备动态注册(一型一密)</li>
* <li>运行 {@link #testAuth()} 获取设备 token将返回的 token 粘贴到 {@link #TOKEN} 常量</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testPropertyPost()} - 设备属性上报</li>
* <li>{@link #testEventPost()} - 设备事件上报</li>
* </ul>
* </li>
* </ol>
*
* <p>注意UDP 协议是无状态的,每次请求需要在 params 中携带 token与 HTTP 通过 Header 传递不同)
*
* @author 芋道源码
*/
@Slf4j
public class IotDirectDeviceUdpProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8093;
private static final int TIMEOUT_MS = 5000;
// ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
private static final String DEVICE_NAME = "small";
private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3";
/**
* 直连设备 Token从 {@link #testAuth()} 方法获取后,粘贴到这里
*/
private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTk0ODYzOCwiZGV2aWNlTmFtZSI6InNtYWxsIn0.TrOJisXhloZ3quLBOAIyowmpq6Syp9PHiEpfj-nQ9xo";
// ===================== 认证测试 =====================
/**
* 认证测试:获取设备 Token
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建请求
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", "auth")
.put("params", authReqDTO)
.build());
// 1.2 输出请求
log.info("[testAuth][请求体: {}]", payload);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testAuth][响应体: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
}
}
// ===================== 直连设备属性上报测试 =====================
/**
* 属性上报测试
*/
@Test
public void testPropertyPost() throws Exception {
// 1.1 构建请求UDP 协议token 放在 params 中)
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
.put("version", "1.0")
.put("params", withToken(IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
.build())))
.build());
// 1.2 输出请求
log.info("[testPropertyPost][请求体: {}]", payload);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testPropertyPost][响应体: {}]", response);
}
}
// ===================== 直连设备事件上报测试 =====================
/**
* 事件上报测试
*/
@Test
public void testEventPost() throws Exception {
// 1.1 构建请求UDP 协议token 放在 params 中)
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
.put("version", "1.0")
.put("params", withToken(IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis())))
.build());
// 1.2 输出请求
log.info("[testEventPost][请求体: {}]", payload);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testEventPost][响应体: {}]", response);
}
}
// ===================== 动态注册测试 =====================
/**
* 直连设备动态注册测试(一型一密)
* <p>
* 使用产品密钥productSecret验证身份成功后返回设备密钥deviceSecret
* <p>
* 注意:此接口不需要 Token 认证
*/
@Test
public void testDeviceRegister() throws Exception {
// 1.1 构建请求参数
IotDeviceRegisterReqDTO reqDTO = new IotDeviceRegisterReqDTO();
reqDTO.setProductKey(PRODUCT_KEY);
reqDTO.setDeviceName("test-" + System.currentTimeMillis());
reqDTO.setProductSecret("test-product-secret");
// 1.2 构建请求
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod())
.put("params", reqDTO)
.build());
// 1.3 输出请求
log.info("[testDeviceRegister][请求体: {}]", payload);
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
String response = sendAndReceive(socket, payload);
// 2.2 输出结果
log.info("[testDeviceRegister][响应体: {}]", response);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
}
}
// ===================== 辅助方法 =====================
/**
* 将 token 添加到 params 中
* <p>
* 支持 Map 或普通对象,通过 JSON 转换统一处理
*
* @param params 原始参数Map 或对象)
* @return 添加了 token 的 Map
*/
@SuppressWarnings("unchecked")
private Map<String, Object> withToken(Object params) {
// 1. 转成 Map
Map<String, Object> map;
if (params instanceof Map) {
map = new HashMap<>((Map<String, Object>) params);
} else {
// 对象转 Map通过 JSON 序列化再反序列化)
map = JsonUtils.parseObject(JsonUtils.toJsonString(params), Map.class);
}
// 2. 添加 token
if (map != null) {
map.put("token", TOKEN);
}
return map;
}
/**
* 发送 UDP 请求并接收响应
*
* @param socket UDP Socket
* @param payload 请求体
* @return 响应内容
*/
private String sendAndReceive(DatagramSocket socket, String payload) throws Exception {
byte[] sendData = payload.getBytes(StandardCharsets.UTF_8);
InetAddress address = InetAddress.getByName(SERVER_HOST);
// 发送请求
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, address, SERVER_PORT);
socket.send(sendPacket);
// 接收响应
byte[] receiveData = new byte[4096];
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
try {
socket.receive(receivePacket);
return new String(receivePacket.getData(), 0, receivePacket.getLength(), StandardCharsets.UTF_8);
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
}
}

View File

@@ -1,160 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
/**
* IoT 网关 UDP 协议集成测试(手动测试)
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务UDP 端口 8092</li>
* <li>运行 {@link #testAuth()} 获取 token将返回的 token 粘贴到 {@link #TOKEN} 常量</li>
* <li>运行 {@link #testPropertyPost()} 测试属性上报,或运行 {@link #testEventPost()} 测试事件上报</li>
* </ol>
*
* @author 芋道源码
*/
@Slf4j
public class IotUdpProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8092;
private static final int TIMEOUT_MS = 5000;
// 设备信息(根据实际情况修改 PRODUCT_KEY、DEVICE_NAME、PASSWORD
private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
private static final String DEVICE_NAME = "small";
private static final String PASSWORD = "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75";
// TODO @芋艿1、IotDeviceAuthUtils 调整下拼接2、password 的生成3、后续给 http 也整个单测4、后续给 tcp 也整个单测5、后续给 mqtt 也整个单测6、后续给 emqp 也整个单测
private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME;
private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY;
/**
* 设备 Token从 {@link #testAuth()} 方法获取后,粘贴到这里
*/
private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMxMTY0NiwiZGV2aWNlTmFtZSI6InNtYWxsIn0.re6LCaRfKiE9VQTP3w0Brh2ScVIgrvN3H96z_snndoM";
/**
* 认证测试:获取设备 Token
*/
@Test
public void testAuth() throws Exception {
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", "auth")
.put("params", MapUtil.builder()
.put("clientId", CLIENT_ID)
.put("username", USERNAME)
.put("password", PASSWORD)
.build())
.build());
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
log.info("[testAuth][请求体: {}]", payload);
String response = sendAndReceive(socket, payload);
log.info("[testAuth][响应体: {}]", response);
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
}
}
/**
* 属性上报测试
*/
@Test
public void testPropertyPost() throws Exception {
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
.put("version", "1.0")
.put("params", MapUtil.builder()
.put("token", TOKEN)
.put("width", 1)
.put("height", "2")
.build())
.build());
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
log.info("[testPropertyPost][请求体: {}]", payload);
String response = sendAndReceive(socket, payload);
log.info("[testPropertyPost][响应体: {}]", response);
}
}
/**
* 事件上报测试
*/
@Test
public void testEventPost() throws Exception {
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
.put("version", "1.0")
.put("params", MapUtil.builder()
.put("identifier", "eat")
.put("value", MapUtil.builder()
.put("width", 1)
.put("height", "2")
.put("oneThree", "3")
.build())
.put("time", System.currentTimeMillis())
.build())
.build());
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(TIMEOUT_MS);
log.info("[testEventPost][请求体: {}]", payload);
String response = sendAndReceive(socket, payload);
log.info("[testEventPost][响应体: {}]", response);
}
}
/**
* 发送 UDP 请求并接收响应
*
* @param socket UDP Socket
* @param payload 请求体
* @return 响应内容
*/
private String sendAndReceive(DatagramSocket socket, String payload) throws Exception {
byte[] sendData = payload.getBytes(StandardCharsets.UTF_8);
InetAddress address = InetAddress.getByName(SERVER_HOST);
// 发送请求
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, address, SERVER_PORT);
socket.send(sendPacket);
// 接收响应
byte[] receiveData = new byte[4096];
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
try {
socket.receive(receivePacket);
return new String(receivePacket.getData(), 0, receivePacket.getLength(), StandardCharsets.UTF_8);
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
}
}