feat:【IoT 物联网】新增 MQTT 协议支持,包括上下行消息处理、连接管理及配置项

This commit is contained in:
haohao 2025-08-10 15:38:30 +08:00
parent bec3d070f0
commit 73e97d1675
9 changed files with 949 additions and 3 deletions

View File

@ -6,6 +6,10 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscr
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
@ -49,7 +53,7 @@ public class IotGatewayConfiguration {
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.emqx", name = "enabled", havingValue = "true")
@Slf4j
public static class MqttProtocolConfiguration {
public static class EmqxProtocolConfiguration {
@Bean(destroyMethod = "close")
public Vertx emqxVertx() {
@ -110,4 +114,42 @@ public class IotGatewayConfiguration {
}
/**
* IoT 网关 MQTT 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.mqtt", name = "enabled", havingValue = "true")
@Slf4j
public static class MqttProtocolConfiguration {
@Bean(destroyMethod = "close")
public Vertx mqttVertx() {
return Vertx.vertx();
}
@Bean
public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotMqttConnectionManager connectionManager,
Vertx mqttVertx) {
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(),
deviceService, messageService, connectionManager, mqttVertx);
}
@Bean
public IotMqttDownstreamHandler iotMqttDownstreamHandler(IotDeviceMessageService messageService,
IotMqttConnectionManager connectionManager) {
return new IotMqttDownstreamHandler(messageService, connectionManager);
}
@Bean
public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol,
IotMqttDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, downstreamHandler, messageBus);
}
}
}

View File

@ -83,6 +83,11 @@ public class IotGatewayProperties {
*/
private TcpProperties tcp;
/**
* MQTT 组件配置
*/
private MqttProperties mqtt;
}
@Data
@ -325,4 +330,83 @@ public class IotGatewayProperties {
}
@Data
public static class MqttProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* 服务器端口
*/
private Integer port = 1883;
/**
* 最大消息大小字节
*/
private Integer maxMessageSize = 8192;
/**
* 连接超时时间
*/
private Integer connectTimeoutSeconds = 60;
/**
* 保持连接超时时间
*/
private Integer keepAliveTimeoutSeconds = 300;
/**
* 是否启用 SSL
*/
private Boolean sslEnabled = false;
/**
* SSL 配置
*/
private SslOptions sslOptions = new SslOptions();
/**
* SSL 配置选项
*/
@Data
public static class SslOptions {
/**
* 密钥证书选项
*/
private io.vertx.core.net.KeyCertOptions keyCertOptions;
/**
* 信任选项
*/
private io.vertx.core.net.TrustOptions trustOptions;
/**
* SSL 证书路径
*/
private String certPath;
/**
* SSL 私钥路径
*/
private String keyPath;
/**
* 信任存储路径
*/
private String trustStorePath;
/**
* 信任存储密码
*/
private String trustStorePassword;
}
}
}

View File

@ -0,0 +1,79 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 协议下行消息订阅器
* <p>
* 负责接收来自消息总线的下行消息并委托给下行处理器进行业务处理
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotMqttUpstreamProtocol upstreamProtocol;
private final IotMqttDownstreamHandler downstreamHandler;
private final IotMessageBus messageBus;
public IotMqttDownstreamSubscriber(IotMqttUpstreamProtocol upstreamProtocol,
IotMqttDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
this.upstreamProtocol = upstreamProtocol;
this.downstreamHandler = downstreamHandler;
this.messageBus = messageBus;
}
@PostConstruct
public void subscribe() {
messageBus.register(this);
log.info("[subscribe][MQTT 协议下行消息订阅成功,主题:{}]", getTopic());
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(upstreamProtocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
try {
// 1. 校验
String method = message.getMethod();
if (method == null) {
log.warn("[onMessage][消息方法为空, messageId: {}, deviceId: {}]",
message.getId(), message.getDeviceId());
return;
}
// 2. 委托给下行处理器处理业务逻辑
boolean success = downstreamHandler.handleDownstreamMessage(message);
if (success) {
log.debug("[onMessage][下行消息处理成功, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
} else {
log.warn("[onMessage][下行消息处理失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
}
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId(), e);
}
}
}

View File

@ -0,0 +1,96 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 协议接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttUpstreamProtocol {
private final IotGatewayProperties.MqttProperties mqttProperties;
private final IotDeviceService deviceService;
private final IotDeviceMessageService messageService;
private final IotMqttConnectionManager connectionManager;
private final Vertx vertx;
@Getter
private final String serverId;
private MqttServer mqttServer;
public IotMqttUpstreamProtocol(IotGatewayProperties.MqttProperties mqttProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotMqttConnectionManager connectionManager,
Vertx vertx) {
this.mqttProperties = mqttProperties;
this.deviceService = deviceService;
this.messageService = messageService;
this.connectionManager = connectionManager;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(mqttProperties.getPort());
}
@PostConstruct
public void start() {
// 创建服务器选项
MqttServerOptions options = new MqttServerOptions();
options.setPort(mqttProperties.getPort());
options.setMaxMessageSize(mqttProperties.getMaxMessageSize());
options.setTimeoutOnConnect(mqttProperties.getConnectTimeoutSeconds());
// 配置 SSL如果启用
if (Boolean.TRUE.equals(mqttProperties.getSslEnabled())) {
options.setSsl(true).setKeyCertOptions(mqttProperties.getSslOptions().getKeyCertOptions())
.setTrustOptions(mqttProperties.getSslOptions().getTrustOptions());
}
// 创建服务器并设置连接处理器
mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(endpoint -> {
IotMqttUpstreamHandler handler = new IotMqttUpstreamHandler(this, messageService, deviceService,
connectionManager);
handler.handle(endpoint);
});
// 启动服务器
try {
mqttServer.listen().result();
log.info("[start][IoT 网关 MQTT 协议启动成功,端口:{}]", mqttProperties.getPort());
} catch (Exception e) {
log.error("[start][IoT 网关 MQTT 协议启动失败]", e);
throw e;
}
}
@PreDestroy
public void stop() {
if (mqttServer != null) {
try {
mqttServer.close().result();
log.info("[stop][IoT 网关 MQTT 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 MQTT 协议停止失败]", e);
}
}
}
}

View File

@ -0,0 +1,197 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.mqtt.MqttEndpoint;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* IoT 网关 MQTT 连接管理器
* <p>
* 统一管理 MQTT 连接的认证状态设备会话和消息发送功能
* 1. 管理 MQTT 连接的认证状态
* 2. 管理设备会话和在线状态
* 3. 管理消息发送到设备
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotMqttConnectionManager {
/**
* 连接信息映射MqttEndpoint -> 连接信息
*/
private final Map<MqttEndpoint, ConnectionInfo> connectionMap = new ConcurrentHashMap<>();
/**
* 设备 ID -> MqttEndpoint 的映射
*/
private final Map<Long, MqttEndpoint> deviceEndpointMap = new ConcurrentHashMap<>();
/**
* 安全获取 endpoint 地址
*
* @param endpoint MQTT 连接端点
* @return 地址字符串如果获取失败则返回 "unknown"
*/
private String getEndpointAddress(MqttEndpoint endpoint) {
try {
if (endpoint != null) {
return endpoint.remoteAddress().toString();
}
} catch (Exception e) {
// 忽略异常返回默认值
}
return "unknown";
}
/**
* 注册设备连接包含认证信息
*
* @param endpoint MQTT 连接端点
* @param deviceId 设备 ID
* @param connectionInfo 连接信息
*/
public void registerConnection(MqttEndpoint endpoint, Long deviceId, ConnectionInfo connectionInfo) {
// 如果设备已有其他连接先清理旧连接
MqttEndpoint oldEndpoint = deviceEndpointMap.get(deviceId);
if (oldEndpoint != null && oldEndpoint != endpoint) {
log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]",
deviceId, getEndpointAddress(oldEndpoint));
oldEndpoint.close();
// 清理旧连接的映射
connectionMap.remove(oldEndpoint);
}
connectionMap.put(endpoint, connectionInfo);
deviceEndpointMap.put(deviceId, endpoint);
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {}product key: {}device name: {}]",
deviceId, getEndpointAddress(endpoint), connectionInfo.getProductKey(), connectionInfo.getDeviceName());
}
/**
* 注销设备连接
*
* @param endpoint MQTT 连接端点
*/
public void unregisterConnection(MqttEndpoint endpoint) {
ConnectionInfo connectionInfo = connectionMap.remove(endpoint);
if (connectionInfo != null) {
Long deviceId = connectionInfo.getDeviceId();
deviceEndpointMap.remove(deviceId);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
deviceId, getEndpointAddress(endpoint));
}
}
/**
* 获取连接信息
*/
public ConnectionInfo getConnectionInfo(MqttEndpoint endpoint) {
return connectionMap.get(endpoint);
}
/**
* 根据设备 ID 获取连接信息
*
* @param deviceId 设备 ID
* @return 连接信息
*/
public IotMqttConnectionManager.ConnectionInfo getConnectionInfoByDeviceId(Long deviceId) {
// 通过设备 ID 获取连接端点
var endpoint = getDeviceEndpoint(deviceId);
if (endpoint == null) {
return null;
}
// 获取连接信息
return getConnectionInfo(endpoint);
}
/**
* 检查设备是否在线
*/
public boolean isDeviceOnline(Long deviceId) {
return deviceEndpointMap.containsKey(deviceId);
}
/**
* 检查设备是否离线
*/
public boolean isDeviceOffline(Long deviceId) {
return !isDeviceOnline(deviceId);
}
/**
* 发送消息到设备
*
* @param deviceId 设备 ID
* @param topic 主题
* @param payload 消息内容
* @param qos 服务质量
* @param retain 是否保留消息
* @return 是否发送成功
*/
public boolean sendToDevice(Long deviceId, String topic, byte[] payload, int qos, boolean retain) {
MqttEndpoint endpoint = deviceEndpointMap.get(deviceId);
if (endpoint == null) {
log.warn("[sendToDevice][设备离线,无法发送消息,设备 ID: {},主题: {}]", deviceId, topic);
return false;
}
try {
endpoint.publish(topic, io.vertx.core.buffer.Buffer.buffer(payload), MqttQoS.valueOf(qos), false, retain);
log.debug("[sendToDevice][发送消息成功,设备 ID: {},主题: {}QoS: {}]", deviceId, topic, qos);
return true;
} catch (Exception e) {
log.error("[sendToDevice][发送消息失败,设备 ID: {},主题: {},错误: {}]", deviceId, topic, e.getMessage());
return false;
}
}
/**
* 获取设备连接端点
*/
public MqttEndpoint getDeviceEndpoint(Long deviceId) {
return deviceEndpointMap.get(deviceId);
}
/**
* 连接信息
*/
@Data
public static class ConnectionInfo {
/**
* 设备 ID
*/
private Long deviceId;
/**
* 产品 Key
*/
private String productKey;
/**
* 设备名称
*/
private String deviceName;
/**
* 客户端 ID
*/
private String clientId;
/**
* 是否已认证
*/
private boolean authenticated;
}
}

View File

@ -0,0 +1,6 @@
/**
* MQTT 协议实现包
* <p>
* 提供基于 Vert.x MQTT Server IoT 设备连接和消息处理功能
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;

View File

@ -0,0 +1,133 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 协议下行消息处理器
* <p>
* 专门处理下行消息的业务逻辑包括
* 1. 消息编码
* 2. 主题构建
* 3. 消息发送
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttDownstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final IotMqttConnectionManager connectionManager;
public IotMqttDownstreamHandler(IotDeviceMessageService deviceMessageService,
IotMqttConnectionManager connectionManager) {
this.deviceMessageService = deviceMessageService;
this.connectionManager = connectionManager;
}
/**
* 处理下行消息
*
* @param message 设备消息
* @return 是否处理成功
*/
public boolean handleDownstreamMessage(IotDeviceMessage message) {
try {
// 1. 基础校验
if (message == null || message.getDeviceId() == null) {
log.warn("[handleDownstreamMessage][消息或设备 ID 为空,忽略处理]");
return false;
}
// 2. 检查设备是否在线
if (connectionManager.isDeviceOffline(message.getDeviceId())) {
log.warn("[handleDownstreamMessage][设备离线,无法发送消息,设备 ID{}]", message.getDeviceId());
return false;
}
// 3. 获取连接信息
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId(message.getDeviceId());
if (connectionInfo == null) {
log.warn("[handleDownstreamMessage][连接信息不存在,设备 ID{}]", message.getDeviceId());
return false;
}
// 4. 编码消息
byte[] payload = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getProductKey(),
connectionInfo.getDeviceName());
if (payload == null || payload.length == 0) {
log.warn("[handleDownstreamMessage][消息编码失败,设备 ID{}]", message.getDeviceId());
return false;
}
// 5. 发送消息到设备
return sendMessageToDevice(message, connectionInfo, payload);
} catch (Exception e) {
if (message != null) {
log.error("[handleDownstreamMessage][处理下行消息异常,设备 ID{},错误:{}]",
message.getDeviceId(), e.getMessage(), e);
}
return false;
}
}
/**
* 发送消息到设备
*
* @param message 设备消息
* @param connectionInfo 连接信息
* @param payload 消息负载
* @return 是否发送成功
*/
private boolean sendMessageToDevice(IotDeviceMessage message,
IotMqttConnectionManager.ConnectionInfo connectionInfo,
byte[] payload) {
// 1. 构建主题
String topic = buildDownstreamTopic(message, connectionInfo);
if (StrUtil.isBlank(topic)) {
log.warn("[sendMessageToDevice][主题构建失败,设备 ID{},方法:{}]",
message.getDeviceId(), message.getMethod());
return false;
}
// 2. 发送消息
boolean success = connectionManager.sendToDevice(message.getDeviceId(), topic, payload, MqttQoS.AT_LEAST_ONCE.value(), false);
if (success) {
log.debug("[sendMessageToDevice][消息发送成功,设备 ID{},主题:{},方法:{}]",
message.getDeviceId(), topic, message.getMethod());
} else {
log.warn("[sendMessageToDevice][消息发送失败,设备 ID{},主题:{},方法:{}]",
message.getDeviceId(), topic, message.getMethod());
}
return success;
}
/**
* 构建下行消息主题
*
* @param message 设备消息
* @param connectionInfo 连接信息
* @return 主题
*/
private String buildDownstreamTopic(IotDeviceMessage message,
IotMqttConnectionManager.ConnectionInfo connectionInfo) {
String method = message.getMethod();
if (StrUtil.isBlank(method)) {
return null;
}
// 使用工具类构建主题支持回复消息处理
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
return IotMqttTopicUtils.buildTopicByMethod(method, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), isReply);
}
}

