From fcca74ac7d1335b805a07f191cefa43445306eda Mon Sep 17 00:00:00 2001 From: YunaiV Date: Mon, 9 Feb 2026 08:02:41 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88iot=EF=BC=89=EF=BC=9A=E7=BB=9F?= =?UTF-8?q?=E4=B8=80=E4=BC=98=E5=8C=96=E7=BD=91=E5=85=B3=E5=8D=8F=E8=AE=AE?= =?UTF-8?q?=E7=9A=84=E8=B5=84=E6=BA=90=E6=B8=85=E7=90=86=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=8C=E4=B8=BB=E8=A6=81=E6=98=AF=20stop0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotDeviceModbusConfigServiceImpl.java | 1 - .../protocol/coap/IotCoapProtocol.java | 32 +++++++------ .../protocol/emqx/IotEmqxProtocol.java | 23 ++++----- .../protocol/http/IotHttpProtocol.java | 22 ++++----- .../common/utils/IotModbusCommonUtils.java | 5 +- .../tcpmaster/IotModbusTcpMasterProtocol.java | 36 +++++++------- .../IotModbusTcpMasterConnectionManager.java | 17 +++++-- .../tcpslave/IotModbusTcpSlaveProtocol.java | 47 +++++++------------ .../IotModbusTcpSlaveConnectionManager.java | 11 +++-- .../IotModbusTcpSlavePollScheduler.java | 2 + .../protocol/mqtt/IotMqttProtocol.java | 43 +++++++++-------- .../manager/IotMqttConnectionManager.java | 20 ++++++++ .../gateway/protocol/tcp/IotTcpProtocol.java | 43 +++++++++-------- .../tcp/manager/IotTcpConnectionManager.java | 20 ++++++++ .../gateway/protocol/udp/IotUdpProtocol.java | 38 +++++++-------- .../websocket/IotWebSocketProtocol.java | 45 +++++++++--------- .../IotWebSocketConnectionManager.java | 20 ++++++++ .../src/main/resources/application.yaml | 4 +- 18 files changed, 244 insertions(+), 185 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigServiceImpl.java index 265d9770f5..a97bcb14d2 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigServiceImpl.java @@ -70,7 +70,6 @@ public class IotDeviceModbusConfigServiceImpl implements IotDeviceModbusConfigSe return modbusConfigMapper.selectList(listReqDTO); } - // TODO @AI:还是希望在 IotDeviceModbusConfigSaveReqVO 里,通过 validator 校验?!或者通过 group 来处理? private void validateModbusConfigByProtocolType(IotDeviceModbusConfigSaveReqVO saveReqVO, String protocolType) { IotProtocolTypeEnum protocolTypeEnum = IotProtocolTypeEnum.of(protocolType); if (protocolTypeEnum == null) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java index 14fe10dcd8..4bc8cdbe28 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java @@ -64,17 +64,13 @@ public class IotCoapProtocol implements IotProtocol { /** * 下行消息订阅者 */ - private final IotCoapDownstreamSubscriber downstreamSubscriber; + private IotCoapDownstreamSubscriber downstreamSubscriber; public IotCoapProtocol(ProtocolProperties properties) { IotCoapConfig coapConfig = properties.getCoap(); Assert.notNull(coapConfig, "CoAP 协议配置(coap)不能为空"); this.properties = properties; this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); - - // 初始化下行消息订阅者 - IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - this.downstreamSubscriber = new IotCoapDownstreamSubscriber(this, messageBus); } @Override @@ -94,9 +90,9 @@ public class IotCoapProtocol implements IotProtocol { return; } - IotCoapConfig coapConfig = properties.getCoap(); try { // 1.1 创建 CoAP 配置 + IotCoapConfig coapConfig = properties.getCoap(); Configuration config = Configuration.createStandardWithoutFile(); config.set(CoapConfig.COAP_PORT, properties.getPort()); config.set(CoapConfig.MAX_MESSAGE_SIZE, coapConfig.getMaxMessageSize()); @@ -131,13 +127,12 @@ public class IotCoapProtocol implements IotProtocol { getId(), properties.getPort(), serverId); // 4. 启动下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + this.downstreamSubscriber = new IotCoapDownstreamSubscriber(this, messageBus); this.downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT CoAP 协议 {} 启动失败]", getId(), e); - if (coapServer != null) { - coapServer.destroy(); - coapServer = null; - } + stop0(); throw e; } } @@ -147,12 +142,19 @@ public class IotCoapProtocol implements IotProtocol { if (!running) { return; } + stop0(); + } + + private void stop0() { // 1. 停止下行消息订阅者 - try { - downstreamSubscriber.stop(); - log.info("[stop][IoT CoAP 协议 {} 下行消息订阅者已停止]", getId()); - } catch (Exception e) { - log.error("[stop][IoT CoAP 协议 {} 下行消息订阅者停止失败]", getId(), e); + if (downstreamSubscriber != null) { + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT CoAP 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT CoAP 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + downstreamSubscriber = null; } // 2. 关闭 CoAP 服务器 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java index a9e251736f..f110f64b4d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java @@ -90,7 +90,7 @@ public class IotEmqxProtocol implements IotProtocol { /** * 下行消息订阅者 */ - private final IotEmqxDownstreamSubscriber downstreamSubscriber; + private IotEmqxDownstreamSubscriber downstreamSubscriber; public IotEmqxProtocol(ProtocolProperties properties) { Assert.notNull(properties, "协议实例配置不能为空"); @@ -101,10 +101,6 @@ public class IotEmqxProtocol implements IotProtocol { "MQTT 连接超时时间(emqx.connect-timeout-seconds)不能为空"); this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); this.upstreamHandler = new IotEmqxUpstreamHandler(serverId); - - // 初始化下行消息订阅者 - IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - this.downstreamSubscriber = new IotEmqxDownstreamSubscriber(this, messageBus); } @Override @@ -124,7 +120,7 @@ public class IotEmqxProtocol implements IotProtocol { return; } - // 1.1 创建 Vertx 实例 + // 1.1 创建 Vertx 实例 和 下行消息订阅者 this.vertx = Vertx.vertx(); try { @@ -138,6 +134,8 @@ public class IotEmqxProtocol implements IotProtocol { getId(), properties.getPort(), serverId); // 2. 启动下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + this.downstreamSubscriber = new IotEmqxDownstreamSubscriber(this, messageBus); this.downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT EMQX 协议 {} 启动失败]", getId(), e); @@ -157,11 +155,14 @@ public class IotEmqxProtocol implements IotProtocol { private void stop0() { // 1. 停止下行消息订阅者 - try { - downstreamSubscriber.stop(); - log.info("[stop][IoT EMQX 协议 {} 下行消息订阅者已停止]", getId()); - } catch (Exception e) { - log.error("[stop][IoT EMQX 协议 {} 下行消息订阅者停止失败]", getId(), e); + if (downstreamSubscriber != null) { + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT EMQX 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT EMQX 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + downstreamSubscriber = null; } // 2.1 先置为 false:避免 closeHandler 触发重连 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java index 2f92419161..f6c9bdc900 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java @@ -64,10 +64,6 @@ public class IotHttpProtocol implements IotProtocol { public IotHttpProtocol(ProtocolProperties properties) { this.properties = properties; this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); - - // 初始化下行消息订阅者 - IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus); } @Override @@ -87,7 +83,7 @@ public class IotHttpProtocol implements IotProtocol { return; } - // 1.1 创建 Vertx 实例(每个 Protocol 独立管理) + // 1.1 创建 Vertx 实例 this.vertx = Vertx.vertx(); // 1.2 创建路由 @@ -123,18 +119,12 @@ public class IotHttpProtocol implements IotProtocol { getId(), properties.getPort(), serverId); // 2. 启动下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus); this.downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT HTTP 协议 {} 启动失败]", getId(), e); - // 启动失败时关闭资源 - if (httpServer != null) { - httpServer.close(); - httpServer = null; - } - if (vertx != null) { - vertx.close(); - vertx = null; - } + stop0(); throw e; } } @@ -144,6 +134,10 @@ public class IotHttpProtocol implements IotProtocol { if (!running) { return; } + stop0(); + } + + private void stop0() { // 1. 停止下行消息订阅者 if (downstreamSubscriber != null) { try { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java index 32e7292964..28889c321a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java @@ -381,8 +381,11 @@ public class IotModbusCommonUtils { case UINT16: return new int[]{rawValue.intValue() & 0xFFFF}; case INT32: - case UINT32: return encodeInt32(rawValue.intValue(), byteOrder); + case UINT32: + // 使用 longValue() 避免超过 Integer.MAX_VALUE 时溢出, + // 强转 int 保留低 32 位 bit pattern,写入寄存器的字节是正确的无符号值 + return encodeInt32((int) rawValue.longValue(), byteOrder); case FLOAT: return encodeFloat(rawValue.floatValue(), byteOrder); case DOUBLE: diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java index 1b284e4e4c..31271b6350 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java @@ -65,7 +65,7 @@ public class IotModbusTcpMasterProtocol implements IotProtocol { /** * 下行消息订阅者 */ - private final IotModbusTcpMasterDownstreamSubscriber downstreamSubscriber; + private IotModbusTcpMasterDownstreamSubscriber downstreamSubscriber; private final IotModbusTcpMasterConfigCacheService configCacheService; private final IotModbusTcpMasterPollScheduler pollScheduler; @@ -89,15 +89,9 @@ public class IotModbusTcpMasterProtocol implements IotProtocol { // 初始化 Handler IotModbusTcpMasterUpstreamHandler upstreamHandler = new IotModbusTcpMasterUpstreamHandler(messageService, serverId); - IotModbusTcpMasterDownstreamHandler downstreamHandler = new IotModbusTcpMasterDownstreamHandler(connectionManager, - configCacheService); // 初始化轮询调度器 this.pollScheduler = new IotModbusTcpMasterPollScheduler(vertx, connectionManager, upstreamHandler, configCacheService); - - // 初始化下行消息订阅者 - IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - this.downstreamSubscriber = new IotModbusTcpMasterDownstreamSubscriber(this, downstreamHandler, messageBus); } @Override @@ -130,13 +124,14 @@ public class IotModbusTcpMasterProtocol implements IotProtocol { log.info("[start][IoT Modbus TCP Master 协议 {} 启动成功,serverId={}]", getId(), serverId); // 2. 启动下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + IotModbusTcpMasterDownstreamHandler downstreamHandler = new IotModbusTcpMasterDownstreamHandler(connectionManager, + configCacheService); + this.downstreamSubscriber = new IotModbusTcpMasterDownstreamSubscriber(this, downstreamHandler, messageBus); this.downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT Modbus TCP Master 协议 {} 启动失败]", getId(), e); - // 启动失败时关闭资源 - if (vertx != null) { - vertx.close(); - } + stop0(); throw e; } } @@ -146,12 +141,19 @@ public class IotModbusTcpMasterProtocol implements IotProtocol { if (!running) { return; } + stop0(); + } + + private void stop0() { // 1. 停止下行消息订阅者 - try { - downstreamSubscriber.stop(); - log.info("[stop][IoT Modbus TCP Master 协议 {} 下行消息订阅者已停止]", getId()); - } catch (Exception e) { - log.error("[stop][IoT Modbus TCP Master 协议 {} 下行消息订阅者停止失败]", getId(), e); + if (downstreamSubscriber != null) { + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT Modbus TCP Master 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT Modbus TCP Master 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + downstreamSubscriber = null; } // 2.1 取消配置刷新定时器 @@ -161,10 +163,8 @@ public class IotModbusTcpMasterProtocol implements IotProtocol { } // 2.2 停止轮询调度器 pollScheduler.stopAll(); - log.info("[stop][IoT Modbus TCP Master 协议 {} 轮询调度器已停止]", getId()); // 2.3 关闭所有连接 connectionManager.closeAll(); - log.info("[stop][IoT Modbus TCP Master 协议 {} 连接管理器已关闭]", getId()); // 3. 关闭 Vert.x 实例 if (vertx != null) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java index ebbca8e2a0..4fb605a932 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java @@ -14,6 +14,8 @@ import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -185,8 +187,11 @@ public class IotModbusTcpMasterConnectionManager { */ private void addDeviceAndOnline(ModbusConnection connection, IotModbusDeviceConfigRespDTO config) { - connection.addDevice(config.getDeviceId(), config.getSlaveId()); - sendOnlineMessage(config); + Integer previous = connection.addDevice(config.getDeviceId(), config.getSlaveId()); + // 首次注册,发送上线消息 + if (previous == null) { + sendOnlineMessage(config); + } } /** @@ -247,7 +252,9 @@ public class IotModbusTcpMasterConnectionManager { * 关闭所有连接 */ public void closeAll() { - for (String connectionKey : connectionPool.keySet()) { + // 先复制再遍历,避免 closeConnection 中 remove 导致并发修改 + List connectionKeys = new ArrayList<>(connectionPool.keySet()); + for (String connectionKey : connectionKeys) { closeConnection(connectionKey); } deviceConnectionMap.clear(); @@ -282,8 +289,8 @@ public class IotModbusTcpMasterConnectionManager { */ private Context context; - public void addDevice(Long deviceId, Integer slaveId) { - deviceSlaveMap.put(deviceId, slaveId); + public Integer addDevice(Long deviceId, Integer slaveId) { + return deviceSlaveMap.putIfAbsent(deviceId, slaveId); } public void removeDevice(Long deviceId) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java index 3a44a189fe..feb89811db 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java @@ -88,7 +88,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { /** * 下行消息订阅者 */ - private final IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber; + private IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber; private final IotModbusFrameDecoder frameDecoder; @SuppressWarnings("FieldCanBeLocal") @@ -121,7 +121,6 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { // 初始化共享事务 ID 自增器(PollScheduler 和 DownstreamHandler 共用,避免 transactionId 冲突) AtomicInteger transactionIdCounter = new AtomicInteger(0); - // 初始化轮询调度器 this.pollScheduler = new IotModbusTcpSlavePollScheduler( vertx, connectionManager, frameEncoder, pendingRequestManager, @@ -134,13 +133,6 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { deviceApi, this.messageService, frameEncoder, connectionManager, configCacheService, pendingRequestManager, pollScheduler, deviceService, serverId); - - // 初始化下行消息订阅者 - IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - IotModbusTcpSlaveDownstreamHandler downstreamHandler = new IotModbusTcpSlaveDownstreamHandler( - connectionManager, configCacheService, frameEncoder, transactionIdCounter); - this.downstreamSubscriber = new IotModbusTcpSlaveDownstreamSubscriber( - this, downstreamHandler, messageBus); } @Override @@ -179,25 +171,15 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { getId(), serverId, properties.getPort()); // 3. 启动下行消息订阅 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + IotModbusTcpSlaveDownstreamHandler downstreamHandler = new IotModbusTcpSlaveDownstreamHandler( + connectionManager, configCacheService, frameEncoder, this.pollScheduler.getTransactionIdCounter()); + this.downstreamSubscriber = new IotModbusTcpSlaveDownstreamSubscriber( + this, downstreamHandler, messageBus); downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT Modbus TCP Slave 协议 {} 启动失败]", getId(), e); - // TODO @芋艿:后续统一优化 stop 逻辑; - if (configRefreshTimerId != null) { - vertx.cancelTimer(configRefreshTimerId); - configRefreshTimerId = null; - } - if (requestCleanupTimerId != null) { - vertx.cancelTimer(requestCleanupTimerId); - requestCleanupTimerId = null; - } - connectionManager.closeAll(); - if (netServer != null) { - netServer.close(); - } - if (vertx != null) { - vertx.close(); - } + stop0(); throw e; } } @@ -207,12 +189,18 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { if (!running) { return; } + stop0(); + } + private void stop0() { // 1. 停止下行消息订阅 - try { - downstreamSubscriber.stop(); - } catch (Exception e) { - log.error("[stop][下行消息订阅器停止失败]", e); + if (downstreamSubscriber != null) { + try { + downstreamSubscriber.stop(); + } catch (Exception e) { + log.error("[stop][下行消息订阅器停止失败]", e); + } + downstreamSubscriber = null; } // 2.1 取消定时器 @@ -238,6 +226,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { } catch (Exception e) { log.error("[stop][TCP Server 关闭失败]", e); } + netServer = null; } // 3. 关闭 Vertx diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java index 04434910aa..16899c08fe 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java @@ -8,6 +8,8 @@ import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -155,15 +157,18 @@ public class IotModbusTcpSlaveConnectionManager { * 关闭所有连接 */ public void closeAll() { - for (NetSocket socket : connectionMap.keySet()) { + // 1. 先复制再清空,避免 closeHandler 回调时并发修改 + List sockets = new ArrayList<>(connectionMap.keySet()); + connectionMap.clear(); + deviceSocketMap.clear(); + // 2. 关闭所有 socket(closeHandler 中 removeConnection 发现 map 为空会安全跳过) + for (NetSocket socket : sockets) { try { socket.close(); } catch (Exception e) { log.error("[closeAll][关闭连接失败]", e); } } - connectionMap.clear(); - deviceSocketMap.clear(); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java index f1276eaa52..80bef684db 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java @@ -11,6 +11,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotMod import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager.PendingRequest; import io.vertx.core.Vertx; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicInteger; @@ -31,6 +32,7 @@ public class IotModbusTcpSlavePollScheduler extends AbstractIotModbusPollSchedul /** * TCP 事务 ID 自增器(与 DownstreamHandler 共享) */ + @Getter private final AtomicInteger transactionIdCounter; public IotModbusTcpSlavePollScheduler(Vertx vertx, diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java index 1201fd1a42..354cd1b452 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java @@ -81,7 +81,7 @@ public class IotMqttProtocol implements IotProtocol { /** * 下行消息订阅者 */ - private final IotMqttDownstreamSubscriber downstreamSubscriber; + private IotMqttDownstreamSubscriber downstreamSubscriber; private final IotDeviceMessageService deviceMessageService; @@ -104,11 +104,6 @@ public class IotMqttProtocol implements IotProtocol { this.authHandler = new IotMqttAuthHandler(connectionManager, deviceMessageService, deviceApi, serverId); this.registerHandler = new IotMqttRegisterHandler(connectionManager, deviceMessageService); this.upstreamHandler = new IotMqttUpstreamHandler(connectionManager, deviceMessageService, serverId); - - // 初始化下行消息订阅者 - IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - IotMqttDownstreamHandler downstreamHandler = new IotMqttDownstreamHandler(deviceMessageService, connectionManager); - this.downstreamSubscriber = new IotMqttDownstreamSubscriber(this, downstreamHandler, messageBus); } @Override @@ -157,18 +152,13 @@ public class IotMqttProtocol implements IotProtocol { getId(), properties.getPort(), serverId); // 2. 启动下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + IotMqttDownstreamHandler downstreamHandler = new IotMqttDownstreamHandler(deviceMessageService, connectionManager); + this.downstreamSubscriber = new IotMqttDownstreamSubscriber(this, downstreamHandler, messageBus); this.downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT MQTT 协议 {} 启动失败]", getId(), e); - // 启动失败时关闭资源 - if (mqttServer != null) { - mqttServer.close(); - mqttServer = null; - } - if (vertx != null) { - vertx.close(); - vertx = null; - } + stop0(); throw e; } } @@ -178,15 +168,24 @@ public class IotMqttProtocol implements IotProtocol { if (!running) { return; } + stop0(); + } + + private void stop0() { // 1. 停止下行消息订阅者 - try { - downstreamSubscriber.stop(); - log.info("[stop][IoT MQTT 协议 {} 下行消息订阅者已停止]", getId()); - } catch (Exception e) { - log.error("[stop][IoT MQTT 协议 {} 下行消息订阅者停止失败]", getId(), e); + if (downstreamSubscriber != null) { + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT MQTT 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT MQTT 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + downstreamSubscriber = null; } - // 2.1 关闭 MQTT 服务器 + // 2.1 关闭所有连接 + connectionManager.closeAll(); + // 2.2 关闭 MQTT 服务器 if (mqttServer != null) { try { mqttServer.close().result(); @@ -196,7 +195,7 @@ public class IotMqttProtocol implements IotProtocol { } mqttServer = null; } - // 2.2 关闭 Vertx 实例 + // 2.3 关闭 Vertx 实例 if (vertx != null) { try { vertx.close().result(); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java index ccb9fa5a60..9bd3ec4934 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java @@ -8,6 +8,8 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -166,6 +168,24 @@ public class IotMqttConnectionManager { return deviceEndpointMap.get(deviceId); } + /** + * 关闭所有连接 + */ + public void closeAll() { + // 1. 先复制再清空,避免 closeHandler 回调时并发修改 + List endpoints = new ArrayList<>(connectionMap.keySet()); + connectionMap.clear(); + deviceEndpointMap.clear(); + // 2. 关闭所有连接(closeHandler 中 unregisterConnection 发现 map 为空会安全跳过) + for (MqttEndpoint endpoint : endpoints) { + try { + endpoint.close(); + } catch (Exception ignored) { + // 连接可能已关闭,忽略异常 + } + } + } + /** * 连接信息 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java index e864df543e..8660e87f7e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpProtocol.java @@ -66,7 +66,7 @@ public class IotTcpProtocol implements IotProtocol { /** * 下行消息订阅者 */ - private final IotTcpDownstreamSubscriber downstreamSubscriber; + private IotTcpDownstreamSubscriber downstreamSubscriber; /** * 消息序列化器 @@ -94,11 +94,6 @@ public class IotTcpProtocol implements IotProtocol { // 初始化连接管理器 this.connectionManager = new IotTcpConnectionManager(tcpConfig.getMaxConnections()); - - // 初始化下行消息订阅者 - IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - IotTcpDownstreamHandler downstreamHandler = new IotTcpDownstreamHandler(connectionManager, frameCodec, serializer); - this.downstreamSubscriber = new IotTcpDownstreamSubscriber(this, downstreamHandler, messageBus); } @Override @@ -152,18 +147,13 @@ public class IotTcpProtocol implements IotProtocol { getId(), properties.getPort(), serverId); // 2. 启动下行消息订阅者 + IotTcpDownstreamHandler downstreamHandler = new IotTcpDownstreamHandler(connectionManager, frameCodec, serializer); + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + this.downstreamSubscriber = new IotTcpDownstreamSubscriber(this, downstreamHandler, messageBus); this.downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT TCP 协议 {} 启动失败]", getId(), e); - // 启动失败时关闭资源 - if (tcpServer != null) { - tcpServer.close(); - tcpServer = null; - } - if (vertx != null) { - vertx.close(); - vertx = null; - } + stop0(); throw e; } } @@ -173,15 +163,24 @@ public class IotTcpProtocol implements IotProtocol { if (!running) { return; } + stop0(); + } + + private void stop0() { // 1. 停止下行消息订阅者 - try { - downstreamSubscriber.stop(); - log.info("[stop][IoT TCP 协议 {} 下行消息订阅者已停止]", getId()); - } catch (Exception e) { - log.error("[stop][IoT TCP 协议 {} 下行消息订阅者停止失败]", getId(), e); + if (downstreamSubscriber != null) { + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT TCP 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT TCP 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + downstreamSubscriber = null; } - // 2.1 关闭 TCP 服务器 + // 2.1 关闭所有连接 + connectionManager.closeAll(); + // 2.2 关闭 TCP 服务器 if (tcpServer != null) { try { tcpServer.close().result(); @@ -191,7 +190,7 @@ public class IotTcpProtocol implements IotProtocol { } tcpServer = null; } - // 2.2 关闭 Vertx 实例 + // 2.3 关闭 Vertx 实例 if (vertx != null) { try { vertx.close().result(); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java index b7b72a370b..20065f1b40 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java @@ -5,6 +5,8 @@ import io.vertx.core.net.NetSocket; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -122,6 +124,24 @@ public class IotTcpConnectionManager { } } + /** + * 关闭所有连接 + */ + public void closeAll() { + // 1. 先复制再清空,避免 closeHandler 回调时并发修改 + List sockets = new ArrayList<>(connectionMap.keySet()); + connectionMap.clear(); + deviceSocketMap.clear(); + // 2. 关闭所有连接(closeHandler 中 unregisterConnection 发现 map 为空会安全跳过) + for (NetSocket socket : sockets) { + try { + socket.close(); + } catch (Exception ignored) { + // 连接可能已关闭,忽略异常 + } + } + } + /** * 连接信息(包含认证信息) */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocol.java index bfed2d9c32..9df3c8bce7 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotUdpProtocol.java @@ -63,7 +63,7 @@ public class IotUdpProtocol implements IotProtocol { /** * 下行消息订阅者 */ - private final IotUdpDownstreamSubscriber downstreamSubscriber; + private IotUdpDownstreamSubscriber downstreamSubscriber; /** * 消息序列化器 @@ -85,10 +85,6 @@ public class IotUdpProtocol implements IotProtocol { // 初始化会话管理器 this.sessionManager = new IotUdpSessionManager(udpConfig.getMaxSessions(), udpConfig.getSessionTimeoutMs()); - // 初始化下行消息订阅者 - IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - IotUdpDownstreamHandler downstreamHandler = new IotUdpDownstreamHandler(this, sessionManager, serializer); - this.downstreamSubscriber = new IotUdpDownstreamSubscriber(this, downstreamHandler, messageBus); } @Override @@ -108,8 +104,11 @@ public class IotUdpProtocol implements IotProtocol { return; } - // 1.1 创建 Vertx 实例 + // 1.1 创建 Vertx 实例 和 下行消息订阅者 this.vertx = Vertx.vertx(); + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + IotUdpDownstreamHandler downstreamHandler = new IotUdpDownstreamHandler(this, sessionManager, serializer); + this.downstreamSubscriber = new IotUdpDownstreamSubscriber(this, downstreamHandler, messageBus); // 1.2 创建 UDP Socket 选项 IotUdpConfig udpConfig = properties.getUdp(); @@ -137,15 +136,7 @@ public class IotUdpProtocol implements IotProtocol { this.downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT UDP 协议 {} 启动失败]", getId(), e); - // 启动失败时关闭资源 - if (udpSocket != null) { - udpSocket.close(); - udpSocket = null; - } - if (vertx != null) { - vertx.close(); - vertx = null; - } + stop0(); throw e; } } @@ -155,12 +146,19 @@ public class IotUdpProtocol implements IotProtocol { if (!running) { return; } + stop0(); + } + + private void stop0() { // 1. 停止下行消息订阅者 - try { - downstreamSubscriber.stop(); - log.info("[stop][IoT UDP 协议 {} 下行消息订阅者已停止]", getId()); - } catch (Exception e) { - log.error("[stop][IoT UDP 协议 {} 下行消息订阅者停止失败]", getId(), e); + if (downstreamSubscriber != null) { + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT UDP 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT UDP 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + downstreamSubscriber = null; } // 2.1 关闭 UDP Socket diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketProtocol.java index b416900db5..083dc32369 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketProtocol.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; +import cn.hutool.core.lang.Assert; import cn.hutool.core.util.ObjUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; @@ -9,8 +10,8 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.upstream.IotWebSocketUpstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; @@ -21,7 +22,6 @@ import io.vertx.core.http.HttpServerOptions; import io.vertx.core.net.PemKeyCertOptions; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import cn.hutool.core.lang.Assert; /** * IoT WebSocket 协议实现 @@ -65,7 +65,7 @@ public class IotWebSocketProtocol implements IotProtocol { /** * 下行消息订阅者 */ - private final IotWebSocketDownstreamSubscriber downstreamSubscriber; + private IotWebSocketDownstreamSubscriber downstreamSubscriber; /** * 消息序列化器 @@ -87,10 +87,6 @@ public class IotWebSocketProtocol implements IotProtocol { // 初始化连接管理器 this.connectionManager = new IotWebSocketConnectionManager(); - // 初始化下行消息订阅者 - IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - IotWebSocketDownstreamHandler downstreamHandler = new IotWebSocketDownstreamHandler(serializer, connectionManager); - this.downstreamSubscriber = new IotWebSocketDownstreamSubscriber(this, downstreamHandler, messageBus); } @Override @@ -152,17 +148,13 @@ public class IotWebSocketProtocol implements IotProtocol { getId(), properties.getPort(), wsConfig.getPath(), serverId); // 2. 启动下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + IotWebSocketDownstreamHandler downstreamHandler = new IotWebSocketDownstreamHandler(serializer, connectionManager); + this.downstreamSubscriber = new IotWebSocketDownstreamSubscriber(this, downstreamHandler, messageBus); downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT WebSocket 协议 {} 启动失败]", getId(), e); - if (httpServer != null) { - httpServer.close(); - httpServer = null; - } - if (vertx != null) { - vertx.close(); - vertx = null; - } + stop0(); throw e; } } @@ -172,15 +164,24 @@ public class IotWebSocketProtocol implements IotProtocol { if (!running) { return; } + stop0(); + } + + private void stop0() { // 1. 停止下行消息订阅者 - try { - downstreamSubscriber.stop(); - log.info("[stop][IoT WebSocket 协议 {} 下行消息订阅者已停止]", getId()); - } catch (Exception e) { - log.error("[stop][IoT WebSocket 协议 {} 下行消息订阅者停止失败]", getId(), e); + if (downstreamSubscriber != null) { + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT WebSocket 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT WebSocket 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + downstreamSubscriber = null; } - // 2.1 关闭 WebSocket 服务器 + // 2.1 关闭所有连接 + connectionManager.closeAll(); + // 2.2 关闭 WebSocket 服务器 if (httpServer != null) { try { httpServer.close().result(); @@ -190,7 +191,7 @@ public class IotWebSocketProtocol implements IotProtocol { } httpServer = null; } - // 2.2 关闭 Vertx 实例 + // 2.3 关闭 Vertx 实例 if (vertx != null) { try { vertx.close().result(); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java index 8b09da0f98..92019ffadd 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java @@ -5,6 +5,8 @@ import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -114,6 +116,24 @@ public class IotWebSocketConnectionManager { } } + /** + * 关闭所有连接 + */ + public void closeAll() { + // 1. 先复制再清空,避免 closeHandler 回调时并发修改 + List sockets = new ArrayList<>(connectionMap.keySet()); + connectionMap.clear(); + deviceSocketMap.clear(); + // 2. 关闭所有连接(closeHandler 中 unregisterConnection 发现 map 为空会安全跳过) + for (ServerWebSocket socket : sockets) { + try { + socket.close(); + } catch (Exception ignored) { + // 连接可能已关闭,忽略异常 + } + } + } + /** * 连接信息(包含认证信息) */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index 1ccd9f37b3..f5c77dbea0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -167,7 +167,7 @@ yudao: # 针对引入的 Modbus TCP Master 组件的配置 # ==================================== - id: modbus-tcp-master-1 - enabled: true + enabled: false protocol: modbus_tcp_master port: 502 modbus-tcp-master: @@ -176,7 +176,7 @@ yudao: # 针对引入的 Modbus TCP Slave 组件的配置 # ==================================== - id: modbus-tcp-slave-1 - enabled: true + enabled: false protocol: modbus_tcp_slave port: 503 modbus-tcp-slave: