From 63d7bfe2d26ef9d971746edbf469877f325d42b2 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Mon, 26 Jan 2026 21:39:59 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91TCP=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=EF=BC=9A1=EF=BC=89=E5=A2=9E=E5=8A=A0=20regis?= =?UTF-8?q?ter=20=E5=8D=8F=E8=AE=AE=EF=BC=9B2=EF=BC=89=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=20gateway=20=E7=9B=B8=E5=85=B3=E7=9A=84=E5=8D=95=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tcp/IotTcpBinaryDeviceMessageCodec.java | 2 +- .../tcp/IotTcpJsonDeviceMessageCodec.java | 2 +- .../tcp/router/IotTcpUpstreamHandler.java | 113 +++++ ...irectDeviceTcpProtocolIntegrationTest.java | 42 ++ ...tewayDeviceTcpProtocolIntegrationTest.java | 389 ++++++++++++++++++ ...aySubDeviceTcpProtocolIntegrationTest.java | 238 +++++++++++ 6 files changed, 784 insertions(+), 2 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java index 4f42a8c2f6..05098cccbf 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java @@ -13,7 +13,7 @@ import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; /** - * TCP 二进制格式 {@link IotDeviceMessage} 编解码器 + * TCP/UDP 二进制格式 {@link IotDeviceMessage} 编解码器 *

* 二进制协议格式(所有数值使用大端序): * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java index 10ffbdf5c6..7d62ce2e0f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java @@ -11,7 +11,7 @@ import lombok.NoArgsConstructor; import org.springframework.stereotype.Component; /** - * TCP JSON 格式 {@link IotDeviceMessage} 编解码器 + * TCP/UDP JSON 格式 {@link IotDeviceMessage} 编解码器 * * 采用纯 JSON 格式传输,格式如下: * { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java index 58d7cde314..b4638a8261 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java @@ -10,7 +10,10 @@ import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; @@ -120,6 +123,9 @@ public class IotTcpUpstreamHandler implements Handler { if (AUTH_METHOD.equals(message.getMethod())) { // 认证请求 handleAuthenticationRequest(clientId, message, codecType, socket); + } else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) { + // 设备动态注册请求 + handleRegisterRequest(clientId, message, codecType, socket); } else { // 业务消息 handleBusinessRequest(clientId, message, codecType, socket); @@ -190,6 +196,44 @@ public class IotTcpUpstreamHandler implements Handler { } } + /** + * 处理设备动态注册请求(一型一密,不需要认证) + * + * @param clientId 客户端 ID + * @param message 消息信息 + * @param codecType 消息编解码类型 + * @param socket 网络连接 + * @see 阿里云 - 一型一密 + */ + private void handleRegisterRequest(String clientId, IotDeviceMessage message, String codecType, + NetSocket socket) { + try { + // 1. 解析注册参数 + IotDeviceRegisterReqDTO registerParams = parseRegisterParams(message.getParams()); + if (registerParams == null) { + log.warn("[handleRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId); + sendErrorResponse(socket, message.getRequestId(), "注册参数不完整", codecType); + return; + } + + // 2. 调用动态注册 + CommonResult result = deviceApi.registerDevice(registerParams); + if (result.isError()) { + log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg()); + sendErrorResponse(socket, message.getRequestId(), result.getMsg(), codecType); + return; + } + + // 3. 发送成功响应(包含 deviceSecret) + sendRegisterSuccessResponse(socket, message.getRequestId(), result.getData(), codecType); + log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]", + clientId, registerParams.getDeviceName()); + } catch (Exception e) { + log.error("[handleRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e); + sendErrorResponse(socket, message.getRequestId(), "注册处理异常", codecType); + } + } + /** * 处理业务请求 * @@ -405,4 +449,73 @@ public class IotTcpUpstreamHandler implements Handler { } } + /** + * 解析注册参数 + * + * @param params 参数对象(通常为 Map 类型) + * @return 注册参数 DTO,解析失败时返回 null + */ + @SuppressWarnings("unchecked") + private IotDeviceRegisterReqDTO parseRegisterParams(Object params) { + if (params == null) { + return null; + } + + try { + // 参数默认为 Map 类型,直接转换 + if (params instanceof java.util.Map) { + java.util.Map paramMap = (java.util.Map) params; + String productKey = MapUtil.getStr(paramMap, "productKey"); + String deviceName = MapUtil.getStr(paramMap, "deviceName"); + String productSecret = MapUtil.getStr(paramMap, "productSecret"); + if (StrUtil.hasBlank(productKey, deviceName, productSecret)) { + return null; + } + return new IotDeviceRegisterReqDTO() + .setProductKey(productKey) + .setDeviceName(deviceName) + .setProductSecret(productSecret); + } + + // 如果已经是目标类型,直接返回 + if (params instanceof IotDeviceRegisterReqDTO) { + return (IotDeviceRegisterReqDTO) params; + } + + // 其他情况尝试 JSON 转换 + String jsonStr = JsonUtils.toJsonString(params); + return JsonUtils.parseObject(jsonStr, IotDeviceRegisterReqDTO.class); + } catch (Exception e) { + log.error("[parseRegisterParams][解析注册参数({})失败]", params, e); + return null; + } + } + + /** + * 发送注册成功响应(包含 deviceSecret) + * + * @param socket 网络连接 + * @param requestId 请求 ID + * @param registerResp 注册响应 + * @param codecType 消息编解码类型 + */ + private void sendRegisterSuccessResponse(NetSocket socket, String requestId, + IotDeviceRegisterRespDTO registerResp, String codecType) { + try { + // 构建响应数据 + Object responseData = MapUtil.builder() + .put("success", true) + .put("deviceSecret", registerResp.getDeviceSecret()) + .put("message", "注册成功") + .build(); + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, + IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), responseData, 0, "注册成功"); + + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType); + socket.write(Buffer.buffer(encodedData)); + } catch (Exception e) { + log.error("[sendRegisterSuccessResponse][发送注册成功响应失败,requestId: {}]", requestId, e); + } + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java index ec939521af..5ac3cf2b52 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java @@ -6,6 +6,7 @@ import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; @@ -37,6 +38,7 @@ import java.net.Socket; *

  • 运行以下测试方法: * @@ -98,6 +100,46 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { } } + // ===================== 动态注册测试 ===================== + + /** + * 直连设备动态注册测试(一型一密) + *

    + * 使用产品密钥(productSecret)验证身份,成功后返回设备密钥(deviceSecret) + *

    + * 注意:此接口不需要认证 + */ + @Test + public void testDeviceRegister() throws Exception { + // 1.1 构建注册消息 + IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO(); + registerReqDTO.setProductKey(PRODUCT_KEY); + registerReqDTO.setDeviceName("test-tcp-" + System.currentTimeMillis()); + registerReqDTO.setProductSecret("test-product-secret"); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testDeviceRegister][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { + log.info("[testDeviceRegister][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); + } + + // 2.1 发送请求 + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testDeviceRegister][响应消息: {}]", response); + log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]"); + } else { + log.warn("[testDeviceRegister][未收到响应]"); + } + } + } + // ===================== 直连设备属性上报测试 ===================== /** diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java new file mode 100644 index 0000000000..22b2cb9f44 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java @@ -0,0 +1,389 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.HexUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * IoT 网关设备 TCP 协议集成测试(手动测试) + * + *

    测试场景:网关设备(IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 TCP 协议管理子设备拓扑关系 + * + *

    支持两种编解码格式: + *

    + * + *

    使用步骤: + *

      + *
    1. 启动 yudao-module-iot-gateway 服务(TCP 端口 8091)
    2. + *
    3. 修改 {@link #CODEC} 选择测试的编解码格式
    4. + *
    5. 运行以下测试方法: + *
        + *
      • {@link #testAuth()} - 网关设备认证
      • + *
      • {@link #testTopoAdd()} - 添加子设备拓扑关系
      • + *
      • {@link #testTopoDelete()} - 删除子设备拓扑关系
      • + *
      • {@link #testTopoGet()} - 获取子设备拓扑关系
      • + *
      • {@link #testSubDeviceRegister()} - 子设备动态注册
      • + *
      • {@link #testPropertyPackPost()} - 批量上报属性(网关 + 子设备)
      • + *
      + *
    6. + *
    + * + *

    注意:TCP 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotGatewayDeviceTcpProtocolIntegrationTest { + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 8091; + private static final int TIMEOUT_MS = 5000; + + // ===================== 编解码器选择(修改此处切换 JSON / Binary) ===================== + private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec(); +// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec(); + + // ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) ===================== + private static final String GATEWAY_PRODUCT_KEY = "m6XcS1ZJ3TW8eC0v"; + private static final String GATEWAY_DEVICE_NAME = "sub-ddd"; + private static final String GATEWAY_DEVICE_SECRET = "b3d62c70f8a4495487ed1d35d61ac2b3"; + + // ===================== 子设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== + private static final String SUB_DEVICE_PRODUCT_KEY = "jAufEMTF1W6wnPhn"; + private static final String SUB_DEVICE_NAME = "chazuo-it"; + private static final String SUB_DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af"; + + // ===================== 认证测试 ===================== + + /** + * 网关设备认证测试 + */ + @Test + public void testAuth() throws Exception { + // 1.1 构建认证消息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { + log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); + } + + // 2.1 发送请求 + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testAuth][响应消息: {}]", response); + } else { + log.warn("[testAuth][未收到响应]"); + } + } + } + + // ===================== 拓扑管理测试 ===================== + + /** + * 添加子设备拓扑关系测试 + */ + @Test + public void testTopoAdd() throws Exception { + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + + // 1. 先进行认证 + IotDeviceMessage authResponse = authenticate(socket); + log.info("[testTopoAdd][认证响应: {}]", authResponse); + + // 2.1 构建子设备认证信息 + IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo( + SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET); + IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO() + .setClientId(subAuthInfo.getClientId()) + .setUsername(subAuthInfo.getUsername()) + .setPassword(subAuthInfo.getPassword()); + // 2.2 构建请求参数 + IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO(); + params.setSubDevices(Collections.singletonList(subDeviceAuth)); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), + params, + null, null, null); + // 2.3 编码 + byte[] payload = CODEC.encode(request); + log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送请求 + byte[] responseBytes = sendAndReceive(socket, payload); + // 3.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testTopoAdd][响应消息: {}]", response); + } else { + log.warn("[testTopoAdd][未收到响应]"); + } + } + } + + /** + * 删除子设备拓扑关系测试 + */ + @Test + public void testTopoDelete() throws Exception { + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + + // 1. 先进行认证 + IotDeviceMessage authResponse = authenticate(socket); + log.info("[testTopoDelete][认证响应: {}]", authResponse); + + // 2.1 构建请求参数 + IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO(); + params.setSubDevices(Collections.singletonList( + new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), + params, + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送请求 + byte[] responseBytes = sendAndReceive(socket, payload); + // 3.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testTopoDelete][响应消息: {}]", response); + } else { + log.warn("[testTopoDelete][未收到响应]"); + } + } + } + + /** + * 获取子设备拓扑关系测试 + */ + @Test + public void testTopoGet() throws Exception { + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + + // 1. 先进行认证 + IotDeviceMessage authResponse = authenticate(socket); + log.info("[testTopoGet][认证响应: {}]", authResponse); + + // 2.1 构建请求参数 + IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO(); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), + params, + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送请求 + byte[] responseBytes = sendAndReceive(socket, payload); + // 3.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testTopoGet][响应消息: {}]", response); + } else { + log.warn("[testTopoGet][未收到响应]"); + } + } + } + + // ===================== 子设备注册测试 ===================== + + /** + * 子设备动态注册测试 + */ + @Test + public void testSubDeviceRegister() throws Exception { + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + + // 1. 先进行认证 + IotDeviceMessage authResponse = authenticate(socket); + log.info("[testSubDeviceRegister][认证响应: {}]", authResponse); + + // 2.1 构建请求参数 + IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO(); + subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY); + subDevice.setDeviceName("mougezishebei"); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(), + Collections.singletonList(subDevice), + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送请求 + byte[] responseBytes = sendAndReceive(socket, payload); + // 3.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testSubDeviceRegister][响应消息: {}]", response); + } else { + log.warn("[testSubDeviceRegister][未收到响应]"); + } + } + } + + // ===================== 批量上报测试 ===================== + + /** + * 批量上报属性测试(网关 + 子设备) + */ + @Test + public void testPropertyPackPost() throws Exception { + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + + // 1. 先进行认证 + IotDeviceMessage authResponse = authenticate(socket); + log.info("[testPropertyPackPost][认证响应: {}]", authResponse); + + // 2.1 构建【网关设备】自身属性 + Map gatewayProperties = MapUtil.builder() + .put("temperature", 25.5) + .build(); + // 2.2 构建【网关设备】自身事件 + IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); + gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build()); + gatewayEvent.setTime(System.currentTimeMillis()); + Map gatewayEvents = MapUtil.builder() + .put("statusReport", gatewayEvent) + .build(); + // 2.3 构建【网关子设备】属性 + Map subDeviceProperties = MapUtil.builder() + .put("power", 100) + .build(); + // 2.4 构建【网关子设备】事件 + IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); + subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build()); + subDeviceEvent.setTime(System.currentTimeMillis()); + Map subDeviceEvents = MapUtil.builder() + .put("healthCheck", subDeviceEvent) + .build(); + // 2.5 构建子设备数据 + IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData(); + subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)); + subDeviceData.setProperties(subDeviceProperties); + subDeviceData.setEvents(subDeviceEvents); + // 2.6 构建请求参数 + IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO(); + params.setProperties(gatewayProperties); + params.setEvents(gatewayEvents); + params.setSubDevices(List.of(subDeviceData)); + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), + params, + null, null, null); + // 2.7 编码 + byte[] payload = CODEC.encode(request); + log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送请求 + byte[] responseBytes = sendAndReceive(socket, payload); + // 3.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testPropertyPackPost][响应消息: {}]", response); + } else { + log.warn("[testPropertyPackPost][未收到响应]"); + } + } + } + + // ===================== 辅助方法 ===================== + + /** + * 执行网关设备认证 + */ + private IotDeviceMessage authenticate(Socket socket) throws Exception { + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + byte[] payload = CODEC.encode(request); + byte[] responseBytes = sendAndReceive(socket, payload); + if (responseBytes != null) { + return CODEC.decode(responseBytes); + } + return null; + } + + /** + * 发送 TCP 请求并接收响应 + */ + private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception { + // 1. 发送请求 + OutputStream out = socket.getOutputStream(); + InputStream in = socket.getInputStream(); + out.write(payload); + out.flush(); + + // 2.1 等待一小段时间让服务器处理 + Thread.sleep(100); + // 2.2 接收响应 + byte[] buffer = new byte[4096]; + try { + int length = in.read(buffer); + if (length > 0) { + byte[] response = new byte[length]; + System.arraycopy(buffer, 0, response, 0, length); + return response; + } + return null; + } catch (java.net.SocketTimeoutException e) { + log.warn("[sendAndReceive][接收响应超时]"); + return null; + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java new file mode 100644 index 0000000000..eb0cbb092d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java @@ -0,0 +1,238 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.HexUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +/** + * IoT 网关子设备 TCP 协议集成测试(手动测试) + * + *

    测试场景:子设备(IotProductDeviceTypeEnum 的 SUB 类型)通过网关设备代理上报数据 + * + *

    重要说明:子设备无法直接连接平台,所有请求均由网关设备(Gateway)代为转发。 + * + *

    支持两种编解码格式: + *

    + * + *

    使用步骤: + *

      + *
    1. 启动 yudao-module-iot-gateway 服务(TCP 端口 8091)
    2. + *
    3. 确保子设备已通过 {@link IotGatewayDeviceTcpProtocolIntegrationTest#testTopoAdd()} 绑定到网关
    4. + *
    5. 修改 {@link #CODEC} 选择测试的编解码格式
    6. + *
    7. 运行以下测试方法: + *
        + *
      • {@link #testAuth()} - 子设备认证
      • + *
      • {@link #testPropertyPost()} - 子设备属性上报(由网关代理转发)
      • + *
      • {@link #testEventPost()} - 子设备事件上报(由网关代理转发)
      • + *
      + *
    8. + *
    + * + *

    注意:TCP 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotGatewaySubDeviceTcpProtocolIntegrationTest { + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 8091; + private static final int TIMEOUT_MS = 5000; + + // ===================== 编解码器选择(修改此处切换 JSON / Binary) ===================== + private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec(); +// private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec(); + + // ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== + private static final String PRODUCT_KEY = "jAufEMTF1W6wnPhn"; + private static final String DEVICE_NAME = "chazuo-it"; + private static final String DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af"; + + // ===================== 认证测试 ===================== + + /** + * 子设备认证测试 + */ + @Test + public void testAuth() throws Exception { + // 1.1 构建认证消息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { + log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); + } + + // 2.1 发送请求 + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testAuth][响应消息: {}]", response); + } else { + log.warn("[testAuth][未收到响应]"); + } + } + } + + // ===================== 子设备属性上报测试 ===================== + + /** + * 子设备属性上报测试 + */ + @Test + public void testPropertyPost() throws Exception { + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + + // 1. 先进行认证 + IotDeviceMessage authResponse = authenticate(socket); + log.info("[testPropertyPost][认证响应: {}]", authResponse); + + // 2.1 构建属性上报消息 + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + IotDevicePropertyPostReqDTO.of(MapUtil.builder() + .put("power", 100) + .put("status", "online") + .put("temperature", 36.5) + .build()), + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]"); + log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送请求 + byte[] responseBytes = sendAndReceive(socket, payload); + // 3.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testPropertyPost][响应消息: {}]", response); + } else { + log.warn("[testPropertyPost][未收到响应]"); + } + } + } + + // ===================== 子设备事件上报测试 ===================== + + /** + * 子设备事件上报测试 + */ + @Test + public void testEventPost() throws Exception { + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + + // 1. 先进行认证 + IotDeviceMessage authResponse = authenticate(socket); + log.info("[testEventPost][认证响应: {}]", authResponse); + + // 2.1 构建事件上报消息 + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + IotDeviceEventPostReqDTO.of( + "alarm", + MapUtil.builder() + .put("level", "warning") + .put("message", "temperature too high") + .put("threshold", 40) + .put("current", 42) + .build(), + System.currentTimeMillis()), + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]"); + log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送请求 + byte[] responseBytes = sendAndReceive(socket, payload); + // 3.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testEventPost][响应消息: {}]", response); + } else { + log.warn("[testEventPost][未收到响应]"); + } + } + } + + // ===================== 辅助方法 ===================== + + /** + * 执行子设备认证 + */ + private IotDeviceMessage authenticate(Socket socket) throws Exception { + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + byte[] payload = CODEC.encode(request); + byte[] responseBytes = sendAndReceive(socket, payload); + if (responseBytes != null) { + return CODEC.decode(responseBytes); + } + return null; + } + + /** + * 发送 TCP 请求并接收响应 + */ + private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception { + // 1. 发送请求 + OutputStream out = socket.getOutputStream(); + InputStream in = socket.getInputStream(); + out.write(payload); + out.flush(); + + // 2.1 等待一小段时间让服务器处理 + Thread.sleep(100); + // 2.2 接收响应 + byte[] buffer = new byte[4096]; + try { + int length = in.read(buffer); + if (length > 0) { + byte[] response = new byte[length]; + System.arraycopy(buffer, 0, response, 0, length); + return response; + } + return null; + } catch (java.net.SocketTimeoutException e) { + log.warn("[sendAndReceive][接收响应超时]"); + return null; + } + } + +}