diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 85d394f4e1..79d978c4db 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -12,10 +12,6 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscr import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; @@ -159,44 +155,6 @@ public class IotGatewayConfiguration { } - /** - * IoT 网关 MQTT WebSocket 协议配置类 - */ - @Configuration - @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.mqtt-ws", name = "enabled", havingValue = "true") - @Slf4j - public static class MqttWsProtocolConfiguration { - - @Bean(name = "mqttWsVertx", destroyMethod = "close") - public Vertx mqttWsVertx() { - return Vertx.vertx(); - } - - @Bean - public IotMqttWsUpstreamProtocol iotMqttWsUpstreamProtocol(IotGatewayProperties gatewayProperties, - IotDeviceMessageService messageService, - IotMqttWsConnectionManager connectionManager, - @Qualifier("mqttWsVertx") Vertx mqttWsVertx) { - return new IotMqttWsUpstreamProtocol(gatewayProperties.getProtocol().getMqttWs(), - messageService, connectionManager, mqttWsVertx); - } - - @Bean - public IotMqttWsDownstreamHandler iotMqttWsDownstreamHandler(IotDeviceMessageService messageService, - IotDeviceService deviceService, - IotMqttWsConnectionManager connectionManager) { - return new IotMqttWsDownstreamHandler(messageService, deviceService, connectionManager); - } - - @Bean - public IotMqttWsDownstreamSubscriber iotMqttWsDownstreamSubscriber(IotMqttWsUpstreamProtocol mqttWsUpstreamProtocol, - IotMqttWsDownstreamHandler downstreamHandler, - IotMessageBus messageBus) { - return new IotMqttWsDownstreamSubscriber(mqttWsUpstreamProtocol, downstreamHandler, messageBus); - } - - } - /** * IoT 网关 UDP 协议配置类 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java index d7c4adbd00..9fb42c5849 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager; import cn.hutool.core.util.StrUtil; import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.buffer.Buffer; import io.vertx.mqtt.MqttEndpoint; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -166,7 +167,7 @@ public class IotMqttConnectionManager { } try { - endpoint.publish(topic, io.vertx.core.buffer.Buffer.buffer(payload), MqttQoS.valueOf(qos), false, retain); + endpoint.publish(topic, Buffer.buffer(payload), MqttQoS.valueOf(qos), false, retain); log.debug("[sendToDevice][发送消息成功,设备 ID: {},主题: {},QoS: {}]", deviceId, topic, qos); return true; } catch (Exception e) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsDownstreamSubscriber.java deleted file mode 100644 index 302824d6df..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsDownstreamSubscriber.java +++ /dev/null @@ -1,79 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws; - -import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; -import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDownstreamHandler; -import jakarta.annotation.PostConstruct; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT MQTT WebSocket 下行消息订阅器 - *

- * 订阅消息总线的设备下行消息,并通过 WebSocket 发送到设备 - * - * @author 芋道源码 - */ -@Slf4j -public class IotMqttWsDownstreamSubscriber implements IotMessageSubscriber { - - private final IotMqttWsUpstreamProtocol upstreamProtocol; - private final IotMqttWsDownstreamHandler downstreamHandler; - private final IotMessageBus messageBus; - - public IotMqttWsDownstreamSubscriber(IotMqttWsUpstreamProtocol upstreamProtocol, - IotMqttWsDownstreamHandler downstreamHandler, - IotMessageBus messageBus) { - this.upstreamProtocol = upstreamProtocol; - this.downstreamHandler = downstreamHandler; - this.messageBus = messageBus; - } - - @PostConstruct - public void init() { - messageBus.register(this); - log.info("[init][MQTT WebSocket 下行消息订阅器已启动,topic: {}]", getTopic()); - } - - @Override - public String getTopic() { - return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(upstreamProtocol.getServerId()); - } - - @Override - public String getGroup() { - // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group - return getTopic(); - } - - @Override - public void onMessage(IotDeviceMessage message) { - log.debug("[onMessage][收到下行消息,deviceId: {},method: {}]", - message.getDeviceId(), message.getMethod()); - try { - // 1. 校验 - String method = message.getMethod(); - if (StrUtil.isBlank(method)) { - log.warn("[onMessage][消息方法为空,deviceId: {}]", message.getDeviceId()); - return; - } - - // 2. 委托给下行处理器处理业务逻辑 - boolean success = downstreamHandler.handleDownstreamMessage(message); - if (success) { - log.debug("[onMessage][下行消息处理成功,deviceId: {},method: {}]", - message.getDeviceId(), message.getMethod()); - } else { - log.warn("[onMessage][下行消息处理失败,deviceId: {},method: {}]", - message.getDeviceId(), message.getMethod()); - } - } catch (Exception e) { - log.error("[onMessage][处理下行消息失败,deviceId: {},method: {}]", - message.getDeviceId(), message.getMethod(), e); - } - } - -} - diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsUpstreamProtocol.java deleted file mode 100644 index 6944d47dad..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsUpstreamProtocol.java +++ /dev/null @@ -1,146 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws; - -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsUpstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.core.Vertx; -import io.vertx.core.http.HttpServer; -import io.vertx.core.http.HttpServerOptions; -import io.vertx.core.http.ServerWebSocket; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 MQTT WebSocket 协议:接收设备上行消息 - *

- * 基于 Vert.x 实现 MQTT over WebSocket 服务端,支持: - * - 标准 MQTT 3.1.1 协议 - * - WebSocket 协议升级 - * - SSL/TLS 加密(wss://) - * - 设备认证与连接管理 - * - QoS 0/1/2 消息质量保证 - * - * @author 芋道源码 - */ -@Slf4j -public class IotMqttWsUpstreamProtocol { - - private final IotGatewayProperties.MqttWsProperties mqttWsProperties; - - private final IotDeviceMessageService messageService; - - private final IotMqttWsConnectionManager connectionManager; - - private final Vertx vertx; - - @Getter - private final String serverId; - - private HttpServer httpServer; - - public IotMqttWsUpstreamProtocol(IotGatewayProperties.MqttWsProperties mqttWsProperties, - IotDeviceMessageService messageService, - IotMqttWsConnectionManager connectionManager, - Vertx vertx) { - this.mqttWsProperties = mqttWsProperties; - this.messageService = messageService; - this.connectionManager = connectionManager; - this.vertx = vertx; - this.serverId = IotDeviceMessageUtils.generateServerId(mqttWsProperties.getPort()); - } - - @PostConstruct - public void start() { - // 创建 HTTP 服务器选项 - HttpServerOptions options = new HttpServerOptions() - .setPort(mqttWsProperties.getPort()) - .setIdleTimeout(mqttWsProperties.getKeepAliveTimeoutSeconds()) - .setMaxWebSocketFrameSize(mqttWsProperties.getMaxFrameSize()) - .setMaxWebSocketMessageSize(mqttWsProperties.getMaxMessageSize()) - // 配置 WebSocket 子协议支持 - .addWebSocketSubProtocol(mqttWsProperties.getSubProtocol()); - - // 配置 SSL(如果启用) - if (Boolean.TRUE.equals(mqttWsProperties.getSslEnabled())) { - options.setSsl(true) - .setKeyCertOptions(mqttWsProperties.getSslOptions().getKeyCertOptions()) - .setTrustOptions(mqttWsProperties.getSslOptions().getTrustOptions()); - log.info("[start][MQTT WebSocket 已启用 SSL/TLS (wss://)]"); - } - - // 创建 HTTP 服务器 - httpServer = vertx.createHttpServer(options); - - // 设置 WebSocket 处理器 - httpServer.webSocketHandler(this::handleWebSocketConnection); - - // 启动服务器 - try { - httpServer.listen().result(); - log.info("[start][IoT 网关 MQTT WebSocket 协议启动成功,端口: {},路径: {},支持子协议: {}]", - mqttWsProperties.getPort(), mqttWsProperties.getPath(), - "mqtt, mqttv3.1, " + mqttWsProperties.getSubProtocol()); - } catch (Exception e) { - log.error("[start][IoT 网关 MQTT WebSocket 协议启动失败]", e); - throw e; - } - } - - @PreDestroy - public void stop() { - if (httpServer != null) { - try { - // 关闭所有连接 - connectionManager.closeAllConnections(); - - // 关闭服务器 - httpServer.close().result(); - log.info("[stop][IoT 网关 MQTT WebSocket 协议已停止]"); - } catch (Exception e) { - log.error("[stop][IoT 网关 MQTT WebSocket 协议停止失败]", e); - } - } - } - - /** - * 处理 WebSocket 连接请求 - * - * @param socket WebSocket 连接 - */ - private void handleWebSocketConnection(ServerWebSocket socket) { - String path = socket.path(); - String subProtocol = socket.subProtocol(); - - log.info("[handleWebSocketConnection][收到 WebSocket 连接请求,path: {},subProtocol: {},remoteAddress: {}]", - path, subProtocol, socket.remoteAddress()); - - // 验证路径 - if (!mqttWsProperties.getPath().equals(path)) { - log.warn("[handleWebSocketConnection][WebSocket 路径不匹配,拒绝连接,path: {},期望: {}]", - path, mqttWsProperties.getPath()); - socket.close(); - return; - } - - // 验证子协议 - // Vert.x 已经自动进行了子协议协商,这里只需要验证是否为 MQTT 相关协议 - if (subProtocol != null && !subProtocol.startsWith("mqtt")) { - log.warn("[handleWebSocketConnection][WebSocket 子协议不支持,拒绝连接,subProtocol: {}]", subProtocol); - socket.close(); - return; - } - - log.info("[handleWebSocketConnection][WebSocket 连接已接受,remoteAddress: {},subProtocol: {}]", - socket.remoteAddress(), subProtocol); - - // 创建处理器并处理连接 - IotMqttWsUpstreamHandler handler = new IotMqttWsUpstreamHandler( - this, messageService, connectionManager); - handler.handle(socket); - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/manager/IotMqttWsConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/manager/IotMqttWsConnectionManager.java deleted file mode 100644 index fee3e359c8..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/manager/IotMqttWsConnectionManager.java +++ /dev/null @@ -1,259 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager; - -import cn.hutool.core.collection.CollUtil; -import io.vertx.core.http.ServerWebSocket; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; - -/** - * IoT MQTT WebSocket 连接管理器 - * - * @author 芋道源码 - */ -@Slf4j -@Component -public class IotMqttWsConnectionManager { - - /** - * 存储设备连接 - * Key: 设备标识(deviceKey) - * Value: WebSocket 连接 - */ - private final Map connections = new ConcurrentHashMap<>(); - - /** - * 存储设备标识与 Socket ID 的映射 - * Key: 设备标识(deviceKey) - * Value: Socket ID(UUID) - */ - private final Map deviceKeyToSocketId = new ConcurrentHashMap<>(); - - /** - * 存储 Socket ID 与设备标识的映射 - * Key: Socket ID(UUID) - * Value: 设备标识(deviceKey) - */ - private final Map socketIdToDeviceKey = new ConcurrentHashMap<>(); - - /** - * 存储设备订阅的主题 - * Key: 设备标识(deviceKey) - * Value: 订阅的主题集合 - */ - private final Map> deviceSubscriptions = new ConcurrentHashMap<>(); - - /** - * 添加连接 - * - * @param deviceKey 设备标识 - * @param socket WebSocket 连接 - * @param socketId Socket ID(UUID) - */ - public void addConnection(String deviceKey, ServerWebSocket socket, String socketId) { - connections.put(deviceKey, socket); - deviceKeyToSocketId.put(deviceKey, socketId); - socketIdToDeviceKey.put(socketId, deviceKey); - log.info("[addConnection][设备连接已添加,deviceKey: {},socketId: {},当前连接数: {}]", - deviceKey, socketId, connections.size()); - } - - /** - * 移除连接 - * - * @param deviceKey 设备标识 - */ - public void removeConnection(String deviceKey) { - ServerWebSocket socket = connections.remove(deviceKey); - String socketId = deviceKeyToSocketId.remove(deviceKey); - if (socketId != null) { - socketIdToDeviceKey.remove(socketId); - } - if (socket != null) { - log.info("[removeConnection][设备连接已移除,deviceKey: {},socketId: {},当前连接数: {}]", - deviceKey, socketId, connections.size()); - } - } - - /** - * 根据 Socket ID 移除连接 - * - * @param socketId WebSocket 文本框架 ID - */ - public void removeConnectionBySocketId(String socketId) { - String deviceKey = socketIdToDeviceKey.remove(socketId); - if (deviceKey != null) { - connections.remove(deviceKey); - log.info("[removeConnectionBySocketId][设备连接已移除,socketId: {},deviceKey: {},当前连接数: {}]", - socketId, deviceKey, connections.size()); - } - } - - /** - * 获取连接 - * - * @param deviceKey 设备标识 - * @return WebSocket 连接 - */ - public ServerWebSocket getConnection(String deviceKey) { - return connections.get(deviceKey); - } - - /** - * 根据 Socket ID 获取设备标识 - * - * @param socketId WebSocket 文本框架 ID - * @return 设备标识 - */ - public String getDeviceKeyBySocketId(String socketId) { - return socketIdToDeviceKey.get(socketId); - } - - /** - * 检查设备是否在线 - * - * @param deviceKey 设备标识 - * @return 是否在线 - */ - public boolean isOnline(String deviceKey) { - return connections.containsKey(deviceKey); - } - - /** - * 获取当前连接数 - * - * @return 连接数 - */ - public int getConnectionCount() { - return connections.size(); - } - - /** - * 关闭所有连接 - */ - public void closeAllConnections() { - connections.forEach((deviceKey, socket) -> { - try { - socket.close(); - log.info("[closeAllConnections][关闭设备连接,deviceKey: {}]", deviceKey); - } catch (Exception e) { - log.error("[closeAllConnections][关闭设备连接失败,deviceKey: {}]", deviceKey, e); - } - }); - connections.clear(); - deviceKeyToSocketId.clear(); - socketIdToDeviceKey.clear(); - deviceSubscriptions.clear(); - log.info("[closeAllConnections][所有连接已关闭]"); - } - - // ==================== 订阅管理方法 ==================== - - /** - * 添加订阅 - * - * @param deviceKey 设备标识 - * @param topic 订阅主题 - */ - public void addSubscription(String deviceKey, String topic) { - deviceSubscriptions.computeIfAbsent(deviceKey, k -> new CopyOnWriteArraySet<>()).add(topic); - log.debug("[addSubscription][设备订阅主题,deviceKey: {},topic: {}]", deviceKey, topic); - } - - /** - * 移除订阅 - * - * @param deviceKey 设备标识 - * @param topic 订阅主题 - */ - public void removeSubscription(String deviceKey, String topic) { - Set topics = deviceSubscriptions.get(deviceKey); - if (topics != null) { - topics.remove(topic); - log.debug("[removeSubscription][设备取消订阅,deviceKey: {},topic: {}]", deviceKey, topic); - } - } - - /** - * 检查设备是否订阅了指定主题 - * 支持 MQTT 通配符匹配(+ 和 #) - * - * @param deviceKey 设备标识 - * @param topic 发布主题 - * @return 是否匹配 - */ - public boolean isSubscribed(String deviceKey, String topic) { - Set subscriptions = deviceSubscriptions.get(deviceKey); - if (CollUtil.isEmpty(subscriptions)) { - return false; - } - - // 检查是否有匹配的订阅 - for (String subscription : subscriptions) { - if (topicMatches(subscription, topic)) { - return true; - } - } - return false; - } - - /** - * 获取设备的所有订阅 - * - * @param deviceKey 设备标识 - * @return 订阅主题集合 - */ - public Set getSubscriptions(String deviceKey) { - return deviceSubscriptions.get(deviceKey); - } - - // TODO @haohao:这个方法,是不是也可以考虑抽到 IotMqttTopicUtils 里面去哈;感觉更简洁一点? - /** - * MQTT 主题匹配 - * 支持通配符: - * - +:匹配单层主题 - * - #:匹配多层主题(必须在末尾) - * - * @param subscription 订阅主题(可能包含通配符) - * @param topic 发布主题(不包含通配符) - * @return 是否匹配 - */ - private boolean topicMatches(String subscription, String topic) { - // 完全匹配 - if (subscription.equals(topic)) { - return true; - } - - // 不包含通配符 - // TODO @haohao:这里要不要枚举下哈;+ # - if (!subscription.contains("+") && !subscription.contains("#")) { - return false; - } - - String[] subscriptionParts = subscription.split("/"); - String[] topicParts = topic.split("/"); - int i = 0; - for (; i < subscriptionParts.length && i < topicParts.length; i++) { - String subPart = subscriptionParts[i]; - String topicPart = topicParts[i]; - - // # 匹配剩余所有层级,且必须在末尾 - if (subPart.equals("#")) { - return i == subscriptionParts.length - 1; - } - - // 不是通配符且不匹配 - if (!subPart.equals("+") && !subPart.equals(topicPart)) { - return false; - } - } - - // 检查是否都匹配完 - return i == subscriptionParts.length && i == topicParts.length; - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/package-info.java deleted file mode 100644 index b9af4afe3a..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/package-info.java +++ /dev/null @@ -1,15 +0,0 @@ -/** - * IoT 网关 MQTT WebSocket 协议实现 - *

- * 基于 Vert.x 实现 MQTT over WebSocket 服务端,支持: - * - 标准 MQTT 3.1.1 协议 - * - WebSocket 协议升级 - * - SSL/TLS 加密(wss://) - * - 设备认证与连接管理 - * - QoS 0/1/2 消息质量保证 - * - 双向消息通信(上行/下行) - * - * @author 芋道源码 - */ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws; - diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsDownstreamHandler.java deleted file mode 100644 index 3aeb6c5c48..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsDownstreamHandler.java +++ /dev/null @@ -1,221 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router; - -import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.ServerWebSocket; -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.atomic.AtomicInteger; - -/** - * IoT MQTT WebSocket 下行消息处理器 - *

- * 处理从消息总线发送到设备的消息,包括: - * - 属性设置 - * - 服务调用 - * - 事件通知 - * - * @author 芋道源码 - */ -@Slf4j -public class IotMqttWsDownstreamHandler { - - private final IotDeviceMessageService deviceMessageService; - - private final IotDeviceService deviceService; - - private final IotMqttWsConnectionManager connectionManager; - - /** - * 消息 ID 生成器(用于发布消息) - */ - private final AtomicInteger messageIdGenerator = new AtomicInteger(1); - - public IotMqttWsDownstreamHandler(IotDeviceMessageService deviceMessageService, - IotDeviceService deviceService, - IotMqttWsConnectionManager connectionManager) { - this.deviceMessageService = deviceMessageService; - this.deviceService = deviceService; - this.connectionManager = connectionManager; - } - - /** - * 处理下行消息 - * - * @param message 设备消息 - * @return 是否处理成功 - */ - public boolean handleDownstreamMessage(IotDeviceMessage message) { - try { - // 1. 基础校验 - if (message == null || message.getDeviceId() == null) { - log.warn("[handleDownstreamMessage][消息或设备 ID 为空,忽略处理]"); - return false; - } - - // 2. 获取设备信息 - IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId()); - if (deviceInfo == null) { - log.warn("[handleDownstreamMessage][设备不存在,设备 ID:{}]", message.getDeviceId()); - return false; - } - - // 3. 构建设备标识 - String deviceKey = deviceInfo.getProductKey() + ":" + deviceInfo.getDeviceName(); - - // 4. 检查设备是否在线 - if (!connectionManager.isOnline(deviceKey)) { - log.warn("[handleDownstreamMessage][设备离线,无法发送消息,deviceKey: {}]", deviceKey); - return false; - } - - // 5. 构建主题 - String topic = buildDownstreamTopic(message, deviceInfo); - if (StrUtil.isBlank(topic)) { - log.warn("[handleDownstreamMessage][主题构建失败,设备 ID:{},方法:{}]", - message.getDeviceId(), message.getMethod()); - return false; - } - - // 6. 检查设备是否订阅了该主题 - if (!connectionManager.isSubscribed(deviceKey, topic)) { - log.warn("[handleDownstreamMessage][设备未订阅该主题,deviceKey: {},topic: {}]", deviceKey, topic); - return false; - } - - // 8. 编码消息 - byte[] payload = deviceMessageService.encodeDeviceMessage(message, - deviceInfo.getProductKey(), deviceInfo.getDeviceName()); - if (payload == null || payload.length == 0) { - log.warn("[handleDownstreamMessage][消息编码失败,设备 ID:{}]", message.getDeviceId()); - return false; - } - - // 9. 发送消息到设备 - return sendMessageToDevice(deviceKey, topic, payload, 1); - } catch (Exception e) { - if (message != null) { - log.error("[handleDownstreamMessage][处理下行消息异常,设备 ID:{},错误:{}]", - message.getDeviceId(), e.getMessage(), e); - } - return false; - } - } - - /** - * 构建下行消息主题 - * - * @param message 设备消息 - * @param deviceInfo 设备信息 - * @return 主题 - */ - private String buildDownstreamTopic(IotDeviceMessage message, IotDeviceRespDTO deviceInfo) { - String method = message.getMethod(); - if (StrUtil.isBlank(method)) { - return null; - } - - // 使用工具类构建主题,支持回复消息处理 - boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); - return IotMqttTopicUtils.buildTopicByMethod(method, deviceInfo.getProductKey(), - deviceInfo.getDeviceName(), isReply); - } - - /** - * 发送消息到设备 - * - * @param deviceKey 设备标识(productKey:deviceName) - * @param topic 主题 - * @param payload 消息内容 - * @param qos QoS 级别 - * @return 是否发送成功 - */ - private boolean sendMessageToDevice(String deviceKey, String topic, byte[] payload, int qos) { - // 获取设备连接 - ServerWebSocket socket = connectionManager.getConnection(deviceKey); - if (socket == null) { - log.warn("[sendMessageToDevice][设备未连接,deviceKey: {}]", deviceKey); - return false; - } - - try { - int messageId = qos > 0 ? generateMessageId() : 0; - - // 手动编码 MQTT PUBLISH 消息 - io.netty.buffer.ByteBuf byteBuf = io.netty.buffer.Unpooled.buffer(); - - // 固定头:消息类型(PUBLISH=3) + DUP(0) + QoS + RETAIN - int fixedHeaderByte1 = 0x30 | (qos << 1); // PUBLISH类型 - byteBuf.writeByte(fixedHeaderByte1); - - // 计算剩余长度 - int topicLength = topic.getBytes().length; - int remainingLength = 2 + topicLength + (qos > 0 ? 2 : 0) + payload.length; - - // 写入剩余长度(简化版本,假设小于 128 字节) - if (remainingLength < 128) { - byteBuf.writeByte(remainingLength); - } else { - // 处理大于 127 的情况 - int x = remainingLength; - do { - int encodedByte = x % 128; - x = x / 128; - if (x > 0) { - encodedByte = encodedByte | 128; - } - byteBuf.writeByte(encodedByte); - } while (x > 0); - } - - // 可变头:主题名称 - byteBuf.writeShort(topicLength); - byteBuf.writeBytes(topic.getBytes()); - - // 可变头:消息 ID(仅 QoS > 0 时) - if (qos > 0) { - byteBuf.writeShort(messageId); - } - - // 有效载荷 - byteBuf.writeBytes(payload); - - // 发送 - byte[] bytes = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(bytes); - byteBuf.release(); - socket.writeBinaryMessage(Buffer.buffer(bytes)); - - log.info("[sendMessageToDevice][消息已发送到设备,deviceKey: {},topic: {},qos: {},messageId: {}]", - deviceKey, topic, qos, messageId); - return true; - } catch (Exception e) { - log.error("[sendMessageToDevice][发送消息到设备失败,deviceKey: {},topic: {}]", deviceKey, topic, e); - return false; - } - } - - /** - * 生成消息 ID - * - * @return 消息 ID - */ - private int generateMessageId() { - int id = messageIdGenerator.getAndIncrement(); - // MQTT 消息 ID 范围是 1-65535 - // TODO @haohao:并发可能有问题; - if (id > 65535) { - messageIdGenerator.set(1); - return 1; - } - return id; - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsUpstreamHandler.java deleted file mode 100644 index 26833fb46f..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsUpstreamHandler.java +++ /dev/null @@ -1,754 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router; - -import cn.hutool.core.util.BooleanUtil; -import cn.hutool.core.util.IdUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.handler.codec.DecoderException; -import io.netty.handler.codec.mqtt.*; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.ServerWebSocket; -import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.StandardCharsets; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * IoT MQTT WebSocket 上行消息处理器 - *

- * 处理来自设备的 MQTT 消息,包括: - * - CONNECT:设备连接认证 - * - PUBLISH:设备发布消息 - * - SUBSCRIBE:设备订阅主题 - * - UNSUBSCRIBE:设备取消订阅 - * - PINGREQ:心跳请求 - * - DISCONNECT:设备断开连接 - * - * @author 芋道源码 - */ -@Slf4j -public class IotMqttWsUpstreamHandler { - - private final IotMqttWsUpstreamProtocol upstreamProtocol; - - private final IotDeviceCommonApi deviceApi; - - private final IotDeviceMessageService messageService; - - private final IotMqttWsConnectionManager connectionManager; - - /** - * 存储 WebSocket 连接到 Socket ID 的映射 - * Key: WebSocket 对象 - * Value: Socket ID(UUID) - */ - private final ConcurrentHashMap socketIdMap = new ConcurrentHashMap<>(); - - /** - * 存储 Socket ID 对应的设备信息 - * Key: Socket ID(UUID) - * Value: 设备信息 - */ - private final ConcurrentHashMap socketDeviceMap = new ConcurrentHashMap<>(); - - /** - * 存储设备的消息 ID 生成器(用于 QoS > 0 的消息) - */ - private final ConcurrentHashMap deviceMessageIdMap = new ConcurrentHashMap<>(); - - /** - * MQTT 解码通道(用于解析 WebSocket 中的 MQTT 二进制消息) - */ - private final ThreadLocal decoderChannelThreadLocal = ThreadLocal - .withInitial(() -> new EmbeddedChannel(new MqttDecoder())); - - /** - * MQTT 编码通道(用于编码 MQTT 响应消息) - */ - private final ThreadLocal encoderChannelThreadLocal = ThreadLocal - .withInitial(() -> new EmbeddedChannel(MqttEncoder.INSTANCE)); - - public IotMqttWsUpstreamHandler(IotMqttWsUpstreamProtocol upstreamProtocol, - IotDeviceMessageService messageService, - IotMqttWsConnectionManager connectionManager) { - this.upstreamProtocol = upstreamProtocol; - this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); - this.messageService = messageService; - this.connectionManager = connectionManager; - } - - /** - * 处理 WebSocket 连接 - * - * @param socket WebSocket 连接 - */ - public void handle(ServerWebSocket socket) { - // 生成唯一的 Socket ID(因为 MQTT 使用二进制协议,textHandlerID() 会返回 null) - String socketId = IdUtil.simpleUUID(); - socketIdMap.put(socket, socketId); - - log.info("[handle][WebSocket 连接建立,socketId: {},remoteAddress: {}]", - socketId, socket.remoteAddress()); - - // 设置二进制数据处理器 - socket.binaryMessageHandler(buffer -> { - try { - handleMqttMessage(socket, buffer); - } catch (Exception e) { - log.error("[handle][处理 MQTT 消息异常,socketId: {}]", socketId, e); - socket.close(); - } - }); - - // 设置关闭处理器 - socket.closeHandler(v -> { - socketIdMap.remove(socket); - IotDeviceRespDTO device = socketDeviceMap.remove(socketId); - if (device != null) { - String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); - connectionManager.removeConnection(deviceKey); - deviceMessageIdMap.remove(deviceKey); - // 发送设备离线消息 - sendOfflineMessage(device); - log.info("[handle][WebSocket 连接关闭,deviceKey: {},socketId: {}]", deviceKey, socketId); - } - }); - - // 设置异常处理器 - socket.exceptionHandler(e -> { - log.error("[handle][WebSocket 连接异常,socketId: {}]", socketId, e); - socketIdMap.remove(socket); - IotDeviceRespDTO device = socketDeviceMap.remove(socketId); - if (device != null) { - String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); - connectionManager.removeConnection(deviceKey); - deviceMessageIdMap.remove(deviceKey); - } - socket.close(); - }); - } - - /** - * 处理 MQTT 消息 - * - * @param socket WebSocket 连接 - * @param buffer 消息缓冲区 - */ - private void handleMqttMessage(ServerWebSocket socket, Buffer buffer) { - String socketId = socketIdMap.get(socket); - ByteBuf byteBuf = Unpooled.wrappedBuffer(buffer.getBytes()); - - try { - // 使用 EmbeddedChannel 解码 MQTT 消息 - EmbeddedChannel decoderChannel = decoderChannelThreadLocal.get(); - decoderChannel.writeInbound(byteBuf.retain()); - - // 读取解码后的消息 - MqttMessage mqttMessage = decoderChannel.readInbound(); - if (mqttMessage == null) { - log.warn("[handleMqttMessage][MQTT 消息解码失败,socketId: {}]", socketId); - return; - } - - MqttMessageType messageType = mqttMessage.fixedHeader().messageType(); - log.debug("[handleMqttMessage][收到 MQTT 消息,类型: {},socketId: {}]", messageType, socketId); - - // 根据消息类型分发处理 - switch (messageType) { - case CONNECT: - handleConnect(socket, (MqttConnectMessage) mqttMessage); - break; - case PUBLISH: - handlePublish(socket, (MqttPublishMessage) mqttMessage); - break; - case PUBACK: - handlePubAck(socket, mqttMessage); - break; - case PUBREC: - handlePubRec(socket, mqttMessage); - break; - case PUBREL: - handlePubRel(socket, mqttMessage); - break; - case PUBCOMP: - handlePubComp(socket, mqttMessage); - break; - case SUBSCRIBE: - handleSubscribe(socket, (MqttSubscribeMessage) mqttMessage); - break; - case UNSUBSCRIBE: - handleUnsubscribe(socket, (MqttUnsubscribeMessage) mqttMessage); - break; - case PINGREQ: - handlePingReq(socket); - break; - case DISCONNECT: - handleDisconnect(socket); - break; - default: - log.warn("[handleMqttMessage][不支持的消息类型: {},socketId: {}]", messageType, socketId); - } - } catch (DecoderException e) { - log.error("[handleMqttMessage][MQTT 消息解码异常,socketId: {}]", socketId, e); - socket.close(); - } catch (Exception e) { - log.error("[handleMqttMessage][处理 MQTT 消息失败,socketId: {}]", socketId, e); - socket.close(); - } finally { - byteBuf.release(); - } - } - - /** - * 处理 CONNECT 消息(设备认证) - */ - private void handleConnect(ServerWebSocket socket, MqttConnectMessage message) { - String socketId = socketIdMap.get(socket); - try { - // 1. 解析 CONNECT 消息 - MqttConnectPayload payload = message.payload(); - String clientId = payload.clientIdentifier(); - String username = payload.userName(); - String password = payload.passwordInBytes() != null - ? new String(payload.passwordInBytes(), StandardCharsets.UTF_8) - : null; - - log.info("[handleConnect][收到 CONNECT 消息,clientId: {},username: {},socketId: {}]", - clientId, username, socketId); - - // 2. 设备认证 - IotDeviceRespDTO device = authenticateDevice(clientId, username, password); - if (device == null) { - log.warn("[handleConnect][设备认证失败,clientId: {},socketId: {}]", clientId, socketId); - sendConnAck(socket, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); - socket.close(); - return; - } - - // 3. 保存设备信息 - socketDeviceMap.put(socketId, device); - String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); - connectionManager.addConnection(deviceKey, socket, socketId); - deviceMessageIdMap.put(deviceKey, new AtomicInteger(1)); - - log.info("[handleConnect][设备认证成功,deviceId: {},deviceKey: {},socketId: {}]", - device.getId(), deviceKey, socketId); - - // 4. 发送 CONNACK - sendConnAck(socket, MqttConnectReturnCode.CONNECTION_ACCEPTED); - - // 5. 发送设备上线消息 - sendOnlineMessage(device); - } catch (Exception e) { - log.error("[handleConnect][处理 CONNECT 消息失败,socketId: {}]", socketId, e); - sendConnAck(socket, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE); - socket.close(); - } - } - - /** - * 处理 PUBLISH 消息(设备发布消息) - */ - private void handlePublish(ServerWebSocket socket, MqttPublishMessage message) { - String socketId = socketIdMap.get(socket); - IotDeviceRespDTO device = socketDeviceMap.get(socketId); - - if (device == null) { - log.warn("[handlePublish][设备未认证,socketId: {}]", socketId); - socket.close(); - return; - } - - try { - // 1. 解析 PUBLISH 消息 - MqttFixedHeader fixedHeader = message.fixedHeader(); - MqttPublishVariableHeader variableHeader = message.variableHeader(); - ByteBuf payload = message.payload(); - - String topic = variableHeader.topicName(); - int messageId = variableHeader.packetId(); - MqttQoS qos = fixedHeader.qosLevel(); - - log.debug("[handlePublish][收到 PUBLISH 消息,topic: {},messageId: {},QoS: {},deviceId: {}]", - topic, messageId, qos, device.getId()); - - // 2. 读取 payload - byte[] payloadBytes = new byte[payload.readableBytes()]; - payload.readBytes(payloadBytes); - - // 3. 解码并发送消息 - IotDeviceMessage deviceMessage = messageService.decodeDeviceMessage(payloadBytes, - device.getProductKey(), device.getDeviceName()); - if (deviceMessage != null) { - deviceMessage.setServerId(upstreamProtocol.getServerId()); - messageService.sendDeviceMessage(deviceMessage, device.getProductKey(), - device.getDeviceName(), upstreamProtocol.getServerId()); - log.info("[handlePublish][设备消息已发送,method: {},deviceId: {}]", - deviceMessage.getMethod(), device.getId()); - } - - // 4. 根据 QoS 级别发送相应的确认消息 - if (qos == MqttQoS.AT_LEAST_ONCE) { - // QoS 1:发送 PUBACK - sendPubAck(socket, messageId); - } else if (qos == MqttQoS.EXACTLY_ONCE) { - // QoS 2:发送 PUBREC - sendPubRec(socket, messageId); - } - // QoS 0 无需确认 - } catch (Exception e) { - log.error("[handlePublish][处理 PUBLISH 消息失败,deviceId: {}]", device.getId(), e); - } - } - - /** - * 处理 PUBACK 消息(QoS 1 确认) - */ - private void handlePubAck(ServerWebSocket socket, MqttMessage message) { - String socketId = socketIdMap.get(socket); - IotDeviceRespDTO device = socketDeviceMap.get(socketId); - if (device == null) { - log.warn("[handlePubAck][设备未认证,socketId: {}]", socketId); - socket.close(); - return; - } - - int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); - log.debug("[handlePubAck][收到 PUBACK,messageId: {},deviceId: {}]", messageId, device.getId()); - } - - /** - * 处理 PUBREC 消息(QoS 2 第一步确认) - */ - private void handlePubRec(ServerWebSocket socket, MqttMessage message) { - String socketId = socketIdMap.get(socket); - IotDeviceRespDTO device = socketDeviceMap.get(socketId); - if (device == null) { - log.warn("[handlePubRec][设备未认证,socketId: {}]", socketId); - socket.close(); - return; - } - - int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); - log.debug("[handlePubRec][收到 PUBREC,messageId: {},deviceId: {}]", messageId, device.getId()); - // 发送 PUBREL - sendPubRel(socket, messageId); - } - - /** - * 处理 PUBREL 消息(QoS 2 第二步) - */ - private void handlePubRel(ServerWebSocket socket, MqttMessage message) { - String socketId = socketIdMap.get(socket); - IotDeviceRespDTO device = socketDeviceMap.get(socketId); - - if (device == null) { - log.warn("[handlePubRel][设备未认证,socketId: {}]", socketId); - socket.close(); - return; - } - - int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); - log.debug("[handlePubRel][收到 PUBREL,messageId: {},deviceId: {}]", messageId, device.getId()); - // 发送 PUBCOMP - sendPubComp(socket, messageId); - } - - /** - * 处理 PUBCOMP 消息(QoS 2 完成确认) - */ - private void handlePubComp(ServerWebSocket socket, MqttMessage message) { - String socketId = socketIdMap.get(socket); - IotDeviceRespDTO device = socketDeviceMap.get(socketId); - - if (device == null) { - log.warn("[handlePubComp][设备未认证,socketId: {}]", socketId); - socket.close(); - return; - } - - int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); - log.debug("[handlePubComp][收到 PUBCOMP,messageId: {},deviceId: {}]", messageId, device.getId()); - } - - /** - * 处理 SUBSCRIBE 消息(设备订阅主题) - */ - private void handleSubscribe(ServerWebSocket socket, MqttSubscribeMessage message) { - String socketId = socketIdMap.get(socket); - IotDeviceRespDTO device = socketDeviceMap.get(socketId); - if (device == null) { - log.warn("[handleSubscribe][设备未认证,socketId: {}]", socketId); - socket.close(); - return; - } - - try { - // 1. 解析 SUBSCRIBE 消息 - int messageId = message.variableHeader().messageId(); - MqttSubscribePayload payload = message.payload(); - String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); - - log.info("[handleSubscribe][设备订阅请求,deviceKey: {},messageId: {},主题数量: {}]", - deviceKey, messageId, payload.topicSubscriptions().size()); - - // 2. 构建 QoS 列表并记录订阅信息 - int[] grantedQosList = new int[payload.topicSubscriptions().size()]; - for (int i = 0; i < payload.topicSubscriptions().size(); i++) { - MqttTopicSubscription subscription = payload.topicSubscriptions().get(i); - String topic = subscription.topicFilter(); - grantedQosList[i] = subscription.qualityOfService().value(); - - // 记录订阅信息到连接管理器 - connectionManager.addSubscription(deviceKey, topic); - - log.info("[handleSubscribe][订阅主题: {},QoS: {},deviceKey: {}]", - topic, subscription.qualityOfService(), deviceKey); - } - - // 3. 发送 SUBACK - sendSubAck(socket, messageId, grantedQosList); - } catch (Exception e) { - log.error("[handleSubscribe][处理 SUBSCRIBE 消息失败,deviceId: {}]", device.getId(), e); - } - } - - /** - * 处理 UNSUBSCRIBE 消息(设备取消订阅) - */ - private void handleUnsubscribe(ServerWebSocket socket, MqttUnsubscribeMessage message) { - String socketId = socketIdMap.get(socket); - IotDeviceRespDTO device = socketDeviceMap.get(socketId); - if (device == null) { - log.warn("[handleUnsubscribe][设备未认证,socketId: {}]", socketId); - socket.close(); - return; - } - - try { - // 1. 解析 UNSUBSCRIBE 消息 - int messageId = message.variableHeader().messageId(); - MqttUnsubscribePayload payload = message.payload(); - String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); - - log.info("[handleUnsubscribe][设备取消订阅,deviceKey: {},messageId: {},主题数量: {}]", - deviceKey, messageId, payload.topics().size()); - - // 2. 移除订阅信息 - for (String topic : payload.topics()) { - connectionManager.removeSubscription(deviceKey, topic); - log.info("[handleUnsubscribe][取消订阅主题: {},deviceKey: {}]", topic, deviceKey); - } - - // 3. 发送 UNSUBACK - sendUnsubAck(socket, messageId); - } catch (Exception e) { - log.error("[handleUnsubscribe][处理 UNSUBSCRIBE 消息失败,deviceId: {}]", device.getId(), e); - } - } - - /** - * 处理 PINGREQ 消息(心跳请求) - */ - private void handlePingReq(ServerWebSocket socket) { - String socketId = socketIdMap.get(socket); - IotDeviceRespDTO device = socketDeviceMap.get(socketId); - if (device == null) { - log.warn("[handlePingReq][设备未认证,socketId: {}]", socketId); - socket.close(); - return; - } - - log.debug("[handlePingReq][收到心跳请求,deviceId: {}]", device.getId()); - // 发送 PINGRESP - sendPingResp(socket); - } - - /** - * 处理 DISCONNECT 消息(设备断开连接) - */ - private void handleDisconnect(ServerWebSocket socket) { - String socketId = socketIdMap.get(socket); - IotDeviceRespDTO device = socketDeviceMap.remove(socketId); - if (device != null) { - String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); - connectionManager.removeConnection(deviceKey); - deviceMessageIdMap.remove(deviceKey); - sendOfflineMessage(device); - log.info("[handleDisconnect][设备主动断开连接,deviceKey: {}]", deviceKey); - } - - socket.close(); - } - - // ==================== 设备认证和状态相关方法 ==================== - - /** - * 设备认证 - */ - private IotDeviceRespDTO authenticateDevice(String clientId, String username, String password) { - try { - // 1. 参数校验 - if (StrUtil.hasEmpty(clientId, username, password)) { - log.warn("[authenticateDevice][认证参数不完整,clientId: {},username: {}]", clientId, username); - return null; - } - - // 2. 构建认证参数并调用 API - IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO() - .setClientId(clientId) - .setUsername(username) - .setPassword(password); - - CommonResult authResult = deviceApi.authDevice(authParams); - if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) { - log.warn("[authenticateDevice][设备认证失败,clientId: {}]", clientId); - return null; - } - - // 3. 获取设备信息 - IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); - if (deviceInfo == null) { - log.warn("[authenticateDevice][用户名格式不正确,username: {}]", username); - return null; - } - - IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO() - .setProductKey(deviceInfo.getProductKey()) - .setDeviceName(deviceInfo.getDeviceName()); - - CommonResult deviceResult = deviceApi.getDevice(getReqDTO); - if (!deviceResult.isSuccess() || deviceResult.getData() == null) { - log.warn("[authenticateDevice][获取设备信息失败,username: {}]", username); - return null; - } - - return deviceResult.getData(); - } catch (Exception e) { - log.error("[authenticateDevice][设备认证异常,clientId: {}]", clientId, e); - return null; - } - } - - /** - * 发送设备上线消息 - */ - private void sendOnlineMessage(IotDeviceRespDTO device) { - try { - IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); - messageService.sendDeviceMessage(onlineMessage, device.getProductKey(), - device.getDeviceName(), upstreamProtocol.getServerId()); - log.info("[sendOnlineMessage][设备上线,deviceId: {}]", device.getId()); - } catch (Exception e) { - log.error("[sendOnlineMessage][发送设备上线消息失败,deviceId: {}]", device.getId(), e); - } - } - - /** - * 发送设备离线消息 - */ - private void sendOfflineMessage(IotDeviceRespDTO device) { - try { - IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); - messageService.sendDeviceMessage(offlineMessage, device.getProductKey(), - device.getDeviceName(), upstreamProtocol.getServerId()); - log.info("[sendOfflineMessage][设备离线,deviceId: {}]", device.getId()); - } catch (Exception e) { - log.error("[sendOfflineMessage][发送设备离线消息失败,deviceId: {}]", device.getId(), e); - } - } - - // ==================== 发送响应消息的辅助方法 ==================== - - /** - * 发送 CONNACK 消息 - */ - private void sendConnAck(ServerWebSocket socket, MqttConnectReturnCode returnCode) { - try { - // 构建 CONNACK 消息 - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, false); - MqttConnAckMessage connAckMessage = new MqttConnAckMessage(fixedHeader, variableHeader); - - // 编码并发送 - sendMqttMessage(socket, connAckMessage); - log.debug("[sendConnAck][发送 CONNACK 消息,returnCode: {}]", returnCode); - } catch (Exception e) { - log.error("[sendConnAck][发送 CONNACK 消息失败]", e); - } - } - - /** - * 发送 PUBACK 消息(QoS 1 确认) - */ - private void sendPubAck(ServerWebSocket socket, int messageId) { - try { - // 构建 PUBACK 消息 - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); - MqttMessage pubAckMessage = new MqttMessage(fixedHeader, variableHeader); - - // 编码并发送 - sendMqttMessage(socket, pubAckMessage); - log.debug("[sendPubAck][发送 PUBACK 消息,messageId: {}]", messageId); - } catch (Exception e) { - log.error("[sendPubAck][发送 PUBACK 消息失败,messageId: {}]", messageId, e); - } - } - - /** - * 发送 PUBREC 消息(QoS 2 第一步确认) - */ - private void sendPubRec(ServerWebSocket socket, int messageId) { - try { - // 构建 PUBREC 消息 - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); - MqttMessage pubRecMessage = new MqttMessage(fixedHeader, variableHeader); - - // 编码并发送 - sendMqttMessage(socket, pubRecMessage); - log.debug("[sendPubRec][发送 PUBREC 消息,messageId: {}]", messageId); - } catch (Exception e) { - log.error("[sendPubRec][发送 PUBREC 消息失败,messageId: {}]", messageId, e); - } - } - - /** - * 发送 PUBREL 消息(QoS 2 第二步) - */ - private void sendPubRel(ServerWebSocket socket, int messageId) { - try { - // 构建 PUBREL 消息 - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); - MqttMessage pubRelMessage = new MqttMessage(fixedHeader, variableHeader); - - // 编码并发送 - sendMqttMessage(socket, pubRelMessage); - log.debug("[sendPubRel][发送 PUBREL 消息,messageId: {}]", messageId); - } catch (Exception e) { - log.error("[sendPubRel][发送 PUBREL 消息失败,messageId: {}]", messageId, e); - } - } - - /** - * 发送 PUBCOMP 消息(QoS 2 完成确认) - */ - private void sendPubComp(ServerWebSocket socket, int messageId) { - try { - // 构建 PUBCOMP 消息 - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); - MqttMessage pubCompMessage = new MqttMessage(fixedHeader, variableHeader); - - // 编码并发送 - sendMqttMessage(socket, pubCompMessage); - log.debug("[sendPubComp][发送 PUBCOMP 消息,messageId: {}]", messageId); - } catch (Exception e) { - log.error("[sendPubComp][发送 PUBCOMP 消息失败,messageId: {}]", messageId, e); - } - } - - /** - * 发送 SUBACK 消息 - */ - private void sendSubAck(ServerWebSocket socket, int messageId, int[] grantedQosList) { - try { - // 构建 SUBACK 消息 - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); - MqttSubAckPayload payload = new MqttSubAckPayload(grantedQosList); - MqttSubAckMessage subAckMessage = new MqttSubAckMessage(fixedHeader, variableHeader, payload); - - // 编码并发送 - sendMqttMessage(socket, subAckMessage); - log.debug("[sendSubAck][发送 SUBACK 消息,messageId: {},主题数量: {}]", messageId, grantedQosList.length); - } catch (Exception e) { - log.error("[sendSubAck][发送 SUBACK 消息失败,messageId: {}]", messageId, e); - } - } - - /** - * 发送 UNSUBACK 消息 - */ - private void sendUnsubAck(ServerWebSocket socket, int messageId) { - try { - // 构建 UNSUBACK 消息 - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); - MqttUnsubAckMessage unsubAckMessage = new MqttUnsubAckMessage(fixedHeader, variableHeader); - - // 编码并发送 - sendMqttMessage(socket, unsubAckMessage); - log.debug("[sendUnsubAck][发送 UNSUBACK 消息,messageId: {}]", messageId); - } catch (Exception e) { - log.error("[sendUnsubAck][发送 UNSUBACK 消息失败,messageId: {}]", messageId, e); - } - } - - /** - * 发送 PINGRESP 消息 - */ - private void sendPingResp(ServerWebSocket socket) { - try { - // 构建 PINGRESP 消息 - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessage pingRespMessage = new MqttMessage(fixedHeader); - - // 编码并发送 - sendMqttMessage(socket, pingRespMessage); - log.debug("[sendPingResp][发送 PINGRESP 消息]"); - } catch (Exception e) { - log.error("[sendPingResp][发送 PINGRESP 消息失败]", e); - } - } - - /** - * 发送 MQTT 消息到 WebSocket - */ - private void sendMqttMessage(ServerWebSocket socket, MqttMessage mqttMessage) { - ByteBuf byteBuf = null; - try { - // 使用 EmbeddedChannel 编码 MQTT 消息 - EmbeddedChannel encoderChannel = encoderChannelThreadLocal.get(); - encoderChannel.writeOutbound(mqttMessage); - - // 读取编码后的 ByteBuf - byteBuf = encoderChannel.readOutbound(); - if (byteBuf != null) { - byte[] bytes = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(bytes); - socket.writeBinaryMessage(Buffer.buffer(bytes)); - } - } finally { - if (byteBuf != null) { - byteBuf.release(); - } - } - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index 49d590ab4b..cce0449663 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -115,23 +115,10 @@ yudao: connect-timeout-seconds: 60 ssl-enabled: false # ==================================== - # 针对引入的 MQTT WebSocket 组件的配置 - # ==================================== - mqtt-ws: - enabled: false # 是否启用 MQTT WebSocket - port: 8083 # WebSocket 服务端口 - path: /mqtt # WebSocket 路径 - max-message-size: 8192 # 最大消息大小(字节) - max-frame-size: 65536 # 最大帧大小(字节) - connect-timeout-seconds: 60 # 连接超时时间(秒) - keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒) - ssl-enabled: false # 是否启用 SSL(wss://) - sub-protocol: mqtt # WebSocket 子协议 - # ==================================== # 针对引入的 CoAP 组件的配置 # ==================================== coap: - enabled: true # 是否启用 CoAP 协议 + enabled: false # 是否启用 CoAP 协议 port: 5683 # CoAP 服务端口(默认 5683) max-message-size: 1024 # 最大消息大小(字节) ack-timeout: 2000 # ACK 超时时间(毫秒)