diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 2f865d983b..5c2fd860e9 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -1,48 +1,16 @@ package cn.iocoder.yudao.module.iot.gateway.config; -import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.*; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.client.IotModbusTcpClient; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConfigCacheService; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpPollScheduler; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpDownstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpUpstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.IotModbusTcpDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.IotModbusTcpUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.client.IotModbusTcpClient; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConfigCacheService; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpPollScheduler; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpDownstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpUpstreamHandler; import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.core.Vertx; -import lombok.extern.slf4j.Slf4j; -import org.redisson.api.RedissonClient; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.core.Vertx; -import lombok.extern.slf4j.Slf4j; -import org.redisson.api.RedissonClient; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +/** + * IoT 网关配置类 + * + * @author 芋道源码 + */ @Configuration @EnableConfigurationProperties(IotGatewayProperties.class) public class IotGatewayConfiguration { @@ -57,86 +25,4 @@ public class IotGatewayConfiguration { return new IotProtocolManager(gatewayProperties); } - /** - * IoT 网关 Modbus TCP 协议配置类 - */ - @Configuration - @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.modbus-tcp", name = "enabled", havingValue = "true") - @Slf4j - public static class ModbusTcpProtocolConfiguration { - - @Bean(name = "modbusTcpVertx", destroyMethod = "close") - public Vertx modbusTcpVertx() { - return Vertx.vertx(); - } - - @Bean - public IotModbusDataConverter iotModbusDataConverter() { - return new IotModbusDataConverter(); - } - - @Bean - public IotModbusTcpClient iotModbusTcpClient() { - return new IotModbusTcpClient(); - } - - @Bean - public IotModbusTcpConnectionManager iotModbusTcpConnectionManager( - RedissonClient redissonClient, - @Qualifier("modbusTcpVertx") Vertx modbusTcpVertx) { - return new IotModbusTcpConnectionManager(redissonClient, modbusTcpVertx); - } - - @Bean - public IotModbusTcpConfigCacheService iotModbusTcpConfigCacheService( - cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi deviceApi) { - return new IotModbusTcpConfigCacheService(deviceApi); - } - - @Bean - public IotModbusTcpUpstreamHandler iotModbusTcpUpstreamHandler( - IotDeviceMessageService messageService, - IotModbusDataConverter dataConverter) { - return new IotModbusTcpUpstreamHandler(messageService, dataConverter); - } - - @Bean - public IotModbusTcpPollScheduler iotModbusTcpPollScheduler( - @Qualifier("modbusTcpVertx") Vertx modbusTcpVertx, - IotModbusTcpConnectionManager connectionManager, - IotModbusTcpClient modbusClient, - IotModbusTcpUpstreamHandler upstreamHandler) { - return new IotModbusTcpPollScheduler(modbusTcpVertx, connectionManager, modbusClient, upstreamHandler); - } - - @Bean - public IotModbusTcpDownstreamHandler iotModbusTcpDownstreamHandler( - IotModbusTcpConnectionManager connectionManager, - IotModbusTcpClient modbusClient, - IotModbusDataConverter dataConverter, - IotModbusTcpConfigCacheService configCacheService) { - return new IotModbusTcpDownstreamHandler(connectionManager, modbusClient, dataConverter, configCacheService); - } - - @Bean - public IotModbusTcpUpstreamProtocol iotModbusTcpUpstreamProtocol(IotGatewayProperties gatewayProperties, - IotDeviceMessageService messageService, - IotModbusTcpConnectionManager connectionManager, - IotModbusTcpPollScheduler pollScheduler, - IotModbusTcpConfigCacheService configCacheService, - IotModbusTcpUpstreamHandler upstreamHandler, - @Qualifier("modbusTcpVertx") Vertx modbusTcpVertx) { - return new IotModbusTcpUpstreamProtocol(gatewayProperties.getProtocol().getModbusTcp(), - messageService, connectionManager, pollScheduler, configCacheService, upstreamHandler, modbusTcpVertx); - } - - @Bean - public IotModbusTcpDownstreamSubscriber iotModbusTcpDownstreamSubscriber(IotModbusTcpUpstreamProtocol upstreamProtocol, - IotModbusTcpDownstreamHandler downstreamHandler, - IotMessageBus messageBus) { - return new IotModbusTcpDownstreamSubscriber(upstreamProtocol, downstreamHandler, messageBus); - } - - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index ce0222c980..8636bbd061 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -4,6 +4,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.IotModbusTcpConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpConfig; @@ -166,6 +167,12 @@ public class IotGatewayProperties { @Valid private IotEmqxConfig emqx; + /** + * Modbus TCP 协议配置 + */ + @Valid + private IotModbusTcpConfig modbusTcp; + } /** @@ -216,31 +223,4 @@ public class IotGatewayProperties { } - @Data - public static class ProtocolProperties { - - /** - * Modbus TCP 组件配置 - */ - private ModbusTcpProperties modbusTcp; - - } - - @Data - public static class ModbusTcpProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - - /** - * 配置刷新间隔(秒) - */ - @NotNull(message = "配置刷新间隔不能为空") - private Integer configRefreshInterval = 30; - - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java index 3cd00c7573..1549d7d23d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java @@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.IotModbusTcpProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpProtocol; @@ -112,6 +113,8 @@ public class IotProtocolManager implements SmartLifecycle { return createMqttProtocol(config); case EMQX: return createEmqxProtocol(config); + case MODBUS_TCP: + return createModbusTcpProtocol(config); default: throw new IllegalArgumentException(String.format( "[createProtocol][协议实例 %s 的协议类型 %s 暂不支持]", config.getId(), protocolType)); @@ -188,4 +191,14 @@ public class IotProtocolManager implements SmartLifecycle { return new IotEmqxProtocol(config); } + /** + * 创建 Modbus TCP 协议实例 + * + * @param config 协议实例配置 + * @return Modbus TCP 协议实例 + */ + private IotModbusTcpProtocol createModbusTcpProtocol(IotGatewayProperties.ProtocolProperties config) { + return new IotModbusTcpProtocol(config); + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpConfig.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpConfig.java new file mode 100644 index 0000000000..5893869e27 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpConfig.java @@ -0,0 +1,22 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * IoT Modbus TCP 协议配置 + * + * @author 芋道源码 + */ +@Data +public class IotModbusTcpConfig { + + /** + * 配置刷新间隔(秒) + */ + @NotNull(message = "配置刷新间隔不能为空") + @Min(value = 1, message = "配置刷新间隔不能小于 1 秒") + private Integer configRefreshInterval = 30; + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpProtocol.java new file mode 100644 index 0000000000..163e1e5fb7 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpProtocol.java @@ -0,0 +1,215 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp; + +import cn.hutool.core.lang.Assert; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.client.IotModbusTcpClient; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.downstream.IotModbusTcpDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.downstream.IotModbusTcpDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.upstream.IotModbusTcpUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConfigCacheService; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpPollScheduler; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Vertx; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RedissonClient; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * IoT 网关 Modbus TCP 协议:主动轮询 Modbus 从站设备数据 + * + * @author 芋道源码 + */ +@Slf4j +public class IotModbusTcpProtocol implements IotProtocol { + + /** + * 协议配置 + */ + private final ProtocolProperties properties; + /** + * 服务器 ID(用于消息追踪,全局唯一) + */ + @Getter + private final String serverId; + + /** + * 运行状态 + */ + @Getter + private volatile boolean running = false; + + /** + * Vert.x 实例 + */ + private final Vertx vertx; + /** + * 配置刷新定时器 ID + */ + private Long configRefreshTimerId; + + /** + * 连接管理器 + */ + private final IotModbusTcpConnectionManager connectionManager; + /** + * 下行消息订阅者 + */ + private final IotModbusTcpDownstreamSubscriber downstreamSubscriber; + + private final IotModbusTcpConfigCacheService configCacheService; + private final IotModbusTcpPollScheduler pollScheduler; + + public IotModbusTcpProtocol(ProtocolProperties properties) { + IotModbusTcpConfig modbusTcpConfig = properties.getModbusTcp(); + Assert.notNull(modbusTcpConfig, "Modbus TCP 协议配置(modbusTcp)不能为空"); + this.properties = properties; + this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); + + // 初始化 Vertx + this.vertx = Vertx.vertx(); + + // 初始化 Manager + RedissonClient redissonClient = SpringUtil.getBean(RedissonClient.class); + IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.connectionManager = new IotModbusTcpConnectionManager(redissonClient, vertx); + this.configCacheService = new IotModbusTcpConfigCacheService(deviceApi); + + // 初始化 Handler + IotModbusDataConverter dataConverter = new IotModbusDataConverter(); + IotModbusTcpClient modbusClient = new IotModbusTcpClient(); + IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class); + IotModbusTcpUpstreamHandler upstreamHandler = new IotModbusTcpUpstreamHandler(messageService, dataConverter, serverId); + IotModbusTcpDownstreamHandler downstreamHandler = new IotModbusTcpDownstreamHandler(connectionManager, + modbusClient, dataConverter, configCacheService); + + // 初始化轮询调度器 + this.pollScheduler = new IotModbusTcpPollScheduler(vertx, connectionManager, modbusClient, upstreamHandler); + + // 初始化下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + this.downstreamSubscriber = new IotModbusTcpDownstreamSubscriber(this, downstreamHandler, messageBus); + } + + @Override + public String getId() { + return properties.getId(); + } + + @Override + public IotProtocolTypeEnum getType() { + return IotProtocolTypeEnum.MODBUS_TCP; + } + + @Override + public void start() { + if (running) { + log.warn("[start][IoT Modbus TCP 协议 {} 已经在运行中]", getId()); + return; + } + + try { + // 1.1 首次加载配置 + refreshConfig(); + // 1.2 启动配置刷新定时器 + int refreshInterval = properties.getModbusTcp().getConfigRefreshInterval(); + configRefreshTimerId = vertx.setPeriodic( + TimeUnit.SECONDS.toMillis(refreshInterval), + id -> refreshConfig() + ); + running = true; + log.info("[start][IoT Modbus TCP 协议 {} 启动成功,serverId={}]", getId(), serverId); + + // 2. 启动下行消息订阅者 + this.downstreamSubscriber.start(); + } catch (Exception e) { + log.error("[start][IoT Modbus TCP 协议 {} 启动失败]", getId(), e); + // 启动失败时关闭资源 + if (vertx != null) { + vertx.close(); + } + throw e; + } + } + + @Override + public void stop() { + if (!running) { + return; + } + // 1. 停止下行消息订阅者 + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT Modbus TCP 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT Modbus TCP 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + + // 2.1 取消配置刷新定时器 + if (configRefreshTimerId != null) { + vertx.cancelTimer(configRefreshTimerId); + configRefreshTimerId = null; + } + // 2.2 停止轮询调度器 + pollScheduler.stopAll(); + log.info("[stop][IoT Modbus TCP 协议 {} 轮询调度器已停止]", getId()); + // 2.3 关闭所有连接 + connectionManager.closeAll(); + log.info("[stop][IoT Modbus TCP 协议 {} 连接管理器已关闭]", getId()); + + // 3. 关闭 Vert.x 实例 + if (vertx != null) { + try { + vertx.close().result(); + log.info("[stop][IoT Modbus TCP 协议 {} Vertx 已关闭]", getId()); + } catch (Exception e) { + log.error("[stop][IoT Modbus TCP 协议 {} Vertx 关闭失败]", getId(), e); + } + } + running = false; + log.info("[stop][IoT Modbus TCP 协议 {} 已停止]", getId()); + } + + /** + * 刷新配置 + */ + private synchronized void refreshConfig() { + try { + // 1. 从 biz 拉取最新配置 + List configs = configCacheService.refreshConfig(); + log.debug("[refreshConfig][获取到 {} 个 Modbus 设备配置]", configs.size()); + + // 2. 更新连接和轮询任务 + for (IotModbusDeviceConfigRespDTO config : configs) { + try { + // 2.1 确保连接存在 + connectionManager.ensureConnection(config); + // 2.2 更新轮询任务 + pollScheduler.updatePolling(config); + } catch (Exception e) { + log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e); + } + } + + // 3. 清理已删除设备的资源 + configCacheService.cleanupRemovedDevices(configs, deviceId -> { + pollScheduler.stopPolling(deviceId); + connectionManager.removeDevice(deviceId); + }); + } catch (Exception e) { + log.error("[refreshConfig][刷新配置失败]", e); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpUpstreamProtocol.java deleted file mode 100644 index 1d47d30de2..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpUpstreamProtocol.java +++ /dev/null @@ -1,115 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp; - -import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; -import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConfigCacheService; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpPollScheduler; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpUpstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.core.Vertx; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -/** - * IoT Modbus TCP 上行协议:定时拉取配置、管理连接、调度轮询任务 - * - * @author 芋道源码 - */ -@RequiredArgsConstructor -@Slf4j -public class IotModbusTcpUpstreamProtocol { - - private final IotGatewayProperties.ModbusTcpProperties modbusTcpProperties; - private final IotDeviceMessageService messageService; - private final IotModbusTcpConnectionManager connectionManager; - private final IotModbusTcpPollScheduler pollScheduler; - private final IotModbusTcpConfigCacheService configCacheService; - private final IotModbusTcpUpstreamHandler upstreamHandler; - private final Vertx vertx; - - /** - * 服务器 ID,用于标识当前网关实例 - */ - @Getter - private final String serverId = UUID.randomUUID().toString(); - - /** - * 配置刷新定时器 ID - */ - private Long configRefreshTimerId; - - @PostConstruct - public void start() { - log.info("[start][Modbus TCP 协议启动, serverId={}]", serverId); - - // 0. 设置 serverId 到上行处理器 - upstreamHandler.setServerId(serverId); - - // 1. 首次加载配置 - refreshConfig(); - - // 2. 启动配置定时刷新 - int refreshInterval = modbusTcpProperties.getConfigRefreshInterval(); - configRefreshTimerId = vertx.setPeriodic( - TimeUnit.SECONDS.toMillis(refreshInterval), - id -> refreshConfig() - ); - log.info("[start][配置刷新定时器已启动, 间隔={}秒]", refreshInterval); - } - - @PreDestroy - public void stop() { - log.info("[stop][Modbus TCP 协议停止]"); - - // 1. 取消配置刷新定时器 - if (configRefreshTimerId != null) { - vertx.cancelTimer(configRefreshTimerId); - } - - // 2. 停止轮询调度器 - pollScheduler.stopAll(); - - // 3. 关闭所有连接 - connectionManager.closeAll(); - } - - /** - * 刷新配置 - */ - private void refreshConfig() { - try { - // 1. 从 biz 拉取最新配置 - List configs = configCacheService.refreshConfig(); - log.debug("[refreshConfig][获取到 {} 个 Modbus 设备配置]", configs.size()); - - // 2. 更新连接和轮询任务 - for (IotModbusDeviceConfigRespDTO config : configs) { - try { - // 2.1 确保连接存在 - connectionManager.ensureConnection(config); - // 2.2 更新轮询任务 - pollScheduler.updatePolling(config); - } catch (Exception e) { - log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e); - } - } - - // 3. 清理已删除设备的资源 - configCacheService.cleanupRemovedDevices(configs, deviceId -> { - pollScheduler.stopPolling(deviceId); - connectionManager.removeDevice(deviceId); - }); - } catch (Exception e) { - log.error("[refreshConfig][刷新配置失败]", e); - } - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/router/IotModbusTcpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/handler/downstream/IotModbusTcpDownstreamHandler.java similarity index 94% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/router/IotModbusTcpDownstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/handler/downstream/IotModbusTcpDownstreamHandler.java index 2355a3d8ec..49c3931fed 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/router/IotModbusTcpDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/handler/downstream/IotModbusTcpDownstreamHandler.java @@ -1,12 +1,13 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.downstream; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter; import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.client.IotModbusTcpClient; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter; import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConfigCacheService; import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConnectionManager; import lombok.RequiredArgsConstructor; @@ -16,7 +17,7 @@ import java.util.Map; /** * IoT Modbus TCP 下行消息处理器 - * + *

* 负责: * 1. 处理下行消息(如属性设置 thing.service.property.set) * 2. 执行 Modbus 写入操作 @@ -38,8 +39,7 @@ public class IotModbusTcpDownstreamHandler { @SuppressWarnings("unchecked") public void handle(IotDeviceMessage message) { // 1.1 检查是否是属性设置消息 - // TODO @AI:要使用枚举 - if (!"thing.service.property.set".equals(message.getMethod())) { + if (!IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod().equals(message.getMethod())) { log.debug("[handle][忽略非属性设置消息: {}]", message.getMethod()); return; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/handler/downstream/IotModbusTcpDownstreamSubscriber.java similarity index 69% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpDownstreamSubscriber.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/handler/downstream/IotModbusTcpDownstreamSubscriber.java index a05101329c..bea82919ef 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/handler/downstream/IotModbusTcpDownstreamSubscriber.java @@ -1,11 +1,10 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp; +package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpDownstreamHandler; -import jakarta.annotation.PostConstruct; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.IotModbusTcpProtocol; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -18,19 +17,29 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class IotModbusTcpDownstreamSubscriber implements IotMessageSubscriber { - private final IotModbusTcpUpstreamProtocol upstreamProtocol; + private final IotModbusTcpProtocol protocol; private final IotModbusTcpDownstreamHandler downstreamHandler; private final IotMessageBus messageBus; - @PostConstruct - public void subscribe() { + /** + * 启动订阅 + */ + public void start() { messageBus.register(this); - log.info("[subscribe][Modbus TCP 下行消息订阅器已启动, topic={}]", getTopic()); + log.info("[start][Modbus TCP 下行消息订阅器已启动, topic={}]", getTopic()); + } + + /** + * 停止订阅 + */ + public void stop() { + messageBus.unregister(this); + log.info("[stop][Modbus TCP 下行消息订阅器已停止]"); } @Override public String getTopic() { - return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(upstreamProtocol.getServerId()); + return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId()); } @Override diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/router/IotModbusTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/handler/upstream/IotModbusTcpUpstreamHandler.java similarity index 85% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/router/IotModbusTcpUpstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/handler/upstream/IotModbusTcpUpstreamHandler.java index 5da669e46f..adffc3dcc2 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/router/IotModbusTcpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/handler/upstream/IotModbusTcpUpstreamHandler.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.upstream; import cn.hutool.core.map.MapUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; @@ -7,7 +7,6 @@ import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -22,14 +21,14 @@ public class IotModbusTcpUpstreamHandler { private final IotDeviceMessageService messageService; private final IotModbusDataConverter dataConverter; - - @Setter - private String serverId; + private final String serverId; public IotModbusTcpUpstreamHandler(IotDeviceMessageService messageService, - IotModbusDataConverter dataConverter) { + IotModbusDataConverter dataConverter, + String serverId) { this.messageService = messageService; this.dataConverter = dataConverter; + this.serverId = serverId; } /** @@ -40,8 +39,8 @@ public class IotModbusTcpUpstreamHandler { * @param rawValue 原始值(int 数组) */ public void handleReadResult(IotModbusDeviceConfigRespDTO config, - IotModbusPointRespDTO point, - int[] rawValue) { + IotModbusPointRespDTO point, + int[] rawValue) { try { // 1.1 转换原始值为物模型属性值 Object convertedValue = dataConverter.convertToPropertyValue(rawValue, point); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/manager/IotModbusTcpPollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/manager/IotModbusTcpPollScheduler.java index 3caf030ddf..f12ff1de2a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/manager/IotModbusTcpPollScheduler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/manager/IotModbusTcpPollScheduler.java @@ -3,8 +3,8 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpUpstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.client.IotModbusTcpClient; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.upstream.IotModbusTcpUpstreamHandler; import io.vertx.core.Vertx; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index b4f4226b1a..77323e03b1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -51,8 +51,6 @@ yudao: protocol: http port: 8092 enabled: false - http: - ssl-enabled: false # ==================================== # 针对引入的 TCP 组件的配置 # ==================================== @@ -64,7 +62,6 @@ yudao: tcp: max-connections: 1000 keep-alive-timeout-ms: 30000 - ssl-enabled: false codec: type: delimiter # 拆包类型:length_field / delimiter / fixed_length delimiter: "\\n" # 分隔符(支持转义:\\n=换行, \\r=回车, \\t=制表符) @@ -101,7 +98,6 @@ yudao: max-message-size: 65536 # 最大消息大小(字节,默认 64KB) max-frame-size: 65536 # 最大帧大小(字节,默认 64KB) idle-timeout-seconds: 60 # 空闲超时时间(秒,默认 60) - ssl-enabled: false # 是否启用 SSL(wss://) # ==================================== # 针对引入的 CoAP 组件的配置 # ==================================== @@ -117,14 +113,13 @@ yudao: # 针对引入的 MQTT 组件的配置 # ==================================== - id: mqtt-json - enabled: true + enabled: false protocol: mqtt port: 1883 serialize: json mqtt: max-message-size: 8192 # 最大消息大小(字节) connect-timeout-seconds: 60 # 连接超时时间(秒) - ssl-enabled: false # 是否启用 SSL # ==================================== # 针对引入的 EMQX 组件的配置 # ==================================== @@ -168,43 +163,15 @@ yudao: key-store-password: "your-keystore-password" # 客户端证书库密码 trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径 trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码 - http-port: 8090 # MQTT HTTP 服务端口 - mqtt-host: 127.0.0.1 # MQTT Broker 地址 - mqtt-port: 1883 # MQTT Broker 端口 - mqtt-username: admin # MQTT 用户名 - mqtt-password: public # MQTT 密码 - mqtt-client-id: iot-gateway-mqtt # MQTT 客户端 ID - mqtt-ssl: false # 是否开启 SSL - mqtt-topics: - - "/sys/#" # 系统主题 - clean-session: true # 是否启用 Clean Session (默认: true) - keep-alive-interval-seconds: 60 # 心跳间隔,单位秒 (默认: 60) - max-inflight-queue: 10000 # 最大飞行消息队列,单位:条 - connect-timeout-seconds: 10 # 连接超时,单位:秒 - # 是否信任所有 SSL 证书 (默认: false)。警告:生产环境必须为 false! - # 仅在开发环境或内网测试时,如果使用了自签名证书,可以临时设置为 true - trust-all: true # 在 dev 环境可以设为 true - # 遗嘱消息配置 (用于网关异常下线时通知其他系统) - will: - enabled: true # 生产环境强烈建议开启 - topic: "gateway/status/${yudao.iot.gateway.emqx.mqtt-client-id}" # 遗嘱消息主题 - payload: "offline" # 遗嘱消息负载 - qos: 1 # 遗嘱消息 QoS - retain: true # 遗嘱消息是否保留 - # 高级 SSL/TLS 配置 (当 trust-all: false 且 mqtt-ssl: true 时生效) - ssl-options: - key-store-path: "classpath:certs/client.jks" # 客户端证书库路径 - key-store-password: "your-keystore-password" # 客户端证书库密码 - trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径 - trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码 # ==================================== - # 针对引入的 MQTT 组件的配置 + # 针对引入的 Modbus TCP 组件的配置 # ==================================== - # 协议配置(旧版,保持兼容) - protocol: - modbus-tcp: + - id: modbus-tcp-1 enabled: true - config-refresh-interval: 30 # 配置刷新间隔(秒) + protocol: modbus_tcp + port: 502 + modbus-tcp: + config-refresh-interval: 30 # 配置刷新间隔(秒) --- #################### 日志相关配置 #################### @@ -226,6 +193,7 @@ logging: cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.coap: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.websocket: DEBUG + cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp: DEBUG # 根日志级别 root: INFO