diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotDirectDeviceHttpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotDirectDeviceHttpProtocolIntegrationTest.java index 8dd36cc635..ea412a2079 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotDirectDeviceHttpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotDirectDeviceHttpProtocolIntegrationTest.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.IdUtil; import cn.hutool.http.HttpResponse; import cn.hutool.http.HttpUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; @@ -92,9 +91,7 @@ public class IotDirectDeviceHttpProtocolIntegrationTest { String url = String.format("http://%s:%d/topic/sys/%s/%s/thing/property/post", SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME); String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()) - .put("version", "1.0") .put("params", IotDevicePropertyPostReqDTO.of(MapUtil.builder() .put("width", 1) .put("height", "2") @@ -126,9 +123,7 @@ public class IotDirectDeviceHttpProtocolIntegrationTest { String url = String.format("http://%s:%d/topic/sys/%s/%s/thing/event/post", SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME); String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod()) - .put("version", "1.0") .put("params", IotDeviceEventPostReqDTO.of( "eat", MapUtil.builder().put("rice", 3).build(), @@ -163,10 +158,10 @@ public class IotDirectDeviceHttpProtocolIntegrationTest { // 1.1 构建请求 String url = String.format("http://%s:%d/auth/register/device", SERVER_HOST, SERVER_PORT); // 1.2 构建请求参数 - IotDeviceRegisterReqDTO reqDTO = new IotDeviceRegisterReqDTO(); - reqDTO.setProductKey(PRODUCT_KEY); - reqDTO.setDeviceName("test-" + System.currentTimeMillis()); - reqDTO.setProductSecret("test-product-secret"); + IotDeviceRegisterReqDTO reqDTO = new IotDeviceRegisterReqDTO() + .setProductKey(PRODUCT_KEY) + .setDeviceName("test-" + System.currentTimeMillis()) + .setProductSecret("test-product-secret"); String payload = JsonUtils.toJsonString(reqDTO); // 1.3 输出请求 log.info("[testDeviceRegister][请求 URL: {}]", url); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotGatewayDeviceHttpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotGatewayDeviceHttpProtocolIntegrationTest.java index 354c4d6858..779c588b76 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotGatewayDeviceHttpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotGatewayDeviceHttpProtocolIntegrationTest.java @@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.IdUtil; import cn.hutool.http.HttpResponse; import cn.hutool.http.HttpUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; @@ -20,7 +19,6 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.Collections; -import java.util.List; import java.util.Map; @@ -121,9 +119,7 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest { IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO(); params.setSubDevices(Collections.singletonList(subDeviceAuth)); String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.TOPO_ADD.getMethod()) - .put("version", "1.0") .put("params", params) .build()); // 1.4 输出请求 @@ -155,9 +151,7 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest { params.setSubDevices(Collections.singletonList( new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))); String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod()) - .put("version", "1.0") .put("params", params) .build()); // 1.3 输出请求 @@ -187,9 +181,7 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest { // 1.2 构建请求参数(目前为空,预留扩展) IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO(); String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.TOPO_GET.getMethod()) - .put("version", "1.0") .put("params", params) .build()); // 1.3 输出请求 @@ -208,8 +200,6 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest { // ===================== 子设备注册测试 ===================== - // TODO @芋艿:待测试 - /** * 子设备动态注册测试 *

@@ -227,9 +217,7 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest { subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY); subDevice.setDeviceName("mougezishebei"); String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod()) - .put("version", "1.0") .put("params", Collections.singletonList(subDevice)) .build()); // 1.3 输出请求 @@ -263,9 +251,9 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest { .put("temperature", 25.5) .build(); // 1.3 构建【网关设备】自身事件 - IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); - gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build()); - gatewayEvent.setTime(System.currentTimeMillis()); + IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue() + .setValue(MapUtil.builder().put("message", "gateway started").build()) + .setTime(System.currentTimeMillis()); Map gatewayEvents = MapUtil.builder() .put("statusReport", gatewayEvent) .build(); @@ -274,26 +262,24 @@ public class IotGatewayDeviceHttpProtocolIntegrationTest { .put("power", 100) .build(); // 1.5 构建【网关子设备】事件 - IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); - subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build()); - subDeviceEvent.setTime(System.currentTimeMillis()); + IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue() + .setValue(MapUtil.builder().put("errorCode", 0).build()) + .setTime(System.currentTimeMillis()); Map subDeviceEvents = MapUtil.builder() .put("healthCheck", subDeviceEvent) .build(); // 1.6 构建子设备数据 - IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData(); - subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)); - subDeviceData.setProperties(subDeviceProperties); - subDeviceData.setEvents(subDeviceEvents); + IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData() + .setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)) + .setProperties(subDeviceProperties) + .setEvents(subDeviceEvents); // 1.7 构建请求参数 IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO(); params.setProperties(gatewayProperties); params.setEvents(gatewayEvents); params.setSubDevices(ListUtil.of(subDeviceData)); String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod()) - .put("version", "1.0") .put("params", params) .build()); // 1.8 输出请求 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotGatewaySubDeviceHttpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotGatewaySubDeviceHttpProtocolIntegrationTest.java index cfebdbe3f8..f6b9399bcc 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotGatewaySubDeviceHttpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotGatewaySubDeviceHttpProtocolIntegrationTest.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.IdUtil; import cn.hutool.http.HttpResponse; import cn.hutool.http.HttpUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; @@ -94,9 +93,7 @@ public class IotGatewaySubDeviceHttpProtocolIntegrationTest { String url = String.format("http://%s:%d/topic/sys/%s/%s/thing/property/post", SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME); String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()) - .put("version", "1.0") .put("params", IotDevicePropertyPostReqDTO.of(MapUtil.builder() .put("power", 100) .put("status", "online") @@ -130,9 +127,7 @@ public class IotGatewaySubDeviceHttpProtocolIntegrationTest { String url = String.format("http://%s:%d/topic/sys/%s/%s/thing/event/post", SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME); String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod()) - .put("version", "1.0") .put("params", IotDeviceEventPostReqDTO.of( "alarm", MapUtil.builder() diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java index b386cd1455..29b751152b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java @@ -1,8 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.HexUtil; -import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -10,40 +8,32 @@ import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec; import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; -import cn.iocoder.yudao.module.iot.gateway.serialize.binary.IotBinarySerializer; import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * IoT 直连设备 TCP 协议集成测试(手动测试) * *

测试场景:直连设备(IotProductDeviceTypeEnum 的 DIRECT 类型)通过 TCP 协议直接连接平台 * - *

支持两种序列化格式: - *

- * - *

TCP 拆包配置(需与 application.yaml 中的 codec 配置一致): - *

- * *

使用步骤: *

    *
  1. 启动 yudao-module-iot-gateway 服务(TCP 端口 8091)
  2. - *
  3. 修改 {@link #SERIALIZER} 选择测试的序列化格式(Delimiter 模式只支持 JSON)
  4. *
  5. 运行以下测试方法: *
      *
    • {@link #testAuth()} - 设备认证
    • @@ -66,16 +56,25 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { private static final int SERVER_PORT = 8091; private static final int TIMEOUT_MS = 5000; - // TODO @AI:这里可以通过 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/codec 么?例如说:使用 vertx vertx tcp client???从而更好的复用解码逻辑; + private static Vertx vertx; + private static NetClient netClient; + + // ===================== 编解码器 ===================== + /** - * 分隔符(需与 application.yaml 中的 delimiter 配置一致) + * 消息序列化器 */ - private static final String DELIMITER = "\n"; - - // ===================== 序列化器选择(Delimiter 模式推荐使用 JSON) ===================== - private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer(); -// private static final IotMessageSerializer SERIALIZER = new IotBinarySerializer(); + + /** + * TCP 帧编解码器 + */ + private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodec.create( + new IotTcpConfig.CodecConfig() {{ + setType("delimiter"); + setDelimiter("\\n"); + }} + ); // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) ===================== @@ -83,6 +82,25 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { private static final String DEVICE_NAME = "small"; private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3"; + @BeforeAll + static void setUp() { + vertx = Vertx.vertx(); + NetClientOptions options = new NetClientOptions() + .setConnectTimeout(TIMEOUT_MS) + .setIdleTimeout(TIMEOUT_MS); + netClient = vertx.createNetClient(options); + } + + @AfterAll + static void tearDown() { + if (netClient != null) { + netClient.close(); + } + if (vertx != null) { + vertx.close(); + } + } + // ===================== 认证测试 ===================== /** @@ -90,29 +108,21 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - // 1.1 构建认证消息 + // 1. 构建认证消息 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() .setClientId(authInfo.getClientId()) .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO); - // 1.2 序列化 - // TODO @AI:是不是把 SERIALIZER 放到 sendAndReceive 里; - byte[] payload = SERIALIZER.serialize(request); - log.info("[testAuth][Serializer: {}, 请求消息: {}, 数据包长度: {} 字节]", SERIALIZER.getType(), request, payload.length); - // 2.1 发送请求 - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - byte[] responseBytes = sendAndReceive(socket, payload); - // 2.2 反序列化响应 - if (responseBytes != null) { - IotDeviceMessage response = SERIALIZER.deserialize(responseBytes); - log.info("[testAuth][响应消息: {}]", response); - } else { - log.warn("[testAuth][未收到响应]"); - } + // 2. 发送并接收响应 + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testAuth][响应消息: {}]", response); + } finally { + socket.close(); } } @@ -127,29 +137,22 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { */ @Test public void testDeviceRegister() throws Exception { - // 1.1 构建注册消息 - IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO(); - registerReqDTO.setProductKey(PRODUCT_KEY); - registerReqDTO.setDeviceName("test-tcp-" + System.currentTimeMillis()); - registerReqDTO.setProductSecret("test-product-secret"); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null); - // 1.2 序列化 - byte[] payload = SERIALIZER.serialize(request); - log.info("[testDeviceRegister][Serializer: {}, 请求消息: {}, 数据包长度: {} 字节]", SERIALIZER.getType(), request, payload.length); + // 1. 构建注册消息 + IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO() + .setProductKey(PRODUCT_KEY) + .setDeviceName("test-tcp-" + System.currentTimeMillis()) + .setProductSecret("test-product-secret"); + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO); - // 2.1 发送请求 - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - byte[] responseBytes = sendAndReceive(socket, payload); - // 2.2 反序列化响应 - if (responseBytes != null) { - IotDeviceMessage response = SERIALIZER.deserialize(responseBytes); - log.info("[testDeviceRegister][响应消息: {}]", response); - log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]"); - } else { - log.warn("[testDeviceRegister][未收到响应]"); - } + // 2. 发送并接收响应 + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testDeviceRegister][响应消息: {}]", response); + log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]"); + } finally { + socket.close(); } } @@ -160,35 +163,25 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { */ @Test public void testPropertyPost() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { // 1. 先进行认证 IotDeviceMessage authResponse = authenticate(socket); log.info("[testPropertyPost][认证响应: {}]", authResponse); - // 2.1 构建属性上报消息 - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), + // 2. 构建属性上报消息 + IotDeviceMessage request = IotDeviceMessage.requestOf( IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), IotDevicePropertyPostReqDTO.of(MapUtil.builder() .put("width", 1) .put("height", "2") - .build()), - null, null, null); - // 2.2 序列化 - byte[] payload = SERIALIZER.serialize(request); - log.info("[testPropertyPost][Serializer: {}, 请求消息: {}]", SERIALIZER.getType(), request); + .build())); - // 3.1 发送请求 - byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 反序列化响应 - if (responseBytes != null) { - IotDeviceMessage response = SERIALIZER.deserialize(responseBytes); - log.info("[testPropertyPost][响应消息: {}]", response); - } else { - log.warn("[testPropertyPost][未收到响应]"); - } + // 3. 发送并接收响应 + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testPropertyPost][响应消息: {}]", response); + } finally { + socket.close(); } } @@ -199,98 +192,87 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { */ @Test public void testEventPost() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { // 1. 先进行认证 IotDeviceMessage authResponse = authenticate(socket); log.info("[testEventPost][认证响应: {}]", authResponse); - // 2.1 构建事件上报消息 - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), + // 2. 构建事件上报消息 + IotDeviceMessage request = IotDeviceMessage.requestOf( IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), IotDeviceEventPostReqDTO.of( "eat", MapUtil.builder().put("rice", 3).build(), - System.currentTimeMillis()), - null, null, null); - // 2.2 序列化 - byte[] payload = SERIALIZER.serialize(request); - log.info("[testEventPost][Serializer: {}, 请求消息: {}]", SERIALIZER.getType(), request); + System.currentTimeMillis())); - // 3.1 发送请求 - byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 反序列化响应 - if (responseBytes != null) { - IotDeviceMessage response = SERIALIZER.deserialize(responseBytes); - log.info("[testEventPost][响应消息: {}]", response); - } else { - log.warn("[testEventPost][未收到响应]"); - } + // 3. 发送并接收响应 + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testEventPost][响应消息: {}]", response); + } finally { + socket.close(); } } // ===================== 辅助方法 ===================== + /** + * 建立 TCP 连接 + * + * @return 连接 Future + */ + private CompletableFuture connect() { + CompletableFuture future = new CompletableFuture<>(); + netClient.connect(SERVER_PORT, SERVER_HOST) + .onSuccess(future::complete) + .onFailure(future::completeExceptionally); + return future; + } + /** * 执行设备认证 * * @param socket TCP 连接 * @return 认证响应消息 */ - private IotDeviceMessage authenticate(Socket socket) throws Exception { + private IotDeviceMessage authenticate(NetSocket socket) throws Exception { IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - byte[] payload = SERIALIZER.serialize(request); - byte[] responseBytes = sendAndReceive(socket, payload); - if (responseBytes != null) { - log.info("[authenticate][响应数据长度: {} 字节,首字节: 0x{}, HEX: {}]", - responseBytes.length, - String.format("%02X", responseBytes[0]), - HexUtil.encodeHexStr(responseBytes)); - return SERIALIZER.deserialize(responseBytes); - } - return null; + IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authInfo); + return sendAndReceive(socket, request); } /** - * 发送 TCP 请求并接收响应(支持 Delimiter 分隔符协议) - *

      - * 发送格式:[消息体][分隔符] - * 接收格式:[消息体][分隔符] + * 发送消息并接收响应 * - * @param socket TCP Socket - * @param payload 请求数据(消息体,不含分隔符) - * @return 响应数据(消息体,不含分隔符) + * @param socket TCP 连接 + * @param request 请求消息 + * @return 响应消息 */ - private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception { - OutputStream out = socket.getOutputStream(); - BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); - - // 1. 发送请求(添加分隔符后缀) - out.write(payload); - out.write(DELIMITER.getBytes(StandardCharsets.UTF_8)); - out.flush(); - log.info("[sendAndReceive][发送数据: {} 字节(不含分隔符)]", payload.length); - - // 2. 接收响应(读取到分隔符为止) - try { - String responseLine = in.readLine(); - if (responseLine != null) { - byte[] response = responseLine.getBytes(StandardCharsets.UTF_8); - log.info("[sendAndReceive][接收数据: {} 字节]", response.length); - return response; + private IotDeviceMessage sendAndReceive(NetSocket socket, IotDeviceMessage request) throws Exception { + // 1. 使用 FRAME_CODEC 创建解码器 + CompletableFuture responseFuture = new CompletableFuture<>(); + RecordParser parser = FRAME_CODEC.createDecodeParser(buffer -> { + try { + // 反序列化响应 + IotDeviceMessage response = SERIALIZER.deserialize(buffer.getBytes()); + responseFuture.complete(response); + } catch (Exception e) { + responseFuture.completeExceptionally(e); } - return null; - } catch (java.net.SocketTimeoutException e) { - log.warn("[sendAndReceive][接收响应超时]"); - return null; - } + }); + socket.handler(parser); + + // 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑) + byte[] serializedData = SERIALIZER.serialize(request); + Buffer frameData = FRAME_CODEC.encode(serializedData); + log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length()); + // 2.2 发送请求 + socket.write(frameData); + + // 3. 等待响应 + IotDeviceMessage response = responseFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + log.info("[sendAndReceive][收到响应,数据长度: {} 字节]", SERIALIZER.serialize(response).length); + return response; } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java index b417ceb9fa..171bf12fcb 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java @@ -13,35 +13,34 @@ import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO; import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO; import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; import java.util.Collections; -import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * IoT 网关设备 TCP 协议集成测试(手动测试) * *

      测试场景:网关设备(IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 TCP 协议管理子设备拓扑关系 * - *

      支持两种编解码格式: - *

        - *
      • {@link IotTcpJsonDeviceMessageCodec} - JSON 格式
      • - *
      • {@link IotTcpBinaryDeviceMessageCodec} - 二进制格式
      • - *
      - * *

      使用步骤: *

        *
      1. 启动 yudao-module-iot-gateway 服务(TCP 端口 8091)
      2. - *
      3. 修改 {@link #CODEC} 选择测试的编解码格式
      4. *
      5. 运行以下测试方法: *
          *
        • {@link #testAuth()} - 网关设备认证
        • @@ -66,10 +65,25 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { private static final int SERVER_PORT = 8091; private static final int TIMEOUT_MS = 5000; - // ===================== 编解码器选择(修改此处切换 JSON / Binary) ===================== + private static Vertx vertx; + private static NetClient netClient; - private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec(); -// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec(); + // ===================== 编解码器 ===================== + + /** + * 消息序列化器 + */ + private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer(); + + /** + * TCP 帧编解码器 + */ + private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodec.create( + new IotTcpConfig.CodecConfig() {{ + setType("delimiter"); + setDelimiter("\\n"); + }} + ); // ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) ===================== @@ -83,6 +97,25 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { private static final String SUB_DEVICE_NAME = "chazuo-it"; private static final String SUB_DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af"; + @BeforeAll + static void setUp() { + vertx = Vertx.vertx(); + NetClientOptions options = new NetClientOptions() + .setConnectTimeout(TIMEOUT_MS) + .setIdleTimeout(TIMEOUT_MS); + netClient = vertx.createNetClient(options); + } + + @AfterAll + static void tearDown() { + if (netClient != null) { + netClient.close(); + } + if (vertx != null) { + vertx.close(); + } + } + // ===================== 认证测试 ===================== /** @@ -90,29 +123,22 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - // 1.1 构建认证消息 + // 1. 构建认证消息 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() .setClientId(authInfo.getClientId()) .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - // 1.2 编码 - byte[] payload = CODEC.encode(request); - log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO); - // 2.1 发送请求 - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - byte[] responseBytes = sendAndReceive(socket, payload); - // 2.2 解码响应 - if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); - log.info("[testAuth][响应消息: {}]", response); - } else { - log.warn("[testAuth][未收到响应]"); - } + // 2. 发送并接收响应 + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testAuth][响应消息: {}]", response); + } finally { + socket.close(); } } @@ -123,9 +149,8 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { */ @Test public void testTopoAdd() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { // 1. 先进行认证 IotDeviceMessage authResponse = authenticate(socket); log.info("[testTopoAdd][认证响应: {}]", authResponse); @@ -140,24 +165,16 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { // 2.2 构建请求参数 IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO(); params.setSubDevices(Collections.singletonList(subDeviceAuth)); - IotDeviceMessage request = IotDeviceMessage.of( + IotDeviceMessage request = IotDeviceMessage.requestOf( IdUtil.fastSimpleUUID(), IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), - params, - null, null, null); - // 2.3 编码 - byte[] payload = CODEC.encode(request); - log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request); + params); - // 3.1 发送请求 - byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 解码响应 - if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); - log.info("[testTopoAdd][响应消息: {}]", response); - } else { - log.warn("[testTopoAdd][未收到响应]"); - } + // 3. 发送并接收响应 + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testTopoAdd][响应消息: {}]", response); + } finally { + socket.close(); } } @@ -166,35 +183,25 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { */ @Test public void testTopoDelete() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { // 1. 先进行认证 IotDeviceMessage authResponse = authenticate(socket); log.info("[testTopoDelete][认证响应: {}]", authResponse); - // 2.1 构建请求参数 + // 2. 构建请求参数 IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO(); params.setSubDevices(Collections.singletonList( new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))); - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), + IotDeviceMessage request = IotDeviceMessage.requestOf( IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), - params, - null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); - log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request); + params); - // 3.1 发送请求 - byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 解码响应 - if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); - log.info("[testTopoDelete][响应消息: {}]", response); - } else { - log.warn("[testTopoDelete][未收到响应]"); - } + // 3. 发送并接收响应 + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testTopoDelete][响应消息: {}]", response); + } finally { + socket.close(); } } @@ -203,33 +210,23 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { */ @Test public void testTopoGet() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { // 1. 先进行认证 IotDeviceMessage authResponse = authenticate(socket); log.info("[testTopoGet][认证响应: {}]", authResponse); - // 2.1 构建请求参数 + // 2. 构建请求参数 IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO(); - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), + IotDeviceMessage request = IotDeviceMessage.requestOf( IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), - params, - null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); - log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request); + params); - // 3.1 发送请求 - byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 解码响应 - if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); - log.info("[testTopoGet][响应消息: {}]", response); - } else { - log.warn("[testTopoGet][未收到响应]"); - } + // 3. 发送并接收响应 + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testTopoGet][响应消息: {}]", response); + } finally { + socket.close(); } } @@ -240,35 +237,25 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { */ @Test public void testSubDeviceRegister() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { // 1. 先进行认证 IotDeviceMessage authResponse = authenticate(socket); log.info("[testSubDeviceRegister][认证响应: {}]", authResponse); - // 2.1 构建请求参数 - IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO(); - subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY); - subDevice.setDeviceName("mougezishebei"); - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), + // 2. 构建请求参数 + IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO() + .setProductKey(SUB_DEVICE_PRODUCT_KEY) + .setDeviceName("mougezishebei"); + IotDeviceMessage request = IotDeviceMessage.requestOf( IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(), - Collections.singletonList(subDevice), - null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); - log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request); + Collections.singletonList(subDevice)); - // 3.1 发送请求 - byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 解码响应 - if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); - log.info("[testSubDeviceRegister][响应消息: {}]", response); - } else { - log.warn("[testSubDeviceRegister][未收到响应]"); - } + // 3. 发送并接收响应 + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testSubDeviceRegister][响应消息: {}]", response); + } finally { + socket.close(); } } @@ -279,9 +266,8 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { */ @Test public void testPropertyPackPost() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { // 1. 先进行认证 IotDeviceMessage authResponse = authenticate(socket); log.info("[testPropertyPackPost][认证响应: {}]", authResponse); @@ -291,9 +277,9 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { .put("temperature", 25.5) .build(); // 2.2 构建【网关设备】自身事件 - IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); - gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build()); - gatewayEvent.setTime(System.currentTimeMillis()); + IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue() + .setValue(MapUtil.builder().put("message", "gateway started").build()) + .setTime(System.currentTimeMillis()); Map gatewayEvents = MapUtil.builder() .put("statusReport", gatewayEvent) .build(); @@ -302,97 +288,95 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { .put("power", 100) .build(); // 2.4 构建【网关子设备】事件 - IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); - subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build()); - subDeviceEvent.setTime(System.currentTimeMillis()); + IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue() + .setValue(MapUtil.builder().put("errorCode", 0).build()) + .setTime(System.currentTimeMillis()); Map subDeviceEvents = MapUtil.builder() .put("healthCheck", subDeviceEvent) .build(); // 2.5 构建子设备数据 - IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData(); - subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)); - subDeviceData.setProperties(subDeviceProperties); - subDeviceData.setEvents(subDeviceEvents); + IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData() + .setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)) + .setProperties(subDeviceProperties) + .setEvents(subDeviceEvents); // 2.6 构建请求参数 IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO(); params.setProperties(gatewayProperties); params.setEvents(gatewayEvents); params.setSubDevices(ListUtil.of(subDeviceData)); - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), + IotDeviceMessage request = IotDeviceMessage.requestOf( IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), - params, - null, null, null); - // 2.7 编码 - byte[] payload = CODEC.encode(request); - log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + params); - // 3.1 发送请求 - byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 解码响应 - if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); - log.info("[testPropertyPackPost][响应消息: {}]", response); - } else { - log.warn("[testPropertyPackPost][未收到响应]"); - } + // 3. 发送并接收响应 + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testPropertyPackPost][响应消息: {}]", response); + } finally { + socket.close(); } } // ===================== 辅助方法 ===================== + /** + * 建立 TCP 连接 + * + * @return 连接 Future + */ + private CompletableFuture connect() { + CompletableFuture future = new CompletableFuture<>(); + netClient.connect(SERVER_PORT, SERVER_HOST) + .onSuccess(future::complete) + .onFailure(future::completeExceptionally); + return future; + } + /** * 执行网关设备认证 * * @param socket TCP 连接 * @return 认证响应消息 */ - private IotDeviceMessage authenticate(Socket socket) throws Exception { + private IotDeviceMessage authenticate(NetSocket socket) throws Exception { IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - byte[] payload = CODEC.encode(request); - byte[] responseBytes = sendAndReceive(socket, payload); - if (responseBytes != null) { - return CODEC.decode(responseBytes); - } - return null; + IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authInfo); + return sendAndReceive(socket, request); } /** - * 发送 TCP 请求并接收响应 + * 发送消息并接收响应(复用 IotTcpFrameCodec 编解码逻辑) * - * @param socket TCP Socket - * @param payload 请求数据 - * @return 响应数据 + * @param socket TCP 连接 + * @param request 请求消息 + * @return 响应消息 */ - private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception { - // 1. 发送请求 - OutputStream out = socket.getOutputStream(); - InputStream in = socket.getInputStream(); - out.write(payload); - out.flush(); - - // 2.1 等待一小段时间让服务器处理 - Thread.sleep(100); - // 2.2 接收响应 - byte[] buffer = new byte[4096]; - try { - int length = in.read(buffer); - if (length > 0) { - byte[] response = new byte[length]; - System.arraycopy(buffer, 0, response, 0, length); - return response; + private IotDeviceMessage sendAndReceive(NetSocket socket, IotDeviceMessage request) throws Exception { + // 1. 使用 FRAME_CODEC 创建解码器(复用 gateway 的拆包逻辑) + CompletableFuture responseFuture = new CompletableFuture<>(); + RecordParser parser = FRAME_CODEC.createDecodeParser(buffer -> { + try { + // 反序列化响应 + IotDeviceMessage response = SERIALIZER.deserialize(buffer.getBytes()); + responseFuture.complete(response); + } catch (Exception e) { + responseFuture.completeExceptionally(e); } - return null; - } catch (java.net.SocketTimeoutException e) { - log.warn("[sendAndReceive][接收响应超时]"); - return null; - } + }); + socket.handler(parser); + + // 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑) + byte[] serializedData = SERIALIZER.serialize(request); + Buffer frameData = FRAME_CODEC.encode(serializedData); + log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length()); + // 2.2 发送请求 + socket.write(frameData); + + // 3. 等待响应 + IotDeviceMessage response = responseFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + log.info("[sendAndReceive][收到响应,数据长度: {} 字节]", + SERIALIZER.serialize(response).length); + return response; } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java index c918b474c3..4354313e1a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java @@ -1,23 +1,29 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * IoT 网关子设备 TCP 协议集成测试(手动测试) @@ -26,17 +32,10 @@ import java.net.Socket; * *

          重要说明:子设备无法直接连接平台,所有请求均由网关设备(Gateway)代为转发。 * - *

          支持两种编解码格式: - *

            - *
          • {@link IotTcpJsonDeviceMessageCodec} - JSON 格式
          • - *
          • {@link IotTcpBinaryDeviceMessageCodec} - 二进制格式
          • - *
          - * *

          使用步骤: *

            *
          1. 启动 yudao-module-iot-gateway 服务(TCP 端口 8091)
          2. *
          3. 确保子设备已通过 {@link IotGatewayDeviceTcpProtocolIntegrationTest#testTopoAdd()} 绑定到网关
          4. - *
          5. 修改 {@link #CODEC} 选择测试的编解码格式
          6. *
          7. 运行以下测试方法: *
              *
            • {@link #testAuth()} - 子设备认证
            • @@ -58,10 +57,25 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { private static final int SERVER_PORT = 8091; private static final int TIMEOUT_MS = 5000; - // ===================== 编解码器选择(修改此处切换 JSON / Binary) ===================== + private static Vertx vertx; + private static NetClient netClient; - private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec(); -// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec(); + // ===================== 编解码器 ===================== + + /** + * 消息序列化器 + */ + private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer(); + + /** + * TCP 帧编解码器 + */ + private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodec.create( + new IotTcpConfig.CodecConfig() {{ + setType("delimiter"); + setDelimiter("\\n"); + }} + ); // ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== @@ -69,6 +83,25 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { private static final String DEVICE_NAME = "chazuo-it"; private static final String DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af"; + @BeforeAll + static void setUp() { + vertx = Vertx.vertx(); + NetClientOptions options = new NetClientOptions() + .setConnectTimeout(TIMEOUT_MS) + .setIdleTimeout(TIMEOUT_MS); + netClient = vertx.createNetClient(options); + } + + @AfterAll + static void tearDown() { + if (netClient != null) { + netClient.close(); + } + if (vertx != null) { + vertx.close(); + } + } + // ===================== 认证测试 ===================== /** @@ -76,28 +109,21 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - // 1.1 构建认证消息 + // 1. 构建认证消息 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() .setClientId(authInfo.getClientId()) .setUsername(authInfo.getUsername()) .setPassword(authInfo.getPassword()); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - // 1.2 编码 - byte[] payload = CODEC.encode(request); - log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authReqDTO); - // 2.1 发送请求 - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - byte[] responseBytes = sendAndReceive(socket, payload); - // 2.2 解码响应 - if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); - log.info("[testAuth][响应消息: {}]", response); - } else { - log.warn("[testAuth][未收到响应]"); - } + // 2. 发送并接收响应 + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testAuth][响应消息: {}]", response); + } finally { + socket.close(); } } @@ -108,37 +134,27 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { */ @Test public void testPropertyPost() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { // 1. 先进行认证 IotDeviceMessage authResponse = authenticate(socket); log.info("[testPropertyPost][认证响应: {}]", authResponse); - // 2.1 构建属性上报消息 - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), + // 2. 构建属性上报消息 + IotDeviceMessage request = IotDeviceMessage.requestOf( IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), IotDevicePropertyPostReqDTO.of(MapUtil.builder() .put("power", 100) .put("status", "online") .put("temperature", 36.5) - .build()), - null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); + .build())); log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]"); - log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); - // 3.1 发送请求 - byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 解码响应 - if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); - log.info("[testPropertyPost][响应消息: {}]", response); - } else { - log.warn("[testPropertyPost][未收到响应]"); - } + // 3. 发送并接收响应 + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testPropertyPost][响应消息: {}]", response); + } finally { + socket.close(); } } @@ -149,16 +165,14 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { */ @Test public void testEventPost() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - + NetSocket socket = connect().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + try { // 1. 先进行认证 IotDeviceMessage authResponse = authenticate(socket); log.info("[testEventPost][认证响应: {}]", authResponse); - // 2.1 构建事件上报消息 - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), + // 2. 构建事件上报消息 + IotDeviceMessage request = IotDeviceMessage.requestOf( IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), IotDeviceEventPostReqDTO.of( "alarm", @@ -168,78 +182,77 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { .put("threshold", 40) .put("current", 42) .build(), - System.currentTimeMillis()), - null, null, null); - // 2.2 编码 - byte[] payload = CODEC.encode(request); + System.currentTimeMillis())); log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]"); - log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); - // 3.1 发送请求 - byte[] responseBytes = sendAndReceive(socket, payload); - // 3.2 解码响应 - if (responseBytes != null) { - IotDeviceMessage response = CODEC.decode(responseBytes); - log.info("[testEventPost][响应消息: {}]", response); - } else { - log.warn("[testEventPost][未收到响应]"); - } + // 3. 发送并接收响应 + IotDeviceMessage response = sendAndReceive(socket, request); + log.info("[testEventPost][响应消息: {}]", response); + } finally { + socket.close(); } } // ===================== 辅助方法 ===================== + /** + * 建立 TCP 连接 + * + * @return 连接 Future + */ + private CompletableFuture connect() { + CompletableFuture future = new CompletableFuture<>(); + netClient.connect(SERVER_PORT, SERVER_HOST) + .onSuccess(future::complete) + .onFailure(future::completeExceptionally); + return future; + } + /** * 执行子设备认证 * * @param socket TCP 连接 * @return 认证响应消息 */ - private IotDeviceMessage authenticate(Socket socket) throws Exception { + private IotDeviceMessage authenticate(NetSocket socket) throws Exception { IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); - byte[] payload = CODEC.encode(request); - byte[] responseBytes = sendAndReceive(socket, payload); - if (responseBytes != null) { - return CODEC.decode(responseBytes); - } - return null; + IotDeviceMessage request = IotDeviceMessage.requestOf("auth", authInfo); + return sendAndReceive(socket, request); } /** - * 发送 TCP 请求并接收响应 + * 发送消息并接收响应(复用 IotTcpFrameCodec 编解码逻辑) * - * @param socket TCP Socket - * @param payload 请求数据 - * @return 响应数据 + * @param socket TCP 连接 + * @param request 请求消息 + * @return 响应消息 */ - private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception { - // 1. 发送请求 - OutputStream out = socket.getOutputStream(); - InputStream in = socket.getInputStream(); - out.write(payload); - out.flush(); - - // 2.1 等待一小段时间让服务器处理 - Thread.sleep(100); - // 2.2 接收响应 - byte[] buffer = new byte[4096]; - try { - int length = in.read(buffer); - if (length > 0) { - byte[] response = new byte[length]; - System.arraycopy(buffer, 0, response, 0, length); - return response; + private IotDeviceMessage sendAndReceive(NetSocket socket, IotDeviceMessage request) throws Exception { + // 1. 使用 FRAME_CODEC 创建解码器(复用 gateway 的拆包逻辑) + CompletableFuture responseFuture = new CompletableFuture<>(); + RecordParser parser = FRAME_CODEC.createDecodeParser(buffer -> { + try { + // 反序列化响应 + IotDeviceMessage response = SERIALIZER.deserialize(buffer.getBytes()); + responseFuture.complete(response); + } catch (Exception e) { + responseFuture.completeExceptionally(e); } - return null; - } catch (java.net.SocketTimeoutException e) { - log.warn("[sendAndReceive][接收响应超时]"); - return null; - } + }); + socket.handler(parser); + + // 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑) + byte[] serializedData = SERIALIZER.serialize(request); + Buffer frameData = FRAME_CODEC.encode(serializedData); + log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length()); + // 2.2 发送请求 + socket.write(frameData); + + // 3. 等待响应 + IotDeviceMessage response = responseFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + log.info("[sendAndReceive][收到响应,数据长度: {} 字节]", + SERIALIZER.serialize(response).length); + return response; } }