feat(iot):支持 MQTT 设备回复消息的 _reply 方法标准化,并精简上行处理器异常处理

This commit is contained in:
YunaiV
2026-02-09 13:10:08 +08:00
parent fcca74ac7d
commit b800d274a4
8 changed files with 68 additions and 40 deletions

View File

@@ -22,6 +22,7 @@ import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import jakarta.annotation.Resource;
import jakarta.annotation.security.PermitAll;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
@@ -51,8 +52,10 @@ public class IoTDeviceApiImpl implements IotDeviceCommonApi {
@Resource
private IotProductService productService;
@Resource
@Lazy // 延迟加载,解决循环依赖
private IotDeviceModbusConfigService modbusConfigService;
@Resource
@Lazy // 延迟加载,解决循环依赖
private IotDeviceModbusPointService modbusPointService;
@Override

View File

@@ -17,6 +17,7 @@ import com.baomidou.dynamic.datasource.annotation.DSTransactional;
import jakarta.annotation.Resource;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
@@ -40,10 +41,11 @@ public class IotProductServiceImpl implements IotProductService {
@Resource
private IotProductMapper productMapper;
@Resource
private IotDevicePropertyService devicePropertyDataService;
@Resource
private IotDeviceService deviceService;
@Resource
@Lazy // 延迟加载,避免循环依赖
private IotDevicePropertyService devicePropertyDataService;
@Override
public Long createProduct(IotProductSaveReqVO createReqVO) {

View File

@@ -51,8 +51,8 @@ public class IotThingModelServiceImpl implements IotThingModelService {
@Resource
@Lazy // 延迟加载,解决循环依赖
private IotProductService productService;
@Resource
@Lazy // 延迟加载,解决循环依赖
private IotDeviceModbusPointService deviceModbusPointService;
@Override

View File

@@ -5,6 +5,7 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
@@ -42,12 +43,14 @@ public class IotEmqxUpstreamHandler {
return;
}
// 2. 反序列化消息
// 2.1 反序列化消息
IotDeviceMessage message = deviceMessageService.deserializeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[handle][topic({}) payload({}) 消息解码失败]", topic, new String(payload));
return;
}
// 2.2 标准化回复消息的 methodMQTT 协议中,设备回复消息的 method 会携带 _reply 后缀)
IotMqttTopicUtils.normalizeReplyMethod(message);
// 3. 发送消息到队列
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);

View File

@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ArrayUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
@@ -10,8 +9,6 @@ import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.vertx.mqtt.MqttEndpoint;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IoT 网关 MQTT 上行消息处理器:处理业务消息(属性上报、事件上报等)
@@ -39,10 +36,6 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
*/
public void handleBusinessRequest(MqttEndpoint endpoint, String topic, byte[] payload) {
String clientId = endpoint.clientIdentifier();
IotDeviceMessage message = null;
String productKey = null;
String deviceName = null;
try {
// 1.1 基础检查
if (ArrayUtil.isEmpty(payload)) {
@@ -50,8 +43,8 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
}
// 1.2 解析主题,获取 productKey 和 deviceName
String[] topicParts = topic.split("/");
productKey = ArrayUtil.get(topicParts, 2);
deviceName = ArrayUtil.get(topicParts, 3);
String productKey = ArrayUtil.get(topicParts, 2);
String deviceName = ArrayUtil.get(topicParts, 3);
Assert.notBlank(productKey, "产品 Key 不能为空");
Assert.notBlank(deviceName, "设备名称不能为空");
// 1.3 校验设备信息,防止伪造设备消息
@@ -65,38 +58,21 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
return;
}
// 2. 反序列化消息
message = deviceMessageService.deserializeDeviceMessage(payload, productKey, deviceName);
// 2.1 反序列化消息
IotDeviceMessage message = deviceMessageService.deserializeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[handleBusinessRequest][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
sendErrorResponse(endpoint, productKey, deviceName, null, null,
BAD_REQUEST.getCode(), "消息解码失败");
return;
}
// 2.2 标准化回复消息的 methodMQTT 协议中,设备回复消息的 method 会携带 _reply 后缀)
IotMqttTopicUtils.normalizeReplyMethod(message);
// 3. 处理业务消息
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
log.debug("[handleBusinessRequest][消息处理成功,客户端 ID: {},主题: {}]", clientId, topic);
} catch (ServiceException e) {
log.warn("[handleBusinessRequest][业务异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(endpoint, productKey, deviceName, requestId, method, e.getCode(), e.getMessage());
} catch (IllegalArgumentException e) {
log.warn("[handleBusinessRequest][参数校验失败,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(endpoint, productKey, deviceName, requestId, method,
BAD_REQUEST.getCode(), e.getMessage());
} catch (Exception e) {
log.error("[handleBusinessRequest][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(endpoint, productKey, deviceName, requestId, method,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
}
}

View File

@@ -1,6 +1,8 @@
package cn.iocoder.yudao.module.iot.gateway.util;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
/**
* IoT 网关 MQTT 主题工具类
@@ -44,6 +46,32 @@ public final class IotMqttTopicUtils {
*/
public static final String MQTT_ACL_PATH = "/mqtt/acl";
// ========== 消息方法标准化 ==========
/**
* 标准化设备回复消息的 method
* <p>
* MQTT 协议中设备回复下行指令时topic 和 method 会携带 _reply 后缀
* (如 thing.service.invoke_reply。平台内部统一使用基础 method如 thing.service.invoke
* 通过 {@link IotDeviceMessage#getCode()} 非空来识别回复消息。
* <p>
* 此方法剥离 _reply 后缀,并确保 code 字段被设置。
*
* @param message 设备消息
*/
public static void normalizeReplyMethod(IotDeviceMessage message) {
String method = message.getMethod();
if (!StrUtil.endWith(method, REPLY_TOPIC_SUFFIX)) {
return;
}
// 1. 剥离 _reply 后缀
message.setMethod(method.substring(0, method.length() - REPLY_TOPIC_SUFFIX.length()));
// 2. 确保 code 被设置,使 isReplyMessage() 能正确识别
if (message.getCode() == null) {
message.setCode(GlobalErrorCodeConstants.SUCCESS.getCode());
}
}
// ========== 工具方法 ==========
/**

View File

@@ -239,12 +239,28 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
log.info("[testSubscribe][连接认证成功]");
try {
// 2. 设置消息处理器
client.publishHandler(message -> log.info("[testSubscribe][收到消息: topic={}, payload={}]",
message.topicName(), message.payload().toString()));
// 2. 设置消息处理器:收到属性设置时,回复 _reply 消息
client.publishHandler(message -> {
log.info("[testSubscribe][收到消息: topic={}, payload={}]",
message.topicName(), message.payload().toString());
// 收到属性设置消息时,回复 _reply
if (message.topicName().endsWith("/thing/property/set")) {
try {
IotDeviceMessage received = SERIALIZER.deserialize(message.payload().getBytes());
IotDeviceMessage reply = IotDeviceMessage.replyOf(
received.getRequestId(), "thing.property.set_reply", null, 0, null);
String replyTopic = String.format("/sys/%s/%s/thing/property/set_reply", PRODUCT_KEY, DEVICE_NAME);
byte[] replyPayload = SERIALIZER.serialize(reply);
client.publish(replyTopic, Buffer.buffer(replyPayload), MqttQoS.AT_LEAST_ONCE, false, false);
log.info("[testSubscribe][已回复属性设置: topic={}]", replyTopic);
} catch (Exception e) {
log.error("[testSubscribe][回复属性设置异常]", e);
}
}
});
// 3. 订阅下行主题
String topic = String.format("/sys/%s/%s/thing/service/#", PRODUCT_KEY, DEVICE_NAME);
// 3. 订阅下行主题(属性设置 + 服务调用)
String topic = String.format("/sys/%s/%s/#", PRODUCT_KEY, DEVICE_NAME);
log.info("[testSubscribe][订阅主题: {}]", topic);
subscribe(client, topic);
log.info("[testSubscribe][订阅成功,等待下行消息... (30秒后自动断开)]");

View File

@@ -60,7 +60,7 @@ public class MpAccountServiceImpl implements MpAccountService {
private MpAccountMapper mpAccountMapper;
@Resource
@Lazy // 延迟加载,解决循环依赖的问题
@Lazy // 延迟加载,解决循环依赖
private MpServiceFactory mpServiceFactory;
@Override