feat(iot):统一优化网关协议的资源清理逻辑,主要是 stop0

This commit is contained in:
YunaiV
2026-02-09 08:02:41 +08:00
parent 88f090b66f
commit fcca74ac7d
18 changed files with 244 additions and 185 deletions

View File

@@ -70,7 +70,6 @@ public class IotDeviceModbusConfigServiceImpl implements IotDeviceModbusConfigSe
return modbusConfigMapper.selectList(listReqDTO); return modbusConfigMapper.selectList(listReqDTO);
} }
// TODO @AI还是希望在 IotDeviceModbusConfigSaveReqVO 里,通过 validator 校验?!或者通过 group 来处理?
private void validateModbusConfigByProtocolType(IotDeviceModbusConfigSaveReqVO saveReqVO, String protocolType) { private void validateModbusConfigByProtocolType(IotDeviceModbusConfigSaveReqVO saveReqVO, String protocolType) {
IotProtocolTypeEnum protocolTypeEnum = IotProtocolTypeEnum.of(protocolType); IotProtocolTypeEnum protocolTypeEnum = IotProtocolTypeEnum.of(protocolType);
if (protocolTypeEnum == null) { if (protocolTypeEnum == null) {

View File

@@ -64,17 +64,13 @@ public class IotCoapProtocol implements IotProtocol {
/** /**
* 下行消息订阅者 * 下行消息订阅者
*/ */
private final IotCoapDownstreamSubscriber downstreamSubscriber; private IotCoapDownstreamSubscriber downstreamSubscriber;
public IotCoapProtocol(ProtocolProperties properties) { public IotCoapProtocol(ProtocolProperties properties) {
IotCoapConfig coapConfig = properties.getCoap(); IotCoapConfig coapConfig = properties.getCoap();
Assert.notNull(coapConfig, "CoAP 协议配置coap不能为空"); Assert.notNull(coapConfig, "CoAP 协议配置coap不能为空");
this.properties = properties; this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotCoapDownstreamSubscriber(this, messageBus);
} }
@Override @Override
@@ -94,9 +90,9 @@ public class IotCoapProtocol implements IotProtocol {
return; return;
} }
IotCoapConfig coapConfig = properties.getCoap();
try { try {
// 1.1 创建 CoAP 配置 // 1.1 创建 CoAP 配置
IotCoapConfig coapConfig = properties.getCoap();
Configuration config = Configuration.createStandardWithoutFile(); Configuration config = Configuration.createStandardWithoutFile();
config.set(CoapConfig.COAP_PORT, properties.getPort()); config.set(CoapConfig.COAP_PORT, properties.getPort());
config.set(CoapConfig.MAX_MESSAGE_SIZE, coapConfig.getMaxMessageSize()); config.set(CoapConfig.MAX_MESSAGE_SIZE, coapConfig.getMaxMessageSize());
@@ -131,13 +127,12 @@ public class IotCoapProtocol implements IotProtocol {
getId(), properties.getPort(), serverId); getId(), properties.getPort(), serverId);
// 4. 启动下行消息订阅者 // 4. 启动下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotCoapDownstreamSubscriber(this, messageBus);
this.downstreamSubscriber.start(); this.downstreamSubscriber.start();
} catch (Exception e) { } catch (Exception e) {
log.error("[start][IoT CoAP 协议 {} 启动失败]", getId(), e); log.error("[start][IoT CoAP 协议 {} 启动失败]", getId(), e);
if (coapServer != null) { stop0();
coapServer.destroy();
coapServer = null;
}
throw e; throw e;
} }
} }
@@ -147,12 +142,19 @@ public class IotCoapProtocol implements IotProtocol {
if (!running) { if (!running) {
return; return;
} }
stop0();
}
private void stop0() {
// 1. 停止下行消息订阅者 // 1. 停止下行消息订阅者
try { if (downstreamSubscriber != null) {
downstreamSubscriber.stop(); try {
log.info("[stop][IoT CoAP 协议 {} 下行消息订阅者已停止]", getId()); downstreamSubscriber.stop();
} catch (Exception e) { log.info("[stop][IoT CoAP 协议 {} 下行消息订阅者已停止]", getId());
log.error("[stop][IoT CoAP 协议 {} 下行消息订阅者停止失败]", getId(), e); } catch (Exception e) {
log.error("[stop][IoT CoAP 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
downstreamSubscriber = null;
} }
// 2. 关闭 CoAP 服务器 // 2. 关闭 CoAP 服务器

View File

@@ -90,7 +90,7 @@ public class IotEmqxProtocol implements IotProtocol {
/** /**
* 下行消息订阅者 * 下行消息订阅者
*/ */
private final IotEmqxDownstreamSubscriber downstreamSubscriber; private IotEmqxDownstreamSubscriber downstreamSubscriber;
public IotEmqxProtocol(ProtocolProperties properties) { public IotEmqxProtocol(ProtocolProperties properties) {
Assert.notNull(properties, "协议实例配置不能为空"); Assert.notNull(properties, "协议实例配置不能为空");
@@ -101,10 +101,6 @@ public class IotEmqxProtocol implements IotProtocol {
"MQTT 连接超时时间(emqx.connect-timeout-seconds)不能为空"); "MQTT 连接超时时间(emqx.connect-timeout-seconds)不能为空");
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
this.upstreamHandler = new IotEmqxUpstreamHandler(serverId); this.upstreamHandler = new IotEmqxUpstreamHandler(serverId);
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotEmqxDownstreamSubscriber(this, messageBus);
} }
@Override @Override
@@ -124,7 +120,7 @@ public class IotEmqxProtocol implements IotProtocol {
return; return;
} }
// 1.1 创建 Vertx 实例 // 1.1 创建 Vertx 实例 和 下行消息订阅者
this.vertx = Vertx.vertx(); this.vertx = Vertx.vertx();
try { try {
@@ -138,6 +134,8 @@ public class IotEmqxProtocol implements IotProtocol {
getId(), properties.getPort(), serverId); getId(), properties.getPort(), serverId);
// 2. 启动下行消息订阅者 // 2. 启动下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotEmqxDownstreamSubscriber(this, messageBus);
this.downstreamSubscriber.start(); this.downstreamSubscriber.start();
} catch (Exception e) { } catch (Exception e) {
log.error("[start][IoT EMQX 协议 {} 启动失败]", getId(), e); log.error("[start][IoT EMQX 协议 {} 启动失败]", getId(), e);
@@ -157,11 +155,14 @@ public class IotEmqxProtocol implements IotProtocol {
private void stop0() { private void stop0() {
// 1. 停止下行消息订阅者 // 1. 停止下行消息订阅者
try { if (downstreamSubscriber != null) {
downstreamSubscriber.stop(); try {
log.info("[stop][IoT EMQX 协议 {} 下行消息订阅者已停止]", getId()); downstreamSubscriber.stop();
} catch (Exception e) { log.info("[stop][IoT EMQX 协议 {} 下行消息订阅者已停止]", getId());
log.error("[stop][IoT EMQX 协议 {} 下行消息订阅者停止失败]", getId(), e); } catch (Exception e) {
log.error("[stop][IoT EMQX 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
downstreamSubscriber = null;
} }
// 2.1 先置为 false避免 closeHandler 触发重连 // 2.1 先置为 false避免 closeHandler 触发重连

View File

@@ -64,10 +64,6 @@ public class IotHttpProtocol implements IotProtocol {
public IotHttpProtocol(ProtocolProperties properties) { public IotHttpProtocol(ProtocolProperties properties) {
this.properties = properties; this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus);
} }
@Override @Override
@@ -87,7 +83,7 @@ public class IotHttpProtocol implements IotProtocol {
return; return;
} }
// 1.1 创建 Vertx 实例(每个 Protocol 独立管理) // 1.1 创建 Vertx 实例
this.vertx = Vertx.vertx(); this.vertx = Vertx.vertx();
// 1.2 创建路由 // 1.2 创建路由
@@ -123,18 +119,12 @@ public class IotHttpProtocol implements IotProtocol {
getId(), properties.getPort(), serverId); getId(), properties.getPort(), serverId);
// 2. 启动下行消息订阅者 // 2. 启动下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus);
this.downstreamSubscriber.start(); this.downstreamSubscriber.start();
} catch (Exception e) { } catch (Exception e) {
log.error("[start][IoT HTTP 协议 {} 启动失败]", getId(), e); log.error("[start][IoT HTTP 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭资源 stop0();
if (httpServer != null) {
httpServer.close();
httpServer = null;
}
if (vertx != null) {
vertx.close();
vertx = null;
}
throw e; throw e;
} }
} }
@@ -144,6 +134,10 @@ public class IotHttpProtocol implements IotProtocol {
if (!running) { if (!running) {
return; return;
} }
stop0();
}
private void stop0() {
// 1. 停止下行消息订阅者 // 1. 停止下行消息订阅者
if (downstreamSubscriber != null) { if (downstreamSubscriber != null) {
try { try {

View File

@@ -381,8 +381,11 @@ public class IotModbusCommonUtils {
case UINT16: case UINT16:
return new int[]{rawValue.intValue() & 0xFFFF}; return new int[]{rawValue.intValue() & 0xFFFF};
case INT32: case INT32:
case UINT32:
return encodeInt32(rawValue.intValue(), byteOrder); return encodeInt32(rawValue.intValue(), byteOrder);
case UINT32:
// 使用 longValue() 避免超过 Integer.MAX_VALUE 时溢出,
// 强转 int 保留低 32 位 bit pattern写入寄存器的字节是正确的无符号值
return encodeInt32((int) rawValue.longValue(), byteOrder);
case FLOAT: case FLOAT:
return encodeFloat(rawValue.floatValue(), byteOrder); return encodeFloat(rawValue.floatValue(), byteOrder);
case DOUBLE: case DOUBLE:

View File

@@ -65,7 +65,7 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
/** /**
* 下行消息订阅者 * 下行消息订阅者
*/ */
private final IotModbusTcpMasterDownstreamSubscriber downstreamSubscriber; private IotModbusTcpMasterDownstreamSubscriber downstreamSubscriber;
private final IotModbusTcpMasterConfigCacheService configCacheService; private final IotModbusTcpMasterConfigCacheService configCacheService;
private final IotModbusTcpMasterPollScheduler pollScheduler; private final IotModbusTcpMasterPollScheduler pollScheduler;
@@ -89,15 +89,9 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
// 初始化 Handler // 初始化 Handler
IotModbusTcpMasterUpstreamHandler upstreamHandler = new IotModbusTcpMasterUpstreamHandler(messageService, serverId); IotModbusTcpMasterUpstreamHandler upstreamHandler = new IotModbusTcpMasterUpstreamHandler(messageService, serverId);
IotModbusTcpMasterDownstreamHandler downstreamHandler = new IotModbusTcpMasterDownstreamHandler(connectionManager,
configCacheService);
// 初始化轮询调度器 // 初始化轮询调度器
this.pollScheduler = new IotModbusTcpMasterPollScheduler(vertx, connectionManager, upstreamHandler, configCacheService); this.pollScheduler = new IotModbusTcpMasterPollScheduler(vertx, connectionManager, upstreamHandler, configCacheService);
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotModbusTcpMasterDownstreamSubscriber(this, downstreamHandler, messageBus);
} }
@Override @Override
@@ -130,13 +124,14 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
log.info("[start][IoT Modbus TCP Master 协议 {} 启动成功serverId={}]", getId(), serverId); log.info("[start][IoT Modbus TCP Master 协议 {} 启动成功serverId={}]", getId(), serverId);
// 2. 启动下行消息订阅者 // 2. 启动下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotModbusTcpMasterDownstreamHandler downstreamHandler = new IotModbusTcpMasterDownstreamHandler(connectionManager,
configCacheService);
this.downstreamSubscriber = new IotModbusTcpMasterDownstreamSubscriber(this, downstreamHandler, messageBus);
this.downstreamSubscriber.start(); this.downstreamSubscriber.start();
} catch (Exception e) { } catch (Exception e) {
log.error("[start][IoT Modbus TCP Master 协议 {} 启动失败]", getId(), e); log.error("[start][IoT Modbus TCP Master 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭资源 stop0();
if (vertx != null) {
vertx.close();
}
throw e; throw e;
} }
} }
@@ -146,12 +141,19 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
if (!running) { if (!running) {
return; return;
} }
stop0();
}
private void stop0() {
// 1. 停止下行消息订阅者 // 1. 停止下行消息订阅者
try { if (downstreamSubscriber != null) {
downstreamSubscriber.stop(); try {
log.info("[stop][IoT Modbus TCP Master 协议 {} 下行消息订阅者已停止]", getId()); downstreamSubscriber.stop();
} catch (Exception e) { log.info("[stop][IoT Modbus TCP Master 协议 {} 下行消息订阅者已停止]", getId());
log.error("[stop][IoT Modbus TCP Master 协议 {} 下行消息订阅者停止失败]", getId(), e); } catch (Exception e) {
log.error("[stop][IoT Modbus TCP Master 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
downstreamSubscriber = null;
} }
// 2.1 取消配置刷新定时器 // 2.1 取消配置刷新定时器
@@ -161,10 +163,8 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
} }
// 2.2 停止轮询调度器 // 2.2 停止轮询调度器
pollScheduler.stopAll(); pollScheduler.stopAll();
log.info("[stop][IoT Modbus TCP Master 协议 {} 轮询调度器已停止]", getId());
// 2.3 关闭所有连接 // 2.3 关闭所有连接
connectionManager.closeAll(); connectionManager.closeAll();
log.info("[stop][IoT Modbus TCP Master 协议 {} 连接管理器已关闭]", getId());
// 3. 关闭 Vert.x 实例 // 3. 关闭 Vert.x 实例
if (vertx != null) { if (vertx != null) {

View File

@@ -14,6 +14,8 @@ import org.redisson.api.RLock;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -185,8 +187,11 @@ public class IotModbusTcpMasterConnectionManager {
*/ */
private void addDeviceAndOnline(ModbusConnection connection, private void addDeviceAndOnline(ModbusConnection connection,
IotModbusDeviceConfigRespDTO config) { IotModbusDeviceConfigRespDTO config) {
connection.addDevice(config.getDeviceId(), config.getSlaveId()); Integer previous = connection.addDevice(config.getDeviceId(), config.getSlaveId());
sendOnlineMessage(config); // 首次注册,发送上线消息
if (previous == null) {
sendOnlineMessage(config);
}
} }
/** /**
@@ -247,7 +252,9 @@ public class IotModbusTcpMasterConnectionManager {
* 关闭所有连接 * 关闭所有连接
*/ */
public void closeAll() { public void closeAll() {
for (String connectionKey : connectionPool.keySet()) { // 先复制再遍历,避免 closeConnection 中 remove 导致并发修改
List<String> connectionKeys = new ArrayList<>(connectionPool.keySet());
for (String connectionKey : connectionKeys) {
closeConnection(connectionKey); closeConnection(connectionKey);
} }
deviceConnectionMap.clear(); deviceConnectionMap.clear();
@@ -282,8 +289,8 @@ public class IotModbusTcpMasterConnectionManager {
*/ */
private Context context; private Context context;
public void addDevice(Long deviceId, Integer slaveId) { public Integer addDevice(Long deviceId, Integer slaveId) {
deviceSlaveMap.put(deviceId, slaveId); return deviceSlaveMap.putIfAbsent(deviceId, slaveId);
} }
public void removeDevice(Long deviceId) { public void removeDevice(Long deviceId) {

View File

@@ -88,7 +88,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
/** /**
* 下行消息订阅者 * 下行消息订阅者
*/ */
private final IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber; private IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber;
private final IotModbusFrameDecoder frameDecoder; private final IotModbusFrameDecoder frameDecoder;
@SuppressWarnings("FieldCanBeLocal") @SuppressWarnings("FieldCanBeLocal")
@@ -121,7 +121,6 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
// 初始化共享事务 ID 自增器PollScheduler 和 DownstreamHandler 共用,避免 transactionId 冲突) // 初始化共享事务 ID 自增器PollScheduler 和 DownstreamHandler 共用,避免 transactionId 冲突)
AtomicInteger transactionIdCounter = new AtomicInteger(0); AtomicInteger transactionIdCounter = new AtomicInteger(0);
// 初始化轮询调度器 // 初始化轮询调度器
this.pollScheduler = new IotModbusTcpSlavePollScheduler( this.pollScheduler = new IotModbusTcpSlavePollScheduler(
vertx, connectionManager, frameEncoder, pendingRequestManager, vertx, connectionManager, frameEncoder, pendingRequestManager,
@@ -134,13 +133,6 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
deviceApi, this.messageService, frameEncoder, deviceApi, this.messageService, frameEncoder,
connectionManager, configCacheService, pendingRequestManager, connectionManager, configCacheService, pendingRequestManager,
pollScheduler, deviceService, serverId); pollScheduler, deviceService, serverId);
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotModbusTcpSlaveDownstreamHandler downstreamHandler = new IotModbusTcpSlaveDownstreamHandler(
connectionManager, configCacheService, frameEncoder, transactionIdCounter);
this.downstreamSubscriber = new IotModbusTcpSlaveDownstreamSubscriber(
this, downstreamHandler, messageBus);
} }
@Override @Override
@@ -179,25 +171,15 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
getId(), serverId, properties.getPort()); getId(), serverId, properties.getPort());
// 3. 启动下行消息订阅 // 3. 启动下行消息订阅
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotModbusTcpSlaveDownstreamHandler downstreamHandler = new IotModbusTcpSlaveDownstreamHandler(
connectionManager, configCacheService, frameEncoder, this.pollScheduler.getTransactionIdCounter());
this.downstreamSubscriber = new IotModbusTcpSlaveDownstreamSubscriber(
this, downstreamHandler, messageBus);
downstreamSubscriber.start(); downstreamSubscriber.start();
} catch (Exception e) { } catch (Exception e) {
log.error("[start][IoT Modbus TCP Slave 协议 {} 启动失败]", getId(), e); log.error("[start][IoT Modbus TCP Slave 协议 {} 启动失败]", getId(), e);
// TODO @芋艿:后续统一优化 stop 逻辑; stop0();
if (configRefreshTimerId != null) {
vertx.cancelTimer(configRefreshTimerId);
configRefreshTimerId = null;
}
if (requestCleanupTimerId != null) {
vertx.cancelTimer(requestCleanupTimerId);
requestCleanupTimerId = null;
}
connectionManager.closeAll();
if (netServer != null) {
netServer.close();
}
if (vertx != null) {
vertx.close();
}
throw e; throw e;
} }
} }
@@ -207,12 +189,18 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
if (!running) { if (!running) {
return; return;
} }
stop0();
}
private void stop0() {
// 1. 停止下行消息订阅 // 1. 停止下行消息订阅
try { if (downstreamSubscriber != null) {
downstreamSubscriber.stop(); try {
} catch (Exception e) { downstreamSubscriber.stop();
log.error("[stop][下行消息订阅器停止失败]", e); } catch (Exception e) {
log.error("[stop][下行消息订阅器停止失败]", e);
}
downstreamSubscriber = null;
} }
// 2.1 取消定时器 // 2.1 取消定时器
@@ -238,6 +226,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
} catch (Exception e) { } catch (Exception e) {
log.error("[stop][TCP Server 关闭失败]", e); log.error("[stop][TCP Server 关闭失败]", e);
} }
netServer = null;
} }
// 3. 关闭 Vertx // 3. 关闭 Vertx

View File

@@ -8,6 +8,8 @@ import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -155,15 +157,18 @@ public class IotModbusTcpSlaveConnectionManager {
* 关闭所有连接 * 关闭所有连接
*/ */
public void closeAll() { public void closeAll() {
for (NetSocket socket : connectionMap.keySet()) { // 1. 先复制再清空,避免 closeHandler 回调时并发修改
List<NetSocket> sockets = new ArrayList<>(connectionMap.keySet());
connectionMap.clear();
deviceSocketMap.clear();
// 2. 关闭所有 socketcloseHandler 中 removeConnection 发现 map 为空会安全跳过)
for (NetSocket socket : sockets) {
try { try {
socket.close(); socket.close();
} catch (Exception e) { } catch (Exception e) {
log.error("[closeAll][关闭连接失败]", e); log.error("[closeAll][关闭连接失败]", e);
} }
} }
connectionMap.clear();
deviceSocketMap.clear();
} }
} }

View File

@@ -11,6 +11,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotMod
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager.PendingRequest; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager.PendingRequest;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -31,6 +32,7 @@ public class IotModbusTcpSlavePollScheduler extends AbstractIotModbusPollSchedul
/** /**
* TCP 事务 ID 自增器(与 DownstreamHandler 共享) * TCP 事务 ID 自增器(与 DownstreamHandler 共享)
*/ */
@Getter
private final AtomicInteger transactionIdCounter; private final AtomicInteger transactionIdCounter;
public IotModbusTcpSlavePollScheduler(Vertx vertx, public IotModbusTcpSlavePollScheduler(Vertx vertx,

View File

@@ -81,7 +81,7 @@ public class IotMqttProtocol implements IotProtocol {
/** /**
* 下行消息订阅者 * 下行消息订阅者
*/ */
private final IotMqttDownstreamSubscriber downstreamSubscriber; private IotMqttDownstreamSubscriber downstreamSubscriber;
private final IotDeviceMessageService deviceMessageService; private final IotDeviceMessageService deviceMessageService;
@@ -104,11 +104,6 @@ public class IotMqttProtocol implements IotProtocol {
this.authHandler = new IotMqttAuthHandler(connectionManager, deviceMessageService, deviceApi, serverId); this.authHandler = new IotMqttAuthHandler(connectionManager, deviceMessageService, deviceApi, serverId);
this.registerHandler = new IotMqttRegisterHandler(connectionManager, deviceMessageService); this.registerHandler = new IotMqttRegisterHandler(connectionManager, deviceMessageService);
this.upstreamHandler = new IotMqttUpstreamHandler(connectionManager, deviceMessageService, serverId); this.upstreamHandler = new IotMqttUpstreamHandler(connectionManager, deviceMessageService, serverId);
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotMqttDownstreamHandler downstreamHandler = new IotMqttDownstreamHandler(deviceMessageService, connectionManager);
this.downstreamSubscriber = new IotMqttDownstreamSubscriber(this, downstreamHandler, messageBus);
} }
@Override @Override
@@ -157,18 +152,13 @@ public class IotMqttProtocol implements IotProtocol {
getId(), properties.getPort(), serverId); getId(), properties.getPort(), serverId);
// 2. 启动下行消息订阅者 // 2. 启动下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotMqttDownstreamHandler downstreamHandler = new IotMqttDownstreamHandler(deviceMessageService, connectionManager);
this.downstreamSubscriber = new IotMqttDownstreamSubscriber(this, downstreamHandler, messageBus);
this.downstreamSubscriber.start(); this.downstreamSubscriber.start();
} catch (Exception e) { } catch (Exception e) {
log.error("[start][IoT MQTT 协议 {} 启动失败]", getId(), e); log.error("[start][IoT MQTT 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭资源 stop0();
if (mqttServer != null) {
mqttServer.close();
mqttServer = null;
}
if (vertx != null) {
vertx.close();
vertx = null;
}
throw e; throw e;
} }
} }
@@ -178,15 +168,24 @@ public class IotMqttProtocol implements IotProtocol {
if (!running) { if (!running) {
return; return;
} }
stop0();
}
private void stop0() {
// 1. 停止下行消息订阅者 // 1. 停止下行消息订阅者
try { if (downstreamSubscriber != null) {
downstreamSubscriber.stop(); try {
log.info("[stop][IoT MQTT 协议 {} 下行消息订阅者已停止]", getId()); downstreamSubscriber.stop();
} catch (Exception e) { log.info("[stop][IoT MQTT 协议 {} 下行消息订阅者已停止]", getId());
log.error("[stop][IoT MQTT 协议 {} 下行消息订阅者停止失败]", getId(), e); } catch (Exception e) {
log.error("[stop][IoT MQTT 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
downstreamSubscriber = null;
} }
// 2.1 关闭 MQTT 服务器 // 2.1 关闭所有连接
connectionManager.closeAll();
// 2.2 关闭 MQTT 服务器
if (mqttServer != null) { if (mqttServer != null) {
try { try {
mqttServer.close().result(); mqttServer.close().result();
@@ -196,7 +195,7 @@ public class IotMqttProtocol implements IotProtocol {
} }
mqttServer = null; mqttServer = null;
} }
// 2.2 关闭 Vertx 实例 // 2.3 关闭 Vertx 实例
if (vertx != null) { if (vertx != null) {
try { try {
vertx.close().result(); vertx.close().result();

View File

@@ -8,6 +8,8 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -166,6 +168,24 @@ public class IotMqttConnectionManager {
return deviceEndpointMap.get(deviceId); return deviceEndpointMap.get(deviceId);
} }
/**
* 关闭所有连接
*/
public void closeAll() {
// 1. 先复制再清空,避免 closeHandler 回调时并发修改
List<MqttEndpoint> endpoints = new ArrayList<>(connectionMap.keySet());
connectionMap.clear();
deviceEndpointMap.clear();
// 2. 关闭所有连接closeHandler 中 unregisterConnection 发现 map 为空会安全跳过)
for (MqttEndpoint endpoint : endpoints) {
try {
endpoint.close();
} catch (Exception ignored) {
// 连接可能已关闭,忽略异常
}
}
}
/** /**
* 连接信息 * 连接信息
*/ */

View File

@@ -66,7 +66,7 @@ public class IotTcpProtocol implements IotProtocol {
/** /**
* 下行消息订阅者 * 下行消息订阅者
*/ */
private final IotTcpDownstreamSubscriber downstreamSubscriber; private IotTcpDownstreamSubscriber downstreamSubscriber;
/** /**
* 消息序列化器 * 消息序列化器
@@ -94,11 +94,6 @@ public class IotTcpProtocol implements IotProtocol {
// 初始化连接管理器 // 初始化连接管理器
this.connectionManager = new IotTcpConnectionManager(tcpConfig.getMaxConnections()); this.connectionManager = new IotTcpConnectionManager(tcpConfig.getMaxConnections());
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotTcpDownstreamHandler downstreamHandler = new IotTcpDownstreamHandler(connectionManager, frameCodec, serializer);
this.downstreamSubscriber = new IotTcpDownstreamSubscriber(this, downstreamHandler, messageBus);
} }
@Override @Override
@@ -152,18 +147,13 @@ public class IotTcpProtocol implements IotProtocol {
getId(), properties.getPort(), serverId); getId(), properties.getPort(), serverId);
// 2. 启动下行消息订阅者 // 2. 启动下行消息订阅者
IotTcpDownstreamHandler downstreamHandler = new IotTcpDownstreamHandler(connectionManager, frameCodec, serializer);
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotTcpDownstreamSubscriber(this, downstreamHandler, messageBus);
this.downstreamSubscriber.start(); this.downstreamSubscriber.start();
} catch (Exception e) { } catch (Exception e) {
log.error("[start][IoT TCP 协议 {} 启动失败]", getId(), e); log.error("[start][IoT TCP 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭资源 stop0();
if (tcpServer != null) {
tcpServer.close();
tcpServer = null;
}
if (vertx != null) {
vertx.close();
vertx = null;
}
throw e; throw e;
} }
} }
@@ -173,15 +163,24 @@ public class IotTcpProtocol implements IotProtocol {
if (!running) { if (!running) {
return; return;
} }
stop0();
}
private void stop0() {
// 1. 停止下行消息订阅者 // 1. 停止下行消息订阅者
try { if (downstreamSubscriber != null) {
downstreamSubscriber.stop(); try {
log.info("[stop][IoT TCP 协议 {} 下行消息订阅者已停止]", getId()); downstreamSubscriber.stop();
} catch (Exception e) { log.info("[stop][IoT TCP 协议 {} 下行消息订阅者已停止]", getId());
log.error("[stop][IoT TCP 协议 {} 下行消息订阅者停止失败]", getId(), e); } catch (Exception e) {
log.error("[stop][IoT TCP 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
downstreamSubscriber = null;
} }
// 2.1 关闭 TCP 服务器 // 2.1 关闭所有连接
connectionManager.closeAll();
// 2.2 关闭 TCP 服务器
if (tcpServer != null) { if (tcpServer != null) {
try { try {
tcpServer.close().result(); tcpServer.close().result();
@@ -191,7 +190,7 @@ public class IotTcpProtocol implements IotProtocol {
} }
tcpServer = null; tcpServer = null;
} }
// 2.2 关闭 Vertx 实例 // 2.3 关闭 Vertx 实例
if (vertx != null) { if (vertx != null) {
try { try {
vertx.close().result(); vertx.close().result();

View File

@@ -5,6 +5,8 @@ import io.vertx.core.net.NetSocket;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -122,6 +124,24 @@ public class IotTcpConnectionManager {
} }
} }
/**
* 关闭所有连接
*/
public void closeAll() {
// 1. 先复制再清空,避免 closeHandler 回调时并发修改
List<NetSocket> sockets = new ArrayList<>(connectionMap.keySet());
connectionMap.clear();
deviceSocketMap.clear();
// 2. 关闭所有连接closeHandler 中 unregisterConnection 发现 map 为空会安全跳过)
for (NetSocket socket : sockets) {
try {
socket.close();
} catch (Exception ignored) {
// 连接可能已关闭,忽略异常
}
}
}
/** /**
* 连接信息(包含认证信息) * 连接信息(包含认证信息)
*/ */

View File

@@ -63,7 +63,7 @@ public class IotUdpProtocol implements IotProtocol {
/** /**
* 下行消息订阅者 * 下行消息订阅者
*/ */
private final IotUdpDownstreamSubscriber downstreamSubscriber; private IotUdpDownstreamSubscriber downstreamSubscriber;
/** /**
* 消息序列化器 * 消息序列化器
@@ -85,10 +85,6 @@ public class IotUdpProtocol implements IotProtocol {
// 初始化会话管理器 // 初始化会话管理器
this.sessionManager = new IotUdpSessionManager(udpConfig.getMaxSessions(), udpConfig.getSessionTimeoutMs()); this.sessionManager = new IotUdpSessionManager(udpConfig.getMaxSessions(), udpConfig.getSessionTimeoutMs());
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotUdpDownstreamHandler downstreamHandler = new IotUdpDownstreamHandler(this, sessionManager, serializer);
this.downstreamSubscriber = new IotUdpDownstreamSubscriber(this, downstreamHandler, messageBus);
} }
@Override @Override
@@ -108,8 +104,11 @@ public class IotUdpProtocol implements IotProtocol {
return; return;
} }
// 1.1 创建 Vertx 实例 // 1.1 创建 Vertx 实例 和 下行消息订阅者
this.vertx = Vertx.vertx(); this.vertx = Vertx.vertx();
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotUdpDownstreamHandler downstreamHandler = new IotUdpDownstreamHandler(this, sessionManager, serializer);
this.downstreamSubscriber = new IotUdpDownstreamSubscriber(this, downstreamHandler, messageBus);
// 1.2 创建 UDP Socket 选项 // 1.2 创建 UDP Socket 选项
IotUdpConfig udpConfig = properties.getUdp(); IotUdpConfig udpConfig = properties.getUdp();
@@ -137,15 +136,7 @@ public class IotUdpProtocol implements IotProtocol {
this.downstreamSubscriber.start(); this.downstreamSubscriber.start();
} catch (Exception e) { } catch (Exception e) {
log.error("[start][IoT UDP 协议 {} 启动失败]", getId(), e); log.error("[start][IoT UDP 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭资源 stop0();
if (udpSocket != null) {
udpSocket.close();
udpSocket = null;
}
if (vertx != null) {
vertx.close();
vertx = null;
}
throw e; throw e;
} }
} }
@@ -155,12 +146,19 @@ public class IotUdpProtocol implements IotProtocol {
if (!running) { if (!running) {
return; return;
} }
stop0();
}
private void stop0() {
// 1. 停止下行消息订阅者 // 1. 停止下行消息订阅者
try { if (downstreamSubscriber != null) {
downstreamSubscriber.stop(); try {
log.info("[stop][IoT UDP 协议 {} 下行消息订阅者已停止]", getId()); downstreamSubscriber.stop();
} catch (Exception e) { log.info("[stop][IoT UDP 协议 {} 下行消息订阅者已停止]", getId());
log.error("[stop][IoT UDP 协议 {} 下行消息订阅者停止失败]", getId(), e); } catch (Exception e) {
log.error("[stop][IoT UDP 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
downstreamSubscriber = null;
} }
// 2.1 关闭 UDP Socket // 2.1 关闭 UDP Socket

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.ObjUtil;
import cn.hutool.extra.spring.SpringUtil; import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
@@ -9,8 +10,8 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.upstream.IotWebSocketUpstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.upstream.IotWebSocketUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
@@ -21,7 +22,6 @@ import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.PemKeyCertOptions; import io.vertx.core.net.PemKeyCertOptions;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import cn.hutool.core.lang.Assert;
/** /**
* IoT WebSocket 协议实现 * IoT WebSocket 协议实现
@@ -65,7 +65,7 @@ public class IotWebSocketProtocol implements IotProtocol {
/** /**
* 下行消息订阅者 * 下行消息订阅者
*/ */
private final IotWebSocketDownstreamSubscriber downstreamSubscriber; private IotWebSocketDownstreamSubscriber downstreamSubscriber;
/** /**
* 消息序列化器 * 消息序列化器
@@ -87,10 +87,6 @@ public class IotWebSocketProtocol implements IotProtocol {
// 初始化连接管理器 // 初始化连接管理器
this.connectionManager = new IotWebSocketConnectionManager(); this.connectionManager = new IotWebSocketConnectionManager();
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotWebSocketDownstreamHandler downstreamHandler = new IotWebSocketDownstreamHandler(serializer, connectionManager);
this.downstreamSubscriber = new IotWebSocketDownstreamSubscriber(this, downstreamHandler, messageBus);
} }
@Override @Override
@@ -152,17 +148,13 @@ public class IotWebSocketProtocol implements IotProtocol {
getId(), properties.getPort(), wsConfig.getPath(), serverId); getId(), properties.getPort(), wsConfig.getPath(), serverId);
// 2. 启动下行消息订阅者 // 2. 启动下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotWebSocketDownstreamHandler downstreamHandler = new IotWebSocketDownstreamHandler(serializer, connectionManager);
this.downstreamSubscriber = new IotWebSocketDownstreamSubscriber(this, downstreamHandler, messageBus);
downstreamSubscriber.start(); downstreamSubscriber.start();
} catch (Exception e) { } catch (Exception e) {
log.error("[start][IoT WebSocket 协议 {} 启动失败]", getId(), e); log.error("[start][IoT WebSocket 协议 {} 启动失败]", getId(), e);
if (httpServer != null) { stop0();
httpServer.close();
httpServer = null;
}
if (vertx != null) {
vertx.close();
vertx = null;
}
throw e; throw e;
} }
} }
@@ -172,15 +164,24 @@ public class IotWebSocketProtocol implements IotProtocol {
if (!running) { if (!running) {
return; return;
} }
stop0();
}
private void stop0() {
// 1. 停止下行消息订阅者 // 1. 停止下行消息订阅者
try { if (downstreamSubscriber != null) {
downstreamSubscriber.stop(); try {
log.info("[stop][IoT WebSocket 协议 {} 下行消息订阅者已停止]", getId()); downstreamSubscriber.stop();
} catch (Exception e) { log.info("[stop][IoT WebSocket 协议 {} 下行消息订阅者已停止]", getId());
log.error("[stop][IoT WebSocket 协议 {} 下行消息订阅者停止失败]", getId(), e); } catch (Exception e) {
log.error("[stop][IoT WebSocket 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
downstreamSubscriber = null;
} }
// 2.1 关闭 WebSocket 服务器 // 2.1 关闭所有连接
connectionManager.closeAll();
// 2.2 关闭 WebSocket 服务器
if (httpServer != null) { if (httpServer != null) {
try { try {
httpServer.close().result(); httpServer.close().result();
@@ -190,7 +191,7 @@ public class IotWebSocketProtocol implements IotProtocol {
} }
httpServer = null; httpServer = null;
} }
// 2.2 关闭 Vertx 实例 // 2.3 关闭 Vertx 实例
if (vertx != null) { if (vertx != null) {
try { try {
vertx.close().result(); vertx.close().result();

View File

@@ -5,6 +5,8 @@ import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -114,6 +116,24 @@ public class IotWebSocketConnectionManager {
} }
} }
/**
* 关闭所有连接
*/
public void closeAll() {
// 1. 先复制再清空,避免 closeHandler 回调时并发修改
List<ServerWebSocket> sockets = new ArrayList<>(connectionMap.keySet());
connectionMap.clear();
deviceSocketMap.clear();
// 2. 关闭所有连接closeHandler 中 unregisterConnection 发现 map 为空会安全跳过)
for (ServerWebSocket socket : sockets) {
try {
socket.close();
} catch (Exception ignored) {
// 连接可能已关闭,忽略异常
}
}
}
/** /**
* 连接信息(包含认证信息) * 连接信息(包含认证信息)
*/ */

View File

@@ -167,7 +167,7 @@ yudao:
# 针对引入的 Modbus TCP Master 组件的配置 # 针对引入的 Modbus TCP Master 组件的配置
# ==================================== # ====================================
- id: modbus-tcp-master-1 - id: modbus-tcp-master-1
enabled: true enabled: false
protocol: modbus_tcp_master protocol: modbus_tcp_master
port: 502 port: 502
modbus-tcp-master: modbus-tcp-master:
@@ -176,7 +176,7 @@ yudao:
# 针对引入的 Modbus TCP Slave 组件的配置 # 针对引入的 Modbus TCP Slave 组件的配置
# ==================================== # ====================================
- id: modbus-tcp-slave-1 - id: modbus-tcp-slave-1
enabled: true enabled: false
protocol: modbus_tcp_slave protocol: modbus_tcp_slave
port: 503 port: 503
modbus-tcp-slave: modbus-tcp-slave: