feat:【iot】移除不再使用的 MQTT WebSocket 协议配置类(实现不够标准,使用 MQTT 即可)

This commit is contained in:
YunaiV
2026-01-27 08:43:03 +08:00
parent 1b4ac9fb24
commit b87bc19116
9 changed files with 3 additions and 1531 deletions

View File

@@ -12,10 +12,6 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscr
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
@@ -159,44 +155,6 @@ public class IotGatewayConfiguration {
}
/**
* IoT 网关 MQTT WebSocket 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.mqtt-ws", name = "enabled", havingValue = "true")
@Slf4j
public static class MqttWsProtocolConfiguration {
@Bean(name = "mqttWsVertx", destroyMethod = "close")
public Vertx mqttWsVertx() {
return Vertx.vertx();
}
@Bean
public IotMqttWsUpstreamProtocol iotMqttWsUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceMessageService messageService,
IotMqttWsConnectionManager connectionManager,
@Qualifier("mqttWsVertx") Vertx mqttWsVertx) {
return new IotMqttWsUpstreamProtocol(gatewayProperties.getProtocol().getMqttWs(),
messageService, connectionManager, mqttWsVertx);
}
@Bean
public IotMqttWsDownstreamHandler iotMqttWsDownstreamHandler(IotDeviceMessageService messageService,
IotDeviceService deviceService,
IotMqttWsConnectionManager connectionManager) {
return new IotMqttWsDownstreamHandler(messageService, deviceService, connectionManager);
}
@Bean
public IotMqttWsDownstreamSubscriber iotMqttWsDownstreamSubscriber(IotMqttWsUpstreamProtocol mqttWsUpstreamProtocol,
IotMqttWsDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
return new IotMqttWsDownstreamSubscriber(mqttWsUpstreamProtocol, downstreamHandler, messageBus);
}
}
/**
* IoT 网关 UDP 协议配置类
*/

View File

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager;
import cn.hutool.core.util.StrUtil;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttEndpoint;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -166,7 +167,7 @@ public class IotMqttConnectionManager {
}
try {
endpoint.publish(topic, io.vertx.core.buffer.Buffer.buffer(payload), MqttQoS.valueOf(qos), false, retain);
endpoint.publish(topic, Buffer.buffer(payload), MqttQoS.valueOf(qos), false, retain);
log.debug("[sendToDevice][发送消息成功,设备 ID: {},主题: {}QoS: {}]", deviceId, topic, qos);
return true;
} catch (Exception e) {

View File

@@ -1,79 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDownstreamHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
/**
* IoT MQTT WebSocket 下行消息订阅器
* <p>
* 订阅消息总线的设备下行消息,并通过 WebSocket 发送到设备
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttWsDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotMqttWsUpstreamProtocol upstreamProtocol;
private final IotMqttWsDownstreamHandler downstreamHandler;
private final IotMessageBus messageBus;
public IotMqttWsDownstreamSubscriber(IotMqttWsUpstreamProtocol upstreamProtocol,
IotMqttWsDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
this.upstreamProtocol = upstreamProtocol;
this.downstreamHandler = downstreamHandler;
this.messageBus = messageBus;
}
@PostConstruct
public void init() {
messageBus.register(this);
log.info("[init][MQTT WebSocket 下行消息订阅器已启动topic: {}]", getTopic());
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(upstreamProtocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][收到下行消息deviceId: {}method: {}]",
message.getDeviceId(), message.getMethod());
try {
// 1. 校验
String method = message.getMethod();
if (StrUtil.isBlank(method)) {
log.warn("[onMessage][消息方法为空deviceId: {}]", message.getDeviceId());
return;
}
// 2. 委托给下行处理器处理业务逻辑
boolean success = downstreamHandler.handleDownstreamMessage(message);
if (success) {
log.debug("[onMessage][下行消息处理成功deviceId: {}method: {}]",
message.getDeviceId(), message.getMethod());
} else {
log.warn("[onMessage][下行消息处理失败deviceId: {}method: {}]",
message.getDeviceId(), message.getMethod());
}
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败deviceId: {}method: {}]",
message.getDeviceId(), message.getMethod(), e);
}
}
}

View File

@@ -1,146 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.ServerWebSocket;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT WebSocket 协议:接收设备上行消息
* <p>
* 基于 Vert.x 实现 MQTT over WebSocket 服务端,支持:
* - 标准 MQTT 3.1.1 协议
* - WebSocket 协议升级
* - SSL/TLS 加密wss://
* - 设备认证与连接管理
* - QoS 0/1/2 消息质量保证
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttWsUpstreamProtocol {
private final IotGatewayProperties.MqttWsProperties mqttWsProperties;
private final IotDeviceMessageService messageService;
private final IotMqttWsConnectionManager connectionManager;
private final Vertx vertx;
@Getter
private final String serverId;
private HttpServer httpServer;
public IotMqttWsUpstreamProtocol(IotGatewayProperties.MqttWsProperties mqttWsProperties,
IotDeviceMessageService messageService,
IotMqttWsConnectionManager connectionManager,
Vertx vertx) {
this.mqttWsProperties = mqttWsProperties;
this.messageService = messageService;
this.connectionManager = connectionManager;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(mqttWsProperties.getPort());
}
@PostConstruct
public void start() {
// 创建 HTTP 服务器选项
HttpServerOptions options = new HttpServerOptions()
.setPort(mqttWsProperties.getPort())
.setIdleTimeout(mqttWsProperties.getKeepAliveTimeoutSeconds())
.setMaxWebSocketFrameSize(mqttWsProperties.getMaxFrameSize())
.setMaxWebSocketMessageSize(mqttWsProperties.getMaxMessageSize())
// 配置 WebSocket 子协议支持
.addWebSocketSubProtocol(mqttWsProperties.getSubProtocol());
// 配置 SSL如果启用
if (Boolean.TRUE.equals(mqttWsProperties.getSslEnabled())) {
options.setSsl(true)
.setKeyCertOptions(mqttWsProperties.getSslOptions().getKeyCertOptions())
.setTrustOptions(mqttWsProperties.getSslOptions().getTrustOptions());
log.info("[start][MQTT WebSocket 已启用 SSL/TLS (wss://)]");
}
// 创建 HTTP 服务器
httpServer = vertx.createHttpServer(options);
// 设置 WebSocket 处理器
httpServer.webSocketHandler(this::handleWebSocketConnection);
// 启动服务器
try {
httpServer.listen().result();
log.info("[start][IoT 网关 MQTT WebSocket 协议启动成功,端口: {},路径: {},支持子协议: {}]",
mqttWsProperties.getPort(), mqttWsProperties.getPath(),
"mqtt, mqttv3.1, " + mqttWsProperties.getSubProtocol());
} catch (Exception e) {
log.error("[start][IoT 网关 MQTT WebSocket 协议启动失败]", e);
throw e;
}
}
@PreDestroy
public void stop() {
if (httpServer != null) {
try {
// 关闭所有连接
connectionManager.closeAllConnections();
// 关闭服务器
httpServer.close().result();
log.info("[stop][IoT 网关 MQTT WebSocket 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 MQTT WebSocket 协议停止失败]", e);
}
}
}
/**
* 处理 WebSocket 连接请求
*
* @param socket WebSocket 连接
*/
private void handleWebSocketConnection(ServerWebSocket socket) {
String path = socket.path();
String subProtocol = socket.subProtocol();
log.info("[handleWebSocketConnection][收到 WebSocket 连接请求path: {}subProtocol: {}remoteAddress: {}]",
path, subProtocol, socket.remoteAddress());
// 验证路径
if (!mqttWsProperties.getPath().equals(path)) {
log.warn("[handleWebSocketConnection][WebSocket 路径不匹配拒绝连接path: {},期望: {}]",
path, mqttWsProperties.getPath());
socket.close();
return;
}
// 验证子协议
// Vert.x 已经自动进行了子协议协商,这里只需要验证是否为 MQTT 相关协议
if (subProtocol != null && !subProtocol.startsWith("mqtt")) {
log.warn("[handleWebSocketConnection][WebSocket 子协议不支持拒绝连接subProtocol: {}]", subProtocol);
socket.close();
return;
}
log.info("[handleWebSocketConnection][WebSocket 连接已接受remoteAddress: {}subProtocol: {}]",
socket.remoteAddress(), subProtocol);
// 创建处理器并处理连接
IotMqttWsUpstreamHandler handler = new IotMqttWsUpstreamHandler(
this, messageService, connectionManager);
handler.handle(socket);
}
}

View File

@@ -1,259 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager;
import cn.hutool.core.collection.CollUtil;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* IoT MQTT WebSocket 连接管理器
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotMqttWsConnectionManager {
/**
* 存储设备连接
* Key: 设备标识deviceKey
* Value: WebSocket 连接
*/
private final Map<String, ServerWebSocket> connections = new ConcurrentHashMap<>();
/**
* 存储设备标识与 Socket ID 的映射
* Key: 设备标识deviceKey
* Value: Socket IDUUID
*/
private final Map<String, String> deviceKeyToSocketId = new ConcurrentHashMap<>();
/**
* 存储 Socket ID 与设备标识的映射
* Key: Socket IDUUID
* Value: 设备标识deviceKey
*/
private final Map<String, String> socketIdToDeviceKey = new ConcurrentHashMap<>();
/**
* 存储设备订阅的主题
* Key: 设备标识deviceKey
* Value: 订阅的主题集合
*/
private final Map<String, Set<String>> deviceSubscriptions = new ConcurrentHashMap<>();
/**
* 添加连接
*
* @param deviceKey 设备标识
* @param socket WebSocket 连接
* @param socketId Socket IDUUID
*/
public void addConnection(String deviceKey, ServerWebSocket socket, String socketId) {
connections.put(deviceKey, socket);
deviceKeyToSocketId.put(deviceKey, socketId);
socketIdToDeviceKey.put(socketId, deviceKey);
log.info("[addConnection][设备连接已添加deviceKey: {}socketId: {},当前连接数: {}]",
deviceKey, socketId, connections.size());
}
/**
* 移除连接
*
* @param deviceKey 设备标识
*/
public void removeConnection(String deviceKey) {
ServerWebSocket socket = connections.remove(deviceKey);
String socketId = deviceKeyToSocketId.remove(deviceKey);
if (socketId != null) {
socketIdToDeviceKey.remove(socketId);
}
if (socket != null) {
log.info("[removeConnection][设备连接已移除deviceKey: {}socketId: {},当前连接数: {}]",
deviceKey, socketId, connections.size());
}
}
/**
* 根据 Socket ID 移除连接
*
* @param socketId WebSocket 文本框架 ID
*/
public void removeConnectionBySocketId(String socketId) {
String deviceKey = socketIdToDeviceKey.remove(socketId);
if (deviceKey != null) {
connections.remove(deviceKey);
log.info("[removeConnectionBySocketId][设备连接已移除socketId: {}deviceKey: {},当前连接数: {}]",
socketId, deviceKey, connections.size());
}
}
/**
* 获取连接
*
* @param deviceKey 设备标识
* @return WebSocket 连接
*/
public ServerWebSocket getConnection(String deviceKey) {
return connections.get(deviceKey);
}
/**
* 根据 Socket ID 获取设备标识
*
* @param socketId WebSocket 文本框架 ID
* @return 设备标识
*/
public String getDeviceKeyBySocketId(String socketId) {
return socketIdToDeviceKey.get(socketId);
}
/**
* 检查设备是否在线
*
* @param deviceKey 设备标识
* @return 是否在线
*/
public boolean isOnline(String deviceKey) {
return connections.containsKey(deviceKey);
}
/**
* 获取当前连接数
*
* @return 连接数
*/
public int getConnectionCount() {
return connections.size();
}
/**
* 关闭所有连接
*/
public void closeAllConnections() {
connections.forEach((deviceKey, socket) -> {
try {
socket.close();
log.info("[closeAllConnections][关闭设备连接deviceKey: {}]", deviceKey);
} catch (Exception e) {
log.error("[closeAllConnections][关闭设备连接失败deviceKey: {}]", deviceKey, e);
}
});
connections.clear();
deviceKeyToSocketId.clear();
socketIdToDeviceKey.clear();
deviceSubscriptions.clear();
log.info("[closeAllConnections][所有连接已关闭]");
}
// ==================== 订阅管理方法 ====================
/**
* 添加订阅
*
* @param deviceKey 设备标识
* @param topic 订阅主题
*/
public void addSubscription(String deviceKey, String topic) {
deviceSubscriptions.computeIfAbsent(deviceKey, k -> new CopyOnWriteArraySet<>()).add(topic);
log.debug("[addSubscription][设备订阅主题deviceKey: {}topic: {}]", deviceKey, topic);
}
/**
* 移除订阅
*
* @param deviceKey 设备标识
* @param topic 订阅主题
*/
public void removeSubscription(String deviceKey, String topic) {
Set<String> topics = deviceSubscriptions.get(deviceKey);
if (topics != null) {
topics.remove(topic);
log.debug("[removeSubscription][设备取消订阅deviceKey: {}topic: {}]", deviceKey, topic);
}
}
/**
* 检查设备是否订阅了指定主题
* 支持 MQTT 通配符匹配(+ 和 #
*
* @param deviceKey 设备标识
* @param topic 发布主题
* @return 是否匹配
*/
public boolean isSubscribed(String deviceKey, String topic) {
Set<String> subscriptions = deviceSubscriptions.get(deviceKey);
if (CollUtil.isEmpty(subscriptions)) {
return false;
}
// 检查是否有匹配的订阅
for (String subscription : subscriptions) {
if (topicMatches(subscription, topic)) {
return true;
}
}
return false;
}
/**
* 获取设备的所有订阅
*
* @param deviceKey 设备标识
* @return 订阅主题集合
*/
public Set<String> getSubscriptions(String deviceKey) {
return deviceSubscriptions.get(deviceKey);
}
// TODO @haohao这个方法是不是也可以考虑抽到 IotMqttTopicUtils 里面去哈;感觉更简洁一点?
/**
* MQTT 主题匹配
* 支持通配符:
* - +:匹配单层主题
* - #:匹配多层主题(必须在末尾)
*
* @param subscription 订阅主题(可能包含通配符)
* @param topic 发布主题(不包含通配符)
* @return 是否匹配
*/
private boolean topicMatches(String subscription, String topic) {
// 完全匹配
if (subscription.equals(topic)) {
return true;
}
// 不包含通配符
// TODO @haohao这里要不要枚举下哈+ #
if (!subscription.contains("+") && !subscription.contains("#")) {
return false;
}
String[] subscriptionParts = subscription.split("/");
String[] topicParts = topic.split("/");
int i = 0;
for (; i < subscriptionParts.length && i < topicParts.length; i++) {
String subPart = subscriptionParts[i];
String topicPart = topicParts[i];
// # 匹配剩余所有层级,且必须在末尾
if (subPart.equals("#")) {
return i == subscriptionParts.length - 1;
}
// 不是通配符且不匹配
if (!subPart.equals("+") && !subPart.equals(topicPart)) {
return false;
}
}
// 检查是否都匹配完
return i == subscriptionParts.length && i == topicParts.length;
}
}

View File

@@ -1,15 +0,0 @@
/**
* IoT 网关 MQTT WebSocket 协议实现
* <p>
* 基于 Vert.x 实现 MQTT over WebSocket 服务端,支持:
* - 标准 MQTT 3.1.1 协议
* - WebSocket 协议升级
* - SSL/TLS 加密wss://
* - 设备认证与连接管理
* - QoS 0/1/2 消息质量保证
* - 双向消息通信(上行/下行)
*
* @author 芋道源码
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws;

View File

@@ -1,221 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
/**
* IoT MQTT WebSocket 下行消息处理器
* <p>
* 处理从消息总线发送到设备的消息,包括:
* - 属性设置
* - 服务调用
* - 事件通知
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttWsDownstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotMqttWsConnectionManager connectionManager;
/**
* 消息 ID 生成器(用于发布消息)
*/
private final AtomicInteger messageIdGenerator = new AtomicInteger(1);
public IotMqttWsDownstreamHandler(IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotMqttWsConnectionManager connectionManager) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.connectionManager = connectionManager;
}
/**
* 处理下行消息
*
* @param message 设备消息
* @return 是否处理成功
*/
public boolean handleDownstreamMessage(IotDeviceMessage message) {
try {
// 1. 基础校验
if (message == null || message.getDeviceId() == null) {
log.warn("[handleDownstreamMessage][消息或设备 ID 为空,忽略处理]");
return false;
}
// 2. 获取设备信息
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
if (deviceInfo == null) {
log.warn("[handleDownstreamMessage][设备不存在,设备 ID{}]", message.getDeviceId());
return false;
}
// 3. 构建设备标识
String deviceKey = deviceInfo.getProductKey() + ":" + deviceInfo.getDeviceName();
// 4. 检查设备是否在线
if (!connectionManager.isOnline(deviceKey)) {
log.warn("[handleDownstreamMessage][设备离线无法发送消息deviceKey: {}]", deviceKey);
return false;
}
// 5. 构建主题
String topic = buildDownstreamTopic(message, deviceInfo);
if (StrUtil.isBlank(topic)) {
log.warn("[handleDownstreamMessage][主题构建失败,设备 ID{},方法:{}]",
message.getDeviceId(), message.getMethod());
return false;
}
// 6. 检查设备是否订阅了该主题
if (!connectionManager.isSubscribed(deviceKey, topic)) {
log.warn("[handleDownstreamMessage][设备未订阅该主题deviceKey: {}topic: {}]", deviceKey, topic);
return false;
}
// 8. 编码消息
byte[] payload = deviceMessageService.encodeDeviceMessage(message,
deviceInfo.getProductKey(), deviceInfo.getDeviceName());
if (payload == null || payload.length == 0) {
log.warn("[handleDownstreamMessage][消息编码失败,设备 ID{}]", message.getDeviceId());
return false;
}
// 9. 发送消息到设备
return sendMessageToDevice(deviceKey, topic, payload, 1);
} catch (Exception e) {
if (message != null) {
log.error("[handleDownstreamMessage][处理下行消息异常,设备 ID{},错误:{}]",
message.getDeviceId(), e.getMessage(), e);
}
return false;
}
}
/**
* 构建下行消息主题
*
* @param message 设备消息
* @param deviceInfo 设备信息
* @return 主题
*/
private String buildDownstreamTopic(IotDeviceMessage message, IotDeviceRespDTO deviceInfo) {
String method = message.getMethod();
if (StrUtil.isBlank(method)) {
return null;
}
// 使用工具类构建主题,支持回复消息处理
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
return IotMqttTopicUtils.buildTopicByMethod(method, deviceInfo.getProductKey(),
deviceInfo.getDeviceName(), isReply);
}
/**
* 发送消息到设备
*
* @param deviceKey 设备标识productKey:deviceName
* @param topic 主题
* @param payload 消息内容
* @param qos QoS 级别
* @return 是否发送成功
*/
private boolean sendMessageToDevice(String deviceKey, String topic, byte[] payload, int qos) {
// 获取设备连接
ServerWebSocket socket = connectionManager.getConnection(deviceKey);
if (socket == null) {
log.warn("[sendMessageToDevice][设备未连接deviceKey: {}]", deviceKey);
return false;
}
try {
int messageId = qos > 0 ? generateMessageId() : 0;
// 手动编码 MQTT PUBLISH 消息
io.netty.buffer.ByteBuf byteBuf = io.netty.buffer.Unpooled.buffer();
// 固定头:消息类型(PUBLISH=3) + DUP(0) + QoS + RETAIN
int fixedHeaderByte1 = 0x30 | (qos << 1); // PUBLISH类型
byteBuf.writeByte(fixedHeaderByte1);
// 计算剩余长度
int topicLength = topic.getBytes().length;
int remainingLength = 2 + topicLength + (qos > 0 ? 2 : 0) + payload.length;
// 写入剩余长度(简化版本,假设小于 128 字节)
if (remainingLength < 128) {
byteBuf.writeByte(remainingLength);
} else {
// 处理大于 127 的情况
int x = remainingLength;
do {
int encodedByte = x % 128;
x = x / 128;
if (x > 0) {
encodedByte = encodedByte | 128;
}
byteBuf.writeByte(encodedByte);
} while (x > 0);
}
// 可变头:主题名称
byteBuf.writeShort(topicLength);
byteBuf.writeBytes(topic.getBytes());
// 可变头:消息 ID仅 QoS > 0 时)
if (qos > 0) {
byteBuf.writeShort(messageId);
}
// 有效载荷
byteBuf.writeBytes(payload);
// 发送
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
byteBuf.release();
socket.writeBinaryMessage(Buffer.buffer(bytes));
log.info("[sendMessageToDevice][消息已发送到设备deviceKey: {}topic: {}qos: {}messageId: {}]",
deviceKey, topic, qos, messageId);
return true;
} catch (Exception e) {
log.error("[sendMessageToDevice][发送消息到设备失败deviceKey: {}topic: {}]", deviceKey, topic, e);
return false;
}
}
/**
* 生成消息 ID
*
* @return 消息 ID
*/
private int generateMessageId() {
int id = messageIdGenerator.getAndIncrement();
// MQTT 消息 ID 范围是 1-65535
// TODO @haohao并发可能有问题
if (id > 65535) {
messageIdGenerator.set(1);
return 1;
}
return id;
}
}

View File

@@ -1,754 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.mqtt.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* IoT MQTT WebSocket 上行消息处理器
* <p>
* 处理来自设备的 MQTT 消息,包括:
* - CONNECT设备连接认证
* - PUBLISH设备发布消息
* - SUBSCRIBE设备订阅主题
* - UNSUBSCRIBE设备取消订阅
* - PINGREQ心跳请求
* - DISCONNECT设备断开连接
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttWsUpstreamHandler {
private final IotMqttWsUpstreamProtocol upstreamProtocol;
private final IotDeviceCommonApi deviceApi;
private final IotDeviceMessageService messageService;
private final IotMqttWsConnectionManager connectionManager;
/**
* 存储 WebSocket 连接到 Socket ID 的映射
* Key: WebSocket 对象
* Value: Socket IDUUID
*/
private final ConcurrentHashMap<ServerWebSocket, String> socketIdMap = new ConcurrentHashMap<>();
/**
* 存储 Socket ID 对应的设备信息
* Key: Socket IDUUID
* Value: 设备信息
*/
private final ConcurrentHashMap<String, IotDeviceRespDTO> socketDeviceMap = new ConcurrentHashMap<>();
/**
* 存储设备的消息 ID 生成器(用于 QoS > 0 的消息)
*/
private final ConcurrentHashMap<String, AtomicInteger> deviceMessageIdMap = new ConcurrentHashMap<>();
/**
* MQTT 解码通道(用于解析 WebSocket 中的 MQTT 二进制消息)
*/
private final ThreadLocal<EmbeddedChannel> decoderChannelThreadLocal = ThreadLocal
.withInitial(() -> new EmbeddedChannel(new MqttDecoder()));
/**
* MQTT 编码通道(用于编码 MQTT 响应消息)
*/
private final ThreadLocal<EmbeddedChannel> encoderChannelThreadLocal = ThreadLocal
.withInitial(() -> new EmbeddedChannel(MqttEncoder.INSTANCE));
public IotMqttWsUpstreamHandler(IotMqttWsUpstreamProtocol upstreamProtocol,
IotDeviceMessageService messageService,
IotMqttWsConnectionManager connectionManager) {
this.upstreamProtocol = upstreamProtocol;
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.messageService = messageService;
this.connectionManager = connectionManager;
}
/**
* 处理 WebSocket 连接
*
* @param socket WebSocket 连接
*/
public void handle(ServerWebSocket socket) {
// 生成唯一的 Socket ID因为 MQTT 使用二进制协议textHandlerID() 会返回 null
String socketId = IdUtil.simpleUUID();
socketIdMap.put(socket, socketId);
log.info("[handle][WebSocket 连接建立socketId: {}remoteAddress: {}]",
socketId, socket.remoteAddress());
// 设置二进制数据处理器
socket.binaryMessageHandler(buffer -> {
try {
handleMqttMessage(socket, buffer);
} catch (Exception e) {
log.error("[handle][处理 MQTT 消息异常socketId: {}]", socketId, e);
socket.close();
}
});
// 设置关闭处理器
socket.closeHandler(v -> {
socketIdMap.remove(socket);
IotDeviceRespDTO device = socketDeviceMap.remove(socketId);
if (device != null) {
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
connectionManager.removeConnection(deviceKey);
deviceMessageIdMap.remove(deviceKey);
// 发送设备离线消息
sendOfflineMessage(device);
log.info("[handle][WebSocket 连接关闭deviceKey: {}socketId: {}]", deviceKey, socketId);
}
});
// 设置异常处理器
socket.exceptionHandler(e -> {
log.error("[handle][WebSocket 连接异常socketId: {}]", socketId, e);
socketIdMap.remove(socket);
IotDeviceRespDTO device = socketDeviceMap.remove(socketId);
if (device != null) {
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
connectionManager.removeConnection(deviceKey);
deviceMessageIdMap.remove(deviceKey);
}
socket.close();
});
}
/**
* 处理 MQTT 消息
*
* @param socket WebSocket 连接
* @param buffer 消息缓冲区
*/
private void handleMqttMessage(ServerWebSocket socket, Buffer buffer) {
String socketId = socketIdMap.get(socket);
ByteBuf byteBuf = Unpooled.wrappedBuffer(buffer.getBytes());
try {
// 使用 EmbeddedChannel 解码 MQTT 消息
EmbeddedChannel decoderChannel = decoderChannelThreadLocal.get();
decoderChannel.writeInbound(byteBuf.retain());
// 读取解码后的消息
MqttMessage mqttMessage = decoderChannel.readInbound();
if (mqttMessage == null) {
log.warn("[handleMqttMessage][MQTT 消息解码失败socketId: {}]", socketId);
return;
}
MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
log.debug("[handleMqttMessage][收到 MQTT 消息,类型: {}socketId: {}]", messageType, socketId);
// 根据消息类型分发处理
switch (messageType) {
case CONNECT:
handleConnect(socket, (MqttConnectMessage) mqttMessage);
break;
case PUBLISH:
handlePublish(socket, (MqttPublishMessage) mqttMessage);
break;
case PUBACK:
handlePubAck(socket, mqttMessage);
break;
case PUBREC:
handlePubRec(socket, mqttMessage);
break;
case PUBREL:
handlePubRel(socket, mqttMessage);
break;
case PUBCOMP:
handlePubComp(socket, mqttMessage);
break;
case SUBSCRIBE:
handleSubscribe(socket, (MqttSubscribeMessage) mqttMessage);
break;
case UNSUBSCRIBE:
handleUnsubscribe(socket, (MqttUnsubscribeMessage) mqttMessage);
break;
case PINGREQ:
handlePingReq(socket);
break;
case DISCONNECT:
handleDisconnect(socket);
break;
default:
log.warn("[handleMqttMessage][不支持的消息类型: {}socketId: {}]", messageType, socketId);
}
} catch (DecoderException e) {
log.error("[handleMqttMessage][MQTT 消息解码异常socketId: {}]", socketId, e);
socket.close();
} catch (Exception e) {
log.error("[handleMqttMessage][处理 MQTT 消息失败socketId: {}]", socketId, e);
socket.close();
} finally {
byteBuf.release();
}
}
/**
* 处理 CONNECT 消息(设备认证)
*/
private void handleConnect(ServerWebSocket socket, MqttConnectMessage message) {
String socketId = socketIdMap.get(socket);
try {
// 1. 解析 CONNECT 消息
MqttConnectPayload payload = message.payload();
String clientId = payload.clientIdentifier();
String username = payload.userName();
String password = payload.passwordInBytes() != null
? new String(payload.passwordInBytes(), StandardCharsets.UTF_8)
: null;
log.info("[handleConnect][收到 CONNECT 消息clientId: {}username: {}socketId: {}]",
clientId, username, socketId);
// 2. 设备认证
IotDeviceRespDTO device = authenticateDevice(clientId, username, password);
if (device == null) {
log.warn("[handleConnect][设备认证失败clientId: {}socketId: {}]", clientId, socketId);
sendConnAck(socket, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
socket.close();
return;
}
// 3. 保存设备信息
socketDeviceMap.put(socketId, device);
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
connectionManager.addConnection(deviceKey, socket, socketId);
deviceMessageIdMap.put(deviceKey, new AtomicInteger(1));
log.info("[handleConnect][设备认证成功deviceId: {}deviceKey: {}socketId: {}]",
device.getId(), deviceKey, socketId);
// 4. 发送 CONNACK
sendConnAck(socket, MqttConnectReturnCode.CONNECTION_ACCEPTED);
// 5. 发送设备上线消息
sendOnlineMessage(device);
} catch (Exception e) {
log.error("[handleConnect][处理 CONNECT 消息失败socketId: {}]", socketId, e);
sendConnAck(socket, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
socket.close();
}
}
/**
* 处理 PUBLISH 消息(设备发布消息)
*/
private void handlePublish(ServerWebSocket socket, MqttPublishMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePublish][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
try {
// 1. 解析 PUBLISH 消息
MqttFixedHeader fixedHeader = message.fixedHeader();
MqttPublishVariableHeader variableHeader = message.variableHeader();
ByteBuf payload = message.payload();
String topic = variableHeader.topicName();
int messageId = variableHeader.packetId();
MqttQoS qos = fixedHeader.qosLevel();
log.debug("[handlePublish][收到 PUBLISH 消息topic: {}messageId: {}QoS: {}deviceId: {}]",
topic, messageId, qos, device.getId());
// 2. 读取 payload
byte[] payloadBytes = new byte[payload.readableBytes()];
payload.readBytes(payloadBytes);
// 3. 解码并发送消息
IotDeviceMessage deviceMessage = messageService.decodeDeviceMessage(payloadBytes,
device.getProductKey(), device.getDeviceName());
if (deviceMessage != null) {
deviceMessage.setServerId(upstreamProtocol.getServerId());
messageService.sendDeviceMessage(deviceMessage, device.getProductKey(),
device.getDeviceName(), upstreamProtocol.getServerId());
log.info("[handlePublish][设备消息已发送method: {}deviceId: {}]",
deviceMessage.getMethod(), device.getId());
}
// 4. 根据 QoS 级别发送相应的确认消息
if (qos == MqttQoS.AT_LEAST_ONCE) {
// QoS 1发送 PUBACK
sendPubAck(socket, messageId);
} else if (qos == MqttQoS.EXACTLY_ONCE) {
// QoS 2发送 PUBREC
sendPubRec(socket, messageId);
}
// QoS 0 无需确认
} catch (Exception e) {
log.error("[handlePublish][处理 PUBLISH 消息失败deviceId: {}]", device.getId(), e);
}
}
/**
* 处理 PUBACK 消息QoS 1 确认)
*/
private void handlePubAck(ServerWebSocket socket, MqttMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePubAck][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
log.debug("[handlePubAck][收到 PUBACKmessageId: {}deviceId: {}]", messageId, device.getId());
}
/**
* 处理 PUBREC 消息QoS 2 第一步确认)
*/
private void handlePubRec(ServerWebSocket socket, MqttMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePubRec][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
log.debug("[handlePubRec][收到 PUBRECmessageId: {}deviceId: {}]", messageId, device.getId());
// 发送 PUBREL
sendPubRel(socket, messageId);
}
/**
* 处理 PUBREL 消息QoS 2 第二步)
*/
private void handlePubRel(ServerWebSocket socket, MqttMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePubRel][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
log.debug("[handlePubRel][收到 PUBRELmessageId: {}deviceId: {}]", messageId, device.getId());
// 发送 PUBCOMP
sendPubComp(socket, messageId);
}
/**
* 处理 PUBCOMP 消息QoS 2 完成确认)
*/
private void handlePubComp(ServerWebSocket socket, MqttMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePubComp][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
log.debug("[handlePubComp][收到 PUBCOMPmessageId: {}deviceId: {}]", messageId, device.getId());
}
/**
* 处理 SUBSCRIBE 消息(设备订阅主题)
*/
private void handleSubscribe(ServerWebSocket socket, MqttSubscribeMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handleSubscribe][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
try {
// 1. 解析 SUBSCRIBE 消息
int messageId = message.variableHeader().messageId();
MqttSubscribePayload payload = message.payload();
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
log.info("[handleSubscribe][设备订阅请求deviceKey: {}messageId: {},主题数量: {}]",
deviceKey, messageId, payload.topicSubscriptions().size());
// 2. 构建 QoS 列表并记录订阅信息
int[] grantedQosList = new int[payload.topicSubscriptions().size()];
for (int i = 0; i < payload.topicSubscriptions().size(); i++) {
MqttTopicSubscription subscription = payload.topicSubscriptions().get(i);
String topic = subscription.topicFilter();
grantedQosList[i] = subscription.qualityOfService().value();
// 记录订阅信息到连接管理器
connectionManager.addSubscription(deviceKey, topic);
log.info("[handleSubscribe][订阅主题: {}QoS: {}deviceKey: {}]",
topic, subscription.qualityOfService(), deviceKey);
}
// 3. 发送 SUBACK
sendSubAck(socket, messageId, grantedQosList);
} catch (Exception e) {
log.error("[handleSubscribe][处理 SUBSCRIBE 消息失败deviceId: {}]", device.getId(), e);
}
}
/**
* 处理 UNSUBSCRIBE 消息(设备取消订阅)
*/
private void handleUnsubscribe(ServerWebSocket socket, MqttUnsubscribeMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handleUnsubscribe][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
try {
// 1. 解析 UNSUBSCRIBE 消息
int messageId = message.variableHeader().messageId();
MqttUnsubscribePayload payload = message.payload();
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
log.info("[handleUnsubscribe][设备取消订阅deviceKey: {}messageId: {},主题数量: {}]",
deviceKey, messageId, payload.topics().size());
// 2. 移除订阅信息
for (String topic : payload.topics()) {
connectionManager.removeSubscription(deviceKey, topic);
log.info("[handleUnsubscribe][取消订阅主题: {}deviceKey: {}]", topic, deviceKey);
}
// 3. 发送 UNSUBACK
sendUnsubAck(socket, messageId);
} catch (Exception e) {
log.error("[handleUnsubscribe][处理 UNSUBSCRIBE 消息失败deviceId: {}]", device.getId(), e);
}
}
/**
* 处理 PINGREQ 消息(心跳请求)
*/
private void handlePingReq(ServerWebSocket socket) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePingReq][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
log.debug("[handlePingReq][收到心跳请求deviceId: {}]", device.getId());
// 发送 PINGRESP
sendPingResp(socket);
}
/**
* 处理 DISCONNECT 消息(设备断开连接)
*/
private void handleDisconnect(ServerWebSocket socket) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.remove(socketId);
if (device != null) {
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
connectionManager.removeConnection(deviceKey);
deviceMessageIdMap.remove(deviceKey);
sendOfflineMessage(device);
log.info("[handleDisconnect][设备主动断开连接deviceKey: {}]", deviceKey);
}
socket.close();
}
// ==================== 设备认证和状态相关方法 ====================
/**
* 设备认证
*/
private IotDeviceRespDTO authenticateDevice(String clientId, String username, String password) {
try {
// 1. 参数校验
if (StrUtil.hasEmpty(clientId, username, password)) {
log.warn("[authenticateDevice][认证参数不完整clientId: {}username: {}]", clientId, username);
return null;
}
// 2. 构建认证参数并调用 API
IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO()
.setClientId(clientId)
.setUsername(username)
.setPassword(password);
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) {
log.warn("[authenticateDevice][设备认证失败clientId: {}]", clientId);
return null;
}
// 3. 获取设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.warn("[authenticateDevice][用户名格式不正确username: {}]", username);
return null;
}
IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO()
.setProductKey(deviceInfo.getProductKey())
.setDeviceName(deviceInfo.getDeviceName());
CommonResult<IotDeviceRespDTO> deviceResult = deviceApi.getDevice(getReqDTO);
if (!deviceResult.isSuccess() || deviceResult.getData() == null) {
log.warn("[authenticateDevice][获取设备信息失败username: {}]", username);
return null;
}
return deviceResult.getData();
} catch (Exception e) {
log.error("[authenticateDevice][设备认证异常clientId: {}]", clientId, e);
return null;
}
}
/**
* 发送设备上线消息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
messageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), upstreamProtocol.getServerId());
log.info("[sendOnlineMessage][设备上线deviceId: {}]", device.getId());
} catch (Exception e) {
log.error("[sendOnlineMessage][发送设备上线消息失败deviceId: {}]", device.getId(), e);
}
}
/**
* 发送设备离线消息
*/
private void sendOfflineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
messageService.sendDeviceMessage(offlineMessage, device.getProductKey(),
device.getDeviceName(), upstreamProtocol.getServerId());
log.info("[sendOfflineMessage][设备离线deviceId: {}]", device.getId());
} catch (Exception e) {
log.error("[sendOfflineMessage][发送设备离线消息失败deviceId: {}]", device.getId(), e);
}
}
// ==================== 发送响应消息的辅助方法 ====================
/**
* 发送 CONNACK 消息
*/
private void sendConnAck(ServerWebSocket socket, MqttConnectReturnCode returnCode) {
try {
// 构建 CONNACK 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, false);
MqttConnAckMessage connAckMessage = new MqttConnAckMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, connAckMessage);
log.debug("[sendConnAck][发送 CONNACK 消息returnCode: {}]", returnCode);
} catch (Exception e) {
log.error("[sendConnAck][发送 CONNACK 消息失败]", e);
}
}
/**
* 发送 PUBACK 消息QoS 1 确认)
*/
private void sendPubAck(ServerWebSocket socket, int messageId) {
try {
// 构建 PUBACK 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessage pubAckMessage = new MqttMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, pubAckMessage);
log.debug("[sendPubAck][发送 PUBACK 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubAck][发送 PUBACK 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 PUBREC 消息QoS 2 第一步确认)
*/
private void sendPubRec(ServerWebSocket socket, int messageId) {
try {
// 构建 PUBREC 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessage pubRecMessage = new MqttMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, pubRecMessage);
log.debug("[sendPubRec][发送 PUBREC 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubRec][发送 PUBREC 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 PUBREL 消息QoS 2 第二步)
*/
private void sendPubRel(ServerWebSocket socket, int messageId) {
try {
// 构建 PUBREL 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessage pubRelMessage = new MqttMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, pubRelMessage);
log.debug("[sendPubRel][发送 PUBREL 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubRel][发送 PUBREL 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 PUBCOMP 消息QoS 2 完成确认)
*/
private void sendPubComp(ServerWebSocket socket, int messageId) {
try {
// 构建 PUBCOMP 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessage pubCompMessage = new MqttMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, pubCompMessage);
log.debug("[sendPubComp][发送 PUBCOMP 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubComp][发送 PUBCOMP 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 SUBACK 消息
*/
private void sendSubAck(ServerWebSocket socket, int messageId, int[] grantedQosList) {
try {
// 构建 SUBACK 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubAckPayload payload = new MqttSubAckPayload(grantedQosList);
MqttSubAckMessage subAckMessage = new MqttSubAckMessage(fixedHeader, variableHeader, payload);
// 编码并发送
sendMqttMessage(socket, subAckMessage);
log.debug("[sendSubAck][发送 SUBACK 消息messageId: {},主题数量: {}]", messageId, grantedQosList.length);
} catch (Exception e) {
log.error("[sendSubAck][发送 SUBACK 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 UNSUBACK 消息
*/
private void sendUnsubAck(ServerWebSocket socket, int messageId) {
try {
// 构建 UNSUBACK 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttUnsubAckMessage unsubAckMessage = new MqttUnsubAckMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, unsubAckMessage);
log.debug("[sendUnsubAck][发送 UNSUBACK 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendUnsubAck][发送 UNSUBACK 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 PINGRESP 消息
*/
private void sendPingResp(ServerWebSocket socket) {
try {
// 构建 PINGRESP 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage pingRespMessage = new MqttMessage(fixedHeader);
// 编码并发送
sendMqttMessage(socket, pingRespMessage);
log.debug("[sendPingResp][发送 PINGRESP 消息]");
} catch (Exception e) {
log.error("[sendPingResp][发送 PINGRESP 消息失败]", e);
}
}
/**
* 发送 MQTT 消息到 WebSocket
*/
private void sendMqttMessage(ServerWebSocket socket, MqttMessage mqttMessage) {
ByteBuf byteBuf = null;
try {
// 使用 EmbeddedChannel 编码 MQTT 消息
EmbeddedChannel encoderChannel = encoderChannelThreadLocal.get();
encoderChannel.writeOutbound(mqttMessage);
// 读取编码后的 ByteBuf
byteBuf = encoderChannel.readOutbound();
if (byteBuf != null) {
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
socket.writeBinaryMessage(Buffer.buffer(bytes));
}
} finally {
if (byteBuf != null) {
byteBuf.release();
}
}
}
}

View File

@@ -115,23 +115,10 @@ yudao:
connect-timeout-seconds: 60
ssl-enabled: false
# ====================================
# 针对引入的 MQTT WebSocket 组件的配置
# ====================================
mqtt-ws:
enabled: false # 是否启用 MQTT WebSocket
port: 8083 # WebSocket 服务端口
path: /mqtt # WebSocket 路径
max-message-size: 8192 # 最大消息大小(字节)
max-frame-size: 65536 # 最大帧大小(字节)
connect-timeout-seconds: 60 # 连接超时时间(秒)
keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒)
ssl-enabled: false # 是否启用 SSLwss://
sub-protocol: mqtt # WebSocket 子协议
# ====================================
# 针对引入的 CoAP 组件的配置
# ====================================
coap:
enabled: true # 是否启用 CoAP 协议
enabled: false # 是否启用 CoAP 协议
port: 5683 # CoAP 服务端口(默认 5683
max-message-size: 1024 # 最大消息大小(字节)
ack-timeout: 2000 # ACK 超时时间(毫秒)