feat:【iot】websocket 协议:初始化

This commit is contained in:
YunaiV
2026-01-27 09:58:07 +08:00
parent b87bc19116
commit d2c000d64d
11 changed files with 1954 additions and 0 deletions

View File

@@ -18,6 +18,9 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnection
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
@@ -209,4 +212,37 @@ public class IotGatewayConfiguration {
}
/**
* IoT 网关 WebSocket 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.websocket", name = "enabled", havingValue = "true")
@Slf4j
public static class WebSocketProtocolConfiguration {
@Bean(name = "websocketVertx", destroyMethod = "close")
public Vertx websocketVertx() {
return Vertx.vertx();
}
@Bean
public IotWebSocketUpstreamProtocol iotWebSocketUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotWebSocketConnectionManager connectionManager,
@Qualifier("websocketVertx") Vertx websocketVertx) {
return new IotWebSocketUpstreamProtocol(gatewayProperties.getProtocol().getWebsocket(),
deviceService, messageService, connectionManager, websocketVertx);
}
@Bean
public IotWebSocketDownstreamSubscriber iotWebSocketDownstreamSubscriber(IotWebSocketUpstreamProtocol protocolHandler,
IotDeviceMessageService messageService,
IotWebSocketConnectionManager connectionManager,
IotMessageBus messageBus) {
return new IotWebSocketDownstreamSubscriber(protocolHandler, messageService, connectionManager, messageBus);
}
}
}

View File

@@ -103,6 +103,11 @@ public class IotGatewayProperties {
*/
private CoapProperties coap;
/**
* WebSocket 组件配置
*/
private WebSocketProperties websocket;
}
@Data
@@ -586,4 +591,56 @@ public class IotGatewayProperties {
}
@Data
public static class WebSocketProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* 服务器端口默认8094
*/
private Integer port = 8094;
/**
* WebSocket 路径(默认:/ws
*/
@NotEmpty(message = "WebSocket 路径不能为空")
private String path = "/ws";
/**
* 最大消息大小(字节,默认 64KB
*/
private Integer maxMessageSize = 65536;
/**
* 最大帧大小(字节,默认 64KB
*/
private Integer maxFrameSize = 65536;
/**
* 空闲超时时间(秒,默认 60
*/
private Integer idleTimeoutSeconds = 60;
/**
* 是否启用 SSLwss://
*/
private Boolean sslEnabled = false;
/**
* SSL 证书路径
*/
private String sslCertPath;
/**
* SSL 私钥路径
*/
private String sslKeyPath;
}
}

View File

@@ -0,0 +1,64 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 WebSocket 下游订阅者:接收下行给设备的消息
*
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotWebSocketDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotWebSocketUpstreamProtocol protocol;
private final IotDeviceMessageService messageService;
private final IotWebSocketConnectionManager connectionManager;
private final IotMessageBus messageBus;
private IotWebSocketDownstreamHandler downstreamHandler;
@PostConstruct
public void init() {
// 初始化下游处理器
this.downstreamHandler = new IotWebSocketDownstreamHandler(messageService, connectionManager);
messageBus.register(this);
log.info("[init][WebSocket 下游订阅者初始化完成,服务器 ID: {}Topic: {}]",
protocol.getServerId(), getTopic());
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
try {
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId(), e);
}
}
}

View File

@@ -0,0 +1,112 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.PemKeyCertOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 WebSocket 协议:接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotWebSocketUpstreamProtocol {
private final IotGatewayProperties.WebSocketProperties wsProperties;
private final IotDeviceService deviceService;
private final IotDeviceMessageService messageService;
private final IotWebSocketConnectionManager connectionManager;
private final Vertx vertx;
@Getter
private final String serverId;
private HttpServer httpServer;
public IotWebSocketUpstreamProtocol(IotGatewayProperties.WebSocketProperties wsProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotWebSocketConnectionManager connectionManager,
Vertx vertx) {
this.wsProperties = wsProperties;
this.deviceService = deviceService;
this.messageService = messageService;
this.connectionManager = connectionManager;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(wsProperties.getPort());
}
@PostConstruct
public void start() {
// 1.1 创建服务器选项
HttpServerOptions options = new HttpServerOptions()
.setPort(wsProperties.getPort())
.setIdleTimeout(wsProperties.getIdleTimeoutSeconds())
.setMaxWebSocketFrameSize(wsProperties.getMaxFrameSize())
.setMaxWebSocketMessageSize(wsProperties.getMaxMessageSize());
// 1.2 配置 SSL如果启用
if (Boolean.TRUE.equals(wsProperties.getSslEnabled())) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions()
.setKeyPath(wsProperties.getSslKeyPath())
.setCertPath(wsProperties.getSslCertPath());
options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
}
// 2. 创建服务器并设置 WebSocket 处理器
httpServer = vertx.createHttpServer(options);
httpServer.webSocketHandler(socket -> {
// 验证路径
if (ObjUtil.notEqual(wsProperties.getPath(), socket.path())) {
log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]",
socket.path(), wsProperties.getPath());
// TODO @AI已经被废弃看看换什么其他方法
socket.reject();
return;
}
// 创建上行处理器
IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler(
this, messageService, deviceService, connectionManager);
handler.handle(socket);
});
// 3. 启动服务器
try {
httpServer.listen().result();
log.info("[start][IoT 网关 WebSocket 协议启动成功,端口:{},路径:{}]",
wsProperties.getPort(), wsProperties.getPath());
} catch (Exception e) {
log.error("[start][IoT 网关 WebSocket 协议启动失败]", e);
throw e;
}
}
@PreDestroy
public void stop() {
if (httpServer != null) {
try {
httpServer.close().result();
log.info("[stop][IoT 网关 WebSocket 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 WebSocket 协议停止失败]", e);
}
}
}
}

