feat:【iot】TCP 协议:1)增加 register 协议;2)增加 gateway 相关的单测

This commit is contained in:
YunaiV
2026-01-26 21:39:59 +08:00
parent de1a53a5f1
commit 63d7bfe2d2
6 changed files with 784 additions and 2 deletions

View File

@@ -13,7 +13,7 @@ import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
/** /**
* TCP 二进制格式 {@link IotDeviceMessage} 编解码器 * TCP/UDP 二进制格式 {@link IotDeviceMessage} 编解码器
* <p> * <p>
* 二进制协议格式(所有数值使用大端序): * 二进制协议格式(所有数值使用大端序):
* *

View File

@@ -11,7 +11,7 @@ import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* TCP JSON 格式 {@link IotDeviceMessage} 编解码器 * TCP/UDP JSON 格式 {@link IotDeviceMessage} 编解码器
* *
* 采用纯 JSON 格式传输,格式如下: * 采用纯 JSON 格式传输,格式如下:
* { * {

View File

@@ -10,7 +10,10 @@ import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
@@ -120,6 +123,9 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
if (AUTH_METHOD.equals(message.getMethod())) { if (AUTH_METHOD.equals(message.getMethod())) {
// 认证请求 // 认证请求
handleAuthenticationRequest(clientId, message, codecType, socket); handleAuthenticationRequest(clientId, message, codecType, socket);
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) {
// 设备动态注册请求
handleRegisterRequest(clientId, message, codecType, socket);
} else { } else {
// 业务消息 // 业务消息
handleBusinessRequest(clientId, message, codecType, socket); handleBusinessRequest(clientId, message, codecType, socket);
@@ -190,6 +196,44 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
} }
} }
/**
* 处理设备动态注册请求(一型一密,不需要认证)
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param codecType 消息编解码类型
* @param socket 网络连接
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
private void handleRegisterRequest(String clientId, IotDeviceMessage message, String codecType,
NetSocket socket) {
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO registerParams = parseRegisterParams(message.getParams());
if (registerParams == null) {
log.warn("[handleRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "注册参数不完整", codecType);
return;
}
// 2. 调用动态注册
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(registerParams);
if (result.isError()) {
log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg());
sendErrorResponse(socket, message.getRequestId(), result.getMsg(), codecType);
return;
}
// 3. 发送成功响应(包含 deviceSecret
sendRegisterSuccessResponse(socket, message.getRequestId(), result.getData(), codecType);
log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]",
clientId, registerParams.getDeviceName());
} catch (Exception e) {
log.error("[handleRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e);
sendErrorResponse(socket, message.getRequestId(), "注册处理异常", codecType);
}
}
/** /**
* 处理业务请求 * 处理业务请求
* *
@@ -405,4 +449,73 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
} }
} }
/**
* 解析注册参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 注册参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceRegisterReqDTO parseRegisterParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof java.util.Map) {
java.util.Map<String, Object> paramMap = (java.util.Map<String, Object>) params;
String productKey = MapUtil.getStr(paramMap, "productKey");
String deviceName = MapUtil.getStr(paramMap, "deviceName");
String productSecret = MapUtil.getStr(paramMap, "productSecret");
if (StrUtil.hasBlank(productKey, deviceName, productSecret)) {
return null;
}
return new IotDeviceRegisterReqDTO()
.setProductKey(productKey)
.setDeviceName(deviceName)
.setProductSecret(productSecret);
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceRegisterReqDTO) {
return (IotDeviceRegisterReqDTO) params;
}
// 其他情况尝试 JSON 转换
String jsonStr = JsonUtils.toJsonString(params);
return JsonUtils.parseObject(jsonStr, IotDeviceRegisterReqDTO.class);
} catch (Exception e) {
log.error("[parseRegisterParams][解析注册参数({})失败]", params, e);
return null;
}
}
/**
* 发送注册成功响应(包含 deviceSecret
*
* @param socket 网络连接
* @param requestId 请求 ID
* @param registerResp 注册响应
* @param codecType 消息编解码类型
*/
private void sendRegisterSuccessResponse(NetSocket socket, String requestId,
IotDeviceRegisterRespDTO registerResp, String codecType) {
try {
// 构建响应数据
Object responseData = MapUtil.builder()
.put("success", true)
.put("deviceSecret", registerResp.getDeviceSecret())
.put("message", "注册成功")
.build();
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), responseData, 0, "注册成功");
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
} catch (Exception e) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应失败requestId: {}]", requestId, e);
}
}
} }

View File

@@ -6,6 +6,7 @@ import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
@@ -37,6 +38,7 @@ import java.net.Socket;
* <li>运行以下测试方法: * <li>运行以下测试方法:
* <ul> * <ul>
* <li>{@link #testAuth()} - 设备认证</li> * <li>{@link #testAuth()} - 设备认证</li>
* <li>{@link #testDeviceRegister()} - 设备动态注册(一型一密)</li>
* <li>{@link #testPropertyPost()} - 设备属性上报</li> * <li>{@link #testPropertyPost()} - 设备属性上报</li>
* <li>{@link #testEventPost()} - 设备事件上报</li> * <li>{@link #testEventPost()} - 设备事件上报</li>
* </ul> * </ul>
@@ -98,6 +100,46 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
} }
} }
// ===================== 动态注册测试 =====================
/**
* 直连设备动态注册测试(一型一密)
* <p>
* 使用产品密钥productSecret验证身份成功后返回设备密钥deviceSecret
* <p>
* 注意:此接口不需要认证
*/
@Test
public void testDeviceRegister() throws Exception {
// 1.1 构建注册消息
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO();
registerReqDTO.setProductKey(PRODUCT_KEY);
registerReqDTO.setDeviceName("test-tcp-" + System.currentTimeMillis());
registerReqDTO.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testDeviceRegister][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testDeviceRegister][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testDeviceRegister][响应消息: {}]", response);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
} else {
log.warn("[testDeviceRegister][未收到响应]");
}
}
}
// ===================== 直连设备属性上报测试 ===================== // ===================== 直连设备属性上报测试 =====================
/** /**

View File

@@ -0,0 +1,389 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* IoT 网关设备 TCP 协议集成测试(手动测试)
*
* <p>测试场景网关设备IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 TCP 协议管理子设备拓扑关系
*
* <p>支持两种编解码格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务TCP 端口 8091</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testAuth()} - 网关设备认证</li>
* <li>{@link #testTopoAdd()} - 添加子设备拓扑关系</li>
* <li>{@link #testTopoDelete()} - 删除子设备拓扑关系</li>
* <li>{@link #testTopoGet()} - 获取子设备拓扑关系</li>
* <li>{@link #testSubDeviceRegister()} - 子设备动态注册</li>
* <li>{@link #testPropertyPackPost()} - 批量上报属性(网关 + 子设备)</li>
* </ul>
* </li>
* </ol>
*
* <p>注意TCP 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息
*
* @author 芋道源码
*/
@Slf4j
public class IotGatewayDeviceTcpProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8091;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
// ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) =====================
private static final String GATEWAY_PRODUCT_KEY = "m6XcS1ZJ3TW8eC0v";
private static final String GATEWAY_DEVICE_NAME = "sub-ddd";
private static final String GATEWAY_DEVICE_SECRET = "b3d62c70f8a4495487ed1d35d61ac2b3";
// ===================== 子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
private static final String SUB_DEVICE_PRODUCT_KEY = "jAufEMTF1W6wnPhn";
private static final String SUB_DEVICE_NAME = "chazuo-it";
private static final String SUB_DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af";
// ===================== 认证测试 =====================
/**
* 网关设备认证测试
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
} else {
log.warn("[testAuth][未收到响应]");
}
}
}
// ===================== 拓扑管理测试 =====================
/**
* 添加子设备拓扑关系测试
*/
@Test
public void testTopoAdd() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testTopoAdd][认证响应: {}]", authResponse);
// 2.1 构建子设备认证信息
IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo(
SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET);
IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO()
.setClientId(subAuthInfo.getClientId())
.setUsername(subAuthInfo.getUsername())
.setPassword(subAuthInfo.getPassword());
// 2.2 构建请求参数
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO();
params.setSubDevices(Collections.singletonList(subDeviceAuth));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(),
params,
null, null, null);
// 2.3 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoAdd][响应消息: {}]", response);
} else {
log.warn("[testTopoAdd][未收到响应]");
}
}
}
/**
* 删除子设备拓扑关系测试
*/
@Test
public void testTopoDelete() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testTopoDelete][认证响应: {}]", authResponse);
// 2.1 构建请求参数
IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO();
params.setSubDevices(Collections.singletonList(
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(),
params,
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoDelete][响应消息: {}]", response);
} else {
log.warn("[testTopoDelete][未收到响应]");
}
}
}
/**
* 获取子设备拓扑关系测试
*/
@Test
public void testTopoGet() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testTopoGet][认证响应: {}]", authResponse);
// 2.1 构建请求参数
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(),
params,
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoGet][响应消息: {}]", response);
} else {
log.warn("[testTopoGet][未收到响应]");
}
}
}
// ===================== 子设备注册测试 =====================
/**
* 子设备动态注册测试
*/
@Test
public void testSubDeviceRegister() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testSubDeviceRegister][认证响应: {}]", authResponse);
// 2.1 构建请求参数
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO();
subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY);
subDevice.setDeviceName("mougezishebei");
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(),
Collections.singletonList(subDevice),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testSubDeviceRegister][响应消息: {}]", response);
} else {
log.warn("[testSubDeviceRegister][未收到响应]");
}
}
}
// ===================== 批量上报测试 =====================
/**
* 批量上报属性测试(网关 + 子设备)
*/
@Test
public void testPropertyPackPost() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testPropertyPackPost][认证响应: {}]", authResponse);
// 2.1 构建【网关设备】自身属性
Map<String, Object> gatewayProperties = MapUtil.<String, Object>builder()
.put("temperature", 25.5)
.build();
// 2.2 构建【网关设备】自身事件
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build());
gatewayEvent.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> gatewayEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("statusReport", gatewayEvent)
.build();
// 2.3 构建【网关子设备】属性
Map<String, Object> subDeviceProperties = MapUtil.<String, Object>builder()
.put("power", 100)
.build();
// 2.4 构建【网关子设备】事件
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build());
subDeviceEvent.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> subDeviceEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("healthCheck", subDeviceEvent)
.build();
// 2.5 构建子设备数据
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData();
subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME));
subDeviceData.setProperties(subDeviceProperties);
subDeviceData.setEvents(subDeviceEvents);
// 2.6 构建请求参数
IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO();
params.setProperties(gatewayProperties);
params.setEvents(gatewayEvents);
params.setSubDevices(List.of(subDeviceData));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(),
params,
null, null, null);
// 2.7 编码
byte[] payload = CODEC.encode(request);
log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testPropertyPackPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPackPost][未收到响应]");
}
}
}
// ===================== 辅助方法 =====================
/**
* 执行网关设备认证
*/
private IotDeviceMessage authenticate(Socket socket) throws Exception {
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
byte[] responseBytes = sendAndReceive(socket, payload);
if (responseBytes != null) {
return CODEC.decode(responseBytes);
}
return null;
}
/**
* 发送 TCP 请求并接收响应
*/
private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception {
// 1. 发送请求
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
out.write(payload);
out.flush();
// 2.1 等待一小段时间让服务器处理
Thread.sleep(100);
// 2.2 接收响应
byte[] buffer = new byte[4096];
try {
int length = in.read(buffer);
if (length > 0) {
byte[] response = new byte[length];
System.arraycopy(buffer, 0, response, 0, length);
return response;
}
return null;
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
}
}

View File

@@ -0,0 +1,238 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
/**
* IoT 网关子设备 TCP 协议集成测试(手动测试)
*
* <p>测试场景子设备IotProductDeviceTypeEnum 的 SUB 类型)通过网关设备代理上报数据
*
* <p><b>重要说明子设备无法直接连接平台所有请求均由网关设备Gateway代为转发。</b>
*
* <p>支持两种编解码格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务TCP 端口 8091</li>
* <li>确保子设备已通过 {@link IotGatewayDeviceTcpProtocolIntegrationTest#testTopoAdd()} 绑定到网关</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testAuth()} - 子设备认证</li>
* <li>{@link #testPropertyPost()} - 子设备属性上报(由网关代理转发)</li>
* <li>{@link #testEventPost()} - 子设备事件上报(由网关代理转发)</li>
* </ul>
* </li>
* </ol>
*
* <p>注意TCP 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息
*
* @author 芋道源码
*/
@Slf4j
public class IotGatewaySubDeviceTcpProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8091;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
// ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
private static final String PRODUCT_KEY = "jAufEMTF1W6wnPhn";
private static final String DEVICE_NAME = "chazuo-it";
private static final String DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af";
// ===================== 认证测试 =====================
/**
* 子设备认证测试
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
} else {
log.warn("[testAuth][未收到响应]");
}
}
}
// ===================== 子设备属性上报测试 =====================
/**
* 子设备属性上报测试
*/
@Test
public void testPropertyPost() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testPropertyPost][认证响应: {}]", authResponse);
// 2.1 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("power", 100)
.put("status", "online")
.put("temperature", 36.5)
.build()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]");
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testPropertyPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPost][未收到响应]");
}
}
}
// ===================== 子设备事件上报测试 =====================
/**
* 子设备事件上报测试
*/
@Test
public void testEventPost() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testEventPost][认证响应: {}]", authResponse);
// 2.1 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"alarm",
MapUtil.<String, Object>builder()
.put("level", "warning")
.put("message", "temperature too high")
.put("threshold", 40)
.put("current", 42)
.build(),
System.currentTimeMillis()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]");
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testEventPost][响应消息: {}]", response);
} else {
log.warn("[testEventPost][未收到响应]");
}
}
}
// ===================== 辅助方法 =====================
/**
* 执行子设备认证
*/
private IotDeviceMessage authenticate(Socket socket) throws Exception {
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
byte[] responseBytes = sendAndReceive(socket, payload);
if (responseBytes != null) {
return CODEC.decode(responseBytes);
}
return null;
}
/**
* 发送 TCP 请求并接收响应
*/
private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception {
// 1. 发送请求
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
out.write(payload);
out.flush();
// 2.1 等待一小段时间让服务器处理
Thread.sleep(100);
// 2.2 接收响应
byte[] buffer = new byte[4096];
try {
int length = in.read(buffer);
if (length > 0) {
byte[] response = new byte[length];
System.arraycopy(buffer, 0, response, 0, length);
return response;
}
return null;
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
}
}