feat(iot):【协议改造】websocket 初步改造(50%)

This commit is contained in:
YunaiV
2026-02-01 18:43:50 +08:00
parent 3db187091c
commit d9a08094d9
19 changed files with 522 additions and 302 deletions

View File

@@ -8,7 +8,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.SmartLifecycle;
@@ -158,8 +158,8 @@ public class IotProtocolManager implements SmartLifecycle {
* @param config 协议实例配置
* @return WebSocket 协议实例
*/
private IotWebSocketUpstreamProtocol createWebSocketProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
return new IotWebSocketUpstreamProtocol(config);
private IotWebSocketProtocol createWebSocketProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
return new IotWebSocketProtocol(config);
}
}

View File

@@ -57,6 +57,10 @@ public class IotTcpProtocol implements IotProtocol {
* TCP 服务器
*/
private NetServer tcpServer;
/**
* TCP 连接管理器
*/
private final IotTcpConnectionManager connectionManager;
/**
* 下行消息订阅者
@@ -67,17 +71,11 @@ public class IotTcpProtocol implements IotProtocol {
* 消息序列化器
*/
private final IotMessageSerializer serializer;
/**
* TCP 帧编解码器
*/
private final IotTcpFrameCodec frameCodec;
/**
* TCP 连接管理器
*/
private final IotTcpConnectionManager connectionManager;
public IotTcpProtocol(ProtocolInstanceProperties properties) {
IotTcpConfig tcpConfig = properties.getTcp();
Assert.notNull(tcpConfig, "TCP 协议配置tcp不能为空");

View File

@@ -29,6 +29,11 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class IotTcpLengthFieldFrameCodec implements IotTcpFrameCodec {
/**
* 最大帧长度64KB防止 DoS 攻击
*/
private static final int MAX_FRAME_LENGTH = 65536;
private final int lengthFieldOffset;
private final int lengthFieldLength;
private final int lengthAdjustment;
@@ -60,6 +65,7 @@ public class IotTcpLengthFieldFrameCodec implements IotTcpFrameCodec {
public RecordParser createDecodeParser(Handler<Buffer> handler) {
// 创建状态机:先读取头部,再读取消息体
RecordParser parser = RecordParser.newFixed(headerLength);
parser.maxRecordSize(MAX_FRAME_LENGTH); // 设置最大记录大小,防止 DoS 攻击
final AtomicReference<Integer> bodyLength = new AtomicReference<>(null); // 消息体长度null 表示读取头部阶段
final AtomicReference<Buffer> headerBuffer = new AtomicReference<>(null); // 头部消息

View File

@@ -77,7 +77,9 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
}
@Override
@SuppressWarnings("DuplicatedCode")
public void handle(NetSocket socket) {
// TODO @AIclientId 去掉;其它模块也看看,怎么去掉下看看;
String clientId = IdUtil.simpleUUID();
log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
@@ -92,9 +94,10 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
});
// 2.1 设置消息处理器
// TODO @AI去掉 clientId
Handler<Buffer> messageHandler = buffer -> {
try {
processMessage(clientId, buffer, socket);
processMessage(buffer, socket);
} catch (Exception e) {
log.error("[handle][消息处理失败,客户端 ID: {},地址: {}]",
clientId, socket.remoteAddress(), e);
@@ -110,11 +113,10 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
/**
* 处理消息
*
* @param clientId 客户端 ID
* @param buffer 消息
* @param socket 网络连接
*/
private void processMessage(String clientId, Buffer buffer, NetSocket socket) {
private void processMessage(Buffer buffer, NetSocket socket) {
IotDeviceMessage message = null;
try {
// 1. 反序列化消息
@@ -127,29 +129,29 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// 2. 根据消息类型路由处理
if (AUTH_METHOD.equals(message.getMethod())) {
// 认证请求
handleAuthenticationRequest(clientId, message, socket);
handleAuthenticationRequest(message, socket);
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) {
// 设备动态注册请求
handleRegisterRequest(clientId, message, socket);
handleRegisterRequest(message, socket);
} else {
// 业务消息
handleBusinessRequest(clientId, message, socket);
handleBusinessRequest(null, message, socket);
}
} catch (ServiceException e) {
// 业务异常,返回对应的错误码和错误信息
log.warn("[processMessage][业务异常,客户端 ID: {},错误: {}]", clientId, e.getMessage());
log.warn("[processMessage][业务异常,客户端 ID: {},错误: {}]", null, e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(socket, requestId, method, e.getCode(), e.getMessage());
} catch (IllegalArgumentException e) {
// 参数校验失败,返回 400
log.warn("[processMessage][参数校验失败,客户端 ID: {},错误: {}]", clientId, e.getMessage());
log.warn("[processMessage][参数校验失败,客户端 ID: {},错误: {}]", null, e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(socket, requestId, method, BAD_REQUEST.getCode(), e.getMessage());
} catch (Exception e) {
// 其他异常,返回 500并重新抛出让上层关闭连接
log.error("[processMessage][处理消息失败,客户端 ID: {}]", clientId, e);
log.error("[processMessage][处理消息失败,客户端 ID: {}]", null, e);
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(socket, requestId, method,
@@ -161,12 +163,11 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
/**
* 处理认证请求
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket 网络连接
*/
@SuppressWarnings("DuplicatedCode")
private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, NetSocket socket) {
private void handleAuthenticationRequest(IotDeviceMessage message, NetSocket socket) {
// 1. 解析认证参数
IotDeviceAuthReqDTO authParams = JsonUtils.convertObject(message.getParams(), IotDeviceAuthReqDTO.class);
Assert.notNull(authParams, "认证参数不能为空");
@@ -198,13 +199,12 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
/**
* 处理设备动态注册请求(一型一密,不需要认证)
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket 网络连接
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
@SuppressWarnings("DuplicatedCode")
private void handleRegisterRequest(String clientId, IotDeviceMessage message, NetSocket socket) {
private void handleRegisterRequest(IotDeviceMessage message, NetSocket socket) {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = JsonUtils.convertObject(message.getParams(), IotDeviceRegisterReqDTO.class);
Assert.notNull(params, "注册参数不能为空");
@@ -218,7 +218,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// 3. 发送成功响应
sendSuccessResponse(socket, message.getRequestId(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), result.getData());
log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]", clientId, params.getDeviceName());
log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]", null, params.getDeviceName());
}
/**

View File

@@ -55,6 +55,10 @@ public class IotUdpProtocol implements IotProtocol {
*/
@Getter
private DatagramSocket udpSocket;
/**
* UDP 会话管理器
*/
private final IotUdpSessionManager sessionManager;
/**
* 下行消息订阅者
@@ -66,11 +70,6 @@ public class IotUdpProtocol implements IotProtocol {
*/
private final IotMessageSerializer serializer;
/**
* UDP 会话管理器
*/
private final IotUdpSessionManager sessionManager;
public IotUdpProtocol(ProtocolInstanceProperties properties) {
IotUdpConfig udpConfig = properties.getUdp();
Assert.notNull(udpConfig, "UDP 协议配置udp不能为空");

View File

@@ -0,0 +1,54 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* IoT WebSocket 协议配置
*
* @author 芋道源码
*/
@Data
public class IotWebSocketConfig {
/**
* WebSocket 路径(默认:/ws
*/
@NotEmpty(message = "WebSocket 路径不能为空")
private String path = "/ws";
/**
* 最大消息大小(字节,默认 64KB
*/
@NotNull(message = "最大消息大小不能为空")
private Integer maxMessageSize = 65536;
/**
* 最大帧大小(字节,默认 64KB
*/
@NotNull(message = "最大帧大小不能为空")
private Integer maxFrameSize = 65536;
/**
* 空闲超时时间(秒,默认 60
*/
@NotNull(message = "空闲超时时间不能为空")
private Integer idleTimeoutSeconds = 60;
/**
* 是否启用 SSLwss://
*/
@NotNull(message = "是否启用 SSL 不能为空")
private Boolean sslEnabled = false;
/**
* SSL 证书路径
*/
private String sslCertPath;
/**
* SSL 私钥路径
*/
private String sslKeyPath;
}

View File

@@ -0,0 +1,218 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.upstream.IotWebSocketUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.PemKeyCertOptions;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
// TODO @AI注释调整下参考 IotTcpProtocol
/**
* IoT 网关 WebSocket 协议:接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotWebSocketProtocol implements IotProtocol {
/**
* 协议配置
*/
private final ProtocolInstanceProperties properties;
/**
* 服务器 ID用于消息追踪全局唯一
*/
@Getter
private final String serverId;
/**
* 运行状态
*/
@Getter
private volatile boolean running = false;
/**
* Vert.x 实例
*/
private Vertx vertx;
/**
* WebSocket 服务器
*/
private HttpServer httpServer;
/**
* WebSocket 连接管理器
*/
private final IotWebSocketConnectionManager connectionManager;
// TODO @AI可以不用这个变量从 properties 里面获取
/**
* WebSocket 配置
*/
private final IotWebSocketConfig wsConfig;
/**
* 下行消息订阅者
*/
private final IotWebSocketDownstreamSubscriber downstreamSubscriber;
/**
* 消息序列化器
*/
private final IotMessageSerializer serializer;
private final IotDeviceService deviceService;
private final IotDeviceMessageService messageService;
public IotWebSocketProtocol(ProtocolInstanceProperties properties) {
Assert.notNull(properties, "协议实例配置不能为空");
Assert.notNull(properties.getWebsocket(), "WebSocket 协议配置websocket不能为空");
this.properties = properties;
this.wsConfig = properties.getWebsocket();
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
// 初始化序列化器
IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(properties.getSerialize());
Assert.notNull(serializeType, "不支持的序列化类型:" + properties.getSerialize());
IotMessageSerializerManager serializerManager = SpringUtil.getBean(IotMessageSerializerManager.class);
this.serializer = serializerManager.get(serializeType);
// 初始化基础依赖
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
this.messageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.connectionManager = new IotWebSocketConnectionManager();
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotWebSocketDownstreamHandler downstreamHandler = new IotWebSocketDownstreamHandler(serializer, connectionManager);
this.downstreamSubscriber = new IotWebSocketDownstreamSubscriber(this, downstreamHandler, messageBus);
}
@Override
public String getId() {
return properties.getId();
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.WEBSOCKET;
}
@Override
@SuppressWarnings("deprecation")
public void start() {
if (running) {
log.warn("[start][IoT WebSocket 协议 {} 已经在运行中]", getId());
return;
}
// 1.1 创建 Vertx 实例
this.vertx = Vertx.vertx();
// 1.2 创建服务器选项
HttpServerOptions options = new HttpServerOptions()
.setPort(properties.getPort())
.setIdleTimeout(wsConfig.getIdleTimeoutSeconds())
.setMaxWebSocketFrameSize(wsConfig.getMaxFrameSize())
.setMaxWebSocketMessageSize(wsConfig.getMaxMessageSize());
if (Boolean.TRUE.equals(wsConfig.getSslEnabled())) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions()
.setKeyPath(wsConfig.getSslKeyPath())
.setCertPath(wsConfig.getSslCertPath());
options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
}
// 1.3 创建服务器并设置 WebSocket 处理器
httpServer = vertx.createHttpServer(options);
httpServer.webSocketHandler(socket -> {
// 验证路径
if (ObjUtil.notEqual(wsConfig.getPath(), socket.path())) {
log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]",
socket.path(), wsConfig.getPath());
socket.reject();
return;
}
// 创建上行处理器
IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler(this,
messageService, deviceService, connectionManager, serializer);
handler.handle(socket);
});
// 1.4 启动服务器
try {
httpServer.listen().result();
running = true;
log.info("[start][IoT WebSocket 协议 {} 启动成功,端口:{},路径:{}serverId{}]",
getId(), properties.getPort(), wsConfig.getPath(), serverId);
// 2. 启动下行消息订阅者
downstreamSubscriber.start();
} catch (Exception e) {
log.error("[start][IoT WebSocket 协议 {} 启动失败]", getId(), e);
if (httpServer != null) {
httpServer.close();
httpServer = null;
}
if (vertx != null) {
vertx.close();
vertx = null;
}
throw e;
}
}
@Override
public void stop() {
if (!running) {
return;
}
// 1. 停止下行消息订阅者
try {
downstreamSubscriber.stop();
log.info("[stop][IoT WebSocket 协议 {} 下行消息订阅者已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT WebSocket 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
// 2.1 关闭 WebSocket 服务器
if (httpServer != null) {
try {
httpServer.close().result();
log.info("[stop][IoT WebSocket 协议 {} 服务器已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT WebSocket 协议 {} 服务器停止失败]", getId(), e);
}
httpServer = null;
}
// 2.2 关闭 Vertx 实例
if (vertx != null) {
try {
vertx.close().result();
log.info("[stop][IoT WebSocket 协议 {} Vertx 已关闭]", getId());
} catch (Exception e) {
log.error("[stop][IoT WebSocket 协议 {} Vertx 关闭失败]", getId(), e);
}
vertx = null;
}
running = false;
log.info("[stop][IoT WebSocket 协议 {} 已停止]", getId());
}
}

View File

@@ -1,135 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.PemKeyCertOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 WebSocket 协议:接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotWebSocketUpstreamProtocol implements IotProtocol {
private static final String ID = "websocket";
private final IotGatewayProperties.WebSocketProperties wsProperties;
private final IotDeviceService deviceService;
private final IotDeviceMessageService messageService;
private final IotWebSocketConnectionManager connectionManager;
private final Vertx vertx;
@Getter
private final String serverId;
private HttpServer httpServer;
private volatile boolean running = false;
public IotWebSocketUpstreamProtocol(IotGatewayProperties.WebSocketProperties wsProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotWebSocketConnectionManager connectionManager,
Vertx vertx) {
this.wsProperties = wsProperties;
this.deviceService = deviceService;
this.messageService = messageService;
this.connectionManager = connectionManager;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(wsProperties.getPort());
}
@Override
public String getId() {
return ID;
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.WEBSOCKET;
}
@Override
@PostConstruct
@SuppressWarnings("deprecation")
public void start() {
// 1.1 创建服务器选项
HttpServerOptions options = new HttpServerOptions()
.setPort(wsProperties.getPort())
.setIdleTimeout(wsProperties.getIdleTimeoutSeconds())
.setMaxWebSocketFrameSize(wsProperties.getMaxFrameSize())
.setMaxWebSocketMessageSize(wsProperties.getMaxMessageSize());
// 1.2 配置 SSL如果启用
if (Boolean.TRUE.equals(wsProperties.getSslEnabled())) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions()
.setKeyPath(wsProperties.getSslKeyPath())
.setCertPath(wsProperties.getSslCertPath());
options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
}
// 2. 创建服务器并设置 WebSocket 处理器
httpServer = vertx.createHttpServer(options);
httpServer.webSocketHandler(socket -> {
// 验证路径
if (ObjUtil.notEqual(wsProperties.getPath(), socket.path())) {
log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]",
socket.path(), wsProperties.getPath());
socket.reject();
return;
}
// 创建上行处理器
IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler(this,
messageService, deviceService, connectionManager);
handler.handle(socket);
});
// 3. 启动服务器
try {
httpServer.listen().result();
running = true;
log.info("[start][IoT 网关 WebSocket 协议启动成功,端口:{},路径:{}]", wsProperties.getPort(), wsProperties.getPath());
} catch (Exception e) {
log.error("[start][IoT 网关 WebSocket 协议启动失败]", e);
throw e;
}
}
@Override
@PreDestroy
public void stop() {
if (httpServer != null) {
try {
httpServer.close().result();
running = false;
log.info("[stop][IoT 网关 WebSocket 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 WebSocket 协议停止失败]", e);
}
}
}
@Override
public boolean isRunning() {
return running;
}
}

View File

@@ -1,9 +1,10 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -16,7 +17,7 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class IotWebSocketDownstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final IotMessageSerializer serializer;
private final IotWebSocketConnectionManager connectionManager;
@@ -37,9 +38,15 @@ public class IotWebSocketDownstreamHandler {
}
// 2. 编码消息并发送到设备
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getCodecType());
String jsonMessage = StrUtil.utf8Str(bytes);
boolean success = connectionManager.sendToDevice(message.getDeviceId(), jsonMessage);
byte[] bytes = serializer.serialize(message);
// TODO @AI参考别的模块的做法直接发类似 tcp 这种
boolean success;
if (serializer.getType() == IotSerializeTypeEnum.BINARY) {
success = connectionManager.sendToDevice(message.getDeviceId(), bytes);
} else {
String jsonMessage = StrUtil.utf8Str(bytes);
success = connectionManager.sendToDevice(message.getDeviceId(), jsonMessage);
}
if (success) {
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);

View File

@@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol;
import lombok.extern.slf4j.Slf4j;
/**
@@ -16,7 +16,7 @@ public class IotWebSocketDownstreamSubscriber extends IotProtocolDownstreamSubsc
private final IotWebSocketDownstreamHandler downstreamHandler;
public IotWebSocketDownstreamSubscriber(IotWebSocketUpstreamProtocol protocol,
public IotWebSocketDownstreamSubscriber(IotWebSocketProtocol protocol,
IotWebSocketDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
super(protocol, messageBus);

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.upstream;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
@@ -11,17 +12,19 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
@@ -36,82 +39,120 @@ import java.util.Map;
@Slf4j
public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
/**
* 默认消息编解码类型
*/
private static final String CODEC_TYPE = IotAlinkDeviceMessageCodec.TYPE;
private static final String AUTH_METHOD = "auth";
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotWebSocketConnectionManager connectionManager;
private final IotDeviceCommonApi deviceApi;
private final String serverId;
public IotWebSocketUpstreamHandler(IotWebSocketUpstreamProtocol protocol,
/**
* 消息序列化器处理业务消息序列化/反序列化
*/
private final IotMessageSerializer serializer;
/**
* 连接管理器
*/
private final IotWebSocketConnectionManager connectionManager;
// TODO @AI是不是可以去掉
private final boolean binaryPayload;
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotDeviceCommonApi deviceApi;
// TODO @AI参数顺序参考 IotTcpUpstreamHandler
public IotWebSocketUpstreamHandler(IotWebSocketProtocol protocol,
IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotWebSocketConnectionManager connectionManager) {
IotWebSocketConnectionManager connectionManager,
IotMessageSerializer serializer) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.connectionManager = connectionManager;
this.serializer = serializer;
this.binaryPayload = serializer.getType() == IotSerializeTypeEnum.BINARY;
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.serverId = protocol.getServerId();
// TODO @AI通过 springutildeviceServicedeviceMessageService
}
@Override
@SuppressWarnings("DuplicatedCode")
public void handle(ServerWebSocket socket) {
String clientId = IdUtil.simpleUUID();
log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
// 1. 设置异常和关闭处理器
// TODO @AIclientId 去掉
socket.exceptionHandler(ex -> {
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
cleanupConnection(socket);
socket.close();
});
socket.closeHandler(v -> {
log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
cleanupConnection(socket);
});
// 2. 设置文本消息处理器
socket.textMessageHandler(message -> {
try {
processMessage(clientId, message, socket);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage());
// 2. 设置消息处理器JSON 使用文本BINARY 使用二进制
// TODO @AI是不是 textbinary 保持统一用一个 mesagehandler
if (binaryPayload) {
socket.binaryMessageHandler(buffer -> {
try {
processMessage(clientId, buffer.getBytes(), socket);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage());
cleanupConnection(socket);
socket.close();
}
});
socket.textMessageHandler(message -> {
log.warn("[handle][收到文本帧但当前序列化为 BINARY断开连接客户端 ID: {},地址: {}]",
clientId, socket.remoteAddress());
cleanupConnection(socket);
socket.close();
}
});
});
} else {
socket.textMessageHandler(message -> {
try {
processMessage(clientId, StrUtil.utf8Bytes(message), socket);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage());
// TODO @AI是不是不用 cleanupConnectionclosehandler 本身就吹了了
cleanupConnection(socket);
socket.close();
}
});
socket.binaryMessageHandler(buffer -> {
try {
processMessage(clientId, buffer.getBytes(), socket);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage());
cleanupConnection(socket);
socket.close();
}
});
}
}
/**
* 处理消息
*
* @param clientId 客户端 ID
* @param message 消息JSON 字符串
* @param payload 消息负载
* @param socket WebSocket 连接
* @throws Exception 消息解码失败时抛出异常
*/
private void processMessage(String clientId, String message, ServerWebSocket socket) throws Exception {
private void processMessage(String clientId, byte[] payload, ServerWebSocket socket) throws Exception {
// 1.1 基础检查
if (StrUtil.isBlank(message)) {
if (ArrayUtil.isEmpty(payload)) {
return;
}
// 1.2 解码消息已认证连接使用其 codecType未认证连接使用默认 CODEC_TYPE
IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
String codecType = connectionInfo != null ? connectionInfo.getCodecType() : CODEC_TYPE;
// 1.2 解码消息
IotDeviceMessage deviceMessage;
try {
deviceMessage = deviceMessageService.decodeDeviceMessage(
StrUtil.utf8Bytes(message), codecType);
deviceMessage = serializer.deserialize(payload);
if (deviceMessage == null) {
throw new Exception("解码后消息为空");
}
@@ -132,6 +173,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
handleBusinessRequest(clientId, deviceMessage, socket);
}
} catch (Exception e) {
// TODO @AI参考 IotTcpUpstreamHandler 处理业务参数其它
log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]",
clientId, deviceMessage.getMethod(), e);
// 发送错误响应避免客户端一直等待
@@ -153,6 +195,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) {
try {
// 1.1 解析认证参数
// TODO @AI参数解析参考 tcp 对应的 handleAuthenticationRequest
IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams());
if (authParams == null) {
log.warn("[handleAuthenticationRequest][认证参数解析失败,客户端 ID: {}]", clientId);
@@ -204,6 +247,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
private void handleRegisterRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) {
// TODO @AI参数解析参考 tcp 对应的 handleRegisterRequest
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams());
@@ -232,6 +276,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
}
}
// TODO @AI参考对应的 tcp handleBusinessRequest
/**
* 处理业务请求
*
@@ -270,10 +315,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
IotWebSocketConnectionManager.ConnectionInfo connectionInfo = new IotWebSocketConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setClientId(clientId)
.setCodecType(CODEC_TYPE);
// 注册连接
.setDeviceName(device.getDeviceName());
connectionManager.registerConnection(socket, device.getId(), connectionInfo);
}
@@ -314,6 +356,8 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
}
}
// ===================== 发送响应消息 =====================
/**
* 发送响应消息
*
@@ -332,8 +376,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
int code = success ? 0 : 401;
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, code, message);
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE);
socket.writeTextMessage(StrUtil.utf8Str(encodedData));
writeResponse(socket, responseMessage);
} catch (Exception e) {
log.error("[sendResponse][发送响应失败requestId: {}]", requestId, e);
}
@@ -461,11 +504,22 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null);
// 2. 发送响应
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE);
socket.writeTextMessage(StrUtil.utf8Str(encodedData));
writeResponse(socket, responseMessage);
} catch (Exception e) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应失败requestId: {}]", requestId, e);
}
}
/**
* 写入响应消息
*/
private void writeResponse(ServerWebSocket socket, IotDeviceMessage responseMessage) {
byte[] payload = serializer.serialize(responseMessage);
if (binaryPayload) {
socket.writeBinaryMessage(Buffer.buffer(payload));
} else {
socket.writeTextMessage(StrUtil.utf8Str(payload));
}
}
}

View File

@@ -1,10 +1,10 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentHashMap;
* @author 芋道源码
*/
@Slf4j
@Component
public class IotWebSocketConnectionManager {
/**
@@ -69,7 +68,8 @@ public class IotWebSocketConnectionManager {
return;
}
Long deviceId = connectionInfo.getDeviceId();
deviceSocketMap.remove(deviceId);
// 仅当 deviceSocketMap 中的 socket 是当前 socket 时才移除,避免误删新连接
deviceSocketMap.remove(deviceId, socket);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
deviceId, socket.remoteAddress());
}
@@ -115,6 +115,33 @@ public class IotWebSocketConnectionManager {
}
}
// TODO @AI没必要这里加一个
/**
* 发送消息到设备(二进制消息)
*
* @param deviceId 设备 ID
* @param payload 二进制消息
* @return 是否发送成功
*/
public boolean sendToDevice(Long deviceId, byte[] payload) {
ServerWebSocket socket = deviceSocketMap.get(deviceId);
if (socket == null) {
log.warn("[sendToDevice][设备未连接,设备 ID: {}]", deviceId);
return false;
}
try {
socket.writeBinaryMessage(Buffer.buffer(payload));
log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, payload.length);
return true;
} catch (Exception e) {
log.error("[sendToDevice][发送消息失败,设备 ID: {}]", deviceId, e);
// 发送失败时清理连接
unregisterConnection(socket);
return false;
}
}
/**
* 连接信息(包含认证信息)
*/
@@ -135,15 +162,6 @@ public class IotWebSocketConnectionManager {
*/
private String deviceName;
/**
* 客户端 ID
*/
private String clientId;
/**
* 消息编解码类型(认证后确定)
*/
private String codecType;
}
}

View File

@@ -94,7 +94,7 @@ yudao:
- id: websocket-json
type: websocket
port: 8094
enabled: false
enabled: true
serialize: json
websocket:
path: /ws
@@ -159,6 +159,7 @@ yudao:
max-message-size: 8192
connect-timeout-seconds: 60
ssl-enabled: false
--- #################### 日志相关配置 ####################
# 基础日志配置

View File

@@ -270,7 +270,7 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
});
socket.handler(parser);
// 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑)
// 2.1 序列化 + 帧编码
byte[] serializedData = SERIALIZER.serialize(request);
Buffer frameData = FRAME_CODEC.encode(serializedData);
log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length());

View File

@@ -373,7 +373,7 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
});
socket.handler(parser);
// 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑)
// 2.1 序列化 + 帧编码
byte[] serializedData = SERIALIZER.serialize(request);
Buffer frameData = FRAME_CODEC.encode(serializedData);
log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length());

View File

@@ -249,7 +249,7 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest {
});
socket.handler(parser);
// 2.1 序列化 + 帧编码(复用 gateway 的编码逻辑)
// 2.1 序列化 + 帧编码
byte[] serializedData = SERIALIZER.serialize(request);
Buffer frameData = FRAME_CODEC.encode(serializedData);
log.info("[sendAndReceive][发送消息: {},数据长度: {} 字节]", request.getMethod(), frameData.length());

View File

@@ -10,8 +10,8 @@ import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketClient;
@@ -61,7 +61,7 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
// ===================== 编解码器选择 =====================
private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec();
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) =====================
@@ -95,10 +95,10 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
// 1.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testAuth][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 2.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
@@ -109,7 +109,7 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
// 3. 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testAuth][响应消息: {}]", responseMessage);
} else {
log.warn("[testAuth][未收到响应]");
@@ -137,10 +137,10 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
registerReqDTO.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
// 1.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testDeviceRegister][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 2.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
@@ -151,7 +151,7 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
// 3. 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testDeviceRegister][响应消息: {}]", responseMessage);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
} else {
@@ -186,16 +186,16 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
.put("height", "2")
.build()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testPropertyPost][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testPropertyPost][响应消息: {}]", responseMessage);
} else {
log.warn("[testPropertyPost][未收到响应]");
@@ -229,16 +229,16 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testEventPost][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testEventPost][响应消息: {}]", responseMessage);
} else {
log.warn("[testEventPost][未收到响应]");
@@ -308,13 +308,13 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[authenticate][发送认证请求: {}]", jsonMessage);
String response = sendAndReceive(ws, jsonMessage);
if (response != null) {
return CODEC.decode(StrUtil.utf8Bytes(response));
return SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
}
return null;
}

View File

@@ -14,8 +14,8 @@ import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketClient;
@@ -67,9 +67,9 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
private static Vertx vertx;
// ===================== 编解码器选择 =====================
// ===================== 序列化器选择 =====================
private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec();
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) =====================
@@ -110,10 +110,10 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
// 1.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testAuth][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 2.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
@@ -124,7 +124,7 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
// 3. 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testAuth][响应消息: {}]", responseMessage);
} else {
log.warn("[testAuth][未收到响应]");
@@ -164,16 +164,16 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(),
params,
null, null, null);
// 2.3 编码
byte[] payload = CODEC.encode(request);
// 2.3 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testTopoAdd][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testTopoAdd][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testTopoAdd][响应消息: {}]", responseMessage);
} else {
log.warn("[testTopoAdd][未收到响应]");
@@ -205,16 +205,16 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(),
params,
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testTopoDelete][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testTopoDelete][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testTopoDelete][响应消息: {}]", responseMessage);
} else {
log.warn("[testTopoDelete][未收到响应]");
@@ -244,16 +244,16 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(),
params,
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testTopoGet][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testTopoGet][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testTopoGet][响应消息: {}]", responseMessage);
} else {
log.warn("[testTopoGet][未收到响应]");
@@ -287,16 +287,16 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(),
Collections.singletonList(subDevice),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testSubDeviceRegister][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testSubDeviceRegister][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testSubDeviceRegister][响应消息: {}]", responseMessage);
} else {
log.warn("[testSubDeviceRegister][未收到响应]");
@@ -358,16 +358,16 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(),
params,
null, null, null);
// 2.7 编码
byte[] payload = CODEC.encode(request);
// 2.7 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testPropertyPackPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testPropertyPackPost][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testPropertyPackPost][响应消息: {}]", responseMessage);
} else {
log.warn("[testPropertyPackPost][未收到响应]");
@@ -438,13 +438,13 @@ public class IotGatewayDeviceWebSocketProtocolIntegrationTest {
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[authenticate][发送认证请求: {}]", jsonMessage);
String response = sendAndReceive(ws, jsonMessage);
if (response != null) {
return CODEC.decode(StrUtil.utf8Bytes(response));
return SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
}
return null;
}

View File

@@ -9,8 +9,8 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketClient;
@@ -60,9 +60,9 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest {
private static Vertx vertx;
// ===================== 编解码器选择 =====================
// ===================== 序列化器选择 =====================
private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec();
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
@@ -96,10 +96,10 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest {
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
// 1.2 编码
byte[] payload = CODEC.encode(request);
// 1.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testAuth][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testAuth][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 2.1 创建 WebSocket 连接(同步)
WebSocket ws = createWebSocketConnection();
@@ -110,7 +110,7 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest {
// 3. 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testAuth][响应消息: {}]", responseMessage);
} else {
log.warn("[testAuth][未收到响应]");
@@ -146,16 +146,16 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest {
.put("temperature", 36.5)
.build()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testPropertyPost][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testPropertyPost][响应消息: {}]", responseMessage);
} else {
log.warn("[testPropertyPost][未收到响应]");
@@ -195,16 +195,16 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest {
.build(),
System.currentTimeMillis()),
null, null, null);
// 2.2 编码
byte[] payload = CODEC.encode(request);
// 2.2 序列化
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request);
log.info("[testEventPost][Serialize: {}, 请求消息: {}]", SERIALIZER.getType(), request);
// 3.1 发送并等待响应
String response = sendAndReceive(ws, jsonMessage);
// 3.2 解码响应
if (response != null) {
IotDeviceMessage responseMessage = CODEC.decode(StrUtil.utf8Bytes(response));
IotDeviceMessage responseMessage = SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
log.info("[testEventPost][响应消息: {}]", responseMessage);
} else {
log.warn("[testEventPost][未收到响应]");
@@ -274,13 +274,13 @@ public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest {
.setPassword(authInfo.getPassword());
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null);
byte[] payload = CODEC.encode(request);
byte[] payload = SERIALIZER.serialize(request);
String jsonMessage = StrUtil.utf8Str(payload);
log.info("[authenticate][发送认证请求: {}]", jsonMessage);
String response = sendAndReceive(ws, jsonMessage);
if (response != null) {
return CODEC.decode(StrUtil.utf8Bytes(response));
return SERIALIZER.deserialize(StrUtil.utf8Bytes(response));
}
return null;
}