feat(iot): 完善 WebSocket 协议实现,优化代码质量

1. 配置属性校验完善
   - CoAP 配置添加 @NotNull 校验注解,替换 TODO 注释

2. WebSocket 协议核心优化
   - ConnectionInfo 新增 codecType 字段,支持动态编解码类型
   - 上行/下行处理器根据连接的 codecType 进行消息编解码
   - 使用 StrUtil.utf8Str/utf8Bytes 替换 StandardCharsets 硬编码

3. 包注释完善
   - http/tcp package-info.java 添加规范的包级注释

4. 单元测试重构
   - 使用 WebSocketClient.connect() 替换废弃的 HttpClient.webSocket()
   - 提取公共方法,简化测试代码结构
This commit is contained in:
YunaiV
2026-01-27 21:09:00 +08:00
parent 610ae6d532
commit 0072482af8
10 changed files with 662 additions and 592 deletions

View File

@@ -568,25 +568,25 @@ public class IotGatewayProperties {
/**
* 服务端口CoAP 默认端口 5683
*/
// TODO @AI默认不为空
@NotNull(message = "服务端口不能为空")
private Integer port = 5683;
/**
* 最大消息大小(字节)
*/
// TODO @AI默认不为空
@NotNull(message = "最大消息大小不能为空")
private Integer maxMessageSize = 1024;
/**
* ACK 超时时间(毫秒)
*/
// TODO @AI默认不为空
@NotNull(message = "ACK 超时时间不能为空")
private Integer ackTimeout = 2000;
/**
* 最大重传次数
*/
// TODO @AI默认不为空
@NotNull(message = "最大重传次数不能为空")
private Integer maxRetransmit = 4;
}

View File

@@ -1,2 +1,6 @@
// TODO @AI参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java 完善注释;
package cn.iocoder.yudao.module.iot.gateway.protocol.http;
/**
* HTTP 协议实现包
* <p>
* 提供基于 Vert.x HTTP Server 的 IoT 设备连接和消息处理功能
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.http;

View File

@@ -1,2 +1,6 @@
// TODO @AI参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
/**
* TCP 协议实现包
* <p>
* 提供基于 Vert.x TCP Server 的 IoT 设备连接和消息处理功能
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;

View File

@@ -53,6 +53,7 @@ public class IotWebSocketUpstreamProtocol {
}
@PostConstruct
@SuppressWarnings("deprecation")
public void start() {
// 1.1 创建服务器选项
HttpServerOptions options = new HttpServerOptions()
@@ -73,15 +74,14 @@ public class IotWebSocketUpstreamProtocol {
httpServer.webSocketHandler(socket -> {
// 验证路径
if (ObjUtil.notEqual(wsProperties.getPath(), socket.path())) {
log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]", socket.path(), wsProperties.getPath());
// TODO @AI已经被废弃看看换什么其他方法
log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]",
socket.path(), wsProperties.getPath());
socket.reject();
return;
}
// 创建上行处理器
IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler(
this, messageService, deviceService, connectionManager);
IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler(this,
messageService, deviceService, connectionManager);
handler.handle(socket);
});

View File

@@ -139,8 +139,10 @@ public class IotWebSocketConnectionManager {
* 客户端 ID
*/
private String clientId;
// TODO @AI增加有个 codecType 字段;后续可以使用,参考 tcp、udp然后下行的时候也基于这个 codeType 去获取;
/**
* 消息编解码类型(认证后确定)
*/
private String codecType;
}

View File

@@ -1,14 +1,12 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* IoT 网关 WebSocket 下行消息处理器
*
@@ -18,9 +16,6 @@ import java.nio.charset.StandardCharsets;
@RequiredArgsConstructor
public class IotWebSocketDownstreamHandler {
// TODO @芋艿codeType 的处理;
private static final String CODEC_TYPE = IotWebSocketJsonDeviceMessageCodec.TYPE;
private final IotDeviceMessageService deviceMessageService;
private final IotWebSocketConnectionManager connectionManager;
@@ -42,8 +37,8 @@ public class IotWebSocketDownstreamHandler {
}
// 2. 编码消息并发送到设备
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, CODEC_TYPE);
String jsonMessage = new String(bytes, StandardCharsets.UTF_8);
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getCodecType());
String jsonMessage = StrUtil.utf8Str(bytes);
boolean success = connectionManager.sendToDevice(message.getDeviceId(), jsonMessage);
if (success) {
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",

View File

@@ -25,7 +25,6 @@ import io.vertx.core.Handler;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* WebSocket 上行消息处理器
@@ -35,7 +34,9 @@ import java.nio.charset.StandardCharsets;
@Slf4j
public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
// TODO @芋艿codeType 的处理;
/**
* 默认消息编解码类型
*/
private static final String CODEC_TYPE = IotWebSocketJsonDeviceMessageCodec.TYPE;
private static final String AUTH_METHOD = "auth";
@@ -63,13 +64,10 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
@Override
public void handle(ServerWebSocket socket) {
// 1. 接受 WebSocket 连接
String clientId = IdUtil.simpleUUID();
log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
// TODO @AI这个方法已经废弃看看有没其他替换的
socket.accept();
// 2.1 设置异常和关闭处理器
// 1. 设置异常和关闭处理器
socket.exceptionHandler(ex -> {
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
cleanupConnection(socket);
@@ -79,7 +77,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
cleanupConnection(socket);
});
// 2.2 设置文本消息处理器
// 2. 设置文本消息处理器
socket.textMessageHandler(message -> {
try {
processMessage(clientId, message, socket);
@@ -105,12 +103,13 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
if (StrUtil.isBlank(message)) {
return;
}
// 1.2 解码消息
// TODO @AI应该只有初始使用 CODEC_TYPE 解析,后续基于
// 1.2 解码消息(已认证连接使用其 codecType未认证连接使用默认 CODEC_TYPE
IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
String codecType = connectionInfo != null ? connectionInfo.getCodecType() : CODEC_TYPE;
IotDeviceMessage deviceMessage;
try {
deviceMessage = deviceMessageService.decodeDeviceMessage(
message.getBytes(StandardCharsets.UTF_8), CODEC_TYPE);
StrUtil.utf8Bytes(message), codecType);
if (deviceMessage == null) {
throw new Exception("解码后消息为空");
}
@@ -269,7 +268,8 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setClientId(clientId);
.setClientId(clientId)
.setCodecType(CODEC_TYPE);
// 注册连接
connectionManager.registerConnection(socket, device.getId(), connectionInfo);
}
@@ -330,7 +330,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, code, message);
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE);
socket.writeTextMessage(new String(encodedData, StandardCharsets.UTF_8));
socket.writeTextMessage(StrUtil.utf8Str(encodedData));
} catch (Exception e) {
log.error("[sendResponse][发送响应失败requestId: {}]", requestId, e);
}
@@ -472,7 +472,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null);
// 2. 发送响应
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE);
socket.writeTextMessage(new String(encodedData, StandardCharsets.UTF_8));
socket.writeTextMessage(StrUtil.utf8Str(encodedData));
} catch (Exception e) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应失败requestId: {}]", requestId, e);
}

View File

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -12,15 +13,14 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketClient;
import io.vertx.core.http.WebSocketConnectOptions;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -81,119 +81,83 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
// ===================== 认证测试 =====================
// TODO @AI参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java 或 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java 类,优化代码结构
/**
* 认证测试:获取设备 Token
*/
@Test
public void testAuth() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
// 1.1 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 1. 创建 WebSocket 连接
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
// 2.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testAuth][WebSocket 连接成功]");
// TODO @AI这里有告警Deprecate /instead use WebSocketClient.connect(WebSocketConnectOptions)
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testAuth][WebSocket 连接成功]");
// 2.2 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 设置消息处理器
ws.textMessageHandler(message -> {
log.info("[testAuth][收到响应: {}]", message);
responseRef.set(message);
ws.close();
latch.countDown();
});
// 2. 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 3. 编码并发送
byte[] payload = CODEC.encode(request);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testAuth][发送认证请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testAuth][WebSocket 连接失败]", ar.cause());
latch.countDown();
}
});
// 4. 等待响应
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (completed && responseRef.get() != null) {
IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testAuth][解码响应: {}]", response);
// 3. 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testAuth][响应消息: {}]", responseMessage);
} else {
log.warn("[testAuth][测试超时或未收到响应]");
log.warn("[testAuth][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 动态注册测试 =====================
/**
* 直连设备动态注册测试(一型一密)
* <p>
* 使用产品密钥productSecret验证身份成功后返回设备密钥deviceSecret
* <p>
* 注意:此接口不需要认证
*/
@Test
public void testDeviceRegister() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
// 1.1 构建注册消息
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO();
registerReqDTO.setProductKey(PRODUCT_KEY);
registerReqDTO.setDeviceName("test-ws-" + System.currentTimeMillis());
registerReqDTO.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request);
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
// 2.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testDeviceRegister][WebSocket 连接成功]");
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testDeviceRegister][WebSocket 连接成功]");
// 2.2 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
ws.textMessageHandler(message -> {
log.info("[testDeviceRegister][收到响应: {}]", message);
responseRef.set(message);
ws.close();
latch.countDown();
});
// 构建注册消息
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO();
registerReqDTO.setProductKey(PRODUCT_KEY);
registerReqDTO.setDeviceName("test-ws-" + System.currentTimeMillis());
registerReqDTO.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testDeviceRegister][发送注册请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testDeviceRegister][WebSocket 连接失败]", ar.cause());
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (completed && responseRef.get() != null) {
IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testDeviceRegister][解码响应: {}]", response);
// 3. 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testDeviceRegister][响应消息: {}]", responseMessage);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
} else {
log.warn("[testDeviceRegister][测试超时或未收到响应]");
log.warn("[testDeviceRegister][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 直连设备属性上报测试 =====================
@@ -203,82 +167,40 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
CountDownLatch latch = new CountDownLatch(2); // 认证 + 属性上报
AtomicReference<String> authResponseRef = new AtomicReference<>();
AtomicReference<String> propertyResponseRef = new AtomicReference<>();
// 1.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testPropertyPost][WebSocket 连接成功]");
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
// 1.2 先进行认证
IotDeviceMessage authResponse = authenticate(ws);
log.info("[testPropertyPost][认证响应: {}]", authResponse);
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testPropertyPost][WebSocket 连接成功]");
// 2.1 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
.build()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
final boolean[] authenticated = {false};
ws.textMessageHandler(message -> {
log.info("[testPropertyPost][收到响应: {}]", message);
if (!authenticated[0]) {
authResponseRef.set(message);
authenticated[0] = true;
latch.countDown();
// 认证成功后发送属性上报
IotDeviceMessage propertyRequest = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
.build()),
null, null, null);
byte[] payload = CODEC.encode(propertyRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testPropertyPost][发送属性上报请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
propertyResponseRef.set(message);
ws.close();
latch.countDown();
}
});
// 先发送认证请求
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(authRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testPropertyPost][发送认证请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testPropertyPost][WebSocket 连接失败]", ar.cause());
latch.countDown();
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
if (completed) {
if (authResponseRef.get() != null) {
IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testPropertyPost][认证响应: {}]", authResponse);
}
if (propertyResponseRef.get() != null) {
IotDeviceMessage propertyResponse = CODEC.decode(propertyResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testPropertyPost][属性上报响应: {}]", propertyResponse);
}
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testPropertyPost][响应消息: {}]", responseMessage);
} else {
log.warn("[testPropertyPost][测试超时]");
log.warn("[testPropertyPost][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 直连设备事件上报测试 =====================
@@ -288,82 +210,111 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testEventPost() throws Exception {
CountDownLatch latch = new CountDownLatch(2); // 认证 + 事件上报
AtomicReference<String> authResponseRef = new AtomicReference<>();
AtomicReference<String> eventResponseRef = new AtomicReference<>();
// 1.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testEventPost][WebSocket 连接成功]");
HttpClient client = vertx.createHttpClient();
// 1.2 先进行认证
IotDeviceMessage authResponse = authenticate(ws);
log.info("[testEventPost][认证响应: {}]", authResponse);
// 2.1 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testEventPost][响应消息: {}]", responseMessage);
} else {
log.warn("[testEventPost][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 辅助方法 =====================
/**
* 创建 WebSocket 连接(同步)
*
* @return WebSocket 连接
*/
private WebSocket createWebSocketConnection() throws Exception {
WebSocketClient wsClient = vertx.createWebSocketClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
return wsClient.connect(options).toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testEventPost][WebSocket 连接成功]");
/**
* 发送消息并等待响应(同步)
*
* @param ws WebSocket 连接
* @param message 请求消息
* @return 响应消息
*/
public static String sendAndReceive(WebSocket ws, String message) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
final boolean[] authenticated = {false};
ws.textMessageHandler(message -> {
log.info("[testEventPost][收到响应: {}]", message);
if (!authenticated[0]) {
authResponseRef.set(message);
authenticated[0] = true;
latch.countDown();
// 认证成功后发送事件上报
IotDeviceMessage eventRequest = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis()),
null, null, null);
byte[] payload = CODEC.encode(eventRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testEventPost][发送事件上报请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
eventResponseRef.set(message);
ws.close();
latch.countDown();
}
});
// 先发送认证请求
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(authRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testEventPost][发送认证请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testEventPost][WebSocket 连接失败]", ar.cause());
latch.countDown();
latch.countDown();
}
// 设置消息处理器
ws.textMessageHandler(response -> {
log.info("[sendAndReceive][收到响应: {}]", response);
responseRef.set(response);
latch.countDown();
});
boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
if (completed) {
if (authResponseRef.get() != null) {
IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testEventPost][认证响应: {}]", authResponse);
}
if (eventResponseRef.get() != null) {
IotDeviceMessage eventResponse = CODEC.decode(eventResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testEventPost][事件上报响应: {}]", eventResponse);
}
} else {
log.warn("[testEventPost][测试超时]");
// 发送请求
log.info("[sendAndReceive][发送请求: {}]", message);
ws.writeTextMessage(message);
// 等待响应
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!completed) {
log.warn("[sendAndReceive][等待响应超时]");
}
return responseRef.get();
}
/**
* 执行设备认证(同步)
*
* @param ws WebSocket 连接
* @return 认证响应消息
*/
private IotDeviceMessage authenticate(WebSocket ws) throws Exception {
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[authenticate][发送认证请求: {}]", jsonMessage);
String response = sendAndReceive(ws, jsonMessage);
if (response != null) {
return CODEC.decode(StrUtil.utf8Bytes(response));
}
return null;
}
}

View File

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -15,15 +16,14 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketClient;
import io.vertx.core.http.WebSocketConnectOptions;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -100,53 +100,36 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
// 1.1 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request);
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
// 2.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testAuth][WebSocket 连接成功]");
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testAuth][WebSocket 连接成功]");
// 2.2 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
ws.textMessageHandler(message -> {
log.info("[testAuth][收到响应: {}]", message);
responseRef.set(message);
ws.close();
latch.countDown();
});
// 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testAuth][发送认证请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testAuth][WebSocket 连接失败]", ar.cause());
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (completed && responseRef.get() != null) {
IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testAuth][解码响应: {}]", response);
// 3. 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testAuth][响应消息: {}]", responseMessage);
} else {
log.warn("[testAuth][测试超时或未收到响应]");
log.warn("[testAuth][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 拓扑管理测试 =====================
@@ -156,23 +139,46 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testTopoAdd() throws Exception {
executeAuthenticatedRequest("testTopoAdd", ws -> {
// 构建子设备认证信息
IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo(
SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET);
IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO()
.setClientId(subAuthInfo.getClientId())
.setUsername(subAuthInfo.getUsername())
.setPassword(subAuthInfo.getPassword());
// 构建请求参数
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO();
params.setSubDevices(Collections.singletonList(subDeviceAuth));
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(),
params,
null, null, null);
});
// 1.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testTopoAdd][WebSocket 连接成功]");
// 1.2 先进行认证
IotDeviceMessage authResponse = authenticate(ws);
log.info("[testTopoAdd][认证响应: {}]", authResponse);
// 2.1 构建子设备认证信息
IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo(
SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET);
IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO()
.setClientId(subAuthInfo.getClientId())
.setUsername(subAuthInfo.getUsername())
.setPassword(subAuthInfo.getPassword());
// 2.2 构建请求参数
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO();
params.setSubDevices(Collections.singletonList(subDeviceAuth));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(),
params,
null, null, null);
// 2.3 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testTopoAdd][响应消息: {}]", responseMessage);
} else {
log.warn("[testTopoAdd][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
/**
@@ -180,16 +186,40 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testTopoDelete() throws Exception {
executeAuthenticatedRequest("testTopoDelete", ws -> {
IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO();
params.setSubDevices(Collections.singletonList(
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(),
params,
null, null, null);
});
// 1.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testTopoDelete][WebSocket 连接成功]");
// 1.2 先进行认证
IotDeviceMessage authResponse = authenticate(ws);
log.info("[testTopoDelete][认证响应: {}]", authResponse);
// 2.1 构建请求参数
IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO();
params.setSubDevices(Collections.singletonList(
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(),
params,
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testTopoDelete][响应消息: {}]", responseMessage);
} else {
log.warn("[testTopoDelete][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
/**
@@ -197,14 +227,38 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testTopoGet() throws Exception {
executeAuthenticatedRequest("testTopoGet", ws -> {
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(),
params,
null, null, null);
});
// 1.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testTopoGet][WebSocket 连接成功]");
// 1.2 先进行认证
IotDeviceMessage authResponse = authenticate(ws);
log.info("[testTopoGet][认证响应: {}]", authResponse);
// 2.1 构建请求参数
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(),
params,
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testTopoGet][响应消息: {}]", responseMessage);
} else {
log.warn("[testTopoGet][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 子设备注册测试 =====================
@@ -214,16 +268,40 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testSubDeviceRegister() throws Exception {
executeAuthenticatedRequest("testSubDeviceRegister", ws -> {
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO();
subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY);
subDevice.setDeviceName("mougezishebei-ws");
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(),
Collections.singletonList(subDevice),
null, null, null);
});
// 1.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testSubDeviceRegister][WebSocket 连接成功]");
// 1.2 先进行认证
IotDeviceMessage authResponse = authenticate(ws);
log.info("[testSubDeviceRegister][认证响应: {}]", authResponse);
// 2.1 构建请求参数
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO();
subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY);
subDevice.setDeviceName("mougezishebei-ws");
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(),
Collections.singletonList(subDevice),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testSubDeviceRegister][响应消息: {}]", responseMessage);
} else {
log.warn("[testSubDeviceRegister][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 批量上报测试 =====================
@@ -233,126 +311,140 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testPropertyPackPost() throws Exception {
executeAuthenticatedRequest("testPropertyPackPost", ws -> {
// 构建【网关设备】自身属性
Map<String, Object> gatewayProperties = MapUtil.<String, Object>builder()
.put("temperature", 25.5)
.build();
// 构建【网关设备】自身事件
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build());
gatewayEvent.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> gatewayEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("statusReport", gatewayEvent)
.build();
// 构建【网关设备】属性
Map<String, Object> subDeviceProperties = MapUtil.<String, Object>builder()
.put("power", 100)
.build();
// 构建【网关子设备】事件
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build());
subDeviceEvent.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> subDeviceEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("healthCheck", subDeviceEvent)
.build();
// 构建子设备数据
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData();
subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME));
subDeviceData.setProperties(subDeviceProperties);
subDeviceData.setEvents(subDeviceEvents);
// 构建请求参数
IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO();
params.setProperties(gatewayProperties);
params.setEvents(gatewayEvents);
params.setSubDevices(List.of(subDeviceData));
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(),
params,
null, null, null);
});
// 1.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testPropertyPackPost][WebSocket 连接成功]");
// 1.2 先进行认证
IotDeviceMessage authResponse = authenticate(ws);
log.info("[testPropertyPackPost][认证响应: {}]", authResponse);
// 2.1 构建【网关设备】自身属性
Map<String, Object> gatewayProperties = MapUtil.<String, Object>builder()
.put("temperature", 25.5)
.build();
// 2.2 构建【网关设备】自身事件
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build());
gatewayEvent.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> gatewayEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("statusReport", gatewayEvent)
.build();
// 2.3 构建【网关子设备】属性
Map<String, Object> subDeviceProperties = MapUtil.<String, Object>builder()
.put("power", 100)
.build();
// 2.4 构建【网关子设备】事件
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build());
subDeviceEvent.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> subDeviceEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("healthCheck", subDeviceEvent)
.build();
// 2.5 构建子设备数据
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData();
subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME));
subDeviceData.setProperties(subDeviceProperties);
subDeviceData.setEvents(subDeviceEvents);
// 2.6 构建请求参数
IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO();
params.setProperties(gatewayProperties);
params.setEvents(gatewayEvents);
params.setSubDevices(List.of(subDeviceData));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(),
params,
null, null, null);
// 2.7 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testPropertyPackPost][响应消息: {}]", responseMessage);
} else {
log.warn("[testPropertyPackPost][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 辅助方法 =====================
/**
* 执行需要认证的请求
* 创建 WebSocket 连接(同步)
*
* @param testName 测试名称
* @param requestSupplier 请求消息提供者
* @return WebSocket 连接
*/
private void executeAuthenticatedRequest(String testName, java.util.function.Function<WebSocket, IotDeviceMessage> requestSupplier) throws Exception {
CountDownLatch latch = new CountDownLatch(2);
AtomicReference<String> authResponseRef = new AtomicReference<>();
AtomicReference<String> businessResponseRef = new AtomicReference<>();
HttpClient client = vertx.createHttpClient();
private WebSocket createWebSocketConnection() throws Exception {
WebSocketClient wsClient = vertx.createWebSocketClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
return wsClient.connect(options).toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[{}][WebSocket 连接成功]", testName);
/**
* 发送消息并等待响应(同步)
*
* @param ws WebSocket 连接
* @param message 请求消息
* @return 响应消息
*/
private String sendAndReceive(WebSocket ws, String message) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
final boolean[] authenticated = {false};
ws.textMessageHandler(message -> {
log.info("[{}][收到响应: {}]", testName, message);
if (!authenticated[0]) {
authResponseRef.set(message);
authenticated[0] = true;
latch.countDown();
// 认证成功后发送业务请求
IotDeviceMessage businessRequest = requestSupplier.apply(ws);
byte[] payload = CODEC.encode(businessRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[{}][发送业务请求: {}]", testName, jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
businessResponseRef.set(message);
ws.close();
latch.countDown();
}
});
// 先发送认证请求
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(authRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[{}][发送认证请求: {}]", testName, jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[{}][WebSocket 连接失败]", testName, ar.cause());
latch.countDown();
latch.countDown();
}
// 设置消息处理器
ws.textMessageHandler(response -> {
log.info("[sendAndReceive][收到响应: {}]", response);
responseRef.set(response);
latch.countDown();
});
boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
if (completed) {
if (authResponseRef.get() != null) {
IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[{}][认证响应: {}]", testName, authResponse);
}
if (businessResponseRef.get() != null) {
IotDeviceMessage businessResponse = CODEC.decode(businessResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[{}][业务响应: {}]", testName, businessResponse);
}
} else {
log.warn("[{}][测试超时]", testName);
// 发送请求
log.info("[sendAndReceive][发送请求: {}]", message);
ws.writeTextMessage(message);
// 等待响应
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!completed) {
log.warn("[sendAndReceive][等待响应超时]");
}
return responseRef.get();
}
/**
* 执行网关设备认证(同步)
*
* @param ws WebSocket 连接
* @return 认证响应消息
*/
private IotDeviceMessage authenticate(WebSocket ws) throws Exception {
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[authenticate][发送认证请求: {}]", jsonMessage);
String response = sendAndReceive(ws, jsonMessage);
if (response != null) {
return CODEC.decode(StrUtil.utf8Bytes(response));
}
return null;
}
}

View File

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -11,15 +12,14 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketClient;
import io.vertx.core.http.WebSocketConnectOptions;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -87,52 +87,35 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
// 1.1 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request);
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
// 2.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testAuth][WebSocket 连接成功]");
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testAuth][WebSocket 连接成功]");
// 2.2 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
ws.textMessageHandler(message -> {
log.info("[testAuth][收到响应: {}]", message);
responseRef.set(message);
ws.close();
latch.countDown();
});
// 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testAuth][发送认证请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testAuth][WebSocket 连接失败]", ar.cause());
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (completed && responseRef.get() != null) {
IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testAuth][解码响应: {}]", response);
// 3. 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testAuth][响应消息: {}]", responseMessage);
} else {
log.warn("[testAuth][测试超时或未收到响应]");
log.warn("[testAuth][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 子设备属性上报测试 =====================
@@ -142,18 +125,42 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
executeAuthenticatedRequest("testPropertyPost", ws -> {
log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]");
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("power", 100)
.put("status", "online")
.put("temperature", 36.5)
.build()),
null, null, null);
});
// 1.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testPropertyPost][WebSocket 连接成功]");
// 1.2 先进行认证
IotDeviceMessage authResponse = authenticate(ws);
log.info("[testPropertyPost][认证响应: {}]", authResponse);
log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]");
// 2.1 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("power", 100)
.put("status", "online")
.put("temperature", 36.5)
.build()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testPropertyPost][响应消息: {}]", responseMessage);
} else {
log.warn("[testPropertyPost][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 子设备事件上报测试 =====================
@@ -163,102 +170,117 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest {
*/
@Test
public void testEventPost() throws Exception {
executeAuthenticatedRequest("testEventPost", ws -> {
log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]");
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"alarm",
MapUtil.<String, Object>builder()
.put("level", "warning")
.put("message", "temperature too high")
.put("threshold", 40)
.put("current", 42)
.build(),
System.currentTimeMillis()),
null, null, null);
});
// 1.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
log.info("[testEventPost][WebSocket 连接成功]");
// 1.2 先进行认证
IotDeviceMessage authResponse = authenticate(ws);
log.info("[testEventPost][认证响应: {}]", authResponse);
log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]");
// 2.1 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"alarm",
MapUtil.<String, Object>builder()
.put("level", "warning")
.put("message", "temperature too high")
.put("threshold", 40)
.put("current", 42)
.build(),
System.currentTimeMillis()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
log.info("[testEventPost][响应消息: {}]", responseMessage);
} else {
log.warn("[testEventPost][未收到响应]");
}
// 4. 关闭连接
ws.close();
}
// ===================== 辅助方法 =====================
/**
* 执行需要认证的请求
* 创建 WebSocket 连接(同步)
*
* @param testName 测试名称
* @param requestSupplier 请求消息提供者
* @return WebSocket 连接
*/
private void executeAuthenticatedRequest(String testName, java.util.function.Function<WebSocket, IotDeviceMessage> requestSupplier) throws Exception {
CountDownLatch latch = new CountDownLatch(2);
AtomicReference<String> authResponseRef = new AtomicReference<>();
AtomicReference<String> businessResponseRef = new AtomicReference<>();
HttpClient client = vertx.createHttpClient();
private WebSocket createWebSocketConnection() throws Exception {
WebSocketClient wsClient = vertx.createWebSocketClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
return wsClient.connect(options).toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[{}][WebSocket 连接成功]", testName);
/**
* 发送消息并等待响应(同步)
*
* @param ws WebSocket 连接
* @param message 请求消息
* @return 响应消息
*/
private String sendAndReceive(WebSocket ws, String message) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
final boolean[] authenticated = {false};
ws.textMessageHandler(message -> {
log.info("[{}][收到响应: {}]", testName, message);
if (!authenticated[0]) {
authResponseRef.set(message);
authenticated[0] = true;
latch.countDown();
// 认证成功后发送业务请求
IotDeviceMessage businessRequest = requestSupplier.apply(ws);
byte[] payload = CODEC.encode(businessRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[{}][发送业务请求: {}]", testName, jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
businessResponseRef.set(message);
ws.close();
latch.countDown();
}
});
// 先发送认证请求
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(authRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[{}][发送认证请求: {}]", testName, jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[{}][WebSocket 连接失败]", testName, ar.cause());
latch.countDown();
latch.countDown();
}
// 设置消息处理器
ws.textMessageHandler(response -> {
log.info("[sendAndReceive][收到响应: {}]", response);
responseRef.set(response);
latch.countDown();
});
boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
if (completed) {
if (authResponseRef.get() != null) {
IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[{}][认证响应: {}]", testName, authResponse);
}
if (businessResponseRef.get() != null) {
IotDeviceMessage businessResponse = CODEC.decode(businessResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[{}][业务响应: {}]", testName, businessResponse);
}
} else {
log.warn("[{}][测试超时]", testName);
// 发送请求
log.info("[sendAndReceive][发送请求: {}]", message);
ws.writeTextMessage(message);
// 等待响应
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!completed) {
log.warn("[sendAndReceive][等待响应超时]");
}
return responseRef.get();
}
/**
* 执行子设备认证(同步)
*
* @param ws WebSocket 连接
* @return 认证响应消息
*/
private IotDeviceMessage authenticate(WebSocket ws) throws Exception {
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[authenticate][发送认证请求: {}]", jsonMessage);
String response = sendAndReceive(ws, jsonMessage);
if (response != null) {
return CODEC.decode(StrUtil.utf8Bytes(response));
}
return null;
}
}