diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java
index c8041a673c..95d210252f 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java
@@ -84,4 +84,12 @@ public interface RedisKeyConstants {
*/
String SCENE_RULE_LIST = "iot:scene_rule_list";
+ /**
+ * WebSocket 连接分布式锁
+ *
+ * KEY 格式:websocket_connect_lock:${serverUrl}
+ * 用于保证 WebSocket 重连操作的线程安全
+ */
+ String WEBSOCKET_CONNECT_LOCK = "iot:websocket_connect_lock:%s";
+
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java
new file mode 100644
index 0000000000..d50dc548af
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java
@@ -0,0 +1,67 @@
+package cn.iocoder.yudao.module.iot.dal.redis.rule;
+
+import jakarta.annotation.Resource;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import org.springframework.stereotype.Repository;
+
+import java.util.concurrent.TimeUnit;
+
+import static cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants.WEBSOCKET_CONNECT_LOCK;
+
+/**
+ * IoT WebSocket 连接锁 Redis DAO
+ *
+ * 用于保证 WebSocket 重连操作的线程安全,避免多线程同时重连导致的资源竞争
+ *
+ * @author HUIHUI
+ */
+@Repository
+public class IotWebSocketLockRedisDAO {
+
+ /**
+ * 锁等待超时时间(毫秒)
+ */
+ private static final long LOCK_WAIT_TIME_MS = 5000;
+
+ /**
+ * 锁持有超时时间(毫秒)
+ */
+ private static final long LOCK_LEASE_TIME_MS = 10000;
+
+ @Resource
+ private RedissonClient redissonClient;
+
+ /**
+ * 在分布式锁保护下执行操作
+ *
+ * @param serverUrl WebSocket 服务器地址
+ * @param runnable 需要执行的操作
+ * @throws Exception 如果获取锁超时或执行操作时发生异常
+ */
+ public void lock(String serverUrl, Runnable runnable) throws Exception {
+ String lockKey = formatKey(serverUrl);
+ RLock lock = redissonClient.getLock(lockKey);
+
+ try {
+ // 尝试获取分布式锁
+ boolean acquired = lock.tryLock(LOCK_WAIT_TIME_MS, LOCK_LEASE_TIME_MS, TimeUnit.MILLISECONDS);
+ if (!acquired) {
+ throw new RuntimeException("获取 WebSocket 连接锁超时,服务器: " + serverUrl);
+ }
+
+ // 执行操作
+ runnable.run();
+ } finally {
+ // 释放锁
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ private static String formatKey(String serverUrl) {
+ return String.format(WEBSOCKET_CONNECT_LOCK, serverUrl);
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java
index 53a3b71480..74385d08dd 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java
@@ -43,7 +43,6 @@ public class IotTcpDataRuleAction extends
config.getConnectTimeoutMs(),
config.getReadTimeoutMs(),
config.getSsl(),
- config.getSslCertPath(),
config.getDataFormat()
);
// 2.2 连接服务器
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java
index c0445df906..651562987a 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java
@@ -3,8 +3,10 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
+import cn.iocoder.yudao.module.iot.dal.redis.rule.IotWebSocketLockRedisDAO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient;
+import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -22,6 +24,9 @@ import org.springframework.stereotype.Component;
public class IotWebSocketDataRuleAction extends
IotDataRuleCacheableAction {
+ @Resource
+ private IotWebSocketLockRedisDAO webSocketLockRedisDAO;
+
@Override
public Integer getType() {
return IotDataSinkTypeEnum.WEBSOCKET.getType();
@@ -62,12 +67,11 @@ public class IotWebSocketDataRuleAction extends
protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception {
try {
// 1.1 获取或创建 WebSocket 客户端
- // TODO @puhui999:需要加锁,保证必须连接上;
IotWebSocketClient webSocketClient = getProducer(config);
- // 1.2 检查连接状态,如果断开则重新连接
+
+ // 1.2 检查连接状态,如果断开则使用分布式锁保证重连的线程安全
if (!webSocketClient.isConnected()) {
- log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
- webSocketClient.connect();
+ reconnectWithLock(webSocketClient, config);
}
// 2.1 发送消息
@@ -82,4 +86,24 @@ public class IotWebSocketDataRuleAction extends
}
}
+ /**
+ * 使用分布式锁进行重连
+ *
+ * @param webSocketClient WebSocket 客户端
+ * @param config 配置信息
+ */
+ private void reconnectWithLock(IotWebSocketClient webSocketClient, IotDataSinkWebSocketConfig config) throws Exception {
+ webSocketLockRedisDAO.lock(config.getServerUrl(), () -> {
+ // 双重检查:获取锁后再次检查连接状态,避免重复连接
+ if (!webSocketClient.isConnected()) {
+ log.warn("[reconnectWithLock][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
+ try {
+ webSocketClient.connect();
+ } catch (Exception e) {
+ throw new RuntimeException("WebSocket 重连失败,服务器: " + config.getServerUrl(), e);
+ }
+ }
+ });
+ }
+
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java
index 15b57b5405..faf59d3fbc 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java
@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp;
+import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig;
@@ -31,8 +32,6 @@ public class IotTcpClient {
private final Integer connectTimeoutMs;
private final Integer readTimeoutMs;
private final Boolean ssl;
- // TODO @puhui999:sslCertPath 是不是没在用?
- private final String sslCertPath;
private final String dataFormat;
private Socket socket;
@@ -41,15 +40,13 @@ public class IotTcpClient {
private final AtomicBoolean connected = new AtomicBoolean(false);
public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs,
- Boolean ssl, String sslCertPath, String dataFormat) {
+ Boolean ssl, String dataFormat) {
this.host = host;
this.port = port;
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS;
this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS;
this.ssl = ssl != null ? ssl : IotDataSinkTcpConfig.DEFAULT_SSL;
- this.sslCertPath = sslCertPath;
- // TODO @puhui999:可以使用 StrUtil.defaultIfBlank 方法简化
- this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT;
+ this.dataFormat = ObjUtil.defaultIfBlank(dataFormat, IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT);
}
/**
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java
index 2f55d6ee74..e898f61cb8 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java
@@ -4,13 +4,9 @@ import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
import lombok.extern.slf4j.Slf4j;
+import okhttp3.*;
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.WebSocket;
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -19,21 +15,23 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* 负责与外部 WebSocket 服务器建立连接并发送设备消息
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
- * 基于 Java 11+ 内置的 java.net.http.WebSocket 实现
+ * 基于 OkHttp WebSocket 实现,兼容 JDK 8+
+ *
+ * 注意:该类的线程安全由调用方(IotWebSocketDataRuleAction)通过分布式锁保证
*
* @author HUIHUI
*/
@Slf4j
-public class IotWebSocketClient implements WebSocket.Listener {
+public class IotWebSocketClient {
private final String serverUrl;
private final Integer connectTimeoutMs;
private final Integer sendTimeoutMs;
private final String dataFormat;
- private WebSocket webSocket;
+ private OkHttpClient okHttpClient;
+ private volatile WebSocket webSocket;
private final AtomicBoolean connected = new AtomicBoolean(false);
- private final StringBuilder messageBuffer = new StringBuilder();
public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) {
this.serverUrl = serverUrl;
@@ -44,8 +42,9 @@ public class IotWebSocketClient implements WebSocket.Listener {
/**
* 连接到 WebSocket 服务器
+ *
+ * 注意:调用方需要通过分布式锁保证并发安全
*/
- @SuppressWarnings("resource")
public void connect() throws Exception {
if (connected.get()) {
log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]");
@@ -53,17 +52,32 @@ public class IotWebSocketClient implements WebSocket.Listener {
}
try {
- HttpClient httpClient = HttpClient.newBuilder()
- .connectTimeout(Duration.ofMillis(connectTimeoutMs))
+ // 创建 OkHttpClient
+ okHttpClient = new OkHttpClient.Builder()
+ .connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
+ .readTimeout(sendTimeoutMs, TimeUnit.MILLISECONDS)
+ .writeTimeout(sendTimeoutMs, TimeUnit.MILLISECONDS)
.build();
- CompletableFuture future = httpClient.newWebSocketBuilder()
- .connectTimeout(Duration.ofMillis(connectTimeoutMs))
- .buildAsync(URI.create(serverUrl), this);
+ // 创建 WebSocket 请求
+ Request request = new Request.Builder()
+ .url(serverUrl)
+ .build();
+
+ // 使用 CountDownLatch 等待连接完成
+ CountDownLatch connectLatch = new CountDownLatch(1);
+ AtomicBoolean connectSuccess = new AtomicBoolean(false);
+
+ // 创建 WebSocket 连接
+ webSocket = okHttpClient.newWebSocket(request, new IotWebSocketListener(connectLatch, connectSuccess));
// 等待连接完成
- webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
- connected.set(true);
+ boolean await = connectLatch.await(connectTimeoutMs, TimeUnit.MILLISECONDS);
+ if (!await || !connectSuccess.get()) {
+ close();
+ throw new Exception("WebSocket 连接超时或失败,服务器地址: " + serverUrl);
+ }
+
log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl);
} catch (Exception e) {
close();
@@ -72,36 +86,6 @@ public class IotWebSocketClient implements WebSocket.Listener {
}
}
- @Override
- public void onOpen(WebSocket webSocket) {
- log.debug("[onOpen][WebSocket 连接已打开]");
- webSocket.request(1);
- }
-
- @Override
- public CompletionStage> onText(WebSocket webSocket, CharSequence data, boolean last) {
- messageBuffer.append(data);
- if (last) {
- log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer);
- messageBuffer.setLength(0);
- }
- webSocket.request(1);
- return null;
- }
-
- @Override
- public CompletionStage> onClose(WebSocket webSocket, int statusCode, String reason) {
- connected.set(false);
- log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason);
- return null;
- }
-
- @Override
- public void onError(WebSocket webSocket, Throwable error) {
- connected.set(false);
- log.error("[onError][WebSocket 发生错误]", error);
- }
-
/**
* 发送设备消息
*
@@ -109,7 +93,8 @@ public class IotWebSocketClient implements WebSocket.Listener {
* @throws Exception 发送异常
*/
public void sendMessage(IotDeviceMessage message) throws Exception {
- if (!connected.get() || webSocket == null) {
+ WebSocket ws = this.webSocket;
+ if (!connected.get() || ws == null) {
throw new IllegalStateException("WebSocket 客户端未连接");
}
@@ -121,9 +106,11 @@ public class IotWebSocketClient implements WebSocket.Listener {
messageData = message.toString();
}
- // 发送消息并等待完成
- CompletableFuture future = webSocket.sendText(messageData, true);
- future.get(sendTimeoutMs, TimeUnit.MILLISECONDS);
+ // 发送消息
+ boolean success = ws.send(messageData);
+ if (!success) {
+ throw new Exception("WebSocket 发送消息失败,消息队列已满或连接已关闭");
+ }
log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
message.getDeviceId(), messageData.length());
} catch (Exception e) {
@@ -136,18 +123,17 @@ public class IotWebSocketClient implements WebSocket.Listener {
* 关闭连接
*/
public void close() {
- if (!connected.get() && webSocket == null) {
- return;
- }
-
try {
if (webSocket != null) {
- webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭")
- .orTimeout(5, TimeUnit.SECONDS)
- .exceptionally(e -> {
- log.warn("[close][发送关闭帧失败]", e);
- return null;
- });
+ // 发送正常关闭帧,状态码 1000 表示正常关闭
+ webSocket.close(1000, "客户端主动关闭");
+ webSocket = null;
+ }
+ if (okHttpClient != null) {
+ // 关闭连接池和调度器
+ okHttpClient.dispatcher().executorService().shutdown();
+ okHttpClient.connectionPool().evictAll();
+ okHttpClient = null;
}
connected.set(false);
log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl);
@@ -174,4 +160,50 @@ public class IotWebSocketClient implements WebSocket.Listener {
'}';
}
+ /**
+ * OkHttp WebSocket 监听器
+ */
+ private class IotWebSocketListener extends WebSocketListener {
+
+ private final CountDownLatch connectLatch;
+ private final AtomicBoolean connectSuccess;
+
+ public IotWebSocketListener(CountDownLatch connectLatch, AtomicBoolean connectSuccess) {
+ this.connectLatch = connectLatch;
+ this.connectSuccess = connectSuccess;
+ }
+
+ @Override
+ public void onOpen(WebSocket webSocket, Response response) {
+ connected.set(true);
+ connectSuccess.set(true);
+ connectLatch.countDown();
+ log.info("[onOpen][WebSocket 连接已打开,服务器: {}]", serverUrl);
+ }
+
+ @Override
+ public void onMessage(WebSocket webSocket, String text) {
+ log.debug("[onMessage][收到消息: {}]", text);
+ }
+
+ @Override
+ public void onClosing(WebSocket webSocket, int code, String reason) {
+ connected.set(false);
+ log.info("[onClosing][WebSocket 正在关闭,code: {}, reason: {}]", code, reason);
+ }
+
+ @Override
+ public void onClosed(WebSocket webSocket, int code, String reason) {
+ connected.set(false);
+ log.info("[onClosed][WebSocket 已关闭,code: {}, reason: {}]", code, reason);
+ }
+
+ @Override
+ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
+ connected.set(false);
+ connectLatch.countDown(); // 确保连接失败时也释放等待
+ log.error("[onFailure][WebSocket 连接失败]", t);
+ }
+ }
+
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java
index d653c9c42e..1f019b5761 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java
@@ -38,8 +38,7 @@ public class IotDevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerM
// 1.3 检查消息中是否包含触发器指定的属性标识符
// 注意:属性上报可能同时上报多个属性,所以需要判断 trigger.getIdentifier() 是否在 message 的 params 中
- // TODO @puhui999:可以考虑 notXXX 方法,简化代码(尽量取反)
- if (!IotDeviceMessageUtils.containsIdentifier(message, trigger.getIdentifier())) {
+ if (IotDeviceMessageUtils.notContainsIdentifier(message, trigger.getIdentifier())) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中不包含属性: " +
trigger.getIdentifier());
return false;
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java
index b5fa0330dc..ba3190068d 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java
@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.service.rule.scene.matcher.trigger;
+import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
@@ -8,6 +9,8 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherHelper;
import org.springframework.stereotype.Component;
+import java.util.Map;
+
/**
* 设备服务调用触发器匹配器:处理设备服务调用的触发器匹配逻辑
*
@@ -42,13 +45,58 @@ public class IotDeviceServiceInvokeTriggerMatcher implements IotSceneRuleTrigger
return false;
}
- // 2. 对于服务调用触发器,通常只需要匹配服务标识符即可
- // 不需要检查操作符和值,因为服务调用本身就是触发条件
- // TODO @puhui999: 服务调用时校验输入参数是否匹配条件?
+ // 2. 检查是否配置了参数条件
+ if (hasParameterCondition(trigger)) {
+ return matchParameterCondition(message, trigger);
+ }
+
+ // 3. 无参数条件时,标识符匹配即成功
IotSceneRuleMatcherHelper.logTriggerMatchSuccess(message, trigger);
return true;
}
+ /**
+ * 判断触发器是否配置了参数条件
+ *
+ * @param trigger 触发器配置
+ * @return 是否配置了参数条件
+ */
+ private boolean hasParameterCondition(IotSceneRuleDO.Trigger trigger) {
+ return StrUtil.isNotBlank(trigger.getOperator()) && StrUtil.isNotBlank(trigger.getValue());
+ }
+
+ /**
+ * 匹配参数条件
+ *
+ * @param message 设备消息
+ * @param trigger 触发器配置
+ * @return 是否匹配
+ */
+ private boolean matchParameterCondition(IotDeviceMessage message, IotSceneRuleDO.Trigger trigger) {
+ // 从消息中提取服务调用的输入参数
+ Map inputParams = IotDeviceMessageUtils.extractServiceInputParams(message);
+ if (inputParams == null) {
+ IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中缺少服务输入参数");
+ return false;
+ }
+
+ // 获取要匹配的参数值(使用 identifier 作为参数名)
+ Object paramValue = inputParams.get(trigger.getIdentifier());
+ if (paramValue == null) {
+ IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "服务输入参数中缺少指定参数: " + trigger.getIdentifier());
+ return false;
+ }
+
+ // 使用条件评估器进行匹配
+ boolean matched = IotSceneRuleMatcherHelper.evaluateCondition(paramValue, trigger.getOperator(), trigger.getValue());
+ if (matched) {
+ IotSceneRuleMatcherHelper.logTriggerMatchSuccess(message, trigger);
+ } else {
+ IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "服务输入参数条件不匹配");
+ }
+ return matched;
+ }
+
@Override
public int getPriority() {
return 40; // 较低优先级
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java
new file mode 100644
index 0000000000..cd28f8f54e
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java
@@ -0,0 +1,151 @@
+package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp;
+
+import cn.hutool.core.util.ReflectUtil;
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * {@link IotTcpClient} 的单元测试
+ *
+ * 测试 dataFormat 默认值行为
+ * Property 1: TCP 客户端 dataFormat 默认值行为
+ * Validates: Requirements 1.1, 1.2
+ *
+ * @author HUIHUI
+ */
+class IotTcpClientTest {
+
+ @Test
+ public void testConstructor_dataFormatNull() {
+ // 准备参数
+ String host = "localhost";
+ Integer port = 8080;
+
+ // 调用
+ IotTcpClient client = new IotTcpClient(host, port, null, null, null, null);
+
+ // 断言:dataFormat 为 null 时应使用默认值
+ assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT,
+ ReflectUtil.getFieldValue(client, "dataFormat"));
+ }
+
+ @Test
+ public void testConstructor_dataFormatEmpty() {
+ // 准备参数
+ String host = "localhost";
+ Integer port = 8080;
+
+ // 调用
+ IotTcpClient client = new IotTcpClient(host, port, null, null, null, "");
+
+ // 断言:dataFormat 为空字符串时应使用默认值
+ assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT,
+ ReflectUtil.getFieldValue(client, "dataFormat"));
+ }
+
+ @Test
+ public void testConstructor_dataFormatBlank() {
+ // 准备参数
+ String host = "localhost";
+ Integer port = 8080;
+
+ // 调用
+ IotTcpClient client = new IotTcpClient(host, port, null, null, null, " ");
+
+ // 断言:dataFormat 为纯空白字符串时应使用默认值
+ assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT,
+ ReflectUtil.getFieldValue(client, "dataFormat"));
+ }
+
+ @Test
+ public void testConstructor_dataFormatValid() {
+ // 准备参数
+ String host = "localhost";
+ Integer port = 8080;
+ String dataFormat = "BINARY";
+
+ // 调用
+ IotTcpClient client = new IotTcpClient(host, port, null, null, null, dataFormat);
+
+ // 断言:dataFormat 为有效值时应保持原值
+ assertEquals(dataFormat, ReflectUtil.getFieldValue(client, "dataFormat"));
+ }
+
+ @Test
+ public void testConstructor_defaultValues() {
+ // 准备参数
+ String host = "localhost";
+ Integer port = 8080;
+
+ // 调用
+ IotTcpClient client = new IotTcpClient(host, port, null, null, null, null);
+
+ // 断言:验证所有默认值
+ assertEquals(host, ReflectUtil.getFieldValue(client, "host"));
+ assertEquals(port, ReflectUtil.getFieldValue(client, "port"));
+ assertEquals(IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS,
+ ReflectUtil.getFieldValue(client, "connectTimeoutMs"));
+ assertEquals(IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS,
+ ReflectUtil.getFieldValue(client, "readTimeoutMs"));
+ assertEquals(IotDataSinkTcpConfig.DEFAULT_SSL,
+ ReflectUtil.getFieldValue(client, "ssl"));
+ assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT,
+ ReflectUtil.getFieldValue(client, "dataFormat"));
+ }
+
+ @Test
+ public void testConstructor_customValues() {
+ // 准备参数
+ String host = "192.168.1.100";
+ Integer port = 9090;
+ Integer connectTimeoutMs = 3000;
+ Integer readTimeoutMs = 8000;
+ Boolean ssl = true;
+ String dataFormat = "BINARY";
+
+ // 调用
+ IotTcpClient client = new IotTcpClient(host, port, connectTimeoutMs, readTimeoutMs, ssl, dataFormat);
+
+ // 断言:验证自定义值
+ assertEquals(host, ReflectUtil.getFieldValue(client, "host"));
+ assertEquals(port, ReflectUtil.getFieldValue(client, "port"));
+ assertEquals(connectTimeoutMs, ReflectUtil.getFieldValue(client, "connectTimeoutMs"));
+ assertEquals(readTimeoutMs, ReflectUtil.getFieldValue(client, "readTimeoutMs"));
+ assertEquals(ssl, ReflectUtil.getFieldValue(client, "ssl"));
+ assertEquals(dataFormat, ReflectUtil.getFieldValue(client, "dataFormat"));
+ }
+
+ @Test
+ public void testIsConnected_initialState() {
+ // 准备参数
+ String host = "localhost";
+ Integer port = 8080;
+
+ // 调用
+ IotTcpClient client = new IotTcpClient(host, port, null, null, null, null);
+
+ // 断言:初始状态应为未连接
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testToString() {
+ // 准备参数
+ String host = "localhost";
+ Integer port = 8080;
+
+ // 调用
+ IotTcpClient client = new IotTcpClient(host, port, null, null, null, null);
+ String result = client.toString();
+
+ // 断言
+ assertNotNull(result);
+ assertTrue(result.contains("host='localhost'"));
+ assertTrue(result.contains("port=8080"));
+ assertTrue(result.contains("dataFormat='JSON'"));
+ assertTrue(result.contains("connected=false"));
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java
new file mode 100644
index 0000000000..d3568db8b9
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java
@@ -0,0 +1,257 @@
+package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket;
+
+import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
+import okhttp3.Response;
+import okhttp3.WebSocket;
+import okhttp3.WebSocketListener;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * {@link IotWebSocketClient} 的单元测试
+ *
+ * @author HUIHUI
+ */
+class IotWebSocketClientTest {
+
+ private MockWebServer mockWebServer;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ mockWebServer = new MockWebServer();
+ mockWebServer.start();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (mockWebServer != null) {
+ mockWebServer.shutdown();
+ }
+ }
+
+ /**
+ * 简单的 WebSocket 监听器,用于测试
+ */
+ private static class TestWebSocketListener extends WebSocketListener {
+ @Override
+ public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
+ // 连接打开
+ }
+
+ @Override
+ public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
+ // 收到消息
+ }
+
+ @Override
+ public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
+ webSocket.close(code, reason);
+ }
+
+ @Override
+ public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
+ // 连接失败
+ }
+ }
+
+ @Test
+ public void testConstructor_defaultValues() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+
+ // 调用
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, null, null, null);
+
+ // 断言:验证默认值被正确设置
+ assertNotNull(client);
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testConstructor_customValues() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+ Integer connectTimeoutMs = 3000;
+ Integer sendTimeoutMs = 5000;
+ String dataFormat = "TEXT";
+
+ // 调用
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, connectTimeoutMs, sendTimeoutMs, dataFormat);
+
+ // 断言
+ assertNotNull(client);
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testConnect_success() throws Exception {
+ // 准备参数:使用 MockWebServer 的 WebSocket 端点
+ String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // mock:设置 MockWebServer 响应 WebSocket 升级请求
+ mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
+
+ // 调用
+ client.connect();
+
+ // 断言
+ assertTrue(client.isConnected());
+
+ // 清理
+ client.close();
+ }
+
+ @Test
+ public void testConnect_alreadyConnected() throws Exception {
+ // 准备参数
+ String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // mock
+ mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
+
+ // 调用:第一次连接
+ client.connect();
+ assertTrue(client.isConnected());
+
+ // 调用:第二次连接(应该不会重复连接)
+ client.connect();
+ assertTrue(client.isConnected());
+
+ // 清理
+ client.close();
+ }
+
+ @Test
+ public void testSendMessage_success() throws Exception {
+ // 准备参数
+ String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ IotDeviceMessage message = IotDeviceMessage.builder()
+ .deviceId(123L)
+ .method("thing.property.report")
+ .params("{\"temperature\": 25.5}")
+ .build();
+
+ // mock
+ mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
+
+ // 调用
+ client.connect();
+ client.sendMessage(message);
+
+ // 断言:消息发送成功不抛异常
+ assertTrue(client.isConnected());
+
+ // 清理
+ client.close();
+ }
+
+ @Test
+ public void testSendMessage_notConnected() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ IotDeviceMessage message = IotDeviceMessage.builder()
+ .deviceId(123L)
+ .method("thing.property.report")
+ .params("{\"temperature\": 25.5}")
+ .build();
+
+ // 调用 & 断言:未连接时发送消息应抛出异常
+ assertThrows(IllegalStateException.class, () -> client.sendMessage(message));
+ }
+
+ @Test
+ public void testClose_success() throws Exception {
+ // 准备参数
+ String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // mock
+ mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
+
+ // 调用
+ client.connect();
+ assertTrue(client.isConnected());
+
+ client.close();
+
+ // 断言
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testClose_notConnected() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // 调用:关闭未连接的客户端不应抛异常
+ assertDoesNotThrow(client::close);
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testIsConnected_initialState() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // 断言:初始状态应为未连接
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testToString() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // 调用
+ String result = client.toString();
+
+ // 断言
+ assertNotNull(result);
+ assertTrue(result.contains("serverUrl='ws://localhost:8080'"));
+ assertTrue(result.contains("dataFormat='JSON'"));
+ assertTrue(result.contains("connected=false"));
+ }
+
+ @Test
+ public void testSendMessage_textFormat() throws Exception {
+ // 准备参数
+ String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "TEXT");
+
+ IotDeviceMessage message = IotDeviceMessage.builder()
+ .deviceId(123L)
+ .method("thing.property.report")
+ .params("{\"temperature\": 25.5}")
+ .build();
+
+ // mock
+ mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
+
+ // 调用
+ client.connect();
+ client.sendMessage(message);
+
+ // 断言:消息发送成功不抛异常
+ assertTrue(client.isConnected());
+
+ // 清理
+ client.close();
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java
index 3d75b19b37..a6b2b0ae0e 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java
@@ -7,7 +7,6 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
@@ -23,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.*;
*
* @author HUIHUI
*/
-@Disabled // TODO @puhui999:单测有报错,先屏蔽
public class IotDeviceServiceInvokeTriggerMatcherTest extends IotBaseConditionMatcherTest {
private IotDeviceServiceInvokeTriggerMatcher matcher;
@@ -378,6 +376,268 @@ public class IotDeviceServiceInvokeTriggerMatcherTest extends IotBaseConditionMa
assertFalse(result);
}
+
+ // ========== 参数条件匹配测试 ==========
+
+ /**
+ * 测试无参数条件时的匹配逻辑 - 只要标识符匹配就返回 true
+ * **Property 4: 服务调用触发器参数匹配逻辑**
+ * **Validates: Requirements 5.2**
+ */
+ @Test
+ public void testMatches_noParameterCondition_success() {
+ // 准备参数
+ String serviceIdentifier = "testService";
+ Map serviceParams = MapUtil.builder(new HashMap())
+ .put("identifier", serviceIdentifier)
+ .put("inputData", MapUtil.builder(new HashMap())
+ .put("level", 5)
+ .build())
+ .build();
+ IotDeviceMessage message = createServiceInvokeMessage(serviceParams);
+ IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger();
+ trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType());
+ trigger.setIdentifier(serviceIdentifier);
+ trigger.setOperator(null); // 无参数条件
+ trigger.setValue(null);
+
+ // 调用
+ boolean result = matcher.matches(message, trigger);
+
+ // 断言
+ assertTrue(result);
+ }
+
+ /**
+ * 测试有参数条件时的匹配逻辑 - 参数条件匹配成功
+ * **Property 4: 服务调用触发器参数匹配逻辑**
+ * **Validates: Requirements 5.1**
+ */
+ @Test
+ public void testMatches_withParameterCondition_greaterThan_success() {
+ // 准备参数
+ String serviceIdentifier = "level";
+ Map serviceParams = MapUtil.builder(new HashMap())
+ .put("identifier", serviceIdentifier)
+ .put("inputData", MapUtil.builder(new HashMap())
+ .put("level", 5)
+ .build())
+ .build();
+ IotDeviceMessage message = createServiceInvokeMessage(serviceParams);
+ IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger();
+ trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType());
+ trigger.setIdentifier(serviceIdentifier);
+ trigger.setOperator(">"); // 大于操作符
+ trigger.setValue("3");
+
+ // 调用
+ boolean result = matcher.matches(message, trigger);
+
+ // 断言
+ assertTrue(result);
+ }
+
+ /**
+ * 测试有参数条件时的匹配逻辑 - 参数条件匹配失败
+ * **Property 4: 服务调用触发器参数匹配逻辑**
+ * **Validates: Requirements 5.1**
+ */
+ @Test
+ public void testMatches_withParameterCondition_greaterThan_failure() {
+ // 准备参数
+ String serviceIdentifier = "level";
+ Map serviceParams = MapUtil.builder(new HashMap())
+ .put("identifier", serviceIdentifier)
+ .put("inputData", MapUtil.builder(new HashMap())
+ .put("level", 2)
+ .build())
+ .build();
+ IotDeviceMessage message = createServiceInvokeMessage(serviceParams);
+ IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger();
+ trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType());
+ trigger.setIdentifier(serviceIdentifier);
+ trigger.setOperator(">"); // 大于操作符
+ trigger.setValue("3");
+
+ // 调用
+ boolean result = matcher.matches(message, trigger);
+
+ // 断言
+ assertFalse(result);
+ }
+
+ /**
+ * 测试有参数条件时的匹配逻辑 - 等于操作符
+ * **Property 4: 服务调用触发器参数匹配逻辑**
+ * **Validates: Requirements 5.1**
+ */
+ @Test
+ public void testMatches_withParameterCondition_equals_success() {
+ // 准备参数
+ String serviceIdentifier = "mode";
+ Map serviceParams = MapUtil.builder(new HashMap())
+ .put("identifier", serviceIdentifier)
+ .put("inputData", MapUtil.builder(new HashMap())
+ .put("mode", "auto")
+ .build())
+ .build();
+ IotDeviceMessage message = createServiceInvokeMessage(serviceParams);
+ IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger();
+ trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType());
+ trigger.setIdentifier(serviceIdentifier);
+ trigger.setOperator("=="); // 等于操作符
+ trigger.setValue("auto");
+
+ // 调用
+ boolean result = matcher.matches(message, trigger);
+
+ // 断言
+ assertTrue(result);
+ }
+
+ /**
+ * 测试参数缺失时的处理 - 消息中缺少 inputData
+ * **Property 4: 服务调用触发器参数匹配逻辑**
+ * **Validates: Requirements 5.3**
+ */
+ @Test
+ public void testMatches_withParameterCondition_missingInputData() {
+ // 准备参数
+ String serviceIdentifier = "testService";
+ Map serviceParams = MapUtil.builder(new HashMap())
+ .put("identifier", serviceIdentifier)
+ // 缺少 inputData 字段
+ .build();
+ IotDeviceMessage message = createServiceInvokeMessage(serviceParams);
+ IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger();
+ trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType());
+ trigger.setIdentifier(serviceIdentifier);
+ trigger.setOperator(">"); // 配置了参数条件
+ trigger.setValue("3");
+
+ // 调用
+ boolean result = matcher.matches(message, trigger);
+
+ // 断言
+ assertFalse(result);
+ }
+
+ /**
+ * 测试参数缺失时的处理 - inputData 中缺少指定参数
+ * **Property 4: 服务调用触发器参数匹配逻辑**
+ * **Validates: Requirements 5.3**
+ */
+ @Test
+ public void testMatches_withParameterCondition_missingParam() {
+ // 准备参数
+ String serviceIdentifier = "level";
+ Map serviceParams = MapUtil.builder(new HashMap())
+ .put("identifier", serviceIdentifier)
+ .put("inputData", MapUtil.builder(new HashMap())
+ .put("otherParam", 5) // 不是 level 参数
+ .build())
+ .build();
+ IotDeviceMessage message = createServiceInvokeMessage(serviceParams);
+ IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger();
+ trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType());
+ trigger.setIdentifier(serviceIdentifier);
+ trigger.setOperator(">"); // 配置了参数条件
+ trigger.setValue("3");
+
+ // 调用
+ boolean result = matcher.matches(message, trigger);
+
+ // 断言
+ assertFalse(result);
+ }
+
+ /**
+ * 测试只有 operator 没有 value 时不触发参数条件匹配
+ * **Property 4: 服务调用触发器参数匹配逻辑**
+ * **Validates: Requirements 5.2**
+ */
+ @Test
+ public void testMatches_onlyOperator_noValue() {
+ // 准备参数
+ String serviceIdentifier = "testService";
+ Map serviceParams = MapUtil.builder(new HashMap())
+ .put("identifier", serviceIdentifier)
+ .put("inputData", MapUtil.builder(new HashMap())
+ .put("level", 5)
+ .build())
+ .build();
+ IotDeviceMessage message = createServiceInvokeMessage(serviceParams);
+ IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger();
+ trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType());
+ trigger.setIdentifier(serviceIdentifier);
+ trigger.setOperator(">"); // 只有 operator
+ trigger.setValue(null); // 没有 value
+
+ // 调用
+ boolean result = matcher.matches(message, trigger);
+
+ // 断言:只有 operator 没有 value 时,不触发参数条件匹配,标识符匹配即成功
+ assertTrue(result);
+ }
+
+ /**
+ * 测试只有 value 没有 operator 时不触发参数条件匹配
+ * **Property 4: 服务调用触发器参数匹配逻辑**
+ * **Validates: Requirements 5.2**
+ */
+ @Test
+ public void testMatches_onlyValue_noOperator() {
+ // 准备参数
+ String serviceIdentifier = "testService";
+ Map serviceParams = MapUtil.builder(new HashMap())
+ .put("identifier", serviceIdentifier)
+ .put("inputData", MapUtil.builder(new HashMap())
+ .put("level", 5)
+ .build())
+ .build();
+ IotDeviceMessage message = createServiceInvokeMessage(serviceParams);
+ IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger();
+ trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType());
+ trigger.setIdentifier(serviceIdentifier);
+ trigger.setOperator(null); // 没有 operator
+ trigger.setValue("3"); // 只有 value
+
+ // 调用
+ boolean result = matcher.matches(message, trigger);
+
+ // 断言:只有 value 没有 operator 时,不触发参数条件匹配,标识符匹配即成功
+ assertTrue(result);
+ }
+
+ /**
+ * 测试使用 inputParams 字段(替代 inputData)
+ * **Property 4: 服务调用触发器参数匹配逻辑**
+ * **Validates: Requirements 5.1**
+ */
+ @Test
+ public void testMatches_withInputParams_success() {
+ // 准备参数
+ String serviceIdentifier = "level";
+ Map serviceParams = MapUtil.builder(new HashMap())
+ .put("identifier", serviceIdentifier)
+ .put("inputParams", MapUtil.builder(new HashMap()) // 使用 inputParams 而不是 inputData
+ .put("level", 5)
+ .build())
+ .build();
+ IotDeviceMessage message = createServiceInvokeMessage(serviceParams);
+ IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger();
+ trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType());
+ trigger.setIdentifier(serviceIdentifier);
+ trigger.setOperator(">"); // 大于操作符
+ trigger.setValue("3");
+
+ // 调用
+ boolean result = matcher.matches(message, trigger);
+
+ // 断言
+ assertTrue(result);
+ }
+
// ========== 辅助方法 ==========
/**
diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java
index 5c1ac26005..3def053602 100644
--- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java
+++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java
@@ -99,6 +99,17 @@ public class IotDeviceMessageUtils {
return false;
}
+ /**
+ * 判断消息中是否不包含指定的标识符
+ *
+ * @param message 消息
+ * @param identifier 要检查的标识符
+ * @return 是否不包含
+ */
+ public static boolean notContainsIdentifier(IotDeviceMessage message, String identifier) {
+ return !containsIdentifier(message, identifier);
+ }
+
/**
* 将 params 解析为 Map
*
@@ -196,6 +207,44 @@ public class IotDeviceMessageUtils {
return null;
}
+ /**
+ * 从服务调用消息中提取输入参数
+ *
+ * 服务调用消息的 params 结构通常为:
+ * {
+ * "identifier": "serviceIdentifier",
+ * "inputData": { ... } 或 "inputParams": { ... }
+ * }
+ *
+ * @param message 设备消息
+ * @return 输入参数 Map,如果未找到则返回 null
+ */
+ @SuppressWarnings("unchecked")
+ public static Map extractServiceInputParams(IotDeviceMessage message) {
+ Object params = message.getParams();
+ if (params == null) {
+ return null;
+ }
+ if (!(params instanceof Map)) {
+ return null;
+ }
+ Map paramsMap = (Map) params;
+
+ // 尝试从 inputData 字段获取
+ Object inputData = paramsMap.get("inputData");
+ if (inputData instanceof Map) {
+ return (Map) inputData;
+ }
+
+ // 尝试从 inputParams 字段获取
+ Object inputParams = paramsMap.get("inputParams");
+ if (inputParams instanceof Map) {
+ return (Map) inputParams;
+ }
+
+ return null;
+ }
+
// ========== Topic 相关 ==========
public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {
diff --git a/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java b/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java
index a6d669d170..b0d39be519 100644
--- a/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java
+++ b/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java
@@ -1,13 +1,13 @@
package cn.iocoder.yudao.module.iot.core.util;
+import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.*;
/**
* {@link IotDeviceMessageUtils} 的单元测试
@@ -138,4 +138,72 @@ public class IotDeviceMessageUtilsTest {
Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature");
assertEquals(25.5, result); // 应该返回直接标识符的值
}
+
+ // ========== notContainsIdentifier 测试 ==========
+
+ /**
+ * 测试 notContainsIdentifier 与 containsIdentifier 的互补性
+ * **Property 2: notContainsIdentifier 与 containsIdentifier 互补性**
+ * **Validates: Requirements 4.1**
+ */
+ @Test
+ public void testNotContainsIdentifier_complementary_whenContains() {
+ // 准备参数:消息包含指定标识符
+ IotDeviceMessage message = new IotDeviceMessage();
+ message.setMethod(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod());
+ Map params = new HashMap<>();
+ params.put("temperature", 25);
+ message.setParams(params);
+ String identifier = "temperature";
+
+ // 调用 & 断言:验证互补性
+ boolean containsResult = IotDeviceMessageUtils.containsIdentifier(message, identifier);
+ boolean notContainsResult = IotDeviceMessageUtils.notContainsIdentifier(message, identifier);
+ assertTrue(containsResult);
+ assertFalse(notContainsResult);
+ assertEquals(!containsResult, notContainsResult);
+ }
+
+ /**
+ * 测试 notContainsIdentifier 与 containsIdentifier 的互补性
+ * **Property 2: notContainsIdentifier 与 containsIdentifier 互补性**
+ * **Validates: Requirements 4.1**
+ */
+ @Test
+ public void testNotContainsIdentifier_complementary_whenNotContains() {
+ // 准备参数:消息不包含指定标识符
+ IotDeviceMessage message = new IotDeviceMessage();
+ message.setMethod(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod());
+ Map params = new HashMap<>();
+ params.put("temperature", 25);
+ message.setParams(params);
+ String identifier = "humidity";
+
+ // 调用 & 断言:验证互补性
+ boolean containsResult = IotDeviceMessageUtils.containsIdentifier(message, identifier);
+ boolean notContainsResult = IotDeviceMessageUtils.notContainsIdentifier(message, identifier);
+ assertFalse(containsResult);
+ assertTrue(notContainsResult);
+ assertEquals(!containsResult, notContainsResult);
+ }
+
+ /**
+ * 测试 notContainsIdentifier 与 containsIdentifier 的互补性 - 空参数场景
+ * **Property 2: notContainsIdentifier 与 containsIdentifier 互补性**
+ * **Validates: Requirements 4.1**
+ */
+ @Test
+ public void testNotContainsIdentifier_complementary_nullParams() {
+ // 准备参数:params 为 null
+ IotDeviceMessage message = new IotDeviceMessage();
+ message.setParams(null);
+ String identifier = "temperature";
+
+ // 调用 & 断言:验证互补性
+ boolean containsResult = IotDeviceMessageUtils.containsIdentifier(message, identifier);
+ boolean notContainsResult = IotDeviceMessageUtils.notContainsIdentifier(message, identifier);
+ assertFalse(containsResult);
+ assertTrue(notContainsResult);
+ assertEquals(!containsResult, notContainsResult);
+ }
}