feat(iot):【协议改造】modbus-tcp:改造新的协议方式

This commit is contained in:
YunaiV
2026-02-06 00:18:42 +08:00
parent e275cb57e9
commit 4319220750
11 changed files with 300 additions and 323 deletions

View File

@@ -1,48 +1,16 @@
package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.*;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.client.IotModbusTcpClient;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpPollScheduler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.IotModbusTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.IotModbusTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.client.IotModbusTcpClient;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpPollScheduler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* IoT 网关配置类
*
* @author 芋道源码
*/
@Configuration
@EnableConfigurationProperties(IotGatewayProperties.class)
public class IotGatewayConfiguration {
@@ -57,86 +25,4 @@ public class IotGatewayConfiguration {
return new IotProtocolManager(gatewayProperties);
}
/**
* IoT 网关 Modbus TCP 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.modbus-tcp", name = "enabled", havingValue = "true")
@Slf4j
public static class ModbusTcpProtocolConfiguration {
@Bean(name = "modbusTcpVertx", destroyMethod = "close")
public Vertx modbusTcpVertx() {
return Vertx.vertx();
}
@Bean
public IotModbusDataConverter iotModbusDataConverter() {
return new IotModbusDataConverter();
}
@Bean
public IotModbusTcpClient iotModbusTcpClient() {
return new IotModbusTcpClient();
}
@Bean
public IotModbusTcpConnectionManager iotModbusTcpConnectionManager(
RedissonClient redissonClient,
@Qualifier("modbusTcpVertx") Vertx modbusTcpVertx) {
return new IotModbusTcpConnectionManager(redissonClient, modbusTcpVertx);
}
@Bean
public IotModbusTcpConfigCacheService iotModbusTcpConfigCacheService(
cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi deviceApi) {
return new IotModbusTcpConfigCacheService(deviceApi);
}
@Bean
public IotModbusTcpUpstreamHandler iotModbusTcpUpstreamHandler(
IotDeviceMessageService messageService,
IotModbusDataConverter dataConverter) {
return new IotModbusTcpUpstreamHandler(messageService, dataConverter);
}
@Bean
public IotModbusTcpPollScheduler iotModbusTcpPollScheduler(
@Qualifier("modbusTcpVertx") Vertx modbusTcpVertx,
IotModbusTcpConnectionManager connectionManager,
IotModbusTcpClient modbusClient,
IotModbusTcpUpstreamHandler upstreamHandler) {
return new IotModbusTcpPollScheduler(modbusTcpVertx, connectionManager, modbusClient, upstreamHandler);
}
@Bean
public IotModbusTcpDownstreamHandler iotModbusTcpDownstreamHandler(
IotModbusTcpConnectionManager connectionManager,
IotModbusTcpClient modbusClient,
IotModbusDataConverter dataConverter,
IotModbusTcpConfigCacheService configCacheService) {
return new IotModbusTcpDownstreamHandler(connectionManager, modbusClient, dataConverter, configCacheService);
}
@Bean
public IotModbusTcpUpstreamProtocol iotModbusTcpUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceMessageService messageService,
IotModbusTcpConnectionManager connectionManager,
IotModbusTcpPollScheduler pollScheduler,
IotModbusTcpConfigCacheService configCacheService,
IotModbusTcpUpstreamHandler upstreamHandler,
@Qualifier("modbusTcpVertx") Vertx modbusTcpVertx) {
return new IotModbusTcpUpstreamProtocol(gatewayProperties.getProtocol().getModbusTcp(),
messageService, connectionManager, pollScheduler, configCacheService, upstreamHandler, modbusTcpVertx);
}
@Bean
public IotModbusTcpDownstreamSubscriber iotModbusTcpDownstreamSubscriber(IotModbusTcpUpstreamProtocol upstreamProtocol,
IotModbusTcpDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
return new IotModbusTcpDownstreamSubscriber(upstreamProtocol, downstreamHandler, messageBus);
}
}
}

View File

@@ -4,6 +4,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.IotModbusTcpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpConfig;
@@ -166,6 +167,12 @@ public class IotGatewayProperties {
@Valid
private IotEmqxConfig emqx;
/**
* Modbus TCP 协议配置
*/
@Valid
private IotModbusTcpConfig modbusTcp;
}
/**
@@ -216,31 +223,4 @@ public class IotGatewayProperties {
}
@Data
public static class ProtocolProperties {
/**
* Modbus TCP 组件配置
*/
private ModbusTcpProperties modbusTcp;
}
@Data
public static class ModbusTcpProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* 配置刷新间隔(秒)
*/
@NotNull(message = "配置刷新间隔不能为空")
private Integer configRefreshInterval = 30;
}
}

View File

@@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.IotModbusTcpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpProtocol;
@@ -112,6 +113,8 @@ public class IotProtocolManager implements SmartLifecycle {
return createMqttProtocol(config);
case EMQX:
return createEmqxProtocol(config);
case MODBUS_TCP:
return createModbusTcpProtocol(config);
default:
throw new IllegalArgumentException(String.format(
"[createProtocol][协议实例 %s 的协议类型 %s 暂不支持]", config.getId(), protocolType));
@@ -188,4 +191,14 @@ public class IotProtocolManager implements SmartLifecycle {
return new IotEmqxProtocol(config);
}
/**
* 创建 Modbus TCP 协议实例
*
* @param config 协议实例配置
* @return Modbus TCP 协议实例
*/
private IotModbusTcpProtocol createModbusTcpProtocol(IotGatewayProperties.ProtocolProperties config) {
return new IotModbusTcpProtocol(config);
}
}

View File

@@ -0,0 +1,22 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* IoT Modbus TCP 协议配置
*
* @author 芋道源码
*/
@Data
public class IotModbusTcpConfig {
/**
* 配置刷新间隔(秒)
*/
@NotNull(message = "配置刷新间隔不能为空")
@Min(value = 1, message = "配置刷新间隔不能小于 1 秒")
private Integer configRefreshInterval = 30;
}

View File

@@ -0,0 +1,215 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp;
import cn.hutool.core.lang.Assert;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.client.IotModbusTcpClient;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.downstream.IotModbusTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.downstream.IotModbusTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.upstream.IotModbusTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpPollScheduler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* IoT 网关 Modbus TCP 协议:主动轮询 Modbus 从站设备数据
*
* @author 芋道源码
*/
@Slf4j
public class IotModbusTcpProtocol implements IotProtocol {
/**
* 协议配置
*/
private final ProtocolProperties properties;
/**
* 服务器 ID用于消息追踪全局唯一
*/
@Getter
private final String serverId;
/**
* 运行状态
*/
@Getter
private volatile boolean running = false;
/**
* Vert.x 实例
*/
private final Vertx vertx;
/**
* 配置刷新定时器 ID
*/
private Long configRefreshTimerId;
/**
* 连接管理器
*/
private final IotModbusTcpConnectionManager connectionManager;
/**
* 下行消息订阅者
*/
private final IotModbusTcpDownstreamSubscriber downstreamSubscriber;
private final IotModbusTcpConfigCacheService configCacheService;
private final IotModbusTcpPollScheduler pollScheduler;
public IotModbusTcpProtocol(ProtocolProperties properties) {
IotModbusTcpConfig modbusTcpConfig = properties.getModbusTcp();
Assert.notNull(modbusTcpConfig, "Modbus TCP 协议配置modbusTcp不能为空");
this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
// 初始化 Vertx
this.vertx = Vertx.vertx();
// 初始化 Manager
RedissonClient redissonClient = SpringUtil.getBean(RedissonClient.class);
IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.connectionManager = new IotModbusTcpConnectionManager(redissonClient, vertx);
this.configCacheService = new IotModbusTcpConfigCacheService(deviceApi);
// 初始化 Handler
IotModbusDataConverter dataConverter = new IotModbusDataConverter();
IotModbusTcpClient modbusClient = new IotModbusTcpClient();
IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotModbusTcpUpstreamHandler upstreamHandler = new IotModbusTcpUpstreamHandler(messageService, dataConverter, serverId);
IotModbusTcpDownstreamHandler downstreamHandler = new IotModbusTcpDownstreamHandler(connectionManager,
modbusClient, dataConverter, configCacheService);
// 初始化轮询调度器
this.pollScheduler = new IotModbusTcpPollScheduler(vertx, connectionManager, modbusClient, upstreamHandler);
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotModbusTcpDownstreamSubscriber(this, downstreamHandler, messageBus);
}
@Override
public String getId() {
return properties.getId();
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.MODBUS_TCP;
}
@Override
public void start() {
if (running) {
log.warn("[start][IoT Modbus TCP 协议 {} 已经在运行中]", getId());
return;
}
try {
// 1.1 首次加载配置
refreshConfig();
// 1.2 启动配置刷新定时器
int refreshInterval = properties.getModbusTcp().getConfigRefreshInterval();
configRefreshTimerId = vertx.setPeriodic(
TimeUnit.SECONDS.toMillis(refreshInterval),
id -> refreshConfig()
);
running = true;
log.info("[start][IoT Modbus TCP 协议 {} 启动成功serverId={}]", getId(), serverId);
// 2. 启动下行消息订阅者
this.downstreamSubscriber.start();
} catch (Exception e) {
log.error("[start][IoT Modbus TCP 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭资源
if (vertx != null) {
vertx.close();
}
throw e;
}
}
@Override
public void stop() {
if (!running) {
return;
}
// 1. 停止下行消息订阅者
try {
downstreamSubscriber.stop();
log.info("[stop][IoT Modbus TCP 协议 {} 下行消息订阅者已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT Modbus TCP 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
// 2.1 取消配置刷新定时器
if (configRefreshTimerId != null) {
vertx.cancelTimer(configRefreshTimerId);
configRefreshTimerId = null;
}
// 2.2 停止轮询调度器
pollScheduler.stopAll();
log.info("[stop][IoT Modbus TCP 协议 {} 轮询调度器已停止]", getId());
// 2.3 关闭所有连接
connectionManager.closeAll();
log.info("[stop][IoT Modbus TCP 协议 {} 连接管理器已关闭]", getId());
// 3. 关闭 Vert.x 实例
if (vertx != null) {
try {
vertx.close().result();
log.info("[stop][IoT Modbus TCP 协议 {} Vertx 已关闭]", getId());
} catch (Exception e) {
log.error("[stop][IoT Modbus TCP 协议 {} Vertx 关闭失败]", getId(), e);
}
}
running = false;
log.info("[stop][IoT Modbus TCP 协议 {} 已停止]", getId());
}
/**
* 刷新配置
*/
private synchronized void refreshConfig() {
try {
// 1. 从 biz 拉取最新配置
List<IotModbusDeviceConfigRespDTO> configs = configCacheService.refreshConfig();
log.debug("[refreshConfig][获取到 {} 个 Modbus 设备配置]", configs.size());
// 2. 更新连接和轮询任务
for (IotModbusDeviceConfigRespDTO config : configs) {
try {
// 2.1 确保连接存在
connectionManager.ensureConnection(config);
// 2.2 更新轮询任务
pollScheduler.updatePolling(config);
} catch (Exception e) {
log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e);
}
}
// 3. 清理已删除设备的资源
configCacheService.cleanupRemovedDevices(configs, deviceId -> {
pollScheduler.stopPolling(deviceId);
connectionManager.removeDevice(deviceId);
});
} catch (Exception e) {
log.error("[refreshConfig][刷新配置失败]", e);
}
}
}

View File

@@ -1,115 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpPollScheduler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* IoT Modbus TCP 上行协议:定时拉取配置、管理连接、调度轮询任务
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusTcpUpstreamProtocol {
private final IotGatewayProperties.ModbusTcpProperties modbusTcpProperties;
private final IotDeviceMessageService messageService;
private final IotModbusTcpConnectionManager connectionManager;
private final IotModbusTcpPollScheduler pollScheduler;
private final IotModbusTcpConfigCacheService configCacheService;
private final IotModbusTcpUpstreamHandler upstreamHandler;
private final Vertx vertx;
/**
* 服务器 ID用于标识当前网关实例
*/
@Getter
private final String serverId = UUID.randomUUID().toString();
/**
* 配置刷新定时器 ID
*/
private Long configRefreshTimerId;
@PostConstruct
public void start() {
log.info("[start][Modbus TCP 协议启动, serverId={}]", serverId);
// 0. 设置 serverId 到上行处理器
upstreamHandler.setServerId(serverId);
// 1. 首次加载配置
refreshConfig();
// 2. 启动配置定时刷新
int refreshInterval = modbusTcpProperties.getConfigRefreshInterval();
configRefreshTimerId = vertx.setPeriodic(
TimeUnit.SECONDS.toMillis(refreshInterval),
id -> refreshConfig()
);
log.info("[start][配置刷新定时器已启动, 间隔={}秒]", refreshInterval);
}
@PreDestroy
public void stop() {
log.info("[stop][Modbus TCP 协议停止]");
// 1. 取消配置刷新定时器
if (configRefreshTimerId != null) {
vertx.cancelTimer(configRefreshTimerId);
}
// 2. 停止轮询调度器
pollScheduler.stopAll();
// 3. 关闭所有连接
connectionManager.closeAll();
}
/**
* 刷新配置
*/
private void refreshConfig() {
try {
// 1. 从 biz 拉取最新配置
List<IotModbusDeviceConfigRespDTO> configs = configCacheService.refreshConfig();
log.debug("[refreshConfig][获取到 {} 个 Modbus 设备配置]", configs.size());
// 2. 更新连接和轮询任务
for (IotModbusDeviceConfigRespDTO config : configs) {
try {
// 2.1 确保连接存在
connectionManager.ensureConnection(config);
// 2.2 更新轮询任务
pollScheduler.updatePolling(config);
} catch (Exception e) {
log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e);
}
}
// 3. 清理已删除设备的资源
configCacheService.cleanupRemovedDevices(configs, deviceId -> {
pollScheduler.stopPolling(deviceId);
connectionManager.removeDevice(deviceId);
});
} catch (Exception e) {
log.error("[refreshConfig][刷新配置失败]", e);
}
}
}

View File

@@ -1,12 +1,13 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.downstream;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.client.IotModbusTcpClient;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager.IotModbusTcpConnectionManager;
import lombok.RequiredArgsConstructor;
@@ -16,7 +17,7 @@ import java.util.Map;
/**
* IoT Modbus TCP 下行消息处理器
*
* <p>
* 负责
* 1. 处理下行消息如属性设置 thing.service.property.set
* 2. 执行 Modbus 写入操作
@@ -38,8 +39,7 @@ public class IotModbusTcpDownstreamHandler {
@SuppressWarnings("unchecked")
public void handle(IotDeviceMessage message) {
// 1.1 检查是否是属性设置消息
// TODO @AI要使用枚举
if (!"thing.service.property.set".equals(message.getMethod())) {
if (!IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod().equals(message.getMethod())) {
log.debug("[handle][忽略非属性设置消息: {}]", message.getMethod());
return;
}

View File

@@ -1,11 +1,10 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp;
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpDownstreamHandler;
import jakarta.annotation.PostConstruct;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.IotModbusTcpProtocol;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -18,19 +17,29 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class IotModbusTcpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotModbusTcpUpstreamProtocol upstreamProtocol;
private final IotModbusTcpProtocol protocol;
private final IotModbusTcpDownstreamHandler downstreamHandler;
private final IotMessageBus messageBus;
@PostConstruct
public void subscribe() {
/**
* 启动订阅
*/
public void start() {
messageBus.register(this);
log.info("[subscribe][Modbus TCP 下行消息订阅器已启动, topic={}]", getTopic());
log.info("[start][Modbus TCP 下行消息订阅器已启动, topic={}]", getTopic());
}
/**
* 停止订阅
*/
public void stop() {
messageBus.unregister(this);
log.info("[stop][Modbus TCP 下行消息订阅器已停止]");
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(upstreamProtocol.getServerId());
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.upstream;
import cn.hutool.core.map.MapUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
@@ -7,7 +7,6 @@ import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.codec.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@@ -22,14 +21,14 @@ public class IotModbusTcpUpstreamHandler {
private final IotDeviceMessageService messageService;
private final IotModbusDataConverter dataConverter;
@Setter
private String serverId;
private final String serverId;
public IotModbusTcpUpstreamHandler(IotDeviceMessageService messageService,
IotModbusDataConverter dataConverter) {
IotModbusDataConverter dataConverter,
String serverId) {
this.messageService = messageService;
this.dataConverter = dataConverter;
this.serverId = serverId;
}
/**

View File

@@ -3,8 +3,8 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.manager;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.router.IotModbusTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.client.IotModbusTcpClient;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp.handler.upstream.IotModbusTcpUpstreamHandler;
import io.vertx.core.Vertx;
import lombok.AllArgsConstructor;
import lombok.Data;

View File

@@ -51,8 +51,6 @@ yudao:
protocol: http
port: 8092
enabled: false
http:
ssl-enabled: false
# ====================================
# 针对引入的 TCP 组件的配置
# ====================================
@@ -64,7 +62,6 @@ yudao:
tcp:
max-connections: 1000
keep-alive-timeout-ms: 30000
ssl-enabled: false
codec:
type: delimiter # 拆包类型length_field / delimiter / fixed_length
delimiter: "\\n" # 分隔符(支持转义:\\n=换行, \\r=回车, \\t=制表符)
@@ -101,7 +98,6 @@ yudao:
max-message-size: 65536 # 最大消息大小(字节,默认 64KB
max-frame-size: 65536 # 最大帧大小(字节,默认 64KB
idle-timeout-seconds: 60 # 空闲超时时间(秒,默认 60
ssl-enabled: false # 是否启用 SSLwss://
# ====================================
# 针对引入的 CoAP 组件的配置
# ====================================
@@ -117,14 +113,13 @@ yudao:
# 针对引入的 MQTT 组件的配置
# ====================================
- id: mqtt-json
enabled: true
enabled: false
protocol: mqtt
port: 1883
serialize: json
mqtt:
max-message-size: 8192 # 最大消息大小(字节)
connect-timeout-seconds: 60 # 连接超时时间(秒)
ssl-enabled: false # 是否启用 SSL
# ====================================
# 针对引入的 EMQX 组件的配置
# ====================================
@@ -168,42 +163,14 @@ yudao:
key-store-password: "your-keystore-password" # 客户端证书库密码
trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径
trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码
http-port: 8090 # MQTT HTTP 服务端口
mqtt-host: 127.0.0.1 # MQTT Broker 地址
mqtt-port: 1883 # MQTT Broker 端口
mqtt-username: admin # MQTT 用户名
mqtt-password: public # MQTT 密码
mqtt-client-id: iot-gateway-mqtt # MQTT 客户端 ID
mqtt-ssl: false # 是否开启 SSL
mqtt-topics:
- "/sys/#" # 系统主题
clean-session: true # 是否启用 Clean Session (默认: true)
keep-alive-interval-seconds: 60 # 心跳间隔,单位秒 (默认: 60)
max-inflight-queue: 10000 # 最大飞行消息队列,单位:条
connect-timeout-seconds: 10 # 连接超时,单位:秒
# 是否信任所有 SSL 证书 (默认: false)。警告:生产环境必须为 false
# 仅在开发环境或内网测试时,如果使用了自签名证书,可以临时设置为 true
trust-all: true # 在 dev 环境可以设为 true
# 遗嘱消息配置 (用于网关异常下线时通知其他系统)
will:
enabled: true # 生产环境强烈建议开启
topic: "gateway/status/${yudao.iot.gateway.emqx.mqtt-client-id}" # 遗嘱消息主题
payload: "offline" # 遗嘱消息负载
qos: 1 # 遗嘱消息 QoS
retain: true # 遗嘱消息是否保留
# 高级 SSL/TLS 配置 (当 trust-all: false 且 mqtt-ssl: true 时生效)
ssl-options:
key-store-path: "classpath:certs/client.jks" # 客户端证书库路径
key-store-password: "your-keystore-password" # 客户端证书库密码
trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径
trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码
# ====================================
# 针对引入的 MQTT 组件的配置
# 针对引入的 Modbus TCP 组件的配置
# ====================================
# 协议配置(旧版,保持兼容)
protocol:
modbus-tcp:
- id: modbus-tcp-1
enabled: true
protocol: modbus_tcp
port: 502
modbus-tcp:
config-refresh-interval: 30 # 配置刷新间隔(秒)
--- #################### 日志相关配置 ####################
@@ -226,6 +193,7 @@ logging:
cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.coap: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.websocket: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp: DEBUG
# 根日志级别
root: INFO