View File

@@ -0,0 +1,146 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager;
import io.vertx.core.http.ServerWebSocket;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* IoT 网关 WebSocket 连接管理器
* <p>
* 统一管理 WebSocket 连接的认证状态、设备会话和消息发送功能:
* 1. 管理 WebSocket 连接的认证状态
* 2. 管理设备会话和在线状态
* 3. 管理消息发送到设备
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotWebSocketConnectionManager {
/**
* 连接信息映射ServerWebSocket -> 连接信息
*/
private final Map<ServerWebSocket, ConnectionInfo> connectionMap = new ConcurrentHashMap<>();
/**
* 设备 ID -> ServerWebSocket 的映射
*/
private final Map<Long, ServerWebSocket> deviceSocketMap = new ConcurrentHashMap<>();
/**
* 注册设备连接(包含认证信息)
*
* @param socket WebSocket 连接
* @param deviceId 设备 ID
* @param connectionInfo 连接信息
*/
public void registerConnection(ServerWebSocket socket, Long deviceId, ConnectionInfo connectionInfo) {
// 如果设备已有其他连接,先清理旧连接
ServerWebSocket oldSocket = deviceSocketMap.get(deviceId);
if (oldSocket != null && oldSocket != socket) {
log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]",
deviceId, oldSocket.remoteAddress());
oldSocket.close();
// 清理旧连接的映射
connectionMap.remove(oldSocket);
}
connectionMap.put(socket, connectionInfo);
deviceSocketMap.put(deviceId, socket);
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {}product key: {}device name: {}]",
deviceId, socket.remoteAddress(), connectionInfo.getProductKey(), connectionInfo.getDeviceName());
}
/**
* 注销设备连接
*
* @param socket WebSocket 连接
*/
public void unregisterConnection(ServerWebSocket socket) {
ConnectionInfo connectionInfo = connectionMap.remove(socket);
if (connectionInfo != null) {
Long deviceId = connectionInfo.getDeviceId();
deviceSocketMap.remove(deviceId);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
deviceId, socket.remoteAddress());
}
}
/**
* 获取连接信息
*/
public ConnectionInfo getConnectionInfo(ServerWebSocket socket) {
return connectionMap.get(socket);
}
/**
* 根据设备 ID 获取连接信息
*/
public ConnectionInfo getConnectionInfoByDeviceId(Long deviceId) {
ServerWebSocket socket = deviceSocketMap.get(deviceId);
return socket != null ? connectionMap.get(socket) : null;
}
/**
* 发送消息到设备(文本消息)
*
* @param deviceId 设备 ID
* @param message JSON 消息
* @return 是否发送成功
*/
public boolean sendToDevice(Long deviceId, String message) {
ServerWebSocket socket = deviceSocketMap.get(deviceId);
if (socket == null) {
log.warn("[sendToDevice][设备未连接,设备 ID: {}]", deviceId);
return false;
}
try {
socket.writeTextMessage(message);
log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, message.length());
return true;
} catch (Exception e) {
log.error("[sendToDevice][发送消息失败,设备 ID: {}]", deviceId, e);
// 发送失败时清理连接
unregisterConnection(socket);
return false;
}
}
/**
* 连接信息(包含认证信息)
*/
@Data
@Accessors(chain = true)
public static class ConnectionInfo {
/**
* 设备 ID
*/
private Long deviceId;
/**
* 产品 Key
*/
private String productKey;
/**
* 设备名称
*/
private String deviceName;
/**
* 客户端 ID
*/
private String clientId;
// TODO @AI增加有个 codecType 字段;后续可以使用,参考 tcp、udp然后下行的时候也基于这个 codeType 去获取;
}
}

View File

@@ -0,0 +1,61 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* IoT 网关 WebSocket 下行消息处理器
*
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotWebSocketDownstreamHandler {
// TODO @芋艿codeType 的处理;
private static final String CODEC_TYPE = IotWebSocketJsonDeviceMessageCodec.TYPE;
private final IotDeviceMessageService deviceMessageService;
private final IotWebSocketConnectionManager connectionManager;
/**
* 处理下行消息
*/
public void handle(IotDeviceMessage message) {
try {
log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
// 1. 获取连接信息
IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId(
message.getDeviceId());
if (connectionInfo == null) {
log.error("[handle][连接信息不存在,设备 ID: {}]", message.getDeviceId());
return;
}
// 2. 编码消息并发送到设备
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, CODEC_TYPE);
String jsonMessage = new String(bytes, StandardCharsets.UTF_8);
boolean success = connectionManager.sendToDevice(message.getDeviceId(), jsonMessage);
if (success) {
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
} else {
log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
}
} catch (Exception e) {
log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]",
message.getDeviceId(), message.getMethod(), message, e);
}
}
}

