From 50a88a9ce76af69bd4b05ceae449a02b91ed1d29 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Mon, 5 Jan 2026 20:26:16 +0800 Subject: [PATCH] =?UTF-8?q?review=EF=BC=9A=E3=80=90iot=E3=80=91mqtt=20webs?= =?UTF-8?q?ocket=20=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../manager/IotMqttWsConnectionManager.java | 10 +++++---- .../router/IotMqttWsDownstreamHandler.java | 10 ++++----- .../router/IotMqttWsUpstreamHandler.java | 21 ------------------- 3 files changed, 11 insertions(+), 30 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/manager/IotMqttWsConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/manager/IotMqttWsConnectionManager.java index 2bf61bea83..fee3e359c8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/manager/IotMqttWsConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/manager/IotMqttWsConnectionManager.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager; +import cn.hutool.core.collection.CollUtil; import io.vertx.core.http.ServerWebSocket; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -187,7 +188,7 @@ public class IotMqttWsConnectionManager { */ public boolean isSubscribed(String deviceKey, String topic) { Set subscriptions = deviceSubscriptions.get(deviceKey); - if (subscriptions == null || subscriptions.isEmpty()) { + if (CollUtil.isEmpty(subscriptions)) { return false; } @@ -210,6 +211,7 @@ public class IotMqttWsConnectionManager { return deviceSubscriptions.get(deviceKey); } + // TODO @haohao:这个方法,是不是也可以考虑抽到 IotMqttTopicUtils 里面去哈;感觉更简洁一点? /** * MQTT 主题匹配 * 支持通配符: @@ -227,25 +229,25 @@ public class IotMqttWsConnectionManager { } // 不包含通配符 + // TODO @haohao:这里要不要枚举下哈;+ # if (!subscription.contains("+") && !subscription.contains("#")) { return false; } String[] subscriptionParts = subscription.split("/"); String[] topicParts = topic.split("/"); - int i = 0; for (; i < subscriptionParts.length && i < topicParts.length; i++) { String subPart = subscriptionParts[i]; String topicPart = topicParts[i]; + // # 匹配剩余所有层级,且必须在末尾 if (subPart.equals("#")) { - // # 匹配剩余所有层级,且必须在末尾 return i == subscriptionParts.length - 1; } + // 不是通配符且不匹配 if (!subPart.equals("+") && !subPart.equals(topicPart)) { - // 不是通配符且不匹配 return false; } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsDownstreamHandler.java index 37148de7e5..3aeb6c5c48 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsDownstreamHandler.java @@ -148,7 +148,7 @@ public class IotMqttWsDownstreamHandler { try { int messageId = qos > 0 ? generateMessageId() : 0; - // 手动编码MQTT PUBLISH消息 + // 手动编码 MQTT PUBLISH 消息 io.netty.buffer.ByteBuf byteBuf = io.netty.buffer.Unpooled.buffer(); // 固定头:消息类型(PUBLISH=3) + DUP(0) + QoS + RETAIN @@ -159,11 +159,11 @@ public class IotMqttWsDownstreamHandler { int topicLength = topic.getBytes().length; int remainingLength = 2 + topicLength + (qos > 0 ? 2 : 0) + payload.length; - // 写入剩余长度(简化版本,假设小于128字节) + // 写入剩余长度(简化版本,假设小于 128 字节) if (remainingLength < 128) { byteBuf.writeByte(remainingLength); } else { - // 处理大于127的情况 + // 处理大于 127 的情况 int x = remainingLength; do { int encodedByte = x % 128; @@ -179,7 +179,7 @@ public class IotMqttWsDownstreamHandler { byteBuf.writeShort(topicLength); byteBuf.writeBytes(topic.getBytes()); - // 可变头:消息ID(仅QoS>0时) + // 可变头:消息 ID(仅 QoS > 0 时) if (qos > 0) { byteBuf.writeShort(messageId); } @@ -191,7 +191,6 @@ public class IotMqttWsDownstreamHandler { byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); byteBuf.release(); - socket.writeBinaryMessage(Buffer.buffer(bytes)); log.info("[sendMessageToDevice][消息已发送到设备,deviceKey: {},topic: {},qos: {},messageId: {}]", @@ -211,6 +210,7 @@ public class IotMqttWsDownstreamHandler { private int generateMessageId() { int id = messageIdGenerator.getAndIncrement(); // MQTT 消息 ID 范围是 1-65535 + // TODO @haohao:并发可能有问题; if (id > 65535) { messageIdGenerator.set(1); return 1; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsUpstreamHandler.java index d9688b7974..d11d109502 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsUpstreamHandler.java @@ -253,7 +253,6 @@ public class IotMqttWsUpstreamHandler { // 5. 发送设备上线消息 sendOnlineMessage(device); - } catch (Exception e) { log.error("[handleConnect][处理 CONNECT 消息失败,socketId: {}]", socketId, e); sendConnAck(socket, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE); @@ -311,7 +310,6 @@ public class IotMqttWsUpstreamHandler { sendPubRec(socket, messageId); } // QoS 0 无需确认 - } catch (Exception e) { log.error("[handlePublish][处理 PUBLISH 消息失败,deviceId: {}]", device.getId(), e); } @@ -323,7 +321,6 @@ public class IotMqttWsUpstreamHandler { private void handlePubAck(ServerWebSocket socket, MqttMessage message) { String socketId = socketIdMap.get(socket); IotDeviceRespDTO device = socketDeviceMap.get(socketId); - if (device == null) { log.warn("[handlePubAck][设备未认证,socketId: {}]", socketId); socket.close(); @@ -340,7 +337,6 @@ public class IotMqttWsUpstreamHandler { private void handlePubRec(ServerWebSocket socket, MqttMessage message) { String socketId = socketIdMap.get(socket); IotDeviceRespDTO device = socketDeviceMap.get(socketId); - if (device == null) { log.warn("[handlePubRec][设备未认证,socketId: {}]", socketId); socket.close(); @@ -349,7 +345,6 @@ public class IotMqttWsUpstreamHandler { int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); log.debug("[handlePubRec][收到 PUBREC,messageId: {},deviceId: {}]", messageId, device.getId()); - // 发送 PUBREL sendPubRel(socket, messageId); } @@ -369,7 +364,6 @@ public class IotMqttWsUpstreamHandler { int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); log.debug("[handlePubRel][收到 PUBREL,messageId: {},deviceId: {}]", messageId, device.getId()); - // 发送 PUBCOMP sendPubComp(socket, messageId); } @@ -397,7 +391,6 @@ public class IotMqttWsUpstreamHandler { private void handleSubscribe(ServerWebSocket socket, MqttSubscribeMessage message) { String socketId = socketIdMap.get(socket); IotDeviceRespDTO device = socketDeviceMap.get(socketId); - if (device == null) { log.warn("[handleSubscribe][设备未认证,socketId: {}]", socketId); socket.close(); @@ -429,7 +422,6 @@ public class IotMqttWsUpstreamHandler { // 3. 发送 SUBACK sendSubAck(socket, messageId, grantedQosList); - } catch (Exception e) { log.error("[handleSubscribe][处理 SUBSCRIBE 消息失败,deviceId: {}]", device.getId(), e); } @@ -441,7 +433,6 @@ public class IotMqttWsUpstreamHandler { private void handleUnsubscribe(ServerWebSocket socket, MqttUnsubscribeMessage message) { String socketId = socketIdMap.get(socket); IotDeviceRespDTO device = socketDeviceMap.get(socketId); - if (device == null) { log.warn("[handleUnsubscribe][设备未认证,socketId: {}]", socketId); socket.close(); @@ -465,7 +456,6 @@ public class IotMqttWsUpstreamHandler { // 3. 发送 UNSUBACK sendUnsubAck(socket, messageId); - } catch (Exception e) { log.error("[handleUnsubscribe][处理 UNSUBSCRIBE 消息失败,deviceId: {}]", device.getId(), e); } @@ -477,7 +467,6 @@ public class IotMqttWsUpstreamHandler { private void handlePingReq(ServerWebSocket socket) { String socketId = socketIdMap.get(socket); IotDeviceRespDTO device = socketDeviceMap.get(socketId); - if (device == null) { log.warn("[handlePingReq][设备未认证,socketId: {}]", socketId); socket.close(); @@ -485,7 +474,6 @@ public class IotMqttWsUpstreamHandler { } log.debug("[handlePingReq][收到心跳请求,deviceId: {}]", device.getId()); - // 发送 PINGRESP sendPingResp(socket); } @@ -496,7 +484,6 @@ public class IotMqttWsUpstreamHandler { private void handleDisconnect(ServerWebSocket socket) { String socketId = socketIdMap.get(socket); IotDeviceRespDTO device = socketDeviceMap.remove(socketId); - if (device != null) { String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); connectionManager.removeConnection(deviceKey); @@ -600,7 +587,6 @@ public class IotMqttWsUpstreamHandler { // 编码并发送 sendMqttMessage(socket, connAckMessage); - log.debug("[sendConnAck][发送 CONNACK 消息,returnCode: {}]", returnCode); } catch (Exception e) { log.error("[sendConnAck][发送 CONNACK 消息失败]", e); @@ -620,7 +606,6 @@ public class IotMqttWsUpstreamHandler { // 编码并发送 sendMqttMessage(socket, pubAckMessage); - log.debug("[sendPubAck][发送 PUBACK 消息,messageId: {}]", messageId); } catch (Exception e) { log.error("[sendPubAck][发送 PUBACK 消息失败,messageId: {}]", messageId, e); @@ -640,7 +625,6 @@ public class IotMqttWsUpstreamHandler { // 编码并发送 sendMqttMessage(socket, pubRecMessage); - log.debug("[sendPubRec][发送 PUBREC 消息,messageId: {}]", messageId); } catch (Exception e) { log.error("[sendPubRec][发送 PUBREC 消息失败,messageId: {}]", messageId, e); @@ -660,7 +644,6 @@ public class IotMqttWsUpstreamHandler { // 编码并发送 sendMqttMessage(socket, pubRelMessage); - log.debug("[sendPubRel][发送 PUBREL 消息,messageId: {}]", messageId); } catch (Exception e) { log.error("[sendPubRel][发送 PUBREL 消息失败,messageId: {}]", messageId, e); @@ -680,7 +663,6 @@ public class IotMqttWsUpstreamHandler { // 编码并发送 sendMqttMessage(socket, pubCompMessage); - log.debug("[sendPubComp][发送 PUBCOMP 消息,messageId: {}]", messageId); } catch (Exception e) { log.error("[sendPubComp][发送 PUBCOMP 消息失败,messageId: {}]", messageId, e); @@ -701,7 +683,6 @@ public class IotMqttWsUpstreamHandler { // 编码并发送 sendMqttMessage(socket, subAckMessage); - log.debug("[sendSubAck][发送 SUBACK 消息,messageId: {},主题数量: {}]", messageId, grantedQosList.length); } catch (Exception e) { log.error("[sendSubAck][发送 SUBACK 消息失败,messageId: {}]", messageId, e); @@ -721,7 +702,6 @@ public class IotMqttWsUpstreamHandler { // 编码并发送 sendMqttMessage(socket, unsubAckMessage); - log.debug("[sendUnsubAck][发送 UNSUBACK 消息,messageId: {}]", messageId); } catch (Exception e) { log.error("[sendUnsubAck][发送 UNSUBACK 消息失败,messageId: {}]", messageId, e); @@ -740,7 +720,6 @@ public class IotMqttWsUpstreamHandler { // 编码并发送 sendMqttMessage(socket, pingRespMessage); - log.debug("[sendPingResp][发送 PINGRESP 消息]"); } catch (Exception e) { log.error("[sendPingResp][发送 PINGRESP 消息失败]", e);