View File

@ -0,0 +1,298 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttTopicSubscription;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* MQTT 上行消息处理器
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttUpstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final IotMqttConnectionManager connectionManager;
private final IotDeviceCommonApi deviceApi;
private final String serverId;
public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol,
IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotMqttConnectionManager connectionManager) {
this.deviceMessageService = deviceMessageService;
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.connectionManager = connectionManager;
this.serverId = protocol.getServerId();
}
/**
* 处理 MQTT 连接
*
* @param endpoint MQTT 连接端点
*/
public void handle(MqttEndpoint endpoint) {
String clientId = endpoint.clientIdentifier();
String username = endpoint.auth() != null ? endpoint.auth().getUsername() : null;
String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null;
log.debug("[handle][设备连接请求,客户端 ID: {},用户名: {},地址: {}]",
clientId, username, getEndpointAddress(endpoint));
// 1. 先进行认证
if (!authenticateDevice(clientId, username, password, endpoint)) {
log.warn("[handle][设备认证失败,拒绝连接,客户端 ID: {},用户名: {}]", clientId, username);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return;
}
log.info("[handle][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username);
// 设置异常和关闭处理器
endpoint.exceptionHandler(ex -> {
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, getEndpointAddress(endpoint));
cleanupConnection(endpoint);
});
endpoint.closeHandler(v -> {
log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, getEndpointAddress(endpoint));
cleanupConnection(endpoint);
});
// 设置消息处理器
endpoint.publishHandler(message -> {
try {
processMessage(clientId, message.topicName(), message.payload().getBytes(), endpoint);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, getEndpointAddress(endpoint), e.getMessage());
cleanupConnection(endpoint);
endpoint.close();
}
});
// 设置订阅处理器
endpoint.subscribeHandler(subscribe -> {
log.debug("[handle][设备订阅,客户端 ID: {},主题: {}]", clientId, subscribe.topicSubscriptions());
// 提取 QoS 列表
List<MqttQoS> grantedQoSLevels = subscribe.topicSubscriptions().stream()
.map(MqttTopicSubscription::qualityOfService)
.collect(java.util.stream.Collectors.toList());
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels);
});
// 设置取消订阅处理器
endpoint.unsubscribeHandler(unsubscribe -> {
log.debug("[handle][设备取消订阅,客户端 ID: {},主题: {}]", clientId, unsubscribe.topics());
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
// 设置断开连接处理器
endpoint.disconnectHandler(v -> {
log.debug("[handle][设备断开连接,客户端 ID: {}]", clientId);
cleanupConnection(endpoint);
});
// 接受连接
endpoint.accept(false);
}
/**
* 处理消息
*
* @param clientId 客户端 ID
* @param topic 主题
* @param payload 消息内容
* @param endpoint MQTT 连接端点
* @throws Exception 消息解码失败时抛出异常
*/
private void processMessage(String clientId, String topic, byte[] payload, MqttEndpoint endpoint) throws Exception {
// 1. 基础检查
if (payload == null || payload.length == 0) {
return;
}
// 2. 解析主题获取 productKey deviceName
String[] topicParts = topic.split("/");
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
log.warn("[processMessage][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic);
return;
}
String productKey = topicParts[2];
String deviceName = topicParts[3];
// 3. 解码消息使用从 topic 解析的 productKey deviceName
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[processMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
return;
}
// 4. 处理业务消息认证已在连接时完成
handleBusinessRequest(clientId, message, productKey, deviceName, endpoint);
}
/**
* MQTT 连接时进行设备认证
*
* @param clientId 客户端 ID
* @param username 用户名
* @param password 密码
* @param endpoint MQTT 连接端点
* @return 认证是否成功
*/
private boolean authenticateDevice(String clientId, String username, String password, MqttEndpoint endpoint) {
try {
// 1. 参数校验
if (StrUtil.hasEmpty(clientId, username, password)) {
log.warn("[authenticateDevice][认证参数不完整,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
// 2. 构建认证参数
IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO()
.setClientId(clientId)
.setUsername(username)
.setPassword(password);
// 3. 调用设备认证 API
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) {
log.warn("[authenticateDevice][设备认证失败,客户端 ID: {},用户名: {},错误: {}]",
clientId, username, authResult.getMsg());
return false;
}
// 4. 获取设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.warn("[authenticateDevice][用户名格式不正确,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO();
getReqDTO.setProductKey(deviceInfo.getProductKey());
getReqDTO.setDeviceName(deviceInfo.getDeviceName());
CommonResult<IotDeviceRespDTO> deviceResult = deviceApi.getDevice(getReqDTO);
if (!deviceResult.isSuccess() || deviceResult.getData() == null) {
log.warn("[authenticateDevice][获取设备信息失败,客户端 ID: {},用户名: {},错误: {}]",
clientId, username, deviceResult.getMsg());
return false;
}
// 5. 注册连接
IotDeviceRespDTO device = deviceResult.getData();
registerConnection(endpoint, device, clientId);
// 6. 发送设备上线消息
sendOnlineMessage(device);
return true;
} catch (Exception e) {
log.error("[authenticateDevice][设备认证异常,客户端 ID: {},用户名: {}]", clientId, username, e);
return false;
}
}
/**
* 处理业务请求
*/
private void handleBusinessRequest(String clientId, IotDeviceMessage message, String productKey, String deviceName,
MqttEndpoint endpoint) {
// 发送消息到消息总线
message.setServerId(serverId);
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
}
/**
* 注册连接
*/
private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device,
String clientId) {
IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo();
connectionInfo.setDeviceId(device.getId());
connectionInfo.setProductKey(device.getProductKey());
connectionInfo.setDeviceName(device.getDeviceName());
connectionInfo.setClientId(clientId);
connectionInfo.setAuthenticated(true);
connectionManager.registerConnection(endpoint, device.getId(), connectionInfo);
}
/**
* 发送设备上线消息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
log.info("[sendOnlineMessage][设备上线,设备 ID: {},设备名称: {}]", device.getId(), device.getDeviceName());
} catch (Exception e) {
log.error("[sendOnlineMessage][发送设备上线消息失败,设备 ID: {},错误: {}]", device.getId(), e.getMessage());
}
}
/**
* 安全获取 endpoint 地址
*
* @param endpoint MQTT 连接端点
* @return 地址字符串如果获取失败则返回 "unknown"
*/
private String getEndpointAddress(MqttEndpoint endpoint) {
try {
if (endpoint != null) {
return endpoint.remoteAddress().toString();
}
} catch (Exception e) {
// 忽略异常返回默认值
}
return "unknown";
}
/**
* 清理连接
*/
private void cleanupConnection(MqttEndpoint endpoint) {
try {
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint);
if (connectionInfo != null) {
// 发送设备离线消息
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
log.info("[cleanupConnection][设备离线,设备 ID: {},设备名称: {}]",
connectionInfo.getDeviceId(), connectionInfo.getDeviceName());
}
// 注销连接
connectionManager.unregisterConnection(endpoint);
} catch (Exception e) {
log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]",
endpoint.clientIdentifier(), e.getMessage());
}
}
}

View File

@ -48,13 +48,13 @@ yudao:
# 针对引入的 HTTP 组件的配置
# ====================================
http:
enabled: true
enabled: false
server-port: 8092
# ====================================
# 针对引入的 EMQX 组件的配置
# ====================================
emqx:
enabled: true
enabled: false
http-port: 8090 # MQTT HTTP 服务端口
mqtt-host: 127.0.0.1 # MQTT Broker 地址
mqtt-port: 1883 # MQTT Broker 端口
@ -95,6 +95,16 @@ yudao:
ssl-enabled: false
ssl-cert-path: "classpath:certs/client.jks"
ssl-key-path: "classpath:certs/client.jks"
# ====================================
# 针对引入的 MQTT 组件的配置
# ====================================
mqtt:
enabled: true
port: 1883
max-message-size: 8192
connect-timeout-seconds: 60
keep-alive-timeout-seconds: 300
ssl-enabled: false
--- #################### 日志相关配置 ####################
@ -113,6 +123,7 @@ logging:
# 开发环境详细日志
cn.iocoder.yudao.module.iot.gateway.protocol.emqx: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG
# 根日志级别
root: INFO