feat(iot):【协议改造】优化 tcp、http 单测类的实现

This commit is contained in:
YunaiV
2026-02-01 03:33:09 +08:00
parent 44b1950e4a
commit 1d8ab8ff3d
6 changed files with 436 additions and 481 deletions

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
@@ -92,9 +91,7 @@ public class IotDirectDeviceHttpProtocolIntegrationTest {
String url = String.format("http://%s:%d/topic/sys/%s/%s/thing/property/post",
SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME);
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
.put("version", "1.0")
.put("params", IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
@@ -126,9 +123,7 @@ public class IotDirectDeviceHttpProtocolIntegrationTest {
String url = String.format("http://%s:%d/topic/sys/%s/%s/thing/event/post",
SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME);
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
.put("version", "1.0")
.put("params", IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
@@ -163,10 +158,10 @@ public class IotDirectDeviceHttpProtocolIntegrationTest {
// 1.1 构建请求
String url = String.format("http://%s:%d/auth/register/device", SERVER_HOST, SERVER_PORT);
// 1.2 构建请求参数
IotDeviceRegisterReqDTO reqDTO = new IotDeviceRegisterReqDTO();
reqDTO.setProductKey(PRODUCT_KEY);
reqDTO.setDeviceName("test-" + System.currentTimeMillis());
reqDTO.setProductSecret("test-product-secret");
IotDeviceRegisterReqDTO reqDTO = new IotDeviceRegisterReqDTO()
.setProductKey(PRODUCT_KEY)
.setDeviceName("test-" + System.currentTimeMillis())
.setProductSecret("test-product-secret");
String payload = JsonUtils.toJsonString(reqDTO);
// 1.3 输出请求
log.info("[testDeviceRegister][请求 URL: {}]", url);

View File

@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
@@ -20,7 +19,6 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -121,9 +119,7 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest {
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO();
params.setSubDevices(Collections.singletonList(subDeviceAuth));
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.TOPO_ADD.getMethod())
.put("version", "1.0")
.put("params", params)
.build());
// 1.4 输出请求
@@ -155,9 +151,7 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest {
params.setSubDevices(Collections.singletonList(
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod())
.put("version", "1.0")
.put("params", params)
.build());
// 1.3 输出请求
@@ -187,9 +181,7 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest {
// 1.2 构建请求参数(目前为空,预留扩展)
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.TOPO_GET.getMethod())
.put("version", "1.0")
.put("params", params)
.build());
// 1.3 输出请求
@@ -208,8 +200,6 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest {
// ===================== 子设备注册测试 =====================
// TODO @芋艿:待测试
/**
* 子设备动态注册测试
* <p>
@@ -227,9 +217,7 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest {
subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY);
subDevice.setDeviceName("mougezishebei");
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod())
.put("version", "1.0")
.put("params", Collections.singletonList(subDevice))
.build());
// 1.3 输出请求
@@ -263,9 +251,9 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest {
.put("temperature", 25.5)
.build();
// 1.3 构建【网关设备】自身事件
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build());
gatewayEvent.setTime(System.currentTimeMillis());
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue()
.setValue(MapUtil.builder().put("message", "gateway started").build())
.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> gatewayEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("statusReport", gatewayEvent)
.build();
@@ -274,26 +262,24 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest {
.put("power", 100)
.build();
// 1.5 构建【网关子设备】事件
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build());
subDeviceEvent.setTime(System.currentTimeMillis());
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue()
.setValue(MapUtil.builder().put("errorCode", 0).build())
.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> subDeviceEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("healthCheck", subDeviceEvent)
.build();
// 1.6 构建子设备数据
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData();
subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME));
subDeviceData.setProperties(subDeviceProperties);
subDeviceData.setEvents(subDeviceEvents);
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData()
.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))
.setProperties(subDeviceProperties)
.setEvents(subDeviceEvents);
// 1.7 构建请求参数
IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO();
params.setProperties(gatewayProperties);
params.setEvents(gatewayEvents);
params.setSubDevices(ListUtil.of(subDeviceData));
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod())
.put("version", "1.0")
.put("params", params)
.build());
// 1.8 输出请求

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
@@ -94,9 +93,7 @@ public class IotGatewaySubDeviceHttpProtocolIntegrationTest {
String url = String.format("http://%s:%d/topic/sys/%s/%s/thing/property/post",
SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME);
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
.put("version", "1.0")
.put("params", IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("power", 100)
.put("status", "online")
@@ -130,9 +127,7 @@ public class IotGatewaySubDeviceHttpProtocolIntegrationTest {
String url = String.format("http://%s:%d/topic/sys/%s/%s/thing/event/post",
SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME);
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
.put("version", "1.0")
.put("params", IotDeviceEventPostReqDTO.of(
"alarm",
MapUtil.<String, Object>builder()

View File

@@ -1,8 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -10,40 +8,32 @@ import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.binary.IotBinarySerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* IoT 直连设备 TCP 协议集成测试(手动测试)
*
* <p>测试场景直连设备IotProductDeviceTypeEnum 的 DIRECT 类型)通过 TCP 协议直接连接平台
*
* <p>支持两种序列化格式:
* <ul>
* <li>{@link IotJsonSerializer} - JSON 格式</li>
* <li>{@link IotBinarySerializer} - 二进制格式</li>
* </ul>
*
* <p>TCP 拆包配置(需与 application.yaml 中的 codec 配置一致):
* <ul>
* <li>type: delimiter - 基于分隔符拆包</li>
* <li>delimiter: \n - 换行符作为分隔符</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务TCP 端口 8091</li>
* <li>修改 {@link #SERIALIZER} 选择测试的序列化格式Delimiter 模式只支持 JSON</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testAuth()} - 设备认证</li>
@@ -66,16 +56,25 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
private static final int SERVER_PORT = 8091;
private static final int TIMEOUT_MS = 5000;
// TODO @AI这里可以通过 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec 么?例如说:使用 vertx vertx tcp client从而更好的复用解码逻辑
private static Vertx vertx;
private static NetClient netClient;
// ===================== 编解码器 =====================
/**
* 分隔符(需与 application.yaml 中的 delimiter 配置一致)
* 消息序列化器
*/
private static final String DELIMITER = "\n";
// ===================== 序列化器选择Delimiter 模式推荐使用 JSON =====================
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// private static final IotMessageSerializer SERIALIZER = new IotBinarySerializer();
/**
* TCP 帧编解码器
*/
private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodec.create(
new IotTcpConfig.CodecConfig() {{
setType("delimiter");
setDelimiter("\\n");
}}
);
// ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) =====================
@@ -83,6 +82,25 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
private static final String DEVICE_NAME = "small";
private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3";
@BeforeAll
static void setUp() {
vertx = Vertx.vertx();
NetClientOptions options = new NetClientOptions()
.setConnectTimeout(TIMEOUT_MS)
.setIdleTimeout(TIMEOUT_MS);
netClient = vertx.createNetClient(options);
}
@AfterAll
static void tearDown() {
if (netClient != null) {
netClient.close();
}
if (vertx != null) {
vertx.close();
}
}
// ===================== 认证测试 =====================
/**
@@ -90,29 +108,21 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建认证消息
// 1. 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO);
// 1.2 序列化
// TODO @AI是不是把 SERIALIZER 放到 sendAndReceive 里;
byte[] payload = SERIALIZER.serialize(request);
log.info("[testAuth][Serializer: {}, 请求消息: {}, 数据包长度: {} 字节]", SERIALIZER.getType(), request, payload.length);
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 反序列化响应
if (responseBytes != null) {
IotDeviceMessage response = SERIALIZER.deserialize(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
} else {
log.warn("[testAuth][未收到响应]");
}
// 2. 发送并接收响应
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testAuth][响应消息: {}]", response);
} finally {
socket.close();
}
}
@@ -127,29 +137,22 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testDeviceRegister() throws Exception {
// 1.1 构建注册消息
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO();
registerReqDTO.setProductKey(PRODUCT_KEY);
registerReqDTO.setDeviceName("test-tcp-" + System.currentTimeMillis());
registerReqDTO.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null);
// 1.2 序列化
byte[] payload = SERIALIZER.serialize(request);
log.info("[testDeviceRegister][Serializer: {}, 请求消息: {}, 数据包长度: {} 字节]", SERIALIZER.getType(), request, payload.length);
// 1. 构建注册消息
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO()
.setProductKey(PRODUCT_KEY)
.setDeviceName("test-tcp-" + System.currentTimeMillis())
.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO);
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 反序列化响应
if (responseBytes != null) {
IotDeviceMessage response = SERIALIZER.deserialize(responseBytes);
log.info("[testDeviceRegister][响应消息: {}]", response);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
} else {
log.warn("[testDeviceRegister][未收到响应]");
}
// 2. 发送并接收响应
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testDeviceRegister][响应消息: {}]", response);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
} finally {
socket.close();
}
}
@@ -160,35 +163,25 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testPropertyPost][认证响应: {}]", authResponse);
// 2.1 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 2. 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
.build()),
null, null, null);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
log.info("[testPropertyPost][Serializer: {}, 请求消息: {}]", SERIALIZER.getType(), request);
.build()));
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 反序列化响应
if (responseBytes != null) {
IotDeviceMessage response = SERIALIZER.deserialize(responseBytes);
log.info("[testPropertyPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPost][未收到响应]");
}
// 3. 发送并接收响应
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testPropertyPost][响应消息: {}]", response);
} finally {
socket.close();
}
}
@@ -199,98 +192,87 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testEventPost() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testEventPost][认证响应: {}]", authResponse);
// 2.1 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 2. 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis()),
null, null, null);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
log.info("[testEventPost][Serializer: {}, 请求消息: {}]", SERIALIZER.getType(), request);
System.currentTimeMillis()));
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 反序列化响应
if (responseBytes != null) {
IotDeviceMessage response = SERIALIZER.deserialize(responseBytes);
log.info("[testEventPost][响应消息: {}]", response);
} else {
log.warn("[testEventPost][未收到响应]");
}
// 3. 发送并接收响应
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testEventPost][响应消息: {}]", response);
} finally {
socket.close();
}
}
// ===================== 辅助方法 =====================
/**
* 建立 TCP 连接
*
* @return 连接 Future
*/
private CompletableFuture<NetSocket> connect() {
CompletableFuture<NetSocket> future = new CompletableFuture<>();
netClient.connect(SERVER_PORT, SERVER_HOST)
.onSuccess(future::complete)
.onFailure(future::completeExceptionally);
return future;
}
/**
* 执行设备认证
*
* @param socket TCP 连接
* @return 认证响应消息
*/
private IotDeviceMessage authenticate(Socket socket) throws Exception {
private IotDeviceMessage authenticate(NetSocket socket) throws Exception {
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = SERIALIZER.serialize(request);
byte[] responseBytes = sendAndReceive(socket, payload);
if (responseBytes != null) {
log.info("[authenticate][响应数据长度: {} 字节,首字节: 0x{}, HEX: {}]",
responseBytes.length,
String.format("%02X", responseBytes[0]),
HexUtil.encodeHexStr(responseBytes));
return SERIALIZER.deserialize(responseBytes);
}
return null;
IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authInfo);
return sendAndReceive(socket, request);
}
/**
* 发送 TCP 请求并接收响应(支持 Delimiter 分隔符协议)
* <p>
* 发送格式:[消息体][分隔符]
* 接收格式:[消息体][分隔符]
* 发送消息并接收响应
*
* @param socket TCP Socket
* @param payload 请求数据(消息体,不含分隔符)
* @return 响应数据(消息体,不含分隔符)
* @param socket TCP 连接
* @param request 请求消息
* @return 响应消息
*/
private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception {
OutputStream out = socket.getOutputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
// 1. 发送请求(添加分隔符后缀)
out.write(payload);
out.write(DELIMITER.getBytes(StandardCharsets.UTF_8));
out.flush();
log.info("[sendAndReceive][发送数据: {} 字节(不含分隔符)]", payload.length);
// 2. 接收响应(读取到分隔符为止)
try {
String responseLine = in.readLine();
if (responseLine != null) {
byte[] response = responseLine.getBytes(StandardCharsets.UTF_8);
log.info("[sendAndReceive][接收数据: {} 字节]", response.length);
return response;
private IotDeviceMessage sendAndReceive(NetSocket socket, IotDeviceMessage request) throws Exception {
// 1. 使用 FRAME_CODEC 创建解码器
CompletableFuture<IotDeviceMessage> responseFuture = new CompletableFuture<>();
RecordParser parser = FRAME_CODEC.createDecodeParser(buffer -> {
try {
// 反序列化响应
IotDeviceMessage response = SERIALIZER.deserialize(buffer.getBytes());
responseFuture.complete(response);
} catch (Exception e) {
responseFuture.completeExceptionally(e);
}
return null;
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
});
socket.handler(parser);
// 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑)
byte[] serializedData = SERIALIZER.serialize(request);
Buffer frameData = FRAME_CODEC.encode(serializedData);
log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length());
// 2.2 发送请求
socket.write(frameData);
// 3. 等待响应
IotDeviceMessage response = responseFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
log.info("[sendAndReceive][收到响应,数据长度: {} 字节]", SERIALIZER.serialize(response).length);
return response;
}
}

View File

@@ -13,35 +13,34 @@ import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* IoT 网关设备 TCP 协议集成测试(手动测试)
*
* <p>测试场景网关设备IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 TCP 协议管理子设备拓扑关系
*
* <p>支持两种编解码格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务TCP 端口 8091</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testAuth()} - 网关设备认证</li>
@@ -66,10 +65,25 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
private static final int SERVER_PORT = 8091;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
private static Vertx vertx;
private static NetClient netClient;
private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
// ===================== 编解码器 =====================
/**
* 消息序列化器
*/
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
/**
* TCP 帧编解码器
*/
private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodec.create(
new IotTcpConfig.CodecConfig() {{
setType("delimiter");
setDelimiter("\\n");
}}
);
// ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) =====================
@@ -83,6 +97,25 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
private static final String SUB_DEVICE_NAME = "chazuo-it";
private static final String SUB_DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af";
@BeforeAll
static void setUp() {
vertx = Vertx.vertx();
NetClientOptions options = new NetClientOptions()
.setConnectTimeout(TIMEOUT_MS)
.setIdleTimeout(TIMEOUT_MS);
netClient = vertx.createNetClient(options);
}
@AfterAll
static void tearDown() {
if (netClient != null) {
netClient.close();
}
if (vertx != null) {
vertx.close();
}
}
// ===================== 认证测试 =====================
/**
@@ -90,29 +123,22 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建认证消息
// 1. 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO);
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
} else {
log.warn("[testAuth][未收到响应]");
}
// 2. 发送并接收响应
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testAuth][响应消息: {}]", response);
} finally {
socket.close();
}
}
@@ -123,9 +149,8 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testTopoAdd() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testTopoAdd][认证响应: {}]", authResponse);
@@ -140,24 +165,16 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
// 2.2 构建请求参数
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO();
params.setSubDevices(Collections.singletonList(subDeviceAuth));
IotDeviceMessage request = IotDeviceMessage.of(
IotDeviceMessage request = IotDeviceMessage.requestOf(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(),
params,
null, null, null);
// 2.3 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request);
params);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoAdd][响应消息: {}]", response);
} else {
log.warn("[testTopoAdd][未收到响应]");
}
// 3. 发送并接收响应
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testTopoAdd][响应消息: {}]", response);
} finally {
socket.close();
}
}
@@ -166,35 +183,25 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testTopoDelete() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testTopoDelete][认证响应: {}]", authResponse);
// 2.1 构建请求参数
// 2. 构建请求参数
IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO();
params.setSubDevices(Collections.singletonList(
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(),
params,
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request);
params);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoDelete][响应消息: {}]", response);
} else {
log.warn("[testTopoDelete][未收到响应]");
}
// 3. 发送并接收响应
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testTopoDelete][响应消息: {}]", response);
} finally {
socket.close();
}
}
@@ -203,33 +210,23 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testTopoGet() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testTopoGet][认证响应: {}]", authResponse);
// 2.1 构建请求参数
// 2. 构建请求参数
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(),
params,
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request);
params);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testTopoGet][响应消息: {}]", response);
} else {
log.warn("[testTopoGet][未收到响应]");
}
// 3. 发送并接收响应
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testTopoGet][响应消息: {}]", response);
} finally {
socket.close();
}
}
@@ -240,35 +237,25 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testSubDeviceRegister() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testSubDeviceRegister][认证响应: {}]", authResponse);
// 2.1 构建请求参数
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO();
subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY);
subDevice.setDeviceName("mougezishebei");
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 2. 构建请求参数
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO()
.setProductKey(SUB_DEVICE_PRODUCT_KEY)
.setDeviceName("mougezishebei");
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(),
Collections.singletonList(subDevice),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request);
Collections.singletonList(subDevice));
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testSubDeviceRegister][响应消息: {}]", response);
} else {
log.warn("[testSubDeviceRegister][未收到响应]");
}
// 3. 发送并接收响应
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testSubDeviceRegister][响应消息: {}]", response);
} finally {
socket.close();
}
}
@@ -279,9 +266,8 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testPropertyPackPost() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testPropertyPackPost][认证响应: {}]", authResponse);
@@ -291,9 +277,9 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
.put("temperature", 25.5)
.build();
// 2.2 构建【网关设备】自身事件
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build());
gatewayEvent.setTime(System.currentTimeMillis());
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue()
.setValue(MapUtil.builder().put("message", "gateway started").build())
.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> gatewayEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("statusReport", gatewayEvent)
.build();
@@ -302,97 +288,95 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
.put("power", 100)
.build();
// 2.4 构建【网关子设备】事件
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build());
subDeviceEvent.setTime(System.currentTimeMillis());
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue()
.setValue(MapUtil.builder().put("errorCode", 0).build())
.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> subDeviceEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("healthCheck", subDeviceEvent)
.build();
// 2.5 构建子设备数据
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData();
subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME));
subDeviceData.setProperties(subDeviceProperties);
subDeviceData.setEvents(subDeviceEvents);
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData()
.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))
.setProperties(subDeviceProperties)
.setEvents(subDeviceEvents);
// 2.6 构建请求参数
IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO();
params.setProperties(gatewayProperties);
params.setEvents(gatewayEvents);
params.setSubDevices(ListUtil.of(subDeviceData));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(),
params,
null, null, null);
// 2.7 编码
byte[] payload = CODEC.encode(request);
log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
params);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testPropertyPackPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPackPost][未收到响应]");
}
// 3. 发送并接收响应
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testPropertyPackPost][响应消息: {}]", response);
} finally {
socket.close();
}
}
// ===================== 辅助方法 =====================
/**
* 建立 TCP 连接
*
* @return 连接 Future
*/
private CompletableFuture<NetSocket> connect() {
CompletableFuture<NetSocket> future = new CompletableFuture<>();
netClient.connect(SERVER_PORT, SERVER_HOST)
.onSuccess(future::complete)
.onFailure(future::completeExceptionally);
return future;
}
/**
* 执行网关设备认证
*
* @param socket TCP 连接
* @return 认证响应消息
*/
private IotDeviceMessage authenticate(Socket socket) throws Exception {
private IotDeviceMessage authenticate(NetSocket socket) throws Exception {
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
byte[] responseBytes = sendAndReceive(socket, payload);
if (responseBytes != null) {
return CODEC.decode(responseBytes);
}
return null;
IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authInfo);
return sendAndReceive(socket, request);
}
/**
* 发送 TCP 请求并接收响应
* 发送消息并接收响应(复用 IotTcpFrameCodec 编解码逻辑)
*
* @param socket TCP Socket
* @param payload 请求数据
* @return 响应数据
* @param socket TCP 连接
* @param request 请求消息
* @return 响应消息
*/
private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception {
// 1. 发送请求
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
out.write(payload);
out.flush();
// 2.1 等待一小段时间让服务器处理
Thread.sleep(100);
// 2.2 接收响应
byte[] buffer = new byte[4096];
try {
int length = in.read(buffer);
if (length > 0) {
byte[] response = new byte[length];
System.arraycopy(buffer, 0, response, 0, length);
return response;
private IotDeviceMessage sendAndReceive(NetSocket socket, IotDeviceMessage request) throws Exception {
// 1. 使用 FRAME_CODEC 创建解码器(复用 gateway 的拆包逻辑)
CompletableFuture<IotDeviceMessage> responseFuture = new CompletableFuture<>();
RecordParser parser = FRAME_CODEC.createDecodeParser(buffer -> {
try {
// 反序列化响应
IotDeviceMessage response = SERIALIZER.deserialize(buffer.getBytes());
responseFuture.complete(response);
} catch (Exception e) {
responseFuture.completeExceptionally(e);
}
return null;
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
});
socket.handler(parser);
// 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑)
byte[] serializedData = SERIALIZER.serialize(request);
Buffer frameData = FRAME_CODEC.encode(serializedData);
log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length());
// 2.2 发送请求
socket.write(frameData);
// 3. 等待响应
IotDeviceMessage response = responseFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
log.info("[sendAndReceive][收到响应,数据长度: {} 字节]",
SERIALIZER.serialize(response).length);
return response;
}
}

View File

@@ -1,23 +1,29 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* IoT 网关子设备 TCP 协议集成测试(手动测试)
@@ -26,17 +32,10 @@ import java.net.Socket;
*
* <p><b>重要说明子设备无法直接连接平台所有请求均由网关设备Gateway代为转发。</b>
*
* <p>支持两种编解码格式:
* <ul>
* <li>{@link IotTcpJsonDeviceMessageCodec} - JSON 格式</li>
* <li>{@link IotTcpBinaryDeviceMessageCodec} - 二进制格式</li>
* </ul>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务TCP 端口 8091</li>
* <li>确保子设备已通过 {@link IotGatewayDeviceTcpProtocolIntegrationTest#testTopoAdd()} 绑定到网关</li>
* <li>修改 {@link #CODEC} 选择测试的编解码格式</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testAuth()} - 子设备认证</li>
@@ -58,10 +57,25 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest {
private static final int SERVER_PORT = 8091;
private static final int TIMEOUT_MS = 5000;
// ===================== 编解码器选择(修改此处切换 JSON / Binary =====================
private static Vertx vertx;
private static NetClient netClient;
private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec();
// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec();
// ===================== 编解码器 =====================
/**
* 消息序列化器
*/
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
/**
* TCP 帧编解码器
*/
private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodec.create(
new IotTcpConfig.CodecConfig() {{
setType("delimiter");
setDelimiter("\\n");
}}
);
// ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
@@ -69,6 +83,25 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest {
private static final String DEVICE_NAME = "chazuo-it";
private static final String DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af";
@BeforeAll
static void setUp() {
vertx = Vertx.vertx();
NetClientOptions options = new NetClientOptions()
.setConnectTimeout(TIMEOUT_MS)
.setIdleTimeout(TIMEOUT_MS);
netClient = vertx.createNetClient(options);
}
@AfterAll
static void tearDown() {
if (netClient != null) {
netClient.close();
}
if (vertx != null) {
vertx.close();
}
}
// ===================== 认证测试 =====================
/**
@@ -76,28 +109,21 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
// 1.1 构建认证消息
// 1. 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO);
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
byte[] responseBytes = sendAndReceive(socket, payload);
// 2.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testAuth][响应消息: {}]", response);
} else {
log.warn("[testAuth][未收到响应]");
}
// 2. 发送并接收响应
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testAuth][响应消息: {}]", response);
} finally {
socket.close();
}
}
@@ -108,37 +134,27 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testPropertyPost][认证响应: {}]", authResponse);
// 2.1 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 2. 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("power", 100)
.put("status", "online")
.put("temperature", 36.5)
.build()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
.build()));
log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]");
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testPropertyPost][响应消息: {}]", response);
} else {
log.warn("[testPropertyPost][未收到响应]");
}
// 3. 发送并接收响应
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testPropertyPost][响应消息: {}]", response);
} finally {
socket.close();
}
}
@@ -149,16 +165,14 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest {
*/
@Test
public void testEventPost() throws Exception {
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
socket.setSoTimeout(TIMEOUT_MS);
NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
// 1. 先进行认证
IotDeviceMessage authResponse = authenticate(socket);
log.info("[testEventPost][认证响应: {}]", authResponse);
// 2.1 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 2. 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"alarm",
@@ -168,78 +182,77 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest {
.put("threshold", 40)
.put("current", 42)
.build(),
System.currentTimeMillis()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
System.currentTimeMillis()));
log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]");
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送请求
byte[] responseBytes = sendAndReceive(socket, payload);
// 3.2 解码响应
if (responseBytes != null) {
IotDeviceMessage response = CODEC.decode(responseBytes);
log.info("[testEventPost][响应消息: {}]", response);
} else {
log.warn("[testEventPost][未收到响应]");
}
// 3. 发送并接收响应
IotDeviceMessage response = sendAndReceive(socket, request);
log.info("[testEventPost][响应消息: {}]", response);
} finally {
socket.close();
}
}
// ===================== 辅助方法 =====================
/**
* 建立 TCP 连接
*
* @return 连接 Future
*/
private CompletableFuture<NetSocket> connect() {
CompletableFuture<NetSocket> future = new CompletableFuture<>();
netClient.connect(SERVER_PORT, SERVER_HOST)
.onSuccess(future::complete)
.onFailure(future::completeExceptionally);
return future;
}
/**
* 执行子设备认证
*
* @param socket TCP 连接
* @return 认证响应消息
*/
private IotDeviceMessage authenticate(Socket socket) throws Exception {
private IotDeviceMessage authenticate(NetSocket socket) throws Exception {
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
byte[] responseBytes = sendAndReceive(socket, payload);
if (responseBytes != null) {
return CODEC.decode(responseBytes);
}
return null;
IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authInfo);
return sendAndReceive(socket, request);
}
/**
* 发送 TCP 请求并接收响应
* 发送消息并接收响应(复用 IotTcpFrameCodec 编解码逻辑)
*
* @param socket TCP Socket
* @param payload 请求数据
* @return 响应数据
* @param socket TCP 连接
* @param request 请求消息
* @return 响应消息
*/
private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception {
// 1. 发送请求
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
out.write(payload);
out.flush();
// 2.1 等待一小段时间让服务器处理
Thread.sleep(100);
// 2.2 接收响应
byte[] buffer = new byte[4096];
try {
int length = in.read(buffer);
if (length > 0) {
byte[] response = new byte[length];
System.arraycopy(buffer, 0, response, 0, length);
return response;
private IotDeviceMessage sendAndReceive(NetSocket socket, IotDeviceMessage request) throws Exception {
// 1. 使用 FRAME_CODEC 创建解码器(复用 gateway 的拆包逻辑)
CompletableFuture<IotDeviceMessage> responseFuture = new CompletableFuture<>();
RecordParser parser = FRAME_CODEC.createDecodeParser(buffer -> {
try {
// 反序列化响应
IotDeviceMessage response = SERIALIZER.deserialize(buffer.getBytes());
responseFuture.complete(response);
} catch (Exception e) {
responseFuture.completeExceptionally(e);
}
return null;
} catch (java.net.SocketTimeoutException e) {
log.warn("[sendAndReceive][接收响应超时]");
return null;
}
});
socket.handler(parser);
// 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑)
byte[] serializedData = SERIALIZER.serialize(request);
Buffer frameData = FRAME_CODEC.encode(serializedData);
log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length());
// 2.2 发送请求
socket.write(frameData);
// 3. 等待响应
IotDeviceMessage response = responseFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
log.info("[sendAndReceive][收到响应,数据长度: {} 字节]",
SERIALIZER.serialize(response).length);
return response;
}
}