From 7ec541e5bbb07c3a08fd78ce5b3af6ec056d1ec2 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Sun, 25 Jan 2026 18:24:43 +0800 Subject: [PATCH] =?UTF-8?q?perf(iot):=E3=80=90=E5=9C=BA=E6=99=AF=E8=81=94?= =?UTF-8?q?=E5=8A=A8=E3=80=91WebSocket=20=E9=87=8D=E8=BF=9E=E9=94=81?= =?UTF-8?q?=E4=BB=8E=20Redisson=20=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81?= =?UTF-8?q?=E6=94=B9=E4=B8=BA=20ReentrantLock=20=E5=8D=95=E6=9C=BA?= =?UTF-8?q?=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-dependencies/pom.xml | 1 - .../redis/rule/IotWebSocketLockRedisDAO.java | 67 ------------------- .../action/IotWebSocketDataRuleAction.java | 44 ++++++++---- 3 files changed, 32 insertions(+), 80 deletions(-) delete mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index 3ff1534cee..ae72b994ad 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -76,7 +76,6 @@ 2.3.0 4.7.9-20251224.161447 4.40.607.ALL - 4.12.0 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java deleted file mode 100644 index d50dc548af..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java +++ /dev/null @@ -1,67 +0,0 @@ -package cn.iocoder.yudao.module.iot.dal.redis.rule; - -import jakarta.annotation.Resource; -import org.redisson.api.RLock; -import org.redisson.api.RedissonClient; -import org.springframework.stereotype.Repository; - -import java.util.concurrent.TimeUnit; - -import static cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants.WEBSOCKET_CONNECT_LOCK; - -/** - * IoT WebSocket 连接锁 Redis DAO - *

- * 用于保证 WebSocket 重连操作的线程安全,避免多线程同时重连导致的资源竞争 - * - * @author HUIHUI - */ -@Repository -public class IotWebSocketLockRedisDAO { - - /** - * 锁等待超时时间(毫秒) - */ - private static final long LOCK_WAIT_TIME_MS = 5000; - - /** - * 锁持有超时时间(毫秒) - */ - private static final long LOCK_LEASE_TIME_MS = 10000; - - @Resource - private RedissonClient redissonClient; - - /** - * 在分布式锁保护下执行操作 - * - * @param serverUrl WebSocket 服务器地址 - * @param runnable 需要执行的操作 - * @throws Exception 如果获取锁超时或执行操作时发生异常 - */ - public void lock(String serverUrl, Runnable runnable) throws Exception { - String lockKey = formatKey(serverUrl); - RLock lock = redissonClient.getLock(lockKey); - - try { - // 尝试获取分布式锁 - boolean acquired = lock.tryLock(LOCK_WAIT_TIME_MS, LOCK_LEASE_TIME_MS, TimeUnit.MILLISECONDS); - if (!acquired) { - throw new RuntimeException("获取 WebSocket 连接锁超时,服务器: " + serverUrl); - } - - // 执行操作 - runnable.run(); - } finally { - // 释放锁 - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - private static String formatKey(String serverUrl) { - return String.format(WEBSOCKET_CONNECT_LOCK, serverUrl); - } - -} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java index 651562987a..ebfe1f8c10 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java @@ -3,13 +3,15 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; -import cn.iocoder.yudao.module.iot.dal.redis.rule.IotWebSocketLockRedisDAO; import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum; import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + /** * WebSocket 的 {@link IotDataRuleAction} 实现类 *

@@ -24,8 +26,17 @@ import org.springframework.stereotype.Component; public class IotWebSocketDataRuleAction extends IotDataRuleCacheableAction { - @Resource - private IotWebSocketLockRedisDAO webSocketLockRedisDAO; + /** + * 锁等待超时时间(毫秒) + */ + private static final long LOCK_WAIT_TIME_MS = 5000; + + /** + * 重连锁,key 为 WebSocket 服务器地址 + *

+ * WebSocket 连接是与特定服务器实例绑定的,使用单机锁即可保证重连的线程安全 + */ + private final ConcurrentHashMap reconnectLocks = new ConcurrentHashMap<>(); @Override public Integer getType() { @@ -87,23 +98,32 @@ public class IotWebSocketDataRuleAction extends } /** - * 使用分布式锁进行重连 + * 使用锁进行重连,保证同一服务器地址的重连操作线程安全 * * @param webSocketClient WebSocket 客户端 * @param config 配置信息 */ private void reconnectWithLock(IotWebSocketClient webSocketClient, IotDataSinkWebSocketConfig config) throws Exception { - webSocketLockRedisDAO.lock(config.getServerUrl(), () -> { + ReentrantLock lock = reconnectLocks.computeIfAbsent(config.getServerUrl(), k -> new ReentrantLock()); + boolean acquired = false; + try { + acquired = lock.tryLock(LOCK_WAIT_TIME_MS, TimeUnit.MILLISECONDS); + if (!acquired) { + throw new RuntimeException("获取 WebSocket 重连锁超时,服务器: " + config.getServerUrl()); + } // 双重检查:获取锁后再次检查连接状态,避免重复连接 if (!webSocketClient.isConnected()) { log.warn("[reconnectWithLock][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl()); - try { - webSocketClient.connect(); - } catch (Exception e) { - throw new RuntimeException("WebSocket 重连失败,服务器: " + config.getServerUrl(), e); - } + webSocketClient.connect(); } - }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("获取 WebSocket 重连锁被中断,服务器: " + config.getServerUrl(), e); + } finally { + if (acquired && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } } }