review:【iot】mqtt websocket 协议

This commit is contained in:
YunaiV
2026-01-05 20:26:16 +08:00
parent 6fdd91d01e
commit 50a88a9ce7
3 changed files with 11 additions and 30 deletions

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager;
import cn.hutool.core.collection.CollUtil;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -187,7 +188,7 @@ public class IotMqttWsConnectionManager {
*/
public boolean isSubscribed(String deviceKey, String topic) {
Set<String> subscriptions = deviceSubscriptions.get(deviceKey);
if (subscriptions == null || subscriptions.isEmpty()) {
if (CollUtil.isEmpty(subscriptions)) {
return false;
}
@@ -210,6 +211,7 @@ public class IotMqttWsConnectionManager {
return deviceSubscriptions.get(deviceKey);
}
// TODO @haohao这个方法是不是也可以考虑抽到 IotMqttTopicUtils 里面去哈;感觉更简洁一点?
/**
* MQTT 主题匹配
* 支持通配符:
@@ -227,25 +229,25 @@ public class IotMqttWsConnectionManager {
}
// 不包含通配符
// TODO @haohao这里要不要枚举下哈+ #
if (!subscription.contains("+") && !subscription.contains("#")) {
return false;
}
String[] subscriptionParts = subscription.split("/");
String[] topicParts = topic.split("/");
int i = 0;
for (; i < subscriptionParts.length && i < topicParts.length; i++) {
String subPart = subscriptionParts[i];
String topicPart = topicParts[i];
// # 匹配剩余所有层级,且必须在末尾
if (subPart.equals("#")) {
// # 匹配剩余所有层级,且必须在末尾
return i == subscriptionParts.length - 1;
}
// 不是通配符且不匹配
if (!subPart.equals("+") && !subPart.equals(topicPart)) {
// 不是通配符且不匹配
return false;
}
}

View File

@@ -148,7 +148,7 @@ public class IotMqttWsDownstreamHandler {
try {
int messageId = qos > 0 ? generateMessageId() : 0;
// 手动编码MQTT PUBLISH消息
// 手动编码 MQTT PUBLISH 消息
io.netty.buffer.ByteBuf byteBuf = io.netty.buffer.Unpooled.buffer();
// 固定头:消息类型(PUBLISH=3) + DUP(0) + QoS + RETAIN
@@ -159,11 +159,11 @@ public class IotMqttWsDownstreamHandler {
int topicLength = topic.getBytes().length;
int remainingLength = 2 + topicLength + (qos > 0 ? 2 : 0) + payload.length;
// 写入剩余长度简化版本假设小于128字节
// 写入剩余长度(简化版本,假设小于 128 字节)
if (remainingLength < 128) {
byteBuf.writeByte(remainingLength);
} else {
// 处理大于127的情况
// 处理大于 127 的情况
int x = remainingLength;
do {
int encodedByte = x % 128;
@@ -179,7 +179,7 @@ public class IotMqttWsDownstreamHandler {
byteBuf.writeShort(topicLength);
byteBuf.writeBytes(topic.getBytes());
// 可变头消息ID仅QoS>0时)
// 可变头:消息 ID QoS > 0 时)
if (qos > 0) {
byteBuf.writeShort(messageId);
}
@@ -191,7 +191,6 @@ public class IotMqttWsDownstreamHandler {
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
byteBuf.release();
socket.writeBinaryMessage(Buffer.buffer(bytes));
log.info("[sendMessageToDevice][消息已发送到设备deviceKey: {}topic: {}qos: {}messageId: {}]",
@@ -211,6 +210,7 @@ public class IotMqttWsDownstreamHandler {
private int generateMessageId() {
int id = messageIdGenerator.getAndIncrement();
// MQTT 消息 ID 范围是 1-65535
// TODO @haohao并发可能有问题
if (id > 65535) {
messageIdGenerator.set(1);
return 1;

View File

@@ -253,7 +253,6 @@ public class IotMqttWsUpstreamHandler {
// 5. 发送设备上线消息
sendOnlineMessage(device);
} catch (Exception e) {
log.error("[handleConnect][处理 CONNECT 消息失败socketId: {}]", socketId, e);
sendConnAck(socket, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
@@ -311,7 +310,6 @@ public class IotMqttWsUpstreamHandler {
sendPubRec(socket, messageId);
}
// QoS 0 无需确认
} catch (Exception e) {
log.error("[handlePublish][处理 PUBLISH 消息失败deviceId: {}]", device.getId(), e);
}
@@ -323,7 +321,6 @@ public class IotMqttWsUpstreamHandler {
private void handlePubAck(ServerWebSocket socket, MqttMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePubAck][设备未认证socketId: {}]", socketId);
socket.close();
@@ -340,7 +337,6 @@ public class IotMqttWsUpstreamHandler {
private void handlePubRec(ServerWebSocket socket, MqttMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePubRec][设备未认证socketId: {}]", socketId);
socket.close();
@@ -349,7 +345,6 @@ public class IotMqttWsUpstreamHandler {
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
log.debug("[handlePubRec][收到 PUBRECmessageId: {}deviceId: {}]", messageId, device.getId());
// 发送 PUBREL
sendPubRel(socket, messageId);
}
@@ -369,7 +364,6 @@ public class IotMqttWsUpstreamHandler {
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
log.debug("[handlePubRel][收到 PUBRELmessageId: {}deviceId: {}]", messageId, device.getId());
// 发送 PUBCOMP
sendPubComp(socket, messageId);
}
@@ -397,7 +391,6 @@ public class IotMqttWsUpstreamHandler {
private void handleSubscribe(ServerWebSocket socket, MqttSubscribeMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handleSubscribe][设备未认证socketId: {}]", socketId);
socket.close();
@@ -429,7 +422,6 @@ public class IotMqttWsUpstreamHandler {
// 3. 发送 SUBACK
sendSubAck(socket, messageId, grantedQosList);
} catch (Exception e) {
log.error("[handleSubscribe][处理 SUBSCRIBE 消息失败deviceId: {}]", device.getId(), e);
}
@@ -441,7 +433,6 @@ public class IotMqttWsUpstreamHandler {
private void handleUnsubscribe(ServerWebSocket socket, MqttUnsubscribeMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handleUnsubscribe][设备未认证socketId: {}]", socketId);
socket.close();
@@ -465,7 +456,6 @@ public class IotMqttWsUpstreamHandler {
// 3. 发送 UNSUBACK
sendUnsubAck(socket, messageId);
} catch (Exception e) {
log.error("[handleUnsubscribe][处理 UNSUBSCRIBE 消息失败deviceId: {}]", device.getId(), e);
}
@@ -477,7 +467,6 @@ public class IotMqttWsUpstreamHandler {
private void handlePingReq(ServerWebSocket socket) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePingReq][设备未认证socketId: {}]", socketId);
socket.close();
@@ -485,7 +474,6 @@ public class IotMqttWsUpstreamHandler {
}
log.debug("[handlePingReq][收到心跳请求deviceId: {}]", device.getId());
// 发送 PINGRESP
sendPingResp(socket);
}
@@ -496,7 +484,6 @@ public class IotMqttWsUpstreamHandler {
private void handleDisconnect(ServerWebSocket socket) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.remove(socketId);
if (device != null) {
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
connectionManager.removeConnection(deviceKey);
@@ -600,7 +587,6 @@ public class IotMqttWsUpstreamHandler {
// 编码并发送
sendMqttMessage(socket, connAckMessage);
log.debug("[sendConnAck][发送 CONNACK 消息returnCode: {}]", returnCode);
} catch (Exception e) {
log.error("[sendConnAck][发送 CONNACK 消息失败]", e);
@@ -620,7 +606,6 @@ public class IotMqttWsUpstreamHandler {
// 编码并发送
sendMqttMessage(socket, pubAckMessage);
log.debug("[sendPubAck][发送 PUBACK 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubAck][发送 PUBACK 消息失败messageId: {}]", messageId, e);
@@ -640,7 +625,6 @@ public class IotMqttWsUpstreamHandler {
// 编码并发送
sendMqttMessage(socket, pubRecMessage);
log.debug("[sendPubRec][发送 PUBREC 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubRec][发送 PUBREC 消息失败messageId: {}]", messageId, e);
@@ -660,7 +644,6 @@ public class IotMqttWsUpstreamHandler {
// 编码并发送
sendMqttMessage(socket, pubRelMessage);
log.debug("[sendPubRel][发送 PUBREL 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubRel][发送 PUBREL 消息失败messageId: {}]", messageId, e);
@@ -680,7 +663,6 @@ public class IotMqttWsUpstreamHandler {
// 编码并发送
sendMqttMessage(socket, pubCompMessage);
log.debug("[sendPubComp][发送 PUBCOMP 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubComp][发送 PUBCOMP 消息失败messageId: {}]", messageId, e);
@@ -701,7 +683,6 @@ public class IotMqttWsUpstreamHandler {
// 编码并发送
sendMqttMessage(socket, subAckMessage);
log.debug("[sendSubAck][发送 SUBACK 消息messageId: {},主题数量: {}]", messageId, grantedQosList.length);
} catch (Exception e) {
log.error("[sendSubAck][发送 SUBACK 消息失败messageId: {}]", messageId, e);
@@ -721,7 +702,6 @@ public class IotMqttWsUpstreamHandler {
// 编码并发送
sendMqttMessage(socket, unsubAckMessage);
log.debug("[sendUnsubAck][发送 UNSUBACK 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendUnsubAck][发送 UNSUBACK 消息失败messageId: {}]", messageId, e);
@@ -740,7 +720,6 @@ public class IotMqttWsUpstreamHandler {
// 编码并发送
sendMqttMessage(socket, pingRespMessage);
log.debug("[sendPingResp][发送 PINGRESP 消息]");
} catch (Exception e) {
log.error("[sendPingResp][发送 PINGRESP 消息失败]", e);