From 44838510c950098e14a86fc47795cba72fe0c0ba Mon Sep 17 00:00:00 2001 From: puhui999 Date: Wed, 21 Jan 2026 18:12:28 +0800 Subject: [PATCH 1/4] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91WebSocket?= =?UTF-8?q?=20=E8=BF=9E=E6=8E=A5=E7=BA=BF=E7=A8=8B=E5=AE=89=E5=85=A8?= =?UTF-8?q?=E4=B8=8E=20JDK=208=20=E5=85=BC=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-dependencies/pom.xml | 15 + yudao-module-iot/yudao-module-iot-biz/pom.xml | 11 + .../iot/dal/redis/RedisKeyConstants.java | 8 + .../redis/rule/IotWebSocketLockRedisDAO.java | 67 +++++ .../action/IotWebSocketDataRuleAction.java | 32 ++- .../action/websocket/IotWebSocketClient.java | 156 ++++++----- .../websocket/IotWebSocketClientTest.java | 257 ++++++++++++++++++ 7 files changed, 480 insertions(+), 66 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index 0257eb3109..3ff1534cee 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -76,6 +76,8 @@ 2.3.0 4.7.9-20251224.161447 4.40.607.ALL + + 4.12.0 @@ -653,6 +655,19 @@ org.eclipse.paho.client.mqttv3 ${mqtt.version} + + + + com.squareup.okhttp3 + okhttp + ${okhttp.version} + + + com.squareup.okhttp3 + mockwebserver + ${okhttp.version} + test + diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml index 1f83a7acb2..a0fe16de48 100644 --- a/yudao-module-iot/yudao-module-iot-biz/pom.xml +++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml @@ -73,6 +73,17 @@ yudao-spring-boot-starter-excel + + + com.squareup.okhttp3 + okhttp + + + com.squareup.okhttp3 + mockwebserver + test + + diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java index c8041a673c..95d210252f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java @@ -84,4 +84,12 @@ public interface RedisKeyConstants { */ String SCENE_RULE_LIST = "iot:scene_rule_list"; + /** + * WebSocket 连接分布式锁 + *

+ * KEY 格式:websocket_connect_lock:${serverUrl} + * 用于保证 WebSocket 重连操作的线程安全 + */ + String WEBSOCKET_CONNECT_LOCK = "iot:websocket_connect_lock:%s"; + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java new file mode 100644 index 0000000000..d50dc548af --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java @@ -0,0 +1,67 @@ +package cn.iocoder.yudao.module.iot.dal.redis.rule; + +import jakarta.annotation.Resource; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.stereotype.Repository; + +import java.util.concurrent.TimeUnit; + +import static cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants.WEBSOCKET_CONNECT_LOCK; + +/** + * IoT WebSocket 连接锁 Redis DAO + *

+ * 用于保证 WebSocket 重连操作的线程安全,避免多线程同时重连导致的资源竞争 + * + * @author HUIHUI + */ +@Repository +public class IotWebSocketLockRedisDAO { + + /** + * 锁等待超时时间(毫秒) + */ + private static final long LOCK_WAIT_TIME_MS = 5000; + + /** + * 锁持有超时时间(毫秒) + */ + private static final long LOCK_LEASE_TIME_MS = 10000; + + @Resource + private RedissonClient redissonClient; + + /** + * 在分布式锁保护下执行操作 + * + * @param serverUrl WebSocket 服务器地址 + * @param runnable 需要执行的操作 + * @throws Exception 如果获取锁超时或执行操作时发生异常 + */ + public void lock(String serverUrl, Runnable runnable) throws Exception { + String lockKey = formatKey(serverUrl); + RLock lock = redissonClient.getLock(lockKey); + + try { + // 尝试获取分布式锁 + boolean acquired = lock.tryLock(LOCK_WAIT_TIME_MS, LOCK_LEASE_TIME_MS, TimeUnit.MILLISECONDS); + if (!acquired) { + throw new RuntimeException("获取 WebSocket 连接锁超时,服务器: " + serverUrl); + } + + // 执行操作 + runnable.run(); + } finally { + // 释放锁 + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + private static String formatKey(String serverUrl) { + return String.format(WEBSOCKET_CONNECT_LOCK, serverUrl); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java index c0445df906..651562987a 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java @@ -3,8 +3,10 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; +import cn.iocoder.yudao.module.iot.dal.redis.rule.IotWebSocketLockRedisDAO; import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum; import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient; +import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -22,6 +24,9 @@ import org.springframework.stereotype.Component; public class IotWebSocketDataRuleAction extends IotDataRuleCacheableAction { + @Resource + private IotWebSocketLockRedisDAO webSocketLockRedisDAO; + @Override public Integer getType() { return IotDataSinkTypeEnum.WEBSOCKET.getType(); @@ -62,12 +67,11 @@ public class IotWebSocketDataRuleAction extends protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception { try { // 1.1 获取或创建 WebSocket 客户端 - // TODO @puhui999:需要加锁,保证必须连接上; IotWebSocketClient webSocketClient = getProducer(config); - // 1.2 检查连接状态,如果断开则重新连接 + + // 1.2 检查连接状态,如果断开则使用分布式锁保证重连的线程安全 if (!webSocketClient.isConnected()) { - log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl()); - webSocketClient.connect(); + reconnectWithLock(webSocketClient, config); } // 2.1 发送消息 @@ -82,4 +86,24 @@ public class IotWebSocketDataRuleAction extends } } + /** + * 使用分布式锁进行重连 + * + * @param webSocketClient WebSocket 客户端 + * @param config 配置信息 + */ + private void reconnectWithLock(IotWebSocketClient webSocketClient, IotDataSinkWebSocketConfig config) throws Exception { + webSocketLockRedisDAO.lock(config.getServerUrl(), () -> { + // 双重检查:获取锁后再次检查连接状态,避免重复连接 + if (!webSocketClient.isConnected()) { + log.warn("[reconnectWithLock][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl()); + try { + webSocketClient.connect(); + } catch (Exception e) { + throw new RuntimeException("WebSocket 重连失败,服务器: " + config.getServerUrl(), e); + } + } + }); + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java index 2f55d6ee74..e898f61cb8 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java @@ -4,13 +4,9 @@ import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; import lombok.extern.slf4j.Slf4j; +import okhttp3.*; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.WebSocket; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -19,21 +15,23 @@ import java.util.concurrent.atomic.AtomicBoolean; *

* 负责与外部 WebSocket 服务器建立连接并发送设备消息 * 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式 - * 基于 Java 11+ 内置的 java.net.http.WebSocket 实现 + * 基于 OkHttp WebSocket 实现,兼容 JDK 8+ + *

+ * 注意:该类的线程安全由调用方(IotWebSocketDataRuleAction)通过分布式锁保证 * * @author HUIHUI */ @Slf4j -public class IotWebSocketClient implements WebSocket.Listener { +public class IotWebSocketClient { private final String serverUrl; private final Integer connectTimeoutMs; private final Integer sendTimeoutMs; private final String dataFormat; - private WebSocket webSocket; + private OkHttpClient okHttpClient; + private volatile WebSocket webSocket; private final AtomicBoolean connected = new AtomicBoolean(false); - private final StringBuilder messageBuffer = new StringBuilder(); public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) { this.serverUrl = serverUrl; @@ -44,8 +42,9 @@ public class IotWebSocketClient implements WebSocket.Listener { /** * 连接到 WebSocket 服务器 + *

+ * 注意:调用方需要通过分布式锁保证并发安全 */ - @SuppressWarnings("resource") public void connect() throws Exception { if (connected.get()) { log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]"); @@ -53,17 +52,32 @@ public class IotWebSocketClient implements WebSocket.Listener { } try { - HttpClient httpClient = HttpClient.newBuilder() - .connectTimeout(Duration.ofMillis(connectTimeoutMs)) + // 创建 OkHttpClient + okHttpClient = new OkHttpClient.Builder() + .connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS) + .readTimeout(sendTimeoutMs, TimeUnit.MILLISECONDS) + .writeTimeout(sendTimeoutMs, TimeUnit.MILLISECONDS) .build(); - CompletableFuture future = httpClient.newWebSocketBuilder() - .connectTimeout(Duration.ofMillis(connectTimeoutMs)) - .buildAsync(URI.create(serverUrl), this); + // 创建 WebSocket 请求 + Request request = new Request.Builder() + .url(serverUrl) + .build(); + + // 使用 CountDownLatch 等待连接完成 + CountDownLatch connectLatch = new CountDownLatch(1); + AtomicBoolean connectSuccess = new AtomicBoolean(false); + + // 创建 WebSocket 连接 + webSocket = okHttpClient.newWebSocket(request, new IotWebSocketListener(connectLatch, connectSuccess)); // 等待连接完成 - webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS); - connected.set(true); + boolean await = connectLatch.await(connectTimeoutMs, TimeUnit.MILLISECONDS); + if (!await || !connectSuccess.get()) { + close(); + throw new Exception("WebSocket 连接超时或失败,服务器地址: " + serverUrl); + } + log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl); } catch (Exception e) { close(); @@ -72,36 +86,6 @@ public class IotWebSocketClient implements WebSocket.Listener { } } - @Override - public void onOpen(WebSocket webSocket) { - log.debug("[onOpen][WebSocket 连接已打开]"); - webSocket.request(1); - } - - @Override - public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { - messageBuffer.append(data); - if (last) { - log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer); - messageBuffer.setLength(0); - } - webSocket.request(1); - return null; - } - - @Override - public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { - connected.set(false); - log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason); - return null; - } - - @Override - public void onError(WebSocket webSocket, Throwable error) { - connected.set(false); - log.error("[onError][WebSocket 发生错误]", error); - } - /** * 发送设备消息 * @@ -109,7 +93,8 @@ public class IotWebSocketClient implements WebSocket.Listener { * @throws Exception 发送异常 */ public void sendMessage(IotDeviceMessage message) throws Exception { - if (!connected.get() || webSocket == null) { + WebSocket ws = this.webSocket; + if (!connected.get() || ws == null) { throw new IllegalStateException("WebSocket 客户端未连接"); } @@ -121,9 +106,11 @@ public class IotWebSocketClient implements WebSocket.Listener { messageData = message.toString(); } - // 发送消息并等待完成 - CompletableFuture future = webSocket.sendText(messageData, true); - future.get(sendTimeoutMs, TimeUnit.MILLISECONDS); + // 发送消息 + boolean success = ws.send(messageData); + if (!success) { + throw new Exception("WebSocket 发送消息失败,消息队列已满或连接已关闭"); + } log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]", message.getDeviceId(), messageData.length()); } catch (Exception e) { @@ -136,18 +123,17 @@ public class IotWebSocketClient implements WebSocket.Listener { * 关闭连接 */ public void close() { - if (!connected.get() && webSocket == null) { - return; - } - try { if (webSocket != null) { - webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭") - .orTimeout(5, TimeUnit.SECONDS) - .exceptionally(e -> { - log.warn("[close][发送关闭帧失败]", e); - return null; - }); + // 发送正常关闭帧,状态码 1000 表示正常关闭 + webSocket.close(1000, "客户端主动关闭"); + webSocket = null; + } + if (okHttpClient != null) { + // 关闭连接池和调度器 + okHttpClient.dispatcher().executorService().shutdown(); + okHttpClient.connectionPool().evictAll(); + okHttpClient = null; } connected.set(false); log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl); @@ -174,4 +160,50 @@ public class IotWebSocketClient implements WebSocket.Listener { '}'; } + /** + * OkHttp WebSocket 监听器 + */ + private class IotWebSocketListener extends WebSocketListener { + + private final CountDownLatch connectLatch; + private final AtomicBoolean connectSuccess; + + public IotWebSocketListener(CountDownLatch connectLatch, AtomicBoolean connectSuccess) { + this.connectLatch = connectLatch; + this.connectSuccess = connectSuccess; + } + + @Override + public void onOpen(WebSocket webSocket, Response response) { + connected.set(true); + connectSuccess.set(true); + connectLatch.countDown(); + log.info("[onOpen][WebSocket 连接已打开,服务器: {}]", serverUrl); + } + + @Override + public void onMessage(WebSocket webSocket, String text) { + log.debug("[onMessage][收到消息: {}]", text); + } + + @Override + public void onClosing(WebSocket webSocket, int code, String reason) { + connected.set(false); + log.info("[onClosing][WebSocket 正在关闭,code: {}, reason: {}]", code, reason); + } + + @Override + public void onClosed(WebSocket webSocket, int code, String reason) { + connected.set(false); + log.info("[onClosed][WebSocket 已关闭,code: {}, reason: {}]", code, reason); + } + + @Override + public void onFailure(WebSocket webSocket, Throwable t, Response response) { + connected.set(false); + connectLatch.countDown(); // 确保连接失败时也释放等待 + log.error("[onFailure][WebSocket 连接失败]", t); + } + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java new file mode 100644 index 0000000000..d3568db8b9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java @@ -0,0 +1,257 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket; + +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import okhttp3.Response; +import okhttp3.WebSocket; +import okhttp3.WebSocketListener; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link IotWebSocketClient} 的单元测试 + * + * @author HUIHUI + */ +class IotWebSocketClientTest { + + private MockWebServer mockWebServer; + + @BeforeEach + public void setUp() throws Exception { + mockWebServer = new MockWebServer(); + mockWebServer.start(); + } + + @AfterEach + public void tearDown() throws Exception { + if (mockWebServer != null) { + mockWebServer.shutdown(); + } + } + + /** + * 简单的 WebSocket 监听器,用于测试 + */ + private static class TestWebSocketListener extends WebSocketListener { + @Override + public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) { + // 连接打开 + } + + @Override + public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) { + // 收到消息 + } + + @Override + public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) { + webSocket.close(code, reason); + } + + @Override + public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) { + // 连接失败 + } + } + + @Test + public void testConstructor_defaultValues() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + + // 调用 + IotWebSocketClient client = new IotWebSocketClient(serverUrl, null, null, null); + + // 断言:验证默认值被正确设置 + assertNotNull(client); + assertFalse(client.isConnected()); + } + + @Test + public void testConstructor_customValues() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + Integer connectTimeoutMs = 3000; + Integer sendTimeoutMs = 5000; + String dataFormat = "TEXT"; + + // 调用 + IotWebSocketClient client = new IotWebSocketClient(serverUrl, connectTimeoutMs, sendTimeoutMs, dataFormat); + + // 断言 + assertNotNull(client); + assertFalse(client.isConnected()); + } + + @Test + public void testConnect_success() throws Exception { + // 准备参数:使用 MockWebServer 的 WebSocket 端点 + String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort(); + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // mock:设置 MockWebServer 响应 WebSocket 升级请求 + mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener())); + + // 调用 + client.connect(); + + // 断言 + assertTrue(client.isConnected()); + + // 清理 + client.close(); + } + + @Test + public void testConnect_alreadyConnected() throws Exception { + // 准备参数 + String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort(); + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // mock + mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener())); + + // 调用:第一次连接 + client.connect(); + assertTrue(client.isConnected()); + + // 调用:第二次连接(应该不会重复连接) + client.connect(); + assertTrue(client.isConnected()); + + // 清理 + client.close(); + } + + @Test + public void testSendMessage_success() throws Exception { + // 准备参数 + String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort(); + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + IotDeviceMessage message = IotDeviceMessage.builder() + .deviceId(123L) + .method("thing.property.report") + .params("{\"temperature\": 25.5}") + .build(); + + // mock + mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener())); + + // 调用 + client.connect(); + client.sendMessage(message); + + // 断言:消息发送成功不抛异常 + assertTrue(client.isConnected()); + + // 清理 + client.close(); + } + + @Test + public void testSendMessage_notConnected() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + IotDeviceMessage message = IotDeviceMessage.builder() + .deviceId(123L) + .method("thing.property.report") + .params("{\"temperature\": 25.5}") + .build(); + + // 调用 & 断言:未连接时发送消息应抛出异常 + assertThrows(IllegalStateException.class, () -> client.sendMessage(message)); + } + + @Test + public void testClose_success() throws Exception { + // 准备参数 + String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort(); + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // mock + mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener())); + + // 调用 + client.connect(); + assertTrue(client.isConnected()); + + client.close(); + + // 断言 + assertFalse(client.isConnected()); + } + + @Test + public void testClose_notConnected() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // 调用:关闭未连接的客户端不应抛异常 + assertDoesNotThrow(client::close); + assertFalse(client.isConnected()); + } + + @Test + public void testIsConnected_initialState() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // 断言:初始状态应为未连接 + assertFalse(client.isConnected()); + } + + @Test + public void testToString() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // 调用 + String result = client.toString(); + + // 断言 + assertNotNull(result); + assertTrue(result.contains("serverUrl='ws://localhost:8080'")); + assertTrue(result.contains("dataFormat='JSON'")); + assertTrue(result.contains("connected=false")); + } + + @Test + public void testSendMessage_textFormat() throws Exception { + // 准备参数 + String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort(); + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "TEXT"); + + IotDeviceMessage message = IotDeviceMessage.builder() + .deviceId(123L) + .method("thing.property.report") + .params("{\"temperature\": 25.5}") + .build(); + + // mock + mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener())); + + // 调用 + client.connect(); + client.sendMessage(message); + + // 断言:消息发送成功不抛异常 + assertTrue(client.isConnected()); + + // 清理 + client.close(); + } + +} From f320569f2c46ebca8ea88c4bbbe23d9b63539e94 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Wed, 21 Jan 2026 21:21:07 +0800 Subject: [PATCH 2/4] =?UTF-8?q?perf=EF=BC=9A=E3=80=90iot=E3=80=91=E4=BC=98?= =?UTF-8?q?=E5=8C=96=20IotTcpClient=20=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/rule/data/action/IotTcpDataRuleAction.java | 1 - .../iot/service/rule/data/action/tcp/IotTcpClient.java | 9 +++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java index 53a3b71480..74385d08dd 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java @@ -43,7 +43,6 @@ public class IotTcpDataRuleAction extends config.getConnectTimeoutMs(), config.getReadTimeoutMs(), config.getSsl(), - config.getSslCertPath(), config.getDataFormat() ); // 2.2 连接服务器 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java index 15b57b5405..faf59d3fbc 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp; +import cn.hutool.core.util.ObjUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig; @@ -31,8 +32,6 @@ public class IotTcpClient { private final Integer connectTimeoutMs; private final Integer readTimeoutMs; private final Boolean ssl; - // TODO @puhui999:sslCertPath 是不是没在用? - private final String sslCertPath; private final String dataFormat; private Socket socket; @@ -41,15 +40,13 @@ public class IotTcpClient { private final AtomicBoolean connected = new AtomicBoolean(false); public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs, - Boolean ssl, String sslCertPath, String dataFormat) { + Boolean ssl, String dataFormat) { this.host = host; this.port = port; this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS; this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS; this.ssl = ssl != null ? ssl : IotDataSinkTcpConfig.DEFAULT_SSL; - this.sslCertPath = sslCertPath; - // TODO @puhui999:可以使用 StrUtil.defaultIfBlank 方法简化 - this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT; + this.dataFormat = ObjUtil.defaultIfBlank(dataFormat, IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT); } /** From 5bc8a4e487be9d63c27c63901975579d8f46fa0b Mon Sep 17 00:00:00 2001 From: puhui999 Date: Wed, 21 Jan 2026 21:37:39 +0800 Subject: [PATCH 3/4] =?UTF-8?q?perf=EF=BC=9A=E3=80=90iot=E3=80=91=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=20IotDeviceMessageUtils.notContainsIdentifier=20?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotDevicePropertyPostTriggerMatcher.java | 3 +- .../data/action/tcp/IotTcpClientTest.java | 151 ++++++++++++++++++ .../iot/core/util/IotDeviceMessageUtils.java | 11 ++ .../core/util/IotDeviceMessageUtilsTest.java | 72 ++++++++- 4 files changed, 233 insertions(+), 4 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java index d653c9c42e..1f019b5761 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java @@ -38,8 +38,7 @@ public class IotDevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerM // 1.3 检查消息中是否包含触发器指定的属性标识符 // 注意:属性上报可能同时上报多个属性,所以需要判断 trigger.getIdentifier() 是否在 message 的 params 中 - // TODO @puhui999:可以考虑 notXXX 方法,简化代码(尽量取反) - if (!IotDeviceMessageUtils.containsIdentifier(message, trigger.getIdentifier())) { + if (IotDeviceMessageUtils.notContainsIdentifier(message, trigger.getIdentifier())) { IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中不包含属性: " + trigger.getIdentifier()); return false; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java new file mode 100644 index 0000000000..cd28f8f54e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java @@ -0,0 +1,151 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp; + +import cn.hutool.core.util.ReflectUtil; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link IotTcpClient} 的单元测试 + *

+ * 测试 dataFormat 默认值行为 + * Property 1: TCP 客户端 dataFormat 默认值行为 + * Validates: Requirements 1.1, 1.2 + * + * @author HUIHUI + */ +class IotTcpClientTest { + + @Test + public void testConstructor_dataFormatNull() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, null); + + // 断言:dataFormat 为 null 时应使用默认值 + assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT, + ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testConstructor_dataFormatEmpty() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, ""); + + // 断言:dataFormat 为空字符串时应使用默认值 + assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT, + ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testConstructor_dataFormatBlank() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, " "); + + // 断言:dataFormat 为纯空白字符串时应使用默认值 + assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT, + ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testConstructor_dataFormatValid() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + String dataFormat = "BINARY"; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, dataFormat); + + // 断言:dataFormat 为有效值时应保持原值 + assertEquals(dataFormat, ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testConstructor_defaultValues() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, null); + + // 断言:验证所有默认值 + assertEquals(host, ReflectUtil.getFieldValue(client, "host")); + assertEquals(port, ReflectUtil.getFieldValue(client, "port")); + assertEquals(IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS, + ReflectUtil.getFieldValue(client, "connectTimeoutMs")); + assertEquals(IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS, + ReflectUtil.getFieldValue(client, "readTimeoutMs")); + assertEquals(IotDataSinkTcpConfig.DEFAULT_SSL, + ReflectUtil.getFieldValue(client, "ssl")); + assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT, + ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testConstructor_customValues() { + // 准备参数 + String host = "192.168.1.100"; + Integer port = 9090; + Integer connectTimeoutMs = 3000; + Integer readTimeoutMs = 8000; + Boolean ssl = true; + String dataFormat = "BINARY"; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, connectTimeoutMs, readTimeoutMs, ssl, dataFormat); + + // 断言:验证自定义值 + assertEquals(host, ReflectUtil.getFieldValue(client, "host")); + assertEquals(port, ReflectUtil.getFieldValue(client, "port")); + assertEquals(connectTimeoutMs, ReflectUtil.getFieldValue(client, "connectTimeoutMs")); + assertEquals(readTimeoutMs, ReflectUtil.getFieldValue(client, "readTimeoutMs")); + assertEquals(ssl, ReflectUtil.getFieldValue(client, "ssl")); + assertEquals(dataFormat, ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testIsConnected_initialState() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, null); + + // 断言:初始状态应为未连接 + assertFalse(client.isConnected()); + } + + @Test + public void testToString() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, null); + String result = client.toString(); + + // 断言 + assertNotNull(result); + assertTrue(result.contains("host='localhost'")); + assertTrue(result.contains("port=8080")); + assertTrue(result.contains("dataFormat='JSON'")); + assertTrue(result.contains("connected=false")); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java index 5c1ac26005..b02a9b4c3a 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java @@ -99,6 +99,17 @@ public class IotDeviceMessageUtils { return false; } + /** + * 判断消息中是否不包含指定的标识符 + * + * @param message 消息 + * @param identifier 要检查的标识符 + * @return 是否不包含 + */ + public static boolean notContainsIdentifier(IotDeviceMessage message, String identifier) { + return !containsIdentifier(message, identifier); + } + /** * 将 params 解析为 Map * diff --git a/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java b/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java index a6d669d170..b0d39be519 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java +++ b/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java @@ -1,13 +1,13 @@ package cn.iocoder.yudao.module.iot.core.util; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.*; /** * {@link IotDeviceMessageUtils} 的单元测试 @@ -138,4 +138,72 @@ public class IotDeviceMessageUtilsTest { Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature"); assertEquals(25.5, result); // 应该返回直接标识符的值 } + + // ========== notContainsIdentifier 测试 ========== + + /** + * 测试 notContainsIdentifier 与 containsIdentifier 的互补性 + * **Property 2: notContainsIdentifier 与 containsIdentifier 互补性** + * **Validates: Requirements 4.1** + */ + @Test + public void testNotContainsIdentifier_complementary_whenContains() { + // 准备参数:消息包含指定标识符 + IotDeviceMessage message = new IotDeviceMessage(); + message.setMethod(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()); + Map params = new HashMap<>(); + params.put("temperature", 25); + message.setParams(params); + String identifier = "temperature"; + + // 调用 & 断言:验证互补性 + boolean containsResult = IotDeviceMessageUtils.containsIdentifier(message, identifier); + boolean notContainsResult = IotDeviceMessageUtils.notContainsIdentifier(message, identifier); + assertTrue(containsResult); + assertFalse(notContainsResult); + assertEquals(!containsResult, notContainsResult); + } + + /** + * 测试 notContainsIdentifier 与 containsIdentifier 的互补性 + * **Property 2: notContainsIdentifier 与 containsIdentifier 互补性** + * **Validates: Requirements 4.1** + */ + @Test + public void testNotContainsIdentifier_complementary_whenNotContains() { + // 准备参数:消息不包含指定标识符 + IotDeviceMessage message = new IotDeviceMessage(); + message.setMethod(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()); + Map params = new HashMap<>(); + params.put("temperature", 25); + message.setParams(params); + String identifier = "humidity"; + + // 调用 & 断言:验证互补性 + boolean containsResult = IotDeviceMessageUtils.containsIdentifier(message, identifier); + boolean notContainsResult = IotDeviceMessageUtils.notContainsIdentifier(message, identifier); + assertFalse(containsResult); + assertTrue(notContainsResult); + assertEquals(!containsResult, notContainsResult); + } + + /** + * 测试 notContainsIdentifier 与 containsIdentifier 的互补性 - 空参数场景 + * **Property 2: notContainsIdentifier 与 containsIdentifier 互补性** + * **Validates: Requirements 4.1** + */ + @Test + public void testNotContainsIdentifier_complementary_nullParams() { + // 准备参数:params 为 null + IotDeviceMessage message = new IotDeviceMessage(); + message.setParams(null); + String identifier = "temperature"; + + // 调用 & 断言:验证互补性 + boolean containsResult = IotDeviceMessageUtils.containsIdentifier(message, identifier); + boolean notContainsResult = IotDeviceMessageUtils.notContainsIdentifier(message, identifier); + assertFalse(containsResult); + assertTrue(notContainsResult); + assertEquals(!containsResult, notContainsResult); + } } From 4ad4fcf6cfca7c3f5911011b858944be28fd725a Mon Sep 17 00:00:00 2001 From: puhui999 Date: Wed, 21 Jan 2026 22:06:02 +0800 Subject: [PATCH 4/4] =?UTF-8?q?perf=EF=BC=9A=E3=80=90iot=E3=80=91=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=20IotDeviceServiceInvokeTriggerMatcher=20=E5=8F=82?= =?UTF-8?q?=E6=95=B0=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotDeviceServiceInvokeTriggerMatcher.java | 54 +++- ...DeviceServiceInvokeTriggerMatcherTest.java | 264 +++++++++++++++++- .../iot/core/util/IotDeviceMessageUtils.java | 38 +++ 3 files changed, 351 insertions(+), 5 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java index b5fa0330dc..ba3190068d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.service.rule.scene.matcher.trigger; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; @@ -8,6 +9,8 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum; import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherHelper; import org.springframework.stereotype.Component; +import java.util.Map; + /** * 设备服务调用触发器匹配器:处理设备服务调用的触发器匹配逻辑 * @@ -42,13 +45,58 @@ public class IotDeviceServiceInvokeTriggerMatcher implements IotSceneRuleTrigger return false; } - // 2. 对于服务调用触发器,通常只需要匹配服务标识符即可 - // 不需要检查操作符和值,因为服务调用本身就是触发条件 - // TODO @puhui999: 服务调用时校验输入参数是否匹配条件? + // 2. 检查是否配置了参数条件 + if (hasParameterCondition(trigger)) { + return matchParameterCondition(message, trigger); + } + + // 3. 无参数条件时,标识符匹配即成功 IotSceneRuleMatcherHelper.logTriggerMatchSuccess(message, trigger); return true; } + /** + * 判断触发器是否配置了参数条件 + * + * @param trigger 触发器配置 + * @return 是否配置了参数条件 + */ + private boolean hasParameterCondition(IotSceneRuleDO.Trigger trigger) { + return StrUtil.isNotBlank(trigger.getOperator()) && StrUtil.isNotBlank(trigger.getValue()); + } + + /** + * 匹配参数条件 + * + * @param message 设备消息 + * @param trigger 触发器配置 + * @return 是否匹配 + */ + private boolean matchParameterCondition(IotDeviceMessage message, IotSceneRuleDO.Trigger trigger) { + // 从消息中提取服务调用的输入参数 + Map inputParams = IotDeviceMessageUtils.extractServiceInputParams(message); + if (inputParams == null) { + IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中缺少服务输入参数"); + return false; + } + + // 获取要匹配的参数值(使用 identifier 作为参数名) + Object paramValue = inputParams.get(trigger.getIdentifier()); + if (paramValue == null) { + IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "服务输入参数中缺少指定参数: " + trigger.getIdentifier()); + return false; + } + + // 使用条件评估器进行匹配 + boolean matched = IotSceneRuleMatcherHelper.evaluateCondition(paramValue, trigger.getOperator(), trigger.getValue()); + if (matched) { + IotSceneRuleMatcherHelper.logTriggerMatchSuccess(message, trigger); + } else { + IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "服务输入参数条件不匹配"); + } + return matched; + } + @Override public int getPriority() { return 40; // 较低优先级 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java index 3d75b19b37..a6b2b0ae0e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java @@ -7,7 +7,6 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO; import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum; import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.HashMap; @@ -23,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.*; * * @author HUIHUI */ -@Disabled // TODO @puhui999:单测有报错,先屏蔽 public class IotDeviceServiceInvokeTriggerMatcherTest extends IotBaseConditionMatcherTest { private IotDeviceServiceInvokeTriggerMatcher matcher; @@ -378,6 +376,268 @@ public class IotDeviceServiceInvokeTriggerMatcherTest extends IotBaseConditionMa assertFalse(result); } + + // ========== 参数条件匹配测试 ========== + + /** + * 测试无参数条件时的匹配逻辑 - 只要标识符匹配就返回 true + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.2** + */ + @Test + public void testMatches_noParameterCondition_success() { + // 准备参数 + String serviceIdentifier = "testService"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("level", 5) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(null); // 无参数条件 + trigger.setValue(null); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertTrue(result); + } + + /** + * 测试有参数条件时的匹配逻辑 - 参数条件匹配成功 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.1** + */ + @Test + public void testMatches_withParameterCondition_greaterThan_success() { + // 准备参数 + String serviceIdentifier = "level"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("level", 5) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 大于操作符 + trigger.setValue("3"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertTrue(result); + } + + /** + * 测试有参数条件时的匹配逻辑 - 参数条件匹配失败 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.1** + */ + @Test + public void testMatches_withParameterCondition_greaterThan_failure() { + // 准备参数 + String serviceIdentifier = "level"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("level", 2) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 大于操作符 + trigger.setValue("3"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertFalse(result); + } + + /** + * 测试有参数条件时的匹配逻辑 - 等于操作符 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.1** + */ + @Test + public void testMatches_withParameterCondition_equals_success() { + // 准备参数 + String serviceIdentifier = "mode"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("mode", "auto") + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator("=="); // 等于操作符 + trigger.setValue("auto"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertTrue(result); + } + + /** + * 测试参数缺失时的处理 - 消息中缺少 inputData + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.3** + */ + @Test + public void testMatches_withParameterCondition_missingInputData() { + // 准备参数 + String serviceIdentifier = "testService"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + // 缺少 inputData 字段 + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 配置了参数条件 + trigger.setValue("3"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertFalse(result); + } + + /** + * 测试参数缺失时的处理 - inputData 中缺少指定参数 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.3** + */ + @Test + public void testMatches_withParameterCondition_missingParam() { + // 准备参数 + String serviceIdentifier = "level"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("otherParam", 5) // 不是 level 参数 + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 配置了参数条件 + trigger.setValue("3"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertFalse(result); + } + + /** + * 测试只有 operator 没有 value 时不触发参数条件匹配 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.2** + */ + @Test + public void testMatches_onlyOperator_noValue() { + // 准备参数 + String serviceIdentifier = "testService"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("level", 5) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 只有 operator + trigger.setValue(null); // 没有 value + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言:只有 operator 没有 value 时,不触发参数条件匹配,标识符匹配即成功 + assertTrue(result); + } + + /** + * 测试只有 value 没有 operator 时不触发参数条件匹配 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.2** + */ + @Test + public void testMatches_onlyValue_noOperator() { + // 准备参数 + String serviceIdentifier = "testService"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("level", 5) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(null); // 没有 operator + trigger.setValue("3"); // 只有 value + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言:只有 value 没有 operator 时,不触发参数条件匹配,标识符匹配即成功 + assertTrue(result); + } + + /** + * 测试使用 inputParams 字段(替代 inputData) + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.1** + */ + @Test + public void testMatches_withInputParams_success() { + // 准备参数 + String serviceIdentifier = "level"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputParams", MapUtil.builder(new HashMap()) // 使用 inputParams 而不是 inputData + .put("level", 5) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 大于操作符 + trigger.setValue("3"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertTrue(result); + } + // ========== 辅助方法 ========== /** diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java index b02a9b4c3a..3def053602 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java @@ -207,6 +207,44 @@ public class IotDeviceMessageUtils { return null; } + /** + * 从服务调用消息中提取输入参数 + *

+ * 服务调用消息的 params 结构通常为: + * { + * "identifier": "serviceIdentifier", + * "inputData": { ... } 或 "inputParams": { ... } + * } + * + * @param message 设备消息 + * @return 输入参数 Map,如果未找到则返回 null + */ + @SuppressWarnings("unchecked") + public static Map extractServiceInputParams(IotDeviceMessage message) { + Object params = message.getParams(); + if (params == null) { + return null; + } + if (!(params instanceof Map)) { + return null; + } + Map paramsMap = (Map) params; + + // 尝试从 inputData 字段获取 + Object inputData = paramsMap.get("inputData"); + if (inputData instanceof Map) { + return (Map) inputData; + } + + // 尝试从 inputParams 字段获取 + Object inputParams = paramsMap.get("inputParams"); + if (inputParams instanceof Map) { + return (Map) inputParams; + } + + return null; + } + // ========== Topic 相关 ========== public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {