mirror of
https://gitee.com/zhijiantianya/ruoyi-vue-pro.git
synced 2026-03-22 05:07:17 +08:00
feat(iot):【协议改造】优化各种 code review 的代码
This commit is contained in:
@@ -125,7 +125,11 @@ public class IotHttpProtocol implements IotProtocol {
|
||||
this.downstreamSubscriber.start();
|
||||
} catch (Exception e) {
|
||||
log.error("[start][IoT HTTP 协议 {} 启动失败]", getId(), e);
|
||||
// 启动失败时关闭 Vertx
|
||||
// 启动失败时关闭资源
|
||||
if (httpServer != null) {
|
||||
httpServer.close();
|
||||
httpServer = null;
|
||||
}
|
||||
if (vertx != null) {
|
||||
vertx.close();
|
||||
vertx = null;
|
||||
|
||||
@@ -26,13 +26,6 @@ public class IotMqttConfig {
|
||||
@Min(value = 1, message = "连接超时时间不能小于 1 秒")
|
||||
private Integer connectTimeoutSeconds = 60;
|
||||
|
||||
/**
|
||||
* 保持连接超时时间(秒)
|
||||
*/
|
||||
@NotNull(message = "保持连接超时时间不能为空")
|
||||
@Min(value = 1, message = "保持连接超时时间不能小于 1 秒")
|
||||
private Integer keepAliveTimeoutSeconds = 300;
|
||||
|
||||
/**
|
||||
* 是否启用 SSL
|
||||
*/
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttAuthHandler;
|
||||
@@ -15,6 +17,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqt
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttUpstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
|
||||
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.vertx.core.Vertx;
|
||||
@@ -26,13 +29,10 @@ import io.vertx.mqtt.MqttTopicSubscription;
|
||||
import io.vertx.mqtt.messages.MqttPublishMessage;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 协议:接收设备上行消息
|
||||
*
|
||||
@@ -249,11 +249,22 @@ public class IotMqttProtocol implements IotProtocol {
|
||||
// 3.2 设置 QoS 2 消息的 PUBREL 处理器
|
||||
endpoint.publishReleaseHandler(endpoint::publishComplete);
|
||||
|
||||
// 4.1 设置订阅处理器
|
||||
// 4.1 设置订阅处理器(带 ACL 校验)
|
||||
endpoint.subscribeHandler(subscribe -> {
|
||||
List<String> topicNames = convertList(subscribe.topicSubscriptions(), MqttTopicSubscription::topicName);
|
||||
log.debug("[handleEndpoint][设备订阅,客户端 ID: {},主题: {}]", clientId, topicNames);
|
||||
List<MqttQoS> grantedQoSLevels = convertList(subscribe.topicSubscriptions(), MqttTopicSubscription::qualityOfService);
|
||||
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint);
|
||||
List<MqttQoS> grantedQoSLevels = new ArrayList<>();
|
||||
for (MqttTopicSubscription sub : subscribe.topicSubscriptions()) {
|
||||
String topicName = sub.topicName();
|
||||
// 校验主题是否属于当前设备
|
||||
if (connectionInfo != null && IotMqttTopicUtils.isTopicSubscribeAllowed(
|
||||
topicName, connectionInfo.getProductKey(), connectionInfo.getDeviceName())) {
|
||||
grantedQoSLevels.add(sub.qualityOfService());
|
||||
log.debug("[handleEndpoint][订阅成功,客户端 ID: {},主题: {}]", clientId, topicName);
|
||||
} else {
|
||||
log.warn("[handleEndpoint][订阅被拒绝,客户端 ID: {},主题: {}]", clientId, topicName);
|
||||
grantedQoSLevels.add(MqttQoS.FAILURE);
|
||||
}
|
||||
}
|
||||
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels);
|
||||
});
|
||||
// 4.2 设置取消订阅处理器
|
||||
|
||||
@@ -30,7 +30,6 @@ public abstract class IotMqttAbstractHandler {
|
||||
protected final IotMqttConnectionManager connectionManager;
|
||||
protected final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
// done @AI:基于 method 通过 IotMqttTopicUtils.buildTopicByMethod 计算 reply topic
|
||||
/**
|
||||
* 发送成功响应到设备
|
||||
*
|
||||
@@ -43,20 +42,8 @@ public abstract class IotMqttAbstractHandler {
|
||||
*/
|
||||
protected void sendSuccessResponse(MqttEndpoint endpoint, String productKey, String deviceName,
|
||||
String requestId, String method, Object data) {
|
||||
try {
|
||||
// 1. 构建响应消息
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, data, 0, null);
|
||||
|
||||
// 2. 编码消息(使用默认编解码器)
|
||||
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE);
|
||||
|
||||
// 3. 构建响应主题并发送
|
||||
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, productKey, deviceName, true);
|
||||
endpoint.publish(replyTopic, Buffer.buffer(encodedData), MqttQoS.AT_LEAST_ONCE, false, false);
|
||||
log.debug("[sendSuccessResponse][发送成功响应,主题: {}]", replyTopic);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendSuccessResponse][发送成功响应异常,客户端 ID: {}]", endpoint.clientIdentifier(), e);
|
||||
}
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, data, 0, null);
|
||||
writeResponse(endpoint, productKey, deviceName, method, responseMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -72,22 +59,32 @@ public abstract class IotMqttAbstractHandler {
|
||||
*/
|
||||
protected void sendErrorResponse(MqttEndpoint endpoint, String productKey, String deviceName,
|
||||
String requestId, String method, Integer errorCode, String errorMessage) {
|
||||
try {
|
||||
// 1. 构建响应消息
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, errorCode, errorMessage);
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, errorCode, errorMessage);
|
||||
writeResponse(endpoint, productKey, deviceName, method, responseMessage);
|
||||
}
|
||||
|
||||
// 2. 编码消息(使用默认编解码器)
|
||||
/**
|
||||
* 写入响应消息到设备
|
||||
*
|
||||
* @param endpoint MQTT 连接端点
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @param method 方法名
|
||||
* @param responseMessage 响应消息
|
||||
*/
|
||||
private void writeResponse(MqttEndpoint endpoint, String productKey, String deviceName,
|
||||
String method, IotDeviceMessage responseMessage) {
|
||||
try {
|
||||
// 1. 编码消息(使用默认编解码器)
|
||||
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE);
|
||||
|
||||
// 3. 构建响应主题并发送
|
||||
// 2. 构建响应主题,并发送
|
||||
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, productKey, deviceName, true);
|
||||
endpoint.publish(replyTopic, Buffer.buffer(encodedData), MqttQoS.AT_LEAST_ONCE, false, false);
|
||||
log.debug("[sendErrorResponse][发送错误响应,主题: {}]", replyTopic);
|
||||
log.debug("[writeResponse][发送响应,主题: {},code: {}]", replyTopic, responseMessage.getCode());
|
||||
} catch (Exception e) {
|
||||
log.error("[sendErrorResponse][发送错误响应异常,客户端 ID: {}]", endpoint.clientIdentifier(), e);
|
||||
log.error("[writeResponse][发送响应异常,客户端 ID: {}]", endpoint.clientIdentifier(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO @AI:当前 sendSuccessResponse/sendErrorResponse 已足够清晰,暂不抽取 writeResponse(必须抽取!!!)
|
||||
|
||||
}
|
||||
|
||||
@@ -103,7 +103,7 @@ public class IotMqttAuthHandler extends IotMqttAbstractHandler {
|
||||
.setProductKey(device.getProductKey())
|
||||
.setDeviceName(device.getDeviceName())
|
||||
.setRemoteAddress(connectionManager.getEndpointAddress(endpoint));
|
||||
connectionManager.registerConnection(endpoint, device.getId(), connectionInfo);
|
||||
connectionManager.registerConnection(endpoint, connectionInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -77,6 +77,9 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler {
|
||||
// 接受连接,并发送错误响应
|
||||
endpoint.accept(false);
|
||||
sendErrorResponse(endpoint, productKey, deviceName, null, method, 500, e.getMessage());
|
||||
} finally {
|
||||
// 注册完成后关闭连接(一型一密只用于获取 deviceSecret,不保持连接)
|
||||
endpoint.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.common.exception.ServiceException;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
|
||||
@@ -50,12 +49,10 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
|
||||
}
|
||||
// 1.2 解析主题,获取 productKey 和 deviceName
|
||||
String[] topicParts = topic.split("/");
|
||||
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
|
||||
log.warn("[handleBusinessRequest][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic);
|
||||
return;
|
||||
}
|
||||
productKey = topicParts[2];
|
||||
deviceName = topicParts[3];
|
||||
productKey = ArrayUtil.get(topicParts, 2);
|
||||
deviceName = ArrayUtil.get(topicParts, 3);
|
||||
Assert.notBlank(productKey, "产品 Key 不能为空");
|
||||
Assert.notBlank(deviceName, "设备名称不能为空");
|
||||
// 1.3 校验设备信息,防止伪造设备消息
|
||||
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint);
|
||||
Assert.notNull(connectionInfo, "无法获取连接信息");
|
||||
|
||||
@@ -73,25 +73,24 @@ public class IotMqttConnectionManager {
|
||||
* 注册设备连接(包含认证信息)
|
||||
*
|
||||
* @param endpoint MQTT 连接端点
|
||||
* @param deviceId 设备 ID
|
||||
* @param connectionInfo 连接信息
|
||||
*/
|
||||
// TODO @AI:移除掉 deviceId ???参考别的 tcp 等模块协议
|
||||
public void registerConnection(MqttEndpoint endpoint, Long deviceId, ConnectionInfo connectionInfo) {
|
||||
public void registerConnection(MqttEndpoint endpoint, ConnectionInfo connectionInfo) {
|
||||
Long deviceId = connectionInfo.getDeviceId();
|
||||
// 如果设备已有其他连接,先清理旧连接
|
||||
MqttEndpoint oldEndpoint = deviceEndpointMap.get(deviceId);
|
||||
if (oldEndpoint != null && oldEndpoint != endpoint) {
|
||||
log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]",
|
||||
deviceId, getEndpointAddress(oldEndpoint));
|
||||
oldEndpoint.close();
|
||||
// 清理旧连接的映射
|
||||
// 先清理映射,再关闭连接(避免旧连接处理器干扰)
|
||||
connectionMap.remove(oldEndpoint);
|
||||
oldEndpoint.close();
|
||||
}
|
||||
|
||||
// 注册新连接
|
||||
connectionMap.put(endpoint, connectionInfo);
|
||||
deviceEndpointMap.put(deviceId, endpoint);
|
||||
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]",
|
||||
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},productKey: {},deviceName: {}]",
|
||||
deviceId, getEndpointAddress(endpoint), connectionInfo.getProductKey(), connectionInfo.getDeviceName());
|
||||
}
|
||||
|
||||
|
||||
@@ -47,8 +47,8 @@ public class IotTcpConnectionManager {
|
||||
* @param deviceId 设备 ID
|
||||
* @param connectionInfo 连接信息
|
||||
*/
|
||||
public void registerConnection(NetSocket socket, Long deviceId, ConnectionInfo connectionInfo) {
|
||||
// 检查连接数是否已达上限
|
||||
public synchronized void registerConnection(NetSocket socket, Long deviceId, ConnectionInfo connectionInfo) {
|
||||
// 检查连接数是否已达上限(同步方法确保检查和注册的原子性)
|
||||
if (connectionMap.size() >= maxConnections) {
|
||||
throw new IllegalStateException("连接数已达上限: " + maxConnections);
|
||||
}
|
||||
@@ -57,9 +57,9 @@ public class IotTcpConnectionManager {
|
||||
if (oldSocket != null && oldSocket != socket) {
|
||||
log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]",
|
||||
deviceId, oldSocket.remoteAddress());
|
||||
oldSocket.close();
|
||||
// 清理旧连接的映射
|
||||
// 先清理映射,再关闭连接
|
||||
connectionMap.remove(oldSocket);
|
||||
oldSocket.close();
|
||||
}
|
||||
|
||||
// 注册新连接
|
||||
|
||||
@@ -9,6 +9,7 @@ import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
@@ -47,8 +48,8 @@ public class IotUdpSessionManager {
|
||||
* @param deviceId 设备 ID
|
||||
* @param sessionInfo 会话信息
|
||||
*/
|
||||
public void registerSession(Long deviceId, SessionInfo sessionInfo) {
|
||||
// 检查是否为新设备,且会话数已达上限
|
||||
public synchronized void registerSession(Long deviceId, SessionInfo sessionInfo) {
|
||||
// 检查是否为新设备,且会话数已达上限(同步方法确保检查和注册的原子性)
|
||||
if (deviceSessionCache.getIfPresent(deviceId) == null
|
||||
&& deviceSessionCache.size() >= maxSessions) {
|
||||
throw new IllegalStateException("会话数已达上限: " + maxSessions);
|
||||
@@ -113,16 +114,21 @@ public class IotUdpSessionManager {
|
||||
}
|
||||
InetSocketAddress address = sessionInfo.getAddress();
|
||||
try {
|
||||
// 使用 CompletableFuture 同步等待发送结果
|
||||
CompletableFuture<Boolean> future = new CompletableFuture<>();
|
||||
socket.send(Buffer.buffer(data), address.getPort(), address.getHostString(), result -> {
|
||||
if (result.succeeded()) {
|
||||
log.debug("[sendToDevice][发送消息成功,设备 ID: {},地址: {},数据长度: {} 字节]",
|
||||
deviceId, buildAddressKey(address), data.length);
|
||||
return;
|
||||
future.complete(true);
|
||||
} else {
|
||||
log.error("[sendToDevice][发送消息失败,设备 ID: {},地址: {}]",
|
||||
deviceId, buildAddressKey(address), result.cause());
|
||||
future.complete(false);
|
||||
}
|
||||
log.error("[sendToDevice][发送消息失败,设备 ID: {},地址: {}]",
|
||||
deviceId, buildAddressKey(address), result.cause());
|
||||
});
|
||||
return true;
|
||||
// 同步等待结果,超时 5 秒
|
||||
return future.get(5, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendToDevice][发送消息异常,设备 ID: {}]", deviceId, e);
|
||||
return false;
|
||||
|
||||
@@ -63,4 +63,26 @@ public final class IotMqttTopicUtils {
|
||||
return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/" + topicSuffix;
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验主题是否允许订阅
|
||||
* <p>
|
||||
* 规则:主题必须以 /sys/{productKey}/{deviceName}/ 开头,
|
||||
* 或者是通配符形式 /sys/{productKey}/{deviceName}/#
|
||||
*
|
||||
* @param topic 订阅的主题
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @return 是否允许订阅
|
||||
*/
|
||||
public static boolean isTopicSubscribeAllowed(String topic, String productKey, String deviceName) {
|
||||
if (!StrUtil.isAllNotBlank(topic, productKey, deviceName)) {
|
||||
return false;
|
||||
}
|
||||
// 构建设备主题前缀
|
||||
String deviceTopicPrefix = SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/";
|
||||
// 主题必须以设备前缀开头,或者是设备前缀的通配符形式
|
||||
return topic.startsWith(deviceTopicPrefix)
|
||||
|| topic.equals(SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/#");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -124,7 +124,6 @@ yudao:
|
||||
mqtt:
|
||||
max-message-size: 8192 # 最大消息大小(字节)
|
||||
connect-timeout-seconds: 60 # 连接超时时间(秒)
|
||||
keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒)
|
||||
ssl-enabled: false # 是否启用 SSL
|
||||
|
||||
# 协议配置(旧版,保持兼容)
|
||||
|
||||
Reference in New Issue
Block a user