mirror of
https://gitee.com/yudaocode/yudao-boot-mini.git
synced 2026-03-22 05:27:15 +08:00
feat:【iot】UDP 协议:1)兼容 TOKEN 在 list 的情况,基于 token、body 拆分;2)新增网关设备的单测;3)去掉 udp 默认响应,避免响应多次。
This commit is contained in:
@@ -52,6 +52,10 @@ public class IotUdpUpstreamHandler {
|
||||
* Token 参数 Key
|
||||
*/
|
||||
private static final String PARAM_KEY_TOKEN = "token";
|
||||
/**
|
||||
* Body 参数 Key(实际请求内容)
|
||||
*/
|
||||
private static final String PARAM_KEY_BODY = "body";
|
||||
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
@@ -248,6 +252,10 @@ public class IotUdpUpstreamHandler {
|
||||
|
||||
/**
|
||||
* 处理业务请求
|
||||
* <p>
|
||||
* 请求参数格式:
|
||||
* - token:JWT 令牌
|
||||
* - body:实际请求内容(可以是 Map、List 或其他类型)
|
||||
*
|
||||
* @param message 消息信息
|
||||
* @param codecType 消息编解码类型
|
||||
@@ -259,11 +267,13 @@ public class IotUdpUpstreamHandler {
|
||||
InetSocketAddress senderAddress, DatagramSocket socket) {
|
||||
String addressKey = sessionManager.buildAddressKey(senderAddress);
|
||||
try {
|
||||
// 1.1 从消息中提取 token(无状态:消息体携带 token)
|
||||
// 1.1 从消息中提取 token 和 body(格式:{token: "xxx", body: {...}} 或 {token: "xxx", body: [...]})
|
||||
String token = null;
|
||||
Object body = null;
|
||||
if (message.getParams() instanceof Map) {
|
||||
Map<String, Object> paramsMap = (Map<String, Object>) message.getParams();
|
||||
token = (String) paramsMap.remove(PARAM_KEY_TOKEN);
|
||||
token = (String) paramsMap.get(PARAM_KEY_TOKEN);
|
||||
body = paramsMap.get(PARAM_KEY_BODY);
|
||||
}
|
||||
if (StrUtil.isBlank(token)) {
|
||||
log.warn("[handleBusinessRequest][缺少 token,来源: {}]", addressKey);
|
||||
@@ -291,12 +301,10 @@ public class IotUdpUpstreamHandler {
|
||||
// 3. 更新设备地址映射(保持最新)
|
||||
sessionManager.updateDeviceAddress(device.getId(), senderAddress);
|
||||
|
||||
// 4. 发送消息到消息总线
|
||||
// 4. 将 body 设置为实际的 params,发送消息到消息总线
|
||||
message.setParams(body);
|
||||
deviceMessageService.sendDeviceMessage(message, device.getProductKey(),
|
||||
device.getDeviceName(), serverId);
|
||||
|
||||
// 5. 发送成功响应
|
||||
sendSuccessResponse(socket, senderAddress, message.getRequestId(), "处理成功", codecType);
|
||||
log.debug("[handleBusinessRequest][业务消息处理成功,设备 ID: {},方法: {},来源: {}]",
|
||||
device.getId(), message.getMethod(), addressKey);
|
||||
} catch (Exception e) {
|
||||
@@ -419,21 +427,6 @@ public class IotUdpUpstreamHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送成功响应
|
||||
*
|
||||
* @param socket UDP Socket
|
||||
* @param address 目标地址
|
||||
* @param requestId 请求 ID
|
||||
* @param message 消息
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
@SuppressWarnings("SameParameterValue")
|
||||
private void sendSuccessResponse(DatagramSocket socket, InetSocketAddress address,
|
||||
String requestId, String message, String codecType) {
|
||||
sendResponse(socket, address, true, message, requestId, codecType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送错误响应
|
||||
*
|
||||
@@ -458,18 +451,20 @@ public class IotUdpUpstreamHandler {
|
||||
* @param requestId 请求 ID
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
@SuppressWarnings("SameParameterValue")
|
||||
private void sendResponse(DatagramSocket socket, InetSocketAddress address, boolean success,
|
||||
String message, String requestId, String codecType) {
|
||||
try {
|
||||
// 构建响应数据
|
||||
Object responseData = MapUtil.builder()
|
||||
.put("success", success)
|
||||
.put("message", message)
|
||||
.build();
|
||||
|
||||
int code = success ? 0 : 401;
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, "response", responseData,
|
||||
code, message);
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
|
||||
"response", responseData, code, message);
|
||||
|
||||
// 发送响应
|
||||
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
|
||||
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), ar -> {
|
||||
if (ar.failed()) {
|
||||
|
||||
@@ -109,7 +109,7 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
|
||||
// 1.2 输出请求
|
||||
log.info("[testPropertyPost][请求体: {}]", payload);
|
||||
|
||||
// 2.1 发送请求
|
||||
// 2. 发送请求
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
String response = sendAndReceive(socket, payload);
|
||||
@@ -185,30 +185,23 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
|
||||
// ===================== 辅助方法 =====================
|
||||
|
||||
/**
|
||||
* 将 token 添加到 params 中
|
||||
* 构建带 token 的 params
|
||||
* <p>
|
||||
* 支持 Map 或普通对象,通过 JSON 转换统一处理
|
||||
* 返回格式:{token: "xxx", body: params}
|
||||
* - token:JWT 令牌
|
||||
* - body:实际请求内容(可以是 Map、List 或其他类型)
|
||||
*
|
||||
* @param params 原始参数(Map 或对象)
|
||||
* @return 添加了 token 的 Map
|
||||
* @param params 原始参数(Map、List 或对象)
|
||||
* @return 包含 token 和 body 的 Map
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, Object> withToken(Object params) {
|
||||
// 1. 转成 Map
|
||||
Map<String, Object> map;
|
||||
if (params instanceof Map) {
|
||||
map = new HashMap<>((Map<String, Object>) params);
|
||||
} else {
|
||||
// 对象转 Map(通过 JSON 序列化再反序列化)
|
||||
map = JsonUtils.parseObject(JsonUtils.toJsonString(params), Map.class);
|
||||
}
|
||||
// 2. 添加 token
|
||||
if (map != null) {
|
||||
map.put("token", TOKEN);
|
||||
}
|
||||
return map;
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
result.put("token", TOKEN);
|
||||
result.put("body", params);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 发送 UDP 请求并接收响应
|
||||
*
|
||||
@@ -216,7 +209,7 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
|
||||
* @param payload 请求体
|
||||
* @return 响应内容
|
||||
*/
|
||||
private String sendAndReceive(DatagramSocket socket, String payload) throws Exception {
|
||||
public static String sendAndReceive(DatagramSocket socket, String payload) throws Exception {
|
||||
byte[] sendData = payload.getBytes(StandardCharsets.UTF_8);
|
||||
InetAddress address = InetAddress.getByName(SERVER_HOST);
|
||||
|
||||
|
||||
@@ -0,0 +1,308 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.net.DatagramSocket;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUdpProtocolIntegrationTest.sendAndReceive;
|
||||
|
||||
/**
|
||||
* IoT 网关设备 UDP 协议集成测试(手动测试)
|
||||
*
|
||||
* <p>测试场景:网关设备(IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 UDP 协议管理子设备拓扑关系
|
||||
*
|
||||
* <p>使用步骤:
|
||||
* <ol>
|
||||
* <li>启动 yudao-module-iot-gateway 服务(UDP 端口 8093)</li>
|
||||
* <li>运行 {@link #testAuth()} 获取网关设备 token,将返回的 token 粘贴到 {@link #GATEWAY_TOKEN} 常量</li>
|
||||
* <li>运行以下测试方法:
|
||||
* <ul>
|
||||
* <li>{@link #testTopoAdd()} - 添加子设备拓扑关系</li>
|
||||
* <li>{@link #testTopoDelete()} - 删除子设备拓扑关系</li>
|
||||
* <li>{@link #testTopoGet()} - 获取子设备拓扑关系</li>
|
||||
* <li>{@link #testSubDeviceRegister()} - 子设备动态注册</li>
|
||||
* <li>{@link #testPropertyPackPost()} - 批量上报属性(网关 + 子设备)</li>
|
||||
* </ul>
|
||||
* </li>
|
||||
* </ol>
|
||||
*
|
||||
* <p>注意:UDP 协议是无状态的,每次请求需要在 params 中携带 token(与 HTTP 通过 Header 传递不同)
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotGatewayDeviceUdpProtocolIntegrationTest {
|
||||
|
||||
private static final String SERVER_HOST = "127.0.0.1";
|
||||
private static final int SERVER_PORT = 8093;
|
||||
private static final int TIMEOUT_MS = 5000;
|
||||
|
||||
// ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) =====================
|
||||
private static final String GATEWAY_PRODUCT_KEY = "m6XcS1ZJ3TW8eC0v";
|
||||
private static final String GATEWAY_DEVICE_NAME = "sub-ddd";
|
||||
private static final String GATEWAY_DEVICE_SECRET = "b3d62c70f8a4495487ed1d35d61ac2b3";
|
||||
|
||||
/**
|
||||
* 网关设备 Token:从 {@link #testAuth()} 方法获取后,粘贴到这里
|
||||
*/
|
||||
private static final String GATEWAY_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoibTZYY1MxWkozVFc4ZUMwdiIsImV4cCI6MTc2OTk1NDcxNSwiZGV2aWNlTmFtZSI6InN1Yi1kZGQifQ.Vg5iateNrpg0FVQI2eJomggxrYXGpwug8wsz9BsVr5w";
|
||||
|
||||
// ===================== 子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
|
||||
private static final String SUB_DEVICE_PRODUCT_KEY = "jAufEMTF1W6wnPhn";
|
||||
private static final String SUB_DEVICE_NAME = "chazuo-it";
|
||||
private static final String SUB_DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af";
|
||||
|
||||
// ===================== 认证测试 =====================
|
||||
|
||||
/**
|
||||
* 网关设备认证测试:获取网关设备 Token
|
||||
*/
|
||||
@Test
|
||||
public void testAuth() throws Exception {
|
||||
// 1.1 构建请求
|
||||
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
|
||||
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
|
||||
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
|
||||
.setClientId(authInfo.getClientId())
|
||||
.setUsername(authInfo.getUsername())
|
||||
.setPassword(authInfo.getPassword());
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", "auth")
|
||||
.put("params", authReqDTO)
|
||||
.build());
|
||||
// 1.2 输出请求
|
||||
log.info("[testAuth][请求体: {}]", payload);
|
||||
|
||||
// 2.1 发送请求
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
String response = sendAndReceive(socket, payload);
|
||||
// 2.2 输出结果
|
||||
log.info("[testAuth][响应体: {}]", response);
|
||||
log.info("[testAuth][请将返回的 token 复制到 GATEWAY_TOKEN 常量中]");
|
||||
}
|
||||
}
|
||||
|
||||
// ===================== 拓扑管理测试 =====================
|
||||
|
||||
/**
|
||||
* 添加子设备拓扑关系测试
|
||||
* <p>
|
||||
* 网关设备向平台上报需要绑定的子设备信息
|
||||
*/
|
||||
@Test
|
||||
public void testTopoAdd() throws Exception {
|
||||
// 1.1 构建子设备认证信息
|
||||
IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo(
|
||||
SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET);
|
||||
IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO()
|
||||
.setClientId(subAuthInfo.getClientId())
|
||||
.setUsername(subAuthInfo.getUsername())
|
||||
.setPassword(subAuthInfo.getPassword());
|
||||
// 1.2 构建请求参数
|
||||
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO();
|
||||
params.setSubDevices(Collections.singletonList(subDeviceAuth));
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.TOPO_ADD.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("params", withToken(params))
|
||||
.build());
|
||||
// 1.3 输出请求
|
||||
log.info("[testTopoAdd][请求体: {}]", payload);
|
||||
|
||||
// 2.1 发送请求
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
String response = sendAndReceive(socket, payload);
|
||||
// 2.2 输出结果
|
||||
log.info("[testTopoAdd][响应体: {}]", response);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除子设备拓扑关系测试
|
||||
* <p>
|
||||
* 网关设备向平台上报需要解绑的子设备信息
|
||||
*/
|
||||
@Test
|
||||
public void testTopoDelete() throws Exception {
|
||||
// 1.1 构建请求参数
|
||||
IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO();
|
||||
params.setSubDevices(Collections.singletonList(
|
||||
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("params", withToken(params))
|
||||
.build());
|
||||
// 1.2 输出请求
|
||||
log.info("[testTopoDelete][请求体: {}]", payload);
|
||||
|
||||
// 2.1 发送请求
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
String response = sendAndReceive(socket, payload);
|
||||
// 2.2 输出结果
|
||||
log.info("[testTopoDelete][响应体: {}]", response);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取子设备拓扑关系测试
|
||||
* <p>
|
||||
* 网关设备向平台查询已绑定的子设备列表
|
||||
*/
|
||||
@Test
|
||||
public void testTopoGet() throws Exception {
|
||||
// 1.1 构建请求参数(目前为空,预留扩展)
|
||||
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.TOPO_GET.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("params", withToken(params))
|
||||
.build());
|
||||
// 1.2 输出请求
|
||||
log.info("[testTopoGet][请求体: {}]", payload);
|
||||
|
||||
// 2.1 发送请求
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
String response = sendAndReceive(socket, payload);
|
||||
// 2.2 输出结果
|
||||
log.info("[testTopoGet][响应体: {}]", response);
|
||||
}
|
||||
}
|
||||
|
||||
// ===================== 子设备注册测试 =====================
|
||||
|
||||
/**
|
||||
* 子设备动态注册测试
|
||||
* <p>
|
||||
* 网关设备代理子设备进行动态注册,平台返回子设备的 deviceSecret
|
||||
* <p>
|
||||
* 注意:此接口需要网关 Token 认证
|
||||
*/
|
||||
@Test
|
||||
public void testSubDeviceRegister() throws Exception {
|
||||
// 1.1 构建请求参数
|
||||
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO();
|
||||
subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY);
|
||||
subDevice.setDeviceName("mougezishebei");
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("params", withToken(Collections.singletonList(subDevice)))
|
||||
.build());
|
||||
// 1.2 输出请求
|
||||
log.info("[testSubDeviceRegister][请求体: {}]", payload);
|
||||
|
||||
// 2.1 发送请求
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
String response = sendAndReceive(socket, payload);
|
||||
// 2.2 输出结果
|
||||
log.info("[testSubDeviceRegister][响应体: {}]", response);
|
||||
}
|
||||
}
|
||||
|
||||
// ===================== 批量上报测试 =====================
|
||||
|
||||
/**
|
||||
* 批量上报属性测试(网关 + 子设备)
|
||||
* <p>
|
||||
* 网关设备批量上报自身属性、事件,以及子设备的属性、事件
|
||||
*/
|
||||
@Test
|
||||
public void testPropertyPackPost() throws Exception {
|
||||
// 1.1 构建【网关设备】自身属性
|
||||
Map<String, Object> gatewayProperties = MapUtil.<String, Object>builder()
|
||||
.put("temperature", 25.5)
|
||||
.build();
|
||||
// 1.2 构建【网关设备】自身事件
|
||||
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
|
||||
gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build());
|
||||
gatewayEvent.setTime(System.currentTimeMillis());
|
||||
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> gatewayEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
|
||||
.put("statusReport", gatewayEvent)
|
||||
.build();
|
||||
// 1.3 构建【网关子设备】属性
|
||||
Map<String, Object> subDeviceProperties = MapUtil.<String, Object>builder()
|
||||
.put("power", 100)
|
||||
.build();
|
||||
// 1.4 构建【网关子设备】事件
|
||||
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
|
||||
subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build());
|
||||
subDeviceEvent.setTime(System.currentTimeMillis());
|
||||
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> subDeviceEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
|
||||
.put("healthCheck", subDeviceEvent)
|
||||
.build();
|
||||
// 1.5 构建子设备数据
|
||||
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData();
|
||||
subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME));
|
||||
subDeviceData.setProperties(subDeviceProperties);
|
||||
subDeviceData.setEvents(subDeviceEvents);
|
||||
// 1.6 构建请求参数
|
||||
IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO();
|
||||
params.setProperties(gatewayProperties);
|
||||
params.setEvents(gatewayEvents);
|
||||
params.setSubDevices(List.of(subDeviceData));
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("params", withToken(params))
|
||||
.build());
|
||||
// 1.7 输出请求
|
||||
log.info("[testPropertyPackPost][请求体: {}]", payload);
|
||||
|
||||
// 2.1 发送请求
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
String response = sendAndReceive(socket, payload);
|
||||
// 2.2 输出结果
|
||||
log.info("[testPropertyPackPost][响应体: {}]", response);
|
||||
}
|
||||
}
|
||||
|
||||
// ===================== 辅助方法 =====================
|
||||
|
||||
/**
|
||||
* 构建带 token 的 params
|
||||
* <p>
|
||||
* 返回格式:{token: "xxx", body: params}
|
||||
* - token:JWT 令牌
|
||||
* - body:实际请求内容(可以是 Map、List 或其他类型)
|
||||
*
|
||||
* @param params 原始参数(Map、List 或对象)
|
||||
* @return 包含 token 和 body 的 Map
|
||||
*/
|
||||
private Map<String, Object> withToken(Object params) {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
result.put("token", GATEWAY_TOKEN);
|
||||
result.put("body", params);
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,178 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.net.DatagramSocket;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotDirectDeviceUdpProtocolIntegrationTest.sendAndReceive;
|
||||
|
||||
/**
|
||||
* IoT 网关子设备 UDP 协议集成测试(手动测试)
|
||||
*
|
||||
* <p>测试场景:子设备(IotProductDeviceTypeEnum 的 SUB 类型)通过网关设备代理上报数据
|
||||
*
|
||||
* <p><b>重要说明:子设备无法直接连接平台,所有请求均由网关设备(Gateway)代为转发。</b>
|
||||
* <p>网关设备转发子设备请求时,Token 使用子设备自己的信息。
|
||||
*
|
||||
* <p>使用步骤:
|
||||
* <ol>
|
||||
* <li>启动 yudao-module-iot-gateway 服务(UDP 端口 8093)</li>
|
||||
* <li>确保子设备已通过 {@link IotGatewayDeviceUdpProtocolIntegrationTest#testTopoAdd()} 绑定到网关</li>
|
||||
* <li>运行 {@link #testAuth()} 获取子设备 token,将返回的 token 粘贴到 {@link #TOKEN} 常量</li>
|
||||
* <li>运行以下测试方法:
|
||||
* <ul>
|
||||
* <li>{@link #testPropertyPost()} - 子设备属性上报(由网关代理转发)</li>
|
||||
* <li>{@link #testEventPost()} - 子设备事件上报(由网关代理转发)</li>
|
||||
* </ul>
|
||||
* </li>
|
||||
* </ol>
|
||||
*
|
||||
* <p>注意:UDP 协议是无状态的,每次请求需要在 params 中携带 token(与 HTTP 通过 Header 传递不同)
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
|
||||
|
||||
private static final String SERVER_HOST = "127.0.0.1";
|
||||
private static final int SERVER_PORT = 8093;
|
||||
private static final int TIMEOUT_MS = 5000;
|
||||
|
||||
// ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
|
||||
private static final String PRODUCT_KEY = "jAufEMTF1W6wnPhn";
|
||||
private static final String DEVICE_NAME = "chazuo-it";
|
||||
private static final String DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af";
|
||||
|
||||
/**
|
||||
* 网关子设备 Token:从 {@link #testAuth()} 方法获取后,粘贴到这里
|
||||
*/
|
||||
private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiakF1ZkVNVEYxVzZ3blBobiIsImV4cCI6MTc2OTk1NDY3OSwiZGV2aWNlTmFtZSI6ImNoYXp1by1pdCJ9.jfbUAoU0xkJl4UvO-NUvcJ6yITPRgUjQ4MKATPuwneg";
|
||||
|
||||
// ===================== 认证测试 =====================
|
||||
|
||||
/**
|
||||
* 子设备认证测试:获取子设备 Token
|
||||
*/
|
||||
@Test
|
||||
public void testAuth() throws Exception {
|
||||
// 1.1 构建请求
|
||||
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
|
||||
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
|
||||
.setClientId(authInfo.getClientId())
|
||||
.setUsername(authInfo.getUsername())
|
||||
.setPassword(authInfo.getPassword());
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", "auth")
|
||||
.put("params", authReqDTO)
|
||||
.build());
|
||||
// 1.2 输出请求
|
||||
log.info("[testAuth][请求体: {}]", payload);
|
||||
|
||||
// 2.1 发送请求
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
String response = sendAndReceive(socket, payload);
|
||||
// 2.2 输出结果
|
||||
log.info("[testAuth][响应体: {}]", response);
|
||||
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
|
||||
}
|
||||
}
|
||||
|
||||
// ===================== 子设备属性上报测试 =====================
|
||||
|
||||
/**
|
||||
* 子设备属性上报测试
|
||||
*/
|
||||
@Test
|
||||
public void testPropertyPost() throws Exception {
|
||||
// 1.1 构建请求(UDP 协议:token 放在 params 中)
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("params", withToken(IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
|
||||
.put("power", 100)
|
||||
.put("status", "online")
|
||||
.put("temperature", 36.5)
|
||||
.build())))
|
||||
.build());
|
||||
// 1.2 输出请求
|
||||
log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]");
|
||||
log.info("[testPropertyPost][请求体: {}]", payload);
|
||||
|
||||
// 2.1 发送请求
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
String response = sendAndReceive(socket, payload);
|
||||
// 2.2 输出结果
|
||||
log.info("[testPropertyPost][响应体: {}]", response);
|
||||
}
|
||||
}
|
||||
|
||||
// ===================== 子设备事件上报测试 =====================
|
||||
|
||||
/**
|
||||
* 子设备事件上报测试
|
||||
*/
|
||||
@Test
|
||||
public void testEventPost() throws Exception {
|
||||
// 1.1 构建请求(UDP 协议:token 放在 params 中)
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("params", withToken(IotDeviceEventPostReqDTO.of(
|
||||
"alarm",
|
||||
MapUtil.<String, Object>builder()
|
||||
.put("level", "warning")
|
||||
.put("message", "temperature too high")
|
||||
.put("threshold", 40)
|
||||
.put("current", 42)
|
||||
.build(),
|
||||
System.currentTimeMillis())))
|
||||
.build());
|
||||
// 1.2 输出请求
|
||||
log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]");
|
||||
log.info("[testEventPost][请求体: {}]", payload);
|
||||
|
||||
// 2.1 发送请求
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(TIMEOUT_MS);
|
||||
String response = sendAndReceive(socket, payload);
|
||||
// 2.2 输出结果
|
||||
log.info("[testEventPost][响应体: {}]", response);
|
||||
}
|
||||
}
|
||||
|
||||
// ===================== 辅助方法 =====================
|
||||
|
||||
/**
|
||||
* 构建带 token 的 params
|
||||
* <p>
|
||||
* 返回格式:{token: "xxx", body: params}
|
||||
* - token:JWT 令牌
|
||||
* - body:实际请求内容(可以是 Map、List 或其他类型)
|
||||
*
|
||||
* @param params 原始参数(Map、List 或对象)
|
||||
* @return 包含 token 和 body 的 Map
|
||||
*/
|
||||
private Map<String, Object> withToken(Object params) {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
result.put("token", TOKEN);
|
||||
result.put("body", params);
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user