View File

@@ -0,0 +1,482 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Handler;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* WebSocket 上行消息处理器
*
* @author 芋道源码
*/
@Slf4j
public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
// TODO @芋艿codeType 的处理;
private static final String CODEC_TYPE = IotWebSocketJsonDeviceMessageCodec.TYPE;
private static final String AUTH_METHOD = "auth";
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotWebSocketConnectionManager connectionManager;
private final IotDeviceCommonApi deviceApi;
private final String serverId;
public IotWebSocketUpstreamHandler(IotWebSocketUpstreamProtocol protocol,
IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotWebSocketConnectionManager connectionManager) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.connectionManager = connectionManager;
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.serverId = protocol.getServerId();
}
@Override
public void handle(ServerWebSocket socket) {
// 1. 接受 WebSocket 连接
String clientId = IdUtil.simpleUUID();
log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
// TODO @AI这个方法已经废弃看看有没其他替换的
socket.accept();
// 2.1 设置异常和关闭处理器
socket.exceptionHandler(ex -> {
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
cleanupConnection(socket);
});
socket.closeHandler(v -> {
log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
cleanupConnection(socket);
});
// 2.2 设置文本消息处理器
socket.textMessageHandler(message -> {
try {
processMessage(clientId, message, socket);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage());
cleanupConnection(socket);
socket.close();
}
});
}
/**
* 处理消息
*
* @param clientId 客户端 ID
* @param message 消息JSON 字符串)
* @param socket WebSocket 连接
* @throws Exception 消息解码失败时抛出异常
*/
private void processMessage(String clientId, String message, ServerWebSocket socket) throws Exception {
// 1.1 基础检查
if (StrUtil.isBlank(message)) {
return;
}
// 1.2 解码消息
// TODO @AI应该只有初始使用 CODEC_TYPE 解析,后续基于
IotDeviceMessage deviceMessage;
try {
deviceMessage = deviceMessageService.decodeDeviceMessage(
message.getBytes(StandardCharsets.UTF_8), CODEC_TYPE);
if (deviceMessage == null) {
throw new Exception("解码后消息为空");
}
} catch (Exception e) {
throw new Exception("消息解码失败: " + e.getMessage(), e);
}
// 2. 根据消息类型路由处理
try {
if (AUTH_METHOD.equals(deviceMessage.getMethod())) {
// 认证请求
handleAuthenticationRequest(clientId, deviceMessage, socket);
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(deviceMessage.getMethod())) {
// 设备动态注册请求
handleRegisterRequest(clientId, deviceMessage, socket);
} else {
// 业务消息
handleBusinessRequest(clientId, deviceMessage, socket);
}
} catch (Exception e) {
log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]",
clientId, deviceMessage.getMethod(), e);
// 发送错误响应,避免客户端一直等待
try {
sendErrorResponse(socket, deviceMessage.getRequestId(), "消息处理失败");
} catch (Exception responseEx) {
log.error("[processMessage][发送错误响应失败,客户端 ID: {}]", clientId, responseEx);
}
}
}
/**
* 处理认证请求
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket WebSocket 连接
*/
private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) {
try {
// 1.1 解析认证参数
IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams());
if (authParams == null) {
log.warn("[handleAuthenticationRequest][认证参数解析失败,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "认证参数不完整");
return;
}
// 1.2 执行认证
if (!validateDeviceAuth(authParams)) {
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {}username: {}]",
clientId, authParams.getUsername());
sendErrorResponse(socket, message.getRequestId(), "认证失败");
return;
}
// 2.1 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
if (deviceInfo == null) {
sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败");
return;
}
// 2.2 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
sendErrorResponse(socket, message.getRequestId(), "设备不存在");
return;
}
// 3.1 注册连接
registerConnection(socket, device, clientId);
// 3.2 发送上线消息
sendOnlineMessage(device);
// 3.3 发送成功响应
sendSuccessResponse(socket, message.getRequestId(), "认证成功");
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]",
device.getId(), device.getDeviceName());
} catch (Exception e) {
log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e);
sendErrorResponse(socket, message.getRequestId(), "认证处理异常");
}
}
/**
* 处理设备动态注册请求(一型一密,不需要认证)
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket WebSocket 连接
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
private void handleRegisterRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) {
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO registerParams = parseRegisterParams(message.getParams());
if (registerParams == null) {
log.warn("[handleRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "注册参数不完整");
return;
}
// 2. 调用动态注册
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(registerParams);
if (result.isError()) {
log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg());
sendErrorResponse(socket, message.getRequestId(), result.getMsg());
return;
}
// 3. 发送成功响应(包含 deviceSecret
sendRegisterSuccessResponse(socket, message.getRequestId(), result.getData());
log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]",
clientId, registerParams.getDeviceName());
} catch (Exception e) {
log.error("[handleRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e);
sendErrorResponse(socket, message.getRequestId(), "注册处理异常");
}
}
/**
* 处理业务请求
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket WebSocket 连接
*/
private void handleBusinessRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) {
try {
// 1. 获取认证信息并处理业务消息
IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
if (connectionInfo == null) {
log.warn("[handleBusinessRequest][连接未认证,拒绝处理业务消息,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "连接未认证");
return;
}
// 2. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
log.info("[handleBusinessRequest][发送消息到消息总线,客户端 ID: {},消息: {}",
clientId, message.toString());
} catch (Exception e) {
log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e);
}
}
/**
* 注册连接信息
*
* @param socket WebSocket 连接
* @param device 设备
* @param clientId 客户端 ID
*/
private void registerConnection(ServerWebSocket socket, IotDeviceRespDTO device, String clientId) {
IotWebSocketConnectionManager.ConnectionInfo connectionInfo = new IotWebSocketConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setClientId(clientId);
// 注册连接
connectionManager.registerConnection(socket, device.getId(), connectionInfo);
}
/**
* 发送设备上线消息
*
* @param device 设备信息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
} catch (Exception e) {
log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
}
}
/**
* 清理连接
*
* @param socket WebSocket 连接
*/
private void cleanupConnection(ServerWebSocket socket) {
try {
// 1. 发送离线消息(如果已认证)
IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
if (connectionInfo != null) {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
}
// 2. 注销连接
connectionManager.unregisterConnection(socket);
} catch (Exception e) {
log.error("[cleanupConnection][清理连接失败]", e);
}
}
/**
* 发送响应消息
*
* @param socket WebSocket 连接
* @param success 是否成功
* @param message 消息
* @param requestId 请求 ID
*/
private void sendResponse(ServerWebSocket socket, boolean success, String message, String requestId) {
try {
Object responseData = MapUtil.builder()
.put("success", success)
.put("message", message)
.build();
int code = success ? 0 : 401;
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData,
code, message);
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE);
socket.writeTextMessage(new String(encodedData, StandardCharsets.UTF_8));
} catch (Exception e) {
log.error("[sendResponse][发送响应失败requestId: {}]", requestId, e);
}
}
/**
* 验证设备认证信息
*
* @param authParams 认证参数
* @return 是否认证成功
*/
private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) {
try {
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(authParams.getClientId()).setUsername(authParams.getUsername())
.setPassword(authParams.getPassword()));
result.checkError();
return BooleanUtil.isTrue(result.getData());
} catch (Exception e) {
log.error("[validateDeviceAuth][设备认证异常username: {}]", authParams.getUsername(), e);
return false;
}
}
/**
* 发送错误响应
*
* @param socket WebSocket 连接
* @param requestId 请求 ID
* @param errorMessage 错误消息
*/
private void sendErrorResponse(ServerWebSocket socket, String requestId, String errorMessage) {
sendResponse(socket, false, errorMessage, requestId);
}
/**
* 发送成功响应
*
* @param socket WebSocket 连接
* @param requestId 请求 ID
* @param message 消息
*/
@SuppressWarnings("SameParameterValue")
private void sendSuccessResponse(ServerWebSocket socket, String requestId, String message) {
sendResponse(socket, true, message, requestId);
}
/**
* 解析认证参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 认证参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceAuthReqDTO parseAuthParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof java.util.Map) {
java.util.Map<String, Object> paramMap = (java.util.Map<String, Object>) params;
return new IotDeviceAuthReqDTO()
.setClientId(MapUtil.getStr(paramMap, "clientId"))
.setUsername(MapUtil.getStr(paramMap, "username"))
.setPassword(MapUtil.getStr(paramMap, "password"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceAuthReqDTO) {
return (IotDeviceAuthReqDTO) params;
}
// 其他情况尝试 JSON 转换
// TODO @芋艿:要不要优化下;
String jsonStr = JsonUtils.toJsonString(params);
return JsonUtils.convertObject(jsonStr, IotDeviceAuthReqDTO.class);
} catch (Exception e) {
log.error("[parseAuthParams][解析认证参数({})失败]", params, e);
return null;
}
}
/**
* 解析注册参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 注册参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceRegisterReqDTO parseRegisterParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof java.util.Map) {
java.util.Map<String, Object> paramMap = (java.util.Map<String, Object>) params;
String productKey = MapUtil.getStr(paramMap, "productKey");
String deviceName = MapUtil.getStr(paramMap, "deviceName");
String productSecret = MapUtil.getStr(paramMap, "productSecret");
if (StrUtil.hasBlank(productKey, deviceName, productSecret)) {
return null;
}
return new IotDeviceRegisterReqDTO()
.setProductKey(productKey)
.setDeviceName(deviceName)
.setProductSecret(productSecret);
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceRegisterReqDTO) {
return (IotDeviceRegisterReqDTO) params;
}
// 其他情况尝试 JSON 转换
String jsonStr = JsonUtils.toJsonString(params);
return JsonUtils.parseObject(jsonStr, IotDeviceRegisterReqDTO.class);
} catch (Exception e) {
log.error("[parseRegisterParams][解析注册参数({})失败]", params, e);
return null;
}
}
/**
* 发送注册成功响应(包含 deviceSecret
*
* @param socket WebSocket 连接
* @param requestId 请求 ID
* @param registerResp 注册响应
*/
private void sendRegisterSuccessResponse(ServerWebSocket socket, String requestId,
IotDeviceRegisterRespDTO registerResp) {
try {
// 1. 构建响应消息(参考 HTTP 返回格式,直接返回 IotDeviceRegisterRespDTO
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null);
// 2. 发送响应
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE);
socket.writeTextMessage(new String(encodedData, StandardCharsets.UTF_8));
} catch (Exception e) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应失败requestId: {}]", requestId, e);
}
}
}

View File

@@ -123,6 +123,17 @@ yudao:
max-message-size: 1024 # 最大消息大小(字节)
ack-timeout: 2000 # ACK 超时时间(毫秒)
max-retransmit: 4 # 最大重传次数
# ====================================
# 针对引入的 WebSocket 组件的配置
# ====================================
websocket:
enabled: false # 是否启用 WebSocket 协议
port: 8094 # WebSocket 服务端口(默认 8094
path: /ws # WebSocket 路径(默认 /ws
max-message-size: 65536 # 最大消息大小(字节,默认 64KB
max-frame-size: 65536 # 最大帧大小(字节,默认 64KB
idle-timeout-seconds: 60 # 空闲超时时间(秒,默认 60
ssl-enabled: false # 是否启用 SSLwss://
--- #################### 日志相关配置 ####################
@@ -144,6 +155,7 @@ logging:
cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.mqttws: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.coap: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.websocket: DEBUG
# 根日志级别
root: INFO

View File

@@ -0,0 +1,365 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketConnectOptions;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* IoT 直连设备 WebSocket 协议集成测试(手动测试)
*
* <p>测试场景直连设备IotProductDeviceTypeEnum 的 DIRECT 类型)通过 WebSocket 协议直接连接平台
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务WebSocket 端口 8094</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testAuth()} - 设备认证</li>
* <li>{@link #testDeviceRegister()} - 设备动态注册(一型一密)</li>
* <li>{@link #testPropertyPost()} - 设备属性上报</li>
* <li>{@link #testEventPost()} - 设备事件上报</li>
* </ul>
* </li>
* </ol>
*
* <p>注意WebSocket 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息
*
* @author 芋道源码
*/
@Slf4j
public class IotDirectDeviceWebSocketProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8094;
private static final String WS_PATH = "/ws";
private static final int TIMEOUT_SECONDS = 5;
// 编解码器
private static final IotDeviceMessageCodec CODEC = new IotWebSocketJsonDeviceMessageCodec();
// ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) =====================
private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
private static final String DEVICE_NAME = "small";
private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3";
// Vert.x 实例
private static Vertx vertx;
@BeforeAll
public static void setUp() {
vertx = Vertx.vertx();
}
@AfterAll
public static void tearDown() {
if (vertx != null) {
vertx.close();
}
}
// ===================== 认证测试 =====================
/**
* 认证测试:获取设备 Token
*/
@Test
public void testAuth() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
// 1. 创建 WebSocket 连接
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testAuth][WebSocket 连接成功]");
// 设置消息处理器
ws.textMessageHandler(message -> {
log.info("[testAuth][收到响应: {}]", message);
responseRef.set(message);
ws.close();
latch.countDown();
});
// 2. 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 3. 编码并发送
byte[] payload = CODEC.encode(request);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testAuth][发送认证请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testAuth][WebSocket 连接失败]", ar.cause());
latch.countDown();
}
});
// 4. 等待响应
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (completed && responseRef.get() != null) {
IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testAuth][解码响应: {}]", response);
} else {
log.warn("[testAuth][测试超时或未收到响应]");
}
}
// ===================== 动态注册测试 =====================
/**
* 直连设备动态注册测试(一型一密)
*/
@Test
public void testDeviceRegister() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testDeviceRegister][WebSocket 连接成功]");
ws.textMessageHandler(message -> {
log.info("[testDeviceRegister][收到响应: {}]", message);
responseRef.set(message);
ws.close();
latch.countDown();
});
// 构建注册消息
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO();
registerReqDTO.setProductKey(PRODUCT_KEY);
registerReqDTO.setDeviceName("test-ws-" + System.currentTimeMillis());
registerReqDTO.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testDeviceRegister][发送注册请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testDeviceRegister][WebSocket 连接失败]", ar.cause());
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (completed && responseRef.get() != null) {
IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testDeviceRegister][解码响应: {}]", response);
} else {
log.warn("[testDeviceRegister][测试超时或未收到响应]");
}
}
// ===================== 直连设备属性上报测试 =====================
/**
* 属性上报测试
*/
@Test
public void testPropertyPost() throws Exception {
CountDownLatch latch = new CountDownLatch(2); // 认证 + 属性上报
AtomicReference<String> authResponseRef = new AtomicReference<>();
AtomicReference<String> propertyResponseRef = new AtomicReference<>();
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testPropertyPost][WebSocket 连接成功]");
final boolean[] authenticated = {false};
ws.textMessageHandler(message -> {
log.info("[testPropertyPost][收到响应: {}]", message);
if (!authenticated[0]) {
authResponseRef.set(message);
authenticated[0] = true;
latch.countDown();
// 认证成功后发送属性上报
IotDeviceMessage propertyRequest = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
.build()),
null, null, null);
byte[] payload = CODEC.encode(propertyRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testPropertyPost][发送属性上报请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
propertyResponseRef.set(message);
ws.close();
latch.countDown();
}
});
// 先发送认证请求
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(authRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testPropertyPost][发送认证请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testPropertyPost][WebSocket 连接失败]", ar.cause());
latch.countDown();
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
if (completed) {
if (authResponseRef.get() != null) {
IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testPropertyPost][认证响应: {}]", authResponse);
}
if (propertyResponseRef.get() != null) {
IotDeviceMessage propertyResponse = CODEC.decode(propertyResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testPropertyPost][属性上报响应: {}]", propertyResponse);
}
} else {
log.warn("[testPropertyPost][测试超时]");
}
}
// ===================== 直连设备事件上报测试 =====================
/**
* 事件上报测试
*/
@Test
public void testEventPost() throws Exception {
CountDownLatch latch = new CountDownLatch(2); // 认证 + 事件上报
AtomicReference<String> authResponseRef = new AtomicReference<>();
AtomicReference<String> eventResponseRef = new AtomicReference<>();
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testEventPost][WebSocket 连接成功]");
final boolean[] authenticated = {false};
ws.textMessageHandler(message -> {
log.info("[testEventPost][收到响应: {}]", message);
if (!authenticated[0]) {
authResponseRef.set(message);
authenticated[0] = true;
latch.countDown();
// 认证成功后发送事件上报
IotDeviceMessage eventRequest = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis()),
null, null, null);
byte[] payload = CODEC.encode(eventRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testEventPost][发送事件上报请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
eventResponseRef.set(message);
ws.close();
latch.countDown();
}
});
// 先发送认证请求
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(authRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testEventPost][发送认证请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testEventPost][WebSocket 连接失败]", ar.cause());
latch.countDown();
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
if (completed) {
if (authResponseRef.get() != null) {
IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testEventPost][认证响应: {}]", authResponse);
}
if (eventResponseRef.get() != null) {
IotDeviceMessage eventResponse = CODEC.decode(eventResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testEventPost][事件上报响应: {}]", eventResponse);
}
} else {
log.warn("[testEventPost][测试超时]");
}
}
}

View File

@@ -0,0 +1,356 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketConnectOptions;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* IoT 网关设备 WebSocket 协议集成测试(手动测试)
*
* <p>测试场景网关设备IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 WebSocket 协议管理子设备拓扑关系
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务WebSocket 端口 8094</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testAuth()} - 网关设备认证</li>
* <li>{@link #testTopoAdd()} - 添加子设备拓扑关系</li>
* <li>{@link #testTopoDelete()} - 删除子设备拓扑关系</li>
* <li>{@link #testTopoGet()} - 获取子设备拓扑关系</li>
* <li>{@link #testSubDeviceRegister()} - 子设备动态注册</li>
* <li>{@link #testPropertyPackPost()} - 批量上报属性(网关 + 子设备)</li>
* </ul>
* </li>
* </ol>
*
* <p>注意WebSocket 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息
*
* @author 芋道源码
*/
@Slf4j
public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8094;
private static final String WS_PATH = "/ws";
private static final int TIMEOUT_SECONDS = 5;
// 编解码器
private static final IotDeviceMessageCodec CODEC = new IotWebSocketJsonDeviceMessageCodec();
// ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) =====================
private static final String GATEWAY_PRODUCT_KEY = "m6XcS1ZJ3TW8eC0v";
private static final String GATEWAY_DEVICE_NAME = "sub-ddd";
private static final String GATEWAY_DEVICE_SECRET = "b3d62c70f8a4495487ed1d35d61ac2b3";
// ===================== 子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
private static final String SUB_DEVICE_PRODUCT_KEY = "jAufEMTF1W6wnPhn";
private static final String SUB_DEVICE_NAME = "chazuo-it";
private static final String SUB_DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af";
// Vert.x 实例
private static Vertx vertx;
@BeforeAll
public static void setUp() {
vertx = Vertx.vertx();
}
@AfterAll
public static void tearDown() {
if (vertx != null) {
vertx.close();
}
}
// ===================== 认证测试 =====================
/**
* 网关设备认证测试
*/
@Test
public void testAuth() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testAuth][WebSocket 连接成功]");
ws.textMessageHandler(message -> {
log.info("[testAuth][收到响应: {}]", message);
responseRef.set(message);
ws.close();
latch.countDown();
});
// 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testAuth][发送认证请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testAuth][WebSocket 连接失败]", ar.cause());
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (completed && responseRef.get() != null) {
IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testAuth][解码响应: {}]", response);
} else {
log.warn("[testAuth][测试超时或未收到响应]");
}
}
// ===================== 拓扑管理测试 =====================
/**
* 添加子设备拓扑关系测试
*/
@Test
public void testTopoAdd() throws Exception {
executeAuthenticatedRequest("testTopoAdd", ws -> {
// 构建子设备认证信息
IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo(
SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET);
IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO()
.setClientId(subAuthInfo.getClientId())
.setUsername(subAuthInfo.getUsername())
.setPassword(subAuthInfo.getPassword());
// 构建请求参数
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO();
params.setSubDevices(Collections.singletonList(subDeviceAuth));
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(),
params,
null, null, null);
});
}
/**
* 删除子设备拓扑关系测试
*/
@Test
public void testTopoDelete() throws Exception {
executeAuthenticatedRequest("testTopoDelete", ws -> {
IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO();
params.setSubDevices(Collections.singletonList(
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(),
params,
null, null, null);
});
}
/**
* 获取子设备拓扑关系测试
*/
@Test
public void testTopoGet() throws Exception {
executeAuthenticatedRequest("testTopoGet", ws -> {
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(),
params,
null, null, null);
});
}
// ===================== 子设备注册测试 =====================
/**
* 子设备动态注册测试
*/
@Test
public void testSubDeviceRegister() throws Exception {
executeAuthenticatedRequest("testSubDeviceRegister", ws -> {
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO();
subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY);
subDevice.setDeviceName("mougezishebei-ws");
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(),
Collections.singletonList(subDevice),
null, null, null);
});
}
// ===================== 批量上报测试 =====================
/**
* 批量上报属性测试(网关 + 子设备)
*/
@Test
public void testPropertyPackPost() throws Exception {
executeAuthenticatedRequest("testPropertyPackPost", ws -> {
// 构建【网关设备】自身属性
Map<String, Object> gatewayProperties = MapUtil.<String, Object>builder()
.put("temperature", 25.5)
.build();
// 构建【网关设备】自身事件
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build());
gatewayEvent.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> gatewayEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("statusReport", gatewayEvent)
.build();
// 构建【网关子设备】属性
Map<String, Object> subDeviceProperties = MapUtil.<String, Object>builder()
.put("power", 100)
.build();
// 构建【网关子设备】事件
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build());
subDeviceEvent.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> subDeviceEvents = MapUtil.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("healthCheck", subDeviceEvent)
.build();
// 构建子设备数据
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData();
subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME));
subDeviceData.setProperties(subDeviceProperties);
subDeviceData.setEvents(subDeviceEvents);
// 构建请求参数
IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO();
params.setProperties(gatewayProperties);
params.setEvents(gatewayEvents);
params.setSubDevices(List.of(subDeviceData));
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(),
params,
null, null, null);
});
}
// ===================== 辅助方法 =====================
/**
* 执行需要认证的请求
*
* @param testName 测试名称
* @param requestSupplier 请求消息提供者
*/
private void executeAuthenticatedRequest(String testName, java.util.function.Function<WebSocket, IotDeviceMessage> requestSupplier) throws Exception {
CountDownLatch latch = new CountDownLatch(2);
AtomicReference<String> authResponseRef = new AtomicReference<>();
AtomicReference<String> businessResponseRef = new AtomicReference<>();
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[{}][WebSocket 连接成功]", testName);
final boolean[] authenticated = {false};
ws.textMessageHandler(message -> {
log.info("[{}][收到响应: {}]", testName, message);
if (!authenticated[0]) {
authResponseRef.set(message);
authenticated[0] = true;
latch.countDown();
// 认证成功后发送业务请求
IotDeviceMessage businessRequest = requestSupplier.apply(ws);
byte[] payload = CODEC.encode(businessRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[{}][发送业务请求: {}]", testName, jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
businessResponseRef.set(message);
ws.close();
latch.countDown();
}
});
// 先发送认证请求
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(authRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[{}][发送认证请求: {}]", testName, jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[{}][WebSocket 连接失败]", testName, ar.cause());
latch.countDown();
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
if (completed) {
if (authResponseRef.get() != null) {
IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[{}][认证响应: {}]", testName, authResponse);
}
if (businessResponseRef.get() != null) {
IotDeviceMessage businessResponse = CODEC.decode(businessResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[{}][业务响应: {}]", testName, businessResponse);
}
} else {
log.warn("[{}][测试超时]", testName);
}
}
}

View File

@@ -0,0 +1,263 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketConnectOptions;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* IoT 网关子设备 WebSocket 协议集成测试(手动测试)
*
* <p>测试场景子设备IotProductDeviceTypeEnum 的 SUB 类型)通过网关设备代理上报数据
*
* <p><b>重要说明子设备无法直接连接平台所有请求均由网关设备Gateway代为转发。</b>
*
* <p>使用步骤:
* <ol>
* <li>启动 yudao-module-iot-gateway 服务WebSocket 端口 8094</li>
* <li>确保子设备已通过 {@link IotGatewayDeviceWebSocketProtocolIntegrationTest#testTopoAdd()} 绑定到网关</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testAuth()} - 子设备认证</li>
* <li>{@link #testPropertyPost()} - 子设备属性上报(由网关代理转发)</li>
* <li>{@link #testEventPost()} - 子设备事件上报(由网关代理转发)</li>
* </ul>
* </li>
* </ol>
*
* <p>注意WebSocket 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息
*
* @author 芋道源码
*/
@Slf4j
public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 8094;
private static final String WS_PATH = "/ws";
private static final int TIMEOUT_SECONDS = 5;
// 编解码器
private static final IotDeviceMessageCodec CODEC = new IotWebSocketJsonDeviceMessageCodec();
// ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
private static final String PRODUCT_KEY = "jAufEMTF1W6wnPhn";
private static final String DEVICE_NAME = "chazuo-it";
private static final String DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af";
// Vert.x 实例
private static Vertx vertx;
@BeforeAll
public static void setUp() {
vertx = Vertx.vertx();
}
@AfterAll
public static void tearDown() {
if (vertx != null) {
vertx.close();
}
}
// ===================== 认证测试 =====================
/**
* 子设备认证测试
*/
@Test
public void testAuth() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[testAuth][WebSocket 连接成功]");
ws.textMessageHandler(message -> {
log.info("[testAuth][收到响应: {}]", message);
responseRef.set(message);
ws.close();
latch.countDown();
});
// 构建认证消息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[testAuth][发送认证请求: {}]", jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[testAuth][WebSocket 连接失败]", ar.cause());
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (completed && responseRef.get() != null) {
IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[testAuth][解码响应: {}]", response);
} else {
log.warn("[testAuth][测试超时或未收到响应]");
}
}
// ===================== 子设备属性上报测试 =====================
/**
* 子设备属性上报测试
*/
@Test
public void testPropertyPost() throws Exception {
executeAuthenticatedRequest("testPropertyPost", ws -> {
log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]");
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("power", 100)
.put("status", "online")
.put("temperature", 36.5)
.build()),
null, null, null);
});
}
// ===================== 子设备事件上报测试 =====================
/**
* 子设备事件上报测试
*/
@Test
public void testEventPost() throws Exception {
executeAuthenticatedRequest("testEventPost", ws -> {
log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]");
return IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"alarm",
MapUtil.<String, Object>builder()
.put("level", "warning")
.put("message", "temperature too high")
.put("threshold", 40)
.put("current", 42)
.build(),
System.currentTimeMillis()),
null, null, null);
});
}
// ===================== 辅助方法 =====================
/**
* 执行需要认证的请求
*
* @param testName 测试名称
* @param requestSupplier 请求消息提供者
*/
private void executeAuthenticatedRequest(String testName, java.util.function.Function<WebSocket, IotDeviceMessage> requestSupplier) throws Exception {
CountDownLatch latch = new CountDownLatch(2);
AtomicReference<String> authResponseRef = new AtomicReference<>();
AtomicReference<String> businessResponseRef = new AtomicReference<>();
HttpClient client = vertx.createHttpClient();
WebSocketConnectOptions options = new WebSocketConnectOptions()
.setHost(SERVER_HOST)
.setPort(SERVER_PORT)
.setURI(WS_PATH);
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
log.info("[{}][WebSocket 连接成功]", testName);
final boolean[] authenticated = {false};
ws.textMessageHandler(message -> {
log.info("[{}][收到响应: {}]", testName, message);
if (!authenticated[0]) {
authResponseRef.set(message);
authenticated[0] = true;
latch.countDown();
// 认证成功后发送业务请求
IotDeviceMessage businessRequest = requestSupplier.apply(ws);
byte[] payload = CODEC.encode(businessRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[{}][发送业务请求: {}]", testName, jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
businessResponseRef.set(message);
ws.close();
latch.countDown();
}
});
// 先发送认证请求
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(authRequest);
String jsonMessage = new String(payload, StandardCharsets.UTF_8);
log.info("[{}][发送认证请求: {}]", testName, jsonMessage);
ws.writeTextMessage(jsonMessage);
} else {
log.error("[{}][WebSocket 连接失败]", testName, ar.cause());
latch.countDown();
latch.countDown();
}
});
boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
if (completed) {
if (authResponseRef.get() != null) {
IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[{}][认证响应: {}]", testName, authResponse);
}
if (businessResponseRef.get() != null) {
IotDeviceMessage businessResponse = CODEC.decode(businessResponseRef.get().getBytes(StandardCharsets.UTF_8));
log.info("[{}][业务响应: {}]", testName, businessResponse);
}
} else {
log.warn("[{}][测试超时]", testName);
}
}
}