diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java index a80471b9e8..ca317ec3fe 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java @@ -4,7 +4,6 @@ import cn.hutool.core.lang.Assert; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; -import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; @@ -12,9 +11,8 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusRecordParserFactory; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.downstream.IotModbusTcpSlaveDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.downstream.IotModbusTcpSlaveDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.upstream.IotModbusTcpSlaveUpstreamHandler; @@ -23,6 +21,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotM import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePollScheduler; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; import io.vertx.core.net.NetServer; @@ -33,19 +32,15 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -// TODO @AI:不用主动上报! +// DONE @AI:不用主动上报! /** * IoT 网关 Modbus TCP Slave 协议 *

* 作为 TCP Server 接收设备主动连接: * 1. 设备通过自定义功能码(FC 65)发送认证请求 - * 2. 认证成功后,根据设备配置的 mode 决定工作模式: - * - mode=1(云端轮询):网关主动发送 Modbus 读请求,设备响应 - * - mode=2(主动上报):设备主动上报数据,网关透传 + * 2. 认证成功后,网关主动发送 Modbus 读请求,设备响应(云端轮询模式) * * @author 芋道源码 */ @@ -85,20 +80,16 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { */ private Long requestCleanupTimerId; - /** - * 未认证连接的帧格式缓存:socket → 检测到的帧格式 - */ - private final Map pendingFrameFormats = new ConcurrentHashMap<>(); - // ========== 各组件 ========== + // TODO @芋艿:稍后排序下,有点小乱; private final IotModbusTcpSlaveConfig slaveConfig; - private final IotModbusFrameCodec frameCodec; + private final IotModbusFrameDecoder frameDecoder; + private final IotModbusFrameEncoder frameEncoder; private final IotModbusTcpSlaveConnectionManager connectionManager; private final IotModbusTcpSlaveConfigCacheService configCacheService; private final IotModbusTcpSlavePendingRequestManager pendingRequestManager; private final IotModbusTcpSlaveUpstreamHandler upstreamHandler; - private final IotModbusTcpSlaveDownstreamHandler downstreamHandler; private final IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber; private final IotModbusTcpSlavePollScheduler pollScheduler; @@ -118,33 +109,27 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { this.pendingRequestManager = new IotModbusTcpSlavePendingRequestManager(); // 初始化帧编解码器 - this.frameCodec = new IotModbusFrameCodec(slaveConfig.getCustomFunctionCode()); + this.frameDecoder = new IotModbusFrameDecoder(slaveConfig.getCustomFunctionCode()); + this.frameEncoder = new IotModbusFrameEncoder(slaveConfig.getCustomFunctionCode()); + + // 初始化轮询调度器 + this.pollScheduler = new IotModbusTcpSlavePollScheduler( + vertx, connectionManager, frameEncoder, pendingRequestManager, + slaveConfig.getRequestTimeout()); // 初始化 Handler IotModbusDataConverter dataConverter = new IotModbusDataConverter(); IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class); + IotDeviceService deviceService = SpringUtil.getBean(IotDeviceService.class); this.upstreamHandler = new IotModbusTcpSlaveUpstreamHandler( - deviceApi, messageService, dataConverter, frameCodec, - connectionManager, configCacheService, pendingRequestManager, serverId); - this.downstreamHandler = new IotModbusTcpSlaveDownstreamHandler( - connectionManager, configCacheService, dataConverter, frameCodec); - - // 初始化轮询调度器 - this.pollScheduler = new IotModbusTcpSlavePollScheduler( - vertx, connectionManager, frameCodec, pendingRequestManager, - slaveConfig.getRequestTimeout()); - - // 设置认证成功回调:启动轮询 - // TODO @AI:感觉直接去调用,不用注册回调了(更简洁) - this.upstreamHandler.setOnAuthSuccess((deviceId, config) -> { - if (config.getMode() != null - && config.getMode().equals(IotModbusModeEnum.POLLING.getMode())) { - pollScheduler.updatePolling(config); - } - }); + deviceApi, messageService, dataConverter, frameEncoder, + connectionManager, configCacheService, pendingRequestManager, + pollScheduler, deviceService, serverId); // 初始化下行消息订阅者 IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + IotModbusTcpSlaveDownstreamHandler downstreamHandler = new IotModbusTcpSlaveDownstreamHandler( + connectionManager, configCacheService, dataConverter, frameEncoder); this.downstreamSubscriber = new IotModbusTcpSlaveDownstreamSubscriber( this, downstreamHandler, messageBus); } @@ -168,6 +153,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { try { // 1.1 首次加载配置 + // TODO @AI:可能首次不用加载;你在想想; refreshConfig(); // 1.2 启动配置刷新定时器 int refreshInterval = slaveConfig.getConfigRefreshInterval(); @@ -190,10 +176,21 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT Modbus TCP Slave 协议 {} 启动失败]", getId(), e); + if (configRefreshTimerId != null) { + vertx.cancelTimer(configRefreshTimerId); + configRefreshTimerId = null; + } + if (requestCleanupTimerId != null) { + vertx.cancelTimer(requestCleanupTimerId); + requestCleanupTimerId = null; + } + connectionManager.closeAll(); + if (netServer != null) { + netServer.close(); + } if (vertx != null) { vertx.close(); } - // TODO @AI:其它相关的 close; throw e; } } @@ -252,20 +249,19 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { * 启动 TCP Server */ private void startTcpServer() { - // TODO @AI:host 一定要设置么? // 1. 创建 TCP Server NetServerOptions options = new NetServerOptions() - .setPort(properties.getPort()) - .setHost("0.0.0.0"); + .setPort(properties.getPort()); netServer = vertx.createNetServer(options); // 2. 设置连接处理器 netServer.connectHandler(this::handleConnection); - // TODO @AI:是不是 sync 就好,不用 onSuccess/onFailure 了?感觉更简洁。失败,肯定就要抛出异常,结束初始化了! - netServer.listen() - .onSuccess(server -> log.info("[startTcpServer][TCP Server 启动成功, port={}]", - server.actualPort())) - .onFailure(e -> log.error("[startTcpServer][TCP Server 启动失败]", e)); + try { + netServer.listen().toCompletionStage().toCompletableFuture().get(); + log.info("[startTcpServer][TCP Server 启动成功, port={}]", properties.getPort()); + } catch (Exception e) { + throw new RuntimeException("[startTcpServer][TCP Server 启动失败]", e); + } } /** @@ -274,58 +270,24 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { private void handleConnection(NetSocket socket) { log.info("[handleConnection][新连接, remoteAddress={}]", socket.remoteAddress()); - // 1.1 创建带帧格式检测的 RecordParser - // TODO @AI:看看怎么从这个类里面,拿出去;让这个类的职责更单一; - RecordParser parser = IotModbusRecordParserFactory.create( - slaveConfig.getCustomFunctionCode(), - // 完整帧回调 - // TODO @AI:感觉搞个独立的类,稍微好点?! - frameBuffer -> { - byte[] frameBytes = frameBuffer.getBytes(); - // 获取该连接的帧格式 - ConnectionInfo connInfo = connectionManager.getConnectionInfo(socket); - IotModbusFrameFormatEnum frameFormat = connInfo != null ? connInfo.getFrameFormat() : null; - if (frameFormat == null) { - // 未认证的连接,使用首帧检测到的帧格式 - frameFormat = pendingFrameFormats.get(socket); - } - if (frameFormat == null) { - log.warn("[handleConnection][帧格式未检测到, remoteAddress={}]", socket.remoteAddress()); - return; - } - - // 解码帧 - IotModbusFrame frame = frameCodec.decodeResponse(frameBytes, frameFormat); - // 交给 UpstreamHandler 处理 - upstreamHandler.handleFrame(socket, frame, frameFormat); - }, - // 帧格式检测回调:保存到未认证缓存 - detectedFormat -> { - // TODO @AI:是不是不用缓存,每次都探测;因为一般 auth 首包后,基本也没探测的诉求了! - pendingFrameFormats.put(socket, detectedFormat); - // 如果连接已注册(不太可能在检测阶段),也更新 - // TODO @AI:是否非必须?! - connectionManager.setFrameFormat(socket, detectedFormat); - log.debug("[handleConnection][帧格式检测: {}, remoteAddress={}]", - detectedFormat, socket.remoteAddress()); - } - ); - // 1.2 设置数据处理器 - socket.handler(parser); + // 1. 创建 RecordParser 并设置为数据处理器 + RecordParser recordParser = frameDecoder.createRecordParser((frame, frameFormat) -> { + // 【重要】帧处理分发,即消息处理 + upstreamHandler.handleFrame(socket, frame, frameFormat); + }); + socket.handler(recordParser); // 2.1 连接关闭处理 socket.closeHandler(v -> { - pendingFrameFormats.remove(socket); ConnectionInfo info = connectionManager.removeConnection(socket); - // TODO @AI:if return 简化下; - if (info != null && info.getDeviceId() != null) { - pollScheduler.stopPolling(info.getDeviceId()); - pendingRequestManager.removeDevice(info.getDeviceId()); - log.info("[handleConnection][连接关闭, deviceId={}, remoteAddress={}]", - info.getDeviceId(), socket.remoteAddress()); - } else { + if (info == null || info.getDeviceId() == null) { log.info("[handleConnection][未认证连接关闭, remoteAddress={}]", socket.remoteAddress()); + return; } + pollScheduler.stopPolling(info.getDeviceId()); + pendingRequestManager.removeDevice(info.getDeviceId()); + log.info("[handleConnection][连接关闭, deviceId={}, remoteAddress={}]", + info.getDeviceId(), socket.remoteAddress()); }); // 2.2 异常处理 socket.exceptionHandler(e -> { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusRecordParserFactory.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java similarity index 52% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusRecordParserFactory.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java index 07d98d3d55..8ea473738f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusRecordParserFactory.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java @@ -4,116 +4,205 @@ import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.parsetools.RecordParser; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import java.util.function.Consumer; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.function.BiConsumer; -// TODO @AI:看看是不是不要搞成 factory,而是直接 new;(可以一起讨论下) /** - * IoT Modbus RecordParser 工厂 + * IoT Modbus 帧解码器:集成 TCP 拆包 + 帧格式探测 + 帧解码,一条龙完成从 TCP 字节流到 IotModbusFrame 的转换。 *

- * 创建带自动帧格式检测的 RecordParser: + * 流程: * 1. 首帧检测:读前 6 字节,判断 MODBUS_TCP(ProtocolId==0x0000 且 Length 合理)或 MODBUS_RTU - * 2. 检测后自动切换到对应的拆包模式 + * 2. 检测后切换到对应的拆包 Handler,并将首包 6 字节通过 handleFirstBytes() 交给新 Handler 处理 + * 3. 拆包完成后解码为 IotModbusFrame,通过回调返回 * - MODBUS_TCP:两阶段 RecordParser(MBAP length 字段驱动) * - MODBUS_RTU:功能码驱动的状态机 * * @author 芋道源码 */ @Slf4j -public class IotModbusRecordParserFactory { +public class IotModbusFrameDecoder { + + private final int customFunctionCode; + + public IotModbusFrameDecoder(int customFunctionCode) { + this.customFunctionCode = customFunctionCode; + } /** * 创建带自动帧格式检测的 RecordParser * - * @param customFunctionCode 自定义功能码 - * @param frameHandler 完整帧回调 - * @param onFormatDetected 帧格式检测回调 + * @param frameHandler 完整帧回调(解码后的 IotModbusFrame + 检测到的帧格式) * @return RecordParser 实例 */ - public static RecordParser create(int customFunctionCode, - Handler frameHandler, - Consumer onFormatDetected) { - // 先创建一个 RecordParser,使用 fixedSizeMode(6) 读取首帧前 6 字节进行帧格式检测 - // TODO @AI:最小需要 6 个字节么?有可能更小的情况下,就探测出来?! + public RecordParser createRecordParser(BiConsumer frameHandler) { + // 先创建一个 RecordParser:使用 fixedSizeMode(6) 读取首帧前 6 字节进行帧格式检测 RecordParser parser = RecordParser.newFixed(6); - parser.handler(new DetectPhaseHandler(parser, customFunctionCode, frameHandler, onFormatDetected)); + parser.handler(new DetectPhaseHandler(parser, customFunctionCode, frameHandler)); return parser; } + // ==================== 帧解码 ==================== + /** - * 帧格式检测阶段 Handler + * 解码响应帧(拆包后的完整帧 byte[]) + * + * @param data 完整帧字节数组 + * @param format 帧格式 + * @return 解码后的 IotModbusFrame + */ + private IotModbusFrame decodeResponse(byte[] data, IotModbusFrameFormatEnum format) { + if (format == IotModbusFrameFormatEnum.MODBUS_TCP) { + return decodeTcpResponse(data); + } else { + return decodeRtuResponse(data); + } + } + + /** + * 解码 MODBUS_TCP 响应 + * 格式:[TransactionId(2)] [ProtocolId(2)] [Length(2)] [UnitId(1)] [FC(1)] [Data...] + */ + private IotModbusFrame decodeTcpResponse(byte[] data) { + if (data.length < 8) { + log.warn("[decodeTcpResponse][数据长度不足: {}]", data.length); + return null; + } + ByteBuffer buf = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN); + int transactionId = buf.getShort() & 0xFFFF; + buf.getShort(); // protocolId:固定 0x0000,Modbus 协议标识 + buf.getShort(); // length:后续字节数(UnitId + PDU),拆包阶段已使用 + int slaveId = buf.get() & 0xFF; + int functionCode = buf.get() & 0xFF; + // 提取 PDU 数据(从 functionCode 之后到末尾) + byte[] pdu = new byte[data.length - 8]; + System.arraycopy(data, 8, pdu, 0, pdu.length); + + return buildFrame(slaveId, functionCode, pdu, transactionId); + } + + /** + * 解码 MODBUS_RTU 响应 + * 格式:[SlaveId(1)] [FC(1)] [Data...] [CRC(2)] + */ + private IotModbusFrame decodeRtuResponse(byte[] data) { + if (data.length < 4) { + log.warn("[decodeRtuResponse][数据长度不足: {}]", data.length); + return null; + } + // 校验 CRC + if (!IotModbusUtils.verifyCrc16(data)) { + log.warn("[decodeRtuResponse][CRC 校验失败]"); + return null; + } + int slaveId = data[0] & 0xFF; + int functionCode = data[1] & 0xFF; + // PDU 数据(不含 slaveId、functionCode、CRC) + byte[] pdu = new byte[data.length - 4]; + System.arraycopy(data, 2, pdu, 0, pdu.length); + + return buildFrame(slaveId, functionCode, pdu, null); + } + + /** + * 构建 IotModbusFrame + */ + private IotModbusFrame buildFrame(int slaveId, int functionCode, byte[] pdu, Integer transactionId) { + IotModbusFrame frame = new IotModbusFrame() + .setSlaveId(slaveId) + .setFunctionCode(functionCode) + .setPdu(pdu) + .setTransactionId(transactionId); + // 异常响应 + // TODO @AI:0x80 看看是不是要枚举; + if ((functionCode & 0x80) != 0) { + frame.setException(true); + // TODO @AI:0x7f 看看是不是要枚举; + frame.setFunctionCode(functionCode & 0x7F); + if (pdu.length >= 1) { + frame.setExceptionCode(pdu[0] & 0xFF); + } + return frame; + } + // 自定义功能码 + if (functionCode == customFunctionCode) { + // data 区格式:[byteCount(1)] [JSON data(N)] + if (pdu.length >= 1) { + int byteCount = pdu[0] & 0xFF; + if (pdu.length >= 1 + byteCount) { + frame.setCustomData(new String(pdu, 1, byteCount, StandardCharsets.UTF_8)); + } + } + } + return frame; + } + + // ==================== 拆包 Handler ==================== + + /** + * 帧格式检测阶段 Handler(仅处理首包,探测后切换到对应的拆包 Handler) */ @SuppressWarnings("ClassCanBeRecord") - private static class DetectPhaseHandler implements Handler { + @RequiredArgsConstructor + private class DetectPhaseHandler implements Handler { private final RecordParser parser; private final int customFunctionCode; - private final Handler frameHandler; - private final Consumer onFormatDetected; - - // TODO @AI:简化构造方法,使用 lombok; - DetectPhaseHandler(RecordParser parser, int customFunctionCode, - Handler frameHandler, - Consumer onFormatDetected) { - this.parser = parser; - this.customFunctionCode = customFunctionCode; - this.frameHandler = frameHandler; - this.onFormatDetected = onFormatDetected; - } + private final BiConsumer frameHandler; @Override public void handle(Buffer buffer) { - byte[] header = buffer.getBytes(); - // 检测:byte[2]==0x00 && byte[3]==0x00 && 1<=length<=253 - int protocolId = ((header[2] & 0xFF) << 8) | (header[3] & 0xFF); - int length = ((header[4] & 0xFF) << 8) | (header[5] & 0xFF); + // 检测帧格式:protocolId==0x0000 且 length 合法 → MODBUS_TCP,否则 → MODBUS_RTU + byte[] bytes = buffer.getBytes(); + int protocolId = ((bytes[2] & 0xFF) << 8) | (bytes[3] & 0xFF); + int length = ((bytes[4] & 0xFF) << 8) | (bytes[5] & 0xFF); + // 分别处理 MODBUS_TCP、MODBUS_RTU 两种情况 if (protocolId == 0x0000 && length >= 1 && length <= 253) { - // MODBUS_TCP + // MODBUS_TCP:切换到 TCP 拆包 Handler log.debug("[DetectPhaseHandler][检测到 MODBUS_TCP 帧格式]"); - onFormatDetected.accept(IotModbusFrameFormatEnum.MODBUS_TCP); - // 切换到 TCP 拆包模式,处理当前首帧 TcpFrameHandler tcpHandler = new TcpFrameHandler(parser, frameHandler); parser.handler(tcpHandler); - // 当前 header 是 MBAP 的前 6 字节,需要继续读 length 字节 - tcpHandler.handleMbapHeader(header, length); + // 当前 bytes 就是 MBAP 的前 6 字节,直接交给 tcpHandler 处理 + tcpHandler.handleFirstBytes(bytes); } else { - // MODBUS_RTU + // MODBUS_RTU:切换到 RTU 拆包 Handler log.debug("[DetectPhaseHandler][检测到 MODBUS_RTU 帧格式]"); - onFormatDetected.accept(IotModbusFrameFormatEnum.MODBUS_RTU); - // 切换到 RTU 拆包模式,处理当前首帧 - RtuFrameHandler rtuHandler = new RtuFrameHandler(parser, customFunctionCode, frameHandler); + RtuFrameHandler rtuHandler = new RtuFrameHandler(parser, frameHandler, customFunctionCode); parser.handler(rtuHandler); - // 当前 header 包含前 6 字节(slaveId + FC + 部分数据),需要拼接处理 - rtuHandler.handleInitialBytes(header); + // 当前 bytes 包含前 6 字节(slaveId + FC + 部分数据),交给 rtuHandler 处理 + rtuHandler.handleFirstBytes(bytes); } } } /** * MODBUS_TCP 拆包 Handler(两阶段 RecordParser) + *

* Phase 1: fixedSizeMode(6) → 读 MBAP 前 6 字节,提取 length * Phase 2: fixedSizeMode(length) → 读 unitId + PDU */ - private static class TcpFrameHandler implements Handler { + @RequiredArgsConstructor + private class TcpFrameHandler implements Handler { private final RecordParser parser; - private final Handler frameHandler; + private final BiConsumer frameHandler; + private byte[] mbapHeader; private boolean waitingForBody = false; - // TODO @AI:lombok - TcpFrameHandler(RecordParser parser, Handler frameHandler) { - this.parser = parser; - this.frameHandler = frameHandler; - } - /** - * 处理首帧的 MBAP 头 + * 处理探测阶段传来的首帧 6 字节(即 MBAP 头) + * + * @param bytes 探测阶段消费的 6 字节 */ - void handleMbapHeader(byte[] header, int length) { - this.mbapHeader = header; + void handleFirstBytes(byte[] bytes) { + int length = ((bytes[4] & 0xFF) << 8) | (bytes[5] & 0xFF); + this.mbapHeader = bytes; this.waitingForBody = true; parser.fixedSizeMode(length); } @@ -124,10 +213,14 @@ public class IotModbusRecordParserFactory { // Phase 2: 收到 body(unitId + PDU) byte[] body = buffer.getBytes(); // 拼接完整帧:MBAP(6) + body - Buffer frame = Buffer.buffer(mbapHeader.length + body.length); - frame.appendBytes(mbapHeader); - frame.appendBytes(body); - frameHandler.handle(frame); + byte[] fullFrame = new byte[mbapHeader.length + body.length]; + System.arraycopy(mbapHeader, 0, fullFrame, 0, mbapHeader.length); + System.arraycopy(body, 0, fullFrame, mbapHeader.length, body.length); + // 解码并回调 + IotModbusFrame frame = decodeResponse(fullFrame, IotModbusFrameFormatEnum.MODBUS_TCP); + if (frame != null) { + frameHandler.accept(frame, IotModbusFrameFormatEnum.MODBUS_TCP); + } // 切回 Phase 1 waitingForBody = false; mbapHeader = null; @@ -159,7 +252,8 @@ public class IotModbusRecordParserFactory { * - FC05/06 响应:fixedSizeMode(6) → addr(2) + value(2) + CRC(2) * - FC15/16 响应:fixedSizeMode(6) → addr(2) + quantity(2) + CRC(2) */ - private static class RtuFrameHandler implements Handler { + @RequiredArgsConstructor + private class RtuFrameHandler implements Handler { private static final int STATE_HEADER = 0; private static final int STATE_EXCEPTION_BODY = 1; @@ -168,50 +262,41 @@ public class IotModbusRecordParserFactory { private static final int STATE_WRITE_BODY = 4; private final RecordParser parser; + private final BiConsumer frameHandler; private final int customFunctionCode; - private final Handler frameHandler; private int state = STATE_HEADER; private byte slaveId; private byte functionCode; private byte byteCount; - - // TODO @AI:lombok - RtuFrameHandler(RecordParser parser, int customFunctionCode, Handler frameHandler) { - this.parser = parser; - this.customFunctionCode = customFunctionCode; - this.frameHandler = frameHandler; - } + private Buffer pendingData; + private int expectedDataLen; /** - * 处理首帧检测阶段传来的初始 6 字节 - * 由于 RTU 首帧跳过了格式检测,我们需要拼接处理 + * 处理探测阶段传来的首帧 6 字节 + *

+ * 由于 RTU 首帧被探测阶段消费了 6 字节,这里需要从中提取 slaveId + FC 并根据 FC 处理剩余数据 + * + * @param bytes 探测阶段消费的 6 字节:[slaveId][FC][...4 bytes...] */ - void handleInitialBytes(byte[] initialBytes) { - // initialBytes 包含 6 字节:[slaveId][FC][...4 bytes...] - this.slaveId = initialBytes[0]; - this.functionCode = initialBytes[1]; + void handleFirstBytes(byte[] bytes) { + this.slaveId = bytes[0]; + this.functionCode = bytes[1]; int fc = functionCode & 0xFF; - - // 根据功能码,确定还需要多少字节 if ((fc & 0x80) != 0) { - // 异常响应:还需要 exceptionCode(1) + CRC(2) = 3 字节 - // 我们已经有 4 字节剩余(initialBytes[2..5]),足够 - // 拼接完整帧并交付 - // 完整帧 = slaveId(1) + FC(1) + exceptionCode(1) + CRC(2) = 5 + // 异常响应:完整帧 = slaveId(1) + FC(1) + exceptionCode(1) + CRC(2) = 5 字节 + // 已有 6 字节(多 1 字节),取前 5 字节组装 Buffer frame = Buffer.buffer(5); frame.appendByte(slaveId); frame.appendByte(functionCode); - frame.appendBytes(initialBytes, 2, 3); // exceptionCode + CRC - frameHandler.handle(frame); - // 剩余 1 字节需要留给下一帧,但 RecordParser 不支持回推 - // 简化处理:重置状态,开始读下一帧 + frame.appendBytes(bytes, 2, 3); // exceptionCode + CRC + emitFrame(frame); resetToHeader(); } else if (isReadResponse(fc) || fc == customFunctionCode) { - // 读响应或自定义 FC:initialBytes[2] = byteCount - this.byteCount = initialBytes[2]; + // 读响应或自定义 FC:bytes[2] = byteCount + this.byteCount = bytes[2]; int bc = byteCount & 0xFF; - // 已有数据:initialBytes[3..5] = 3 字节 + // 已有数据:bytes[3..5] = 3 字节 // 还需:byteCount + CRC(2) - 3 字节已有 int remaining = bc + 2 - 3; if (remaining <= 0) { @@ -221,36 +306,30 @@ public class IotModbusRecordParserFactory { frame.appendByte(slaveId); frame.appendByte(functionCode); frame.appendByte(byteCount); - frame.appendBytes(initialBytes, 3, bc + 2); // data + CRC - frameHandler.handle(frame); + frame.appendBytes(bytes, 3, bc + 2); // data + CRC + emitFrame(frame); resetToHeader(); } else { // 需要继续读 state = STATE_READ_DATA; - // 保存已有数据片段 - parser.fixedSizeMode(remaining); - // 在 handle() 中需要拼接 initialBytes[3..5] + 新读取的数据 - // 为了简化,我们用一个 Buffer 暂存 this.pendingData = Buffer.buffer(); - this.pendingData.appendBytes(initialBytes, 3, 3); + this.pendingData.appendBytes(bytes, 3, 3); // 暂存已有的 3 字节 this.expectedDataLen = bc + 2; // byteCount 个数据 + 2 CRC + parser.fixedSizeMode(remaining); } } else if (isWriteResponse(fc)) { - // 写响应:FC05/06/15/16,总长 = slaveId(1) + FC(1) + addr(2) + value/qty(2) + CRC(2) = 8 + // 写响应:总长 = slaveId(1) + FC(1) + addr(2) + value/qty(2) + CRC(2) = 8 字节 // 已有 6 字节,还需 2 字节 state = STATE_WRITE_BODY; this.pendingData = Buffer.buffer(); - this.pendingData.appendBytes(initialBytes, 2, 4); // 4 bytes already - parser.fixedSizeMode(2); // need 2 more bytes (CRC) + this.pendingData.appendBytes(bytes, 2, 4); // 暂存已有的 4 字节 + parser.fixedSizeMode(2); // 还需 2 字节(CRC) } else { log.warn("[RtuFrameHandler][未知功能码: 0x{}]", Integer.toHexString(fc)); resetToHeader(); } } - private Buffer pendingData; - private int expectedDataLen; - @Override public void handle(Buffer buffer) { switch (state) { @@ -279,7 +358,6 @@ public class IotModbusRecordParserFactory { this.slaveId = header[0]; this.functionCode = header[1]; int fc = functionCode & 0xFF; - if ((fc & 0x80) != 0) { // 异常响应 state = STATE_EXCEPTION_BODY; @@ -305,7 +383,7 @@ public class IotModbusRecordParserFactory { frame.appendByte(slaveId); frame.appendByte(functionCode); frame.appendBuffer(buffer); - frameHandler.handle(frame); + emitFrame(frame); resetToHeader(); } @@ -327,7 +405,7 @@ public class IotModbusRecordParserFactory { frame.appendByte(functionCode); frame.appendByte(byteCount); frame.appendBuffer(pendingData); - frameHandler.handle(frame); + emitFrame(frame); resetToHeader(); } // 否则继续等待(不应该发生,因为我们精确设置了 fixedSizeMode) @@ -340,16 +418,27 @@ public class IotModbusRecordParserFactory { frame.appendByte(slaveId); frame.appendByte(functionCode); frame.appendBuffer(pendingData); - frameHandler.handle(frame); + emitFrame(frame); resetToHeader(); } + /** + * 发射完整帧:解码并回调 + */ + private void emitFrame(Buffer frameBuffer) { + IotModbusFrame frame = decodeResponse(frameBuffer.getBytes(), IotModbusFrameFormatEnum.MODBUS_RTU); + if (frame != null) { + frameHandler.accept(frame, IotModbusFrameFormatEnum.MODBUS_RTU); + } + } + private void resetToHeader() { state = STATE_HEADER; pendingData = null; parser.fixedSizeMode(2); // slaveId + FC } + // TODO @AI:可以抽到 IotModbusUtils 里? private boolean isReadResponse(int fc) { return fc >= 1 && fc <= 4; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java similarity index 54% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameCodec.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java index 8d65b30bc1..0790ab3b36 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameCodec.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java @@ -1,128 +1,24 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; /** - * IoT Modbus 帧编解码器 + * IoT Modbus 帧编码器 *

- * 纯 Modbus 协议编解码,不处理 TCP 粘包(由 RecordParser 处理)。 - * 支持 MODBUS_TCP(MBAP)和 MODBUS_RTU(CRC16)两种帧格式,以及自定义功能码扩展。 + * 负责将 Modbus 请求/响应编码为字节数组,支持 MODBUS_TCP(MBAP)和 MODBUS_RTU(CRC16)两种帧格式。 * * @author 芋道源码 */ +@RequiredArgsConstructor @Slf4j -public class IotModbusFrameCodec { +public class IotModbusFrameEncoder { private final int customFunctionCode; - public IotModbusFrameCodec(int customFunctionCode) { - this.customFunctionCode = customFunctionCode; - } - - // ==================== 解码 ==================== - - /** - * 解码响应帧(拆包后的完整帧 byte[]) - * - * @param data 完整帧字节数组 - * @param format 帧格式 - * @return 解码后的 IotModbusFrame - */ - public IotModbusFrame decodeResponse(byte[] data, IotModbusFrameFormatEnum format) { - if (format == IotModbusFrameFormatEnum.MODBUS_TCP) { - return decodeTcpResponse(data); - } else { - return decodeRtuResponse(data); - } - } - - /** - * 解码 MODBUS_TCP 响应 - * 格式:[TransactionId(2)] [ProtocolId(2)] [Length(2)] [UnitId(1)] [FC(1)] [Data...] - */ - private IotModbusFrame decodeTcpResponse(byte[] data) { - if (data.length < 8) { - log.warn("[decodeTcpResponse][数据长度不足: {}]", data.length); - return null; - } - ByteBuffer buf = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN); - int transactionId = buf.getShort() & 0xFFFF; - buf.getShort(); // protocolId(跳过)// TODO @AI:跳过原因,最好写下; - buf.getShort(); // length(跳过)// TODO @AI:跳过原因,最好写下; - int slaveId = buf.get() & 0xFF; - int functionCode = buf.get() & 0xFF; - // 提取 PDU 数据(从 functionCode 之后到末尾) - byte[] pdu = new byte[data.length - 8]; - System.arraycopy(data, 8, pdu, 0, pdu.length); - - // 构建 IotModbusFrame - return buildFrame(slaveId, functionCode, pdu, transactionId); - } - - /** - * 解码 MODBUS_RTU 响应 - * 格式:[SlaveId(1)] [FC(1)] [Data...] [CRC(2)] - */ - private IotModbusFrame decodeRtuResponse(byte[] data) { - if (data.length < 4) { - log.warn("[decodeRtuResponse][数据长度不足: {}]", data.length); - return null; - } - // 校验 CRC - if (!verifyCrc16(data)) { - log.warn("[decodeRtuResponse][CRC 校验失败]"); - return null; - } - int slaveId = data[0] & 0xFF; - int functionCode = data[1] & 0xFF; - // PDU 数据(不含 slaveId、functionCode、CRC) - byte[] pdu = new byte[data.length - 4]; - System.arraycopy(data, 2, pdu, 0, pdu.length); - - // 构建 IotModbusFrame - return buildFrame(slaveId, functionCode, pdu, null); - } - - /** - * 构建 IotModbusFrame - */ - private IotModbusFrame buildFrame(int slaveId, int functionCode, byte[] pdu, Integer transactionId) { - IotModbusFrame frame = new IotModbusFrame() - .setSlaveId(slaveId) - .setFunctionCode(functionCode) - .setPdu(pdu) - .setTransactionId(transactionId); - - // 异常响应 - // TODO @AI:0x80 看看是不是要枚举; - if ((functionCode & 0x80) != 0) { - frame.setException(true); - // TODO @AI:0x7f 看看是不是要枚举; - frame.setFunctionCode(functionCode & 0x7F); - if (pdu.length >= 1) { - frame.setExceptionCode(pdu[0] & 0xFF); - } - return frame; - } - - // 自定义功能码 - if (functionCode == customFunctionCode) { - // data 区格式:[byteCount(1)] [JSON data(N)] - if (pdu.length >= 1) { - int byteCount = pdu[0] & 0xFF; - if (pdu.length >= 1 + byteCount) { - frame.setCustomData(new String(pdu, 1, byteCount, StandardCharsets.UTF_8)); - } - } - } - return frame; - } - // ==================== 编码 ==================== /** @@ -269,49 +165,10 @@ public class IotModbusFrameCodec { frame[0] = (byte) slaveId; System.arraycopy(pdu, 0, frame, 1, pdu.length); // 计算并追加 CRC16 - int crc = calculateCrc16(frame, frame.length - 2); + int crc = IotModbusUtils.calculateCrc16(frame, frame.length - 2); frame[frame.length - 2] = (byte) (crc & 0xFF); // CRC Low frame[frame.length - 1] = (byte) ((crc >> 8) & 0xFF); // CRC High return frame; } - // ==================== CRC16 工具 ==================== - - // TODO @AI:hutool 等,有没工具类可以用 - /** - * 计算 CRC-16/MODBUS - * - * @param data 数据 - * @param length 计算长度 - * @return CRC16 值 - */ - public static int calculateCrc16(byte[] data, int length) { - int crc = 0xFFFF; - for (int i = 0; i < length; i++) { - crc ^= (data[i] & 0xFF); - for (int j = 0; j < 8; j++) { - if ((crc & 0x0001) != 0) { - crc >>= 1; - crc ^= 0xA001; - } else { - crc >>= 1; - } - } - } - return crc; - } - - // TODO @AI:hutool 等,有没工具类可以用 - /** - * 校验 CRC16 - */ - private boolean verifyCrc16(byte[] data) { - if (data.length < 3) { - return false; - } - int computed = calculateCrc16(data, data.length - 2); - int received = (data[data.length - 2] & 0xFF) | ((data[data.length - 1] & 0xFF) << 8); - return computed == received; - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusResponseParser.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusUtils.java similarity index 61% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusResponseParser.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusUtils.java index 6fa363db0b..f93f55cf9a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusResponseParser.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusUtils.java @@ -3,14 +3,61 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec; import lombok.extern.slf4j.Slf4j; /** - * IoT Modbus 响应值提取器 + * IoT Modbus 工具类 *

- * 从解码后的 IotModbusFrame 中提取寄存器值,用于后续的点位翻译。 + * 提供: + * 1. CRC-16/MODBUS 计算和校验 + * 2. 从解码后的 IotModbusFrame 中提取寄存器值(用于后续的点位翻译) * * @author 芋道源码 */ @Slf4j -public class IotModbusResponseParser { +public class IotModbusUtils { + + // TODO @AI:可以把 1、2、3、4、5 这些 fucntion code 在这里枚举下。 + // TODO @AI:一些枚举 0x80 这些可以这里枚举; + + // ==================== CRC16 工具 ==================== + + /** + * 计算 CRC-16/MODBUS + * + * @param data 数据 + * @param length 计算长度 + * @return CRC16 值 + */ + public static int calculateCrc16(byte[] data, int length) { + int crc = 0xFFFF; + for (int i = 0; i < length; i++) { + crc ^= (data[i] & 0xFF); + for (int j = 0; j < 8; j++) { + if ((crc & 0x0001) != 0) { + crc >>= 1; + crc ^= 0xA001; + } else { + crc >>= 1; + } + } + } + return crc; + } + + /** + * 校验 CRC16 + * + * @param data 包含 CRC 的完整数据 + * @return 校验是否通过 + */ + public static boolean verifyCrc16(byte[] data) { + if (data.length < 3) { + return false; + } + int computed = calculateCrc16(data, data.length - 2); + int received = (data[data.length - 2] & 0xFF) | ((data[data.length - 1] & 0xFF) << 8); + return computed == received; + } + + // ==================== 响应值提取 ==================== /** * 从帧中提取寄存器值(FC01-04 读响应) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java index 76e904ee7b..5a47a09c16 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java @@ -8,7 +8,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo; @@ -35,7 +35,7 @@ public class IotModbusTcpSlaveDownstreamHandler { private final IotModbusTcpSlaveConnectionManager connectionManager; private final IotModbusTcpSlaveConfigCacheService configCacheService; private final IotModbusDataConverter dataConverter; - private final IotModbusFrameCodec frameCodec; + private final IotModbusFrameEncoder frameEncoder; /** * TCP 事务 ID 自增器 @@ -117,11 +117,11 @@ public class IotModbusTcpSlaveDownstreamHandler { } if (rawValues.length == 1 && fcEnum.getWriteSingleCode() != null) { // 单个值:使用单写功能码(FC05/FC06) - data = frameCodec.encodeWriteSingleRequest(slaveId, fcEnum.getWriteSingleCode(), + data = frameEncoder.encodeWriteSingleRequest(slaveId, fcEnum.getWriteSingleCode(), point.getRegisterAddress(), rawValues[0], frameFormat, transactionId); } else if (fcEnum.getWriteMultipleCode() != null) { // 多个值:使用多写功能码(FC15/FC16) - data = frameCodec.encodeWriteMultipleRegistersRequest(slaveId, + data = frameEncoder.encodeWriteMultipleRegistersRequest(slaveId, point.getRegisterAddress(), rawValues, frameFormat, transactionId); } else { log.warn("[writeProperty][点位 {} 不支持写操作]", point.getIdentifier()); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java index 4d91b636bf..15ae18f996 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java @@ -1,11 +1,13 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.upstream; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; +import cn.iocoder.yudao.framework.common.exception.ServiceException; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; @@ -19,69 +21,76 @@ import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusResponseParser; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager.PendingRequest; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePollScheduler; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.net.NetSocket; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.util.Map; -import java.util.function.BiConsumer; -// TODO @AI:逻辑有点多,看看是不是分区域! +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST; +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException; + +// DONE @AI:逻辑有点多,看看是不是分区域! => 已按区域划分:认证 / 轮询响应 /** * IoT Modbus TCP Slave 上行数据处理器 *

* 处理: * 1. 自定义 FC 认证 * 2. 轮询响应(mode=1)→ 点位翻译 → thing.property.post - * 3. 主动上报(mode=2)→ 透传 property.report TODO @AI:这种模式,应该不用支持;因为主动上报,都走标准的 tcp 即可; + * // DONE @AI:不用主动上报;主动上报走标准 tcp 即可 * * @author 芋道源码 */ @Slf4j public class IotModbusTcpSlaveUpstreamHandler { + private static final String METHOD_AUTH = "auth"; + private final IotDeviceCommonApi deviceApi; private final IotDeviceMessageService messageService; private final IotModbusDataConverter dataConverter; - private final IotModbusFrameCodec frameCodec; + private final IotModbusFrameEncoder frameEncoder; private final IotModbusTcpSlaveConnectionManager connectionManager; private final IotModbusTcpSlaveConfigCacheService configCacheService; private final IotModbusTcpSlavePendingRequestManager pendingRequestManager; + private final IotModbusTcpSlavePollScheduler pollScheduler; + private final IotDeviceService deviceService; private final String serverId; - /** - * 认证成功回调:(deviceId, config) → 启动轮询等 - */ - @Setter - private BiConsumer onAuthSuccess; - public IotModbusTcpSlaveUpstreamHandler(IotDeviceCommonApi deviceApi, IotDeviceMessageService messageService, IotModbusDataConverter dataConverter, - IotModbusFrameCodec frameCodec, + IotModbusFrameEncoder frameEncoder, IotModbusTcpSlaveConnectionManager connectionManager, IotModbusTcpSlaveConfigCacheService configCacheService, IotModbusTcpSlavePendingRequestManager pendingRequestManager, + IotModbusTcpSlavePollScheduler pollScheduler, + IotDeviceService deviceService, String serverId) { this.deviceApi = deviceApi; this.messageService = messageService; this.dataConverter = dataConverter; - this.frameCodec = frameCodec; + this.frameEncoder = frameEncoder; this.connectionManager = connectionManager; this.configCacheService = configCacheService; this.pendingRequestManager = pendingRequestManager; + this.pollScheduler = pollScheduler; + this.deviceService = deviceService; this.serverId = serverId; } + // ========== 帧处理入口 ========== + /** * 处理帧 */ @@ -89,159 +98,158 @@ public class IotModbusTcpSlaveUpstreamHandler { if (frame == null) { return; } - // 1.1 自定义功能码(认证等扩展) - if (StrUtil.isNotEmpty(frame.getCustomData())) { - handleCustomFrame(socket, frame, frameFormat); - return; - } - // 1.2 异常响应 + // 1. 异常响应 if (frame.isException()) { - // TODO @AI:这种需要返回一个结果给 modbus client? log.warn("[handleFrame][设备异常响应, slaveId={}, FC={}, exceptionCode={}]", frame.getSlaveId(), frame.getFunctionCode(), frame.getExceptionCode()); return; } - // 1.3 未认证连接,丢弃 + + // 2. 自定义功能码(认证等扩展) + if (StrUtil.isNotEmpty(frame.getCustomData())) { + handleCustomFrame(socket, frame, frameFormat); + return; + } + + // 1.2 未认证连接,丢弃 + // TODO @AI:把 1.2、1.3 拿到 handlePollingResponse 里;是否需要登录,自己知道! if (!connectionManager.isAuthenticated(socket)) { - // TODO @AI:这种需要返回一个结果给 modbus client? log.warn("[handleFrame][未认证连接, 丢弃数据, remoteAddress={}]", socket.remoteAddress()); return; } - // TODO @AI:获取不到,看看要不要也打个告警;然后 - // 2. 标准 Modbus 响应 + // 3. DONE @AI:断言必须是云端轮询(不再支持主动上报) + // TODO @AI:貌似只能轮询到一次?! + // 1.3 标准 Modbus 响应(只支持云端轮询模式) + // TODO @AI:可以把 ConnectionInfo info = connectionManager.getConnectionInfo(socket); if (info == null) { + log.warn("[handleFrame][已认证但连接信息为空, remoteAddress={}]", socket.remoteAddress()); return; } - // TODO @AI:可以断言下,必须是云端轮询; - if (info.getMode() != null && info.getMode().equals(IotModbusModeEnum.ACTIVE_REPORT.getMode())) { - // mode=2:主动上报,透传 - handleActiveReport(info, frame); - } else { - // mode=1:云端轮询,匹配 PendingRequest - handlePollingResponse(info, frame, frameFormat); - } + handlePollingResponse(info, frame, frameFormat); } + // ========== 自定义 FC 处理(认证等) ========== + /** * 处理自定义功能码帧 + *

+ * 异常分层翻译,参考 {@link cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpAbstractHandler} */ private void handleCustomFrame(NetSocket socket, IotModbusFrame frame, IotModbusFrameFormatEnum frameFormat) { try { - // TODO @AI:直接使用 JsonUtils 去解析出 IotDeviceMessage JSONObject json = JSONUtil.parseObj(frame.getCustomData()); String method = json.getStr("method"); - // TODO @AI: method 枚举下; - if ("auth".equals(method)) { + if (METHOD_AUTH.equals(method)) { handleAuth(socket, frame, json, frameFormat); return; } - // TODO @AI:把 frame 都打印下; - log.warn("[handleCustomFrame][未知 method: {}]", method); + log.warn("[handleCustomFrame][未知 method: {}, frame: slaveId={}, FC={}, customData={}]", + method, frame.getSlaveId(), frame.getFunctionCode(), frame.getCustomData()); + } catch (ServiceException e) { + // 已知业务异常,返回对应的错误码和错误信息 + sendCustomResponse(socket, frame, frameFormat, e.getCode(), e.getMessage()); + } catch (IllegalArgumentException e) { + // 参数校验异常,返回 400 错误 + sendCustomResponse(socket, frame, frameFormat, BAD_REQUEST.getCode(), e.getMessage()); } catch (Exception e) { - // TODO @AI:各种情况的翻译;看看怎么弄比较合适;是不是要用 fc 自定义的 callback 下? - log.error("[handleCustomFrame][解析自定义 FC 数据失败]", e); + // 其他未知异常,返回 500 错误 + log.error("[handleCustomFrame][解析自定义 FC 数据失败, frame: slaveId={}, FC={}, customData={}]", + frame.getSlaveId(), frame.getFunctionCode(), frame.getCustomData(), e); + sendCustomResponse(socket, frame, frameFormat, + INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg()); } } + // TODO @芋艿:在 review 下这个类; + // TODO @AI:不传递 json,直接在 frame /** * 处理认证请求 */ private void handleAuth(NetSocket socket, IotModbusFrame frame, JSONObject json, IotModbusFrameFormatEnum frameFormat) { - // TODO @AI:参数为空的校验; + // TODO @AI:是不是可以 JsonUtils.convert(json, IotDeviceAuthReqDTO.class); JSONObject params = json.getJSONObject("params"); if (params == null) { - sendAuthResponse(socket, frame, frameFormat, 1, "params 为空"); - return; + throw invalidParamException("params 不能为空"); } - // TODO @AI:参数判空; + // DONE @AI:参数判空 String clientId = params.getStr("clientId"); String username = params.getStr("username"); String password = params.getStr("password"); + // TODO @AI:逐个判空; + if (StrUtil.hasBlank(clientId, username, password)) { + throw invalidParamException("clientId、username、password 不能为空"); + } - try { - // 1. 调用认证 API - IotDeviceAuthReqDTO authReq = new IotDeviceAuthReqDTO() - .setClientId(clientId).setUsername(username).setPassword(password); - CommonResult authResult = deviceApi.authDevice(authReq); - // TODO @AI:应该不用 close 吧?! - // TODO @AI:BooleanUtils.isFalse - if (authResult == null || !authResult.isSuccess() || !Boolean.TRUE.equals(authResult.getData())) { - log.warn("[handleAuth][认证失败, clientId={}, username={}]", clientId, username); - sendAuthResponse(socket, frame, frameFormat, 1, "认证失败"); - socket.close(); - return; - } + // 1. 调用认证 API + IotDeviceAuthReqDTO authReq = new IotDeviceAuthReqDTO() + .setClientId(clientId).setUsername(username).setPassword(password); + CommonResult authResult = deviceApi.authDevice(authReq); + authResult.checkError(); + if (BooleanUtil.isFalse(authResult.getData())) { + log.warn("[handleAuth][认证失败, clientId={}, username={}]", clientId, username); + sendCustomResponse(socket, frame, frameFormat, 1, "认证失败"); + return; + } - // 2. 认证成功,查找设备配置(通过 username 作为 deviceName 查找) - // TODO 根据实际的认证模型优化查找逻辑 - // TODO @AI:通过 device - IotModbusDeviceConfigRespDTO config = configCacheService.findConfigByAuth(clientId, username, password); - if (config == null) { - // 退而求其次,遍历缓存查找 - log.info("[handleAuth][认证成功但未找到设备配置, clientId={}, username={}]", clientId, username); - } - // 2.2 解析设备信息 - IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); - Assert.notNull(deviceInfo, "解析设备信息失败"); - // 2.3 获取设备信息 - // TODO @AI:这里要优化下,不要通过 spring 这样注入; - IotDeviceService deviceService = SpringUtil.getBean(IotDeviceService.class); - IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); - Assert.notNull(device, "设备不存在"); - // TODO @AI:校验 frameFormat 是否一致;不一致,连接也失败; + // 2.1 认证成功,查找设备配置 + IotModbusDeviceConfigRespDTO config = configCacheService.findConfigByAuth(clientId, username, password); + if (config == null) { + log.info("[handleAuth][认证成功但未找到设备配置, clientId={}, username={}]", clientId, username); + } + // 2.2 解析设备信息 + IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); + Assert.notNull(deviceInfo, "解析设备信息失败"); + // 2.3 获取设备信息 + // DONE @AI:IotDeviceService 作为构造参数传入,不通过 SpringUtil.getBean + IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + Assert.notNull(device, "设备不存在"); - // 3. 注册连接 - ConnectionInfo connectionInfo = new ConnectionInfo() - .setDeviceId(device.getId()) - .setSlaveId(frame.getSlaveId()) - .setFrameFormat(frameFormat) - .setMode(config != null ? config.getMode() : IotModbusModeEnum.POLLING.getMode()); + // 3. 注册连接 + ConnectionInfo connectionInfo = new ConnectionInfo() + .setDeviceId(device.getId()) + .setSlaveId(frame.getSlaveId()) + .setFrameFormat(frameFormat) + .setMode(config != null ? config.getMode() : IotModbusModeEnum.POLLING.getMode()); + if (config != null) { + connectionInfo.setDeviceId(config.getDeviceId()) + .setProductKey(config.getProductKey()) + .setDeviceName(config.getDeviceName()); + } + connectionManager.registerConnection(socket, connectionInfo); - if (config != null) { - connectionInfo.setDeviceId(config.getDeviceId()) - .setProductKey(config.getProductKey()) - .setDeviceName(config.getDeviceName()); - } - connectionManager.registerConnection(socket, connectionInfo); + // 4. 发送认证成功响应 + sendCustomResponse(socket, frame, frameFormat, 0, "success"); + log.info("[handleAuth][认证成功, clientId={}, deviceId={}]", clientId, + config != null ? config.getDeviceId() : device.getId()); - // 4. 发送认证成功响应 - sendAuthResponse(socket, frame, frameFormat, 0, "success"); - log.info("[handleAuth][认证成功, clientId={}, deviceId={}]", clientId, - config != null ? config.getDeviceId() : "unknown"); - - // 5. 回调:启动轮询等 - // TODO @AI:是不是不要 callback,而是主动调用! - if (onAuthSuccess != null && config != null) { - onAuthSuccess.accept(config.getDeviceId(), config); - } - } catch (Exception e) { - log.error("[handleAuth][认证异常]", e); - sendAuthResponse(socket, frame, frameFormat, 1, "认证异常"); - socket.close(); + // 5. 直接启动轮询 + if (config != null) { + pollScheduler.updatePolling(config); } } /** - * 发送认证响应 + * 发送自定义功能码响应 */ - private void sendAuthResponse(NetSocket socket, IotModbusFrame frame, - IotModbusFrameFormatEnum frameFormat, - int code, String message) { - // TODO @AI:不一定用 auth response;而是 custom? + private void sendCustomResponse(NetSocket socket, IotModbusFrame frame, + IotModbusFrameFormatEnum frameFormat, + int code, String message) { JSONObject resp = new JSONObject(); - resp.set("method", "auth"); + resp.set("method", METHOD_AUTH); resp.set("code", code); resp.set("message", message); - byte[] data = frameCodec.encodeCustomFrame(frame.getSlaveId(), resp.toString(), + byte[] data = frameEncoder.encodeCustomFrame(frame.getSlaveId(), resp.toString(), frameFormat, frame.getTransactionId() != null ? frame.getTransactionId() : 0); connectionManager.sendToSocket(socket, data); } + // ========== 轮询响应处理 ========== + /** - * 处理轮询响应(mode=1) + * 处理轮询响应(云端轮询模式) */ private void handlePollingResponse(ConnectionInfo info, IotModbusFrame frame, IotModbusFrameFormatEnum frameFormat) { @@ -254,7 +262,7 @@ public class IotModbusTcpSlaveUpstreamHandler { return; } // 1.2 提取寄存器值 - int[] rawValues = IotModbusResponseParser.extractValues(frame); + int[] rawValues = IotModbusUtils.extractValues(frame); if (rawValues == null) { log.warn("[handlePollingResponse][提取寄存器值失败, deviceId={}, identifier={}]", info.getDeviceId(), request.getIdentifier()); @@ -262,20 +270,17 @@ public class IotModbusTcpSlaveUpstreamHandler { } // 1.3 查找点位配置 IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(info.getDeviceId()); - if (config == null || config.getPoints() == null) { + if (config == null || CollUtil.isEmpty(config.getPoints())) { return; } - // TODO @AI:findone arrayUtil; - var point = config.getPoints().stream() - .filter(p -> p.getId().equals(request.getPointId())) - .findFirst().orElse(null); + var point = CollUtil.findOne(config.getPoints(), p -> p.getId().equals(request.getPointId())); if (point == null) { return; } - // TODO @AI:拆成 2.1、2.2 - // 4. 点位翻译 → 上报 + // 2.1 点位翻译 Object convertedValue = dataConverter.convertToPropertyValue(rawValues, point); + // 2.2 上报属性 Map params = MapUtil.of(request.getIdentifier(), convertedValue); IotDeviceMessage message = IotDeviceMessage.requestOf( IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params); @@ -284,29 +289,4 @@ public class IotModbusTcpSlaveUpstreamHandler { info.getDeviceId(), request.getIdentifier(), rawValues, convertedValue); } - // TODO @AI:不需要这个逻辑; - /** - * 处理主动上报(mode=2) - * 设备直接上报 property.report 格式:{propertyId: value},不做点位翻译 - */ - @SuppressWarnings("unchecked") - private void handleActiveReport(ConnectionInfo info, IotModbusFrame frame) { - // mode=2 下设备上报标准 Modbus 帧,但由于没有点位翻译, - // 这里暂时将原始寄存器值以 FC+地址 为 key 上报 - int[] rawValues = IotModbusResponseParser.extractValues(frame); - if (rawValues == null) { - return; - } - - // 简单上报:以 "register_FC{fc}" 作为属性名 - String propertyKey = "register_FC" + frame.getFunctionCode(); - Map params = MapUtil.of(propertyKey, rawValues); - IotDeviceMessage message = IotDeviceMessage.requestOf( - IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params); - messageService.sendDeviceMessage(message, info.getProductKey(), info.getDeviceName(), serverId); - - log.debug("[handleActiveReport][设备={}, FC={}, 原始值={}]", - info.getDeviceId(), frame.getFunctionCode(), rawValues); - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java index 51246105c6..0c1ef64f48 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java @@ -4,7 +4,7 @@ import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager.PendingRequest; import io.vertx.core.Vertx; @@ -37,7 +37,7 @@ public class IotModbusTcpSlavePollScheduler { private final Vertx vertx; private final IotModbusTcpSlaveConnectionManager connectionManager; - private final IotModbusFrameCodec frameCodec; + private final IotModbusFrameEncoder frameEncoder; private final IotModbusTcpSlavePendingRequestManager pendingRequestManager; private final int requestTimeout; @@ -163,7 +163,7 @@ public class IotModbusTcpSlavePollScheduler { int transactionId = transactionIdCounter.incrementAndGet() & 0xFFFF; int slaveId = connInfo.getSlaveId() != null ? connInfo.getSlaveId() : 1; // 2.2 编码读请求 - byte[] data = frameCodec.encodeReadRequest(slaveId, point.getFunctionCode(), + byte[] data = frameEncoder.encodeReadRequest(slaveId, point.getFunctionCode(), point.getRegisterAddress(), point.getRegisterCount(), frameFormat, transactionId); // 2.3 注册 PendingRequest PendingRequest pendingRequest = new PendingRequest( diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java index 0580909d05..f9cb923893 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java @@ -6,8 +6,9 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusRecordParserFactory; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusUtils; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClient; @@ -59,7 +60,8 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { // ===================== 编解码器 ===================== - private static final IotModbusFrameCodec FRAME_CODEC = new IotModbusFrameCodec(CUSTOM_FC); + private static final IotModbusFrameDecoder FRAME_DECODER = new IotModbusFrameDecoder(CUSTOM_FC); + private static final IotModbusFrameEncoder FRAME_ENCODER = new IotModbusFrameEncoder(CUSTOM_FC); // ===================== 设备信息(根据实际情况修改,从 iot_device 表查询) ===================== @@ -111,7 +113,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { // ===================== 轮询响应测试 ===================== /** - * 轮询响应测试:认证后等待网关下发 FC03 读请求(RTU 格式),构造读响应帧发回 + * 轮询响应测试:认证后持续监听网关下发的读请求(RTU 格式),每次收到都自动构造读响应帧发回 */ @Test public void testPollingResponse() throws Exception { @@ -121,30 +123,32 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { IotModbusFrame authResponse = authenticate(socket); log.info("[testPollingResponse][认证响应: {}]", authResponse.getCustomData()); - // 2. 等待网关下发读请求 - log.info("[testPollingResponse][等待网关下发读请求...]"); - IotModbusFrame readRequest = waitForRequest(socket); - log.info("[testPollingResponse][收到读请求: slaveId={}, FC={}]", - readRequest.getSlaveId(), readRequest.getFunctionCode()); + // 2. 设置持续监听:每收到一个读请求,自动回复 + log.info("[testPollingResponse][开始持续监听网关下发的读请求...]"); + CompletableFuture done = new CompletableFuture<>(); + RecordParser parser = FRAME_DECODER.createRecordParser((frame, frameFormat) -> { + log.info("[testPollingResponse][收到请求: slaveId={}, FC={}]", + frame.getSlaveId(), frame.getFunctionCode()); + // 解析读请求中的起始地址和数量 + byte[] pdu = frame.getPdu(); + int startAddress = ((pdu[0] & 0xFF) << 8) | (pdu[1] & 0xFF); + int quantity = ((pdu[2] & 0xFF) << 8) | (pdu[3] & 0xFF); + log.info("[testPollingResponse][读请求参数: startAddress={}, quantity={}]", startAddress, quantity); - // 3. 解析读请求中的起始地址和数量 - byte[] pdu = readRequest.getPdu(); - int startAddress = ((pdu[0] & 0xFF) << 8) | (pdu[1] & 0xFF); - int quantity = ((pdu[2] & 0xFF) << 8) | (pdu[3] & 0xFF); - log.info("[testPollingResponse][读请求参数: startAddress={}, quantity={}]", startAddress, quantity); + // 构造读响应帧(模拟寄存器数据,RTU 格式) + int[] registerValues = new int[quantity]; + for (int i = 0; i < quantity; i++) { + registerValues[i] = 100 + i * 100; // 模拟值: 100, 200, 300, ... + } + byte[] responseData = buildReadResponse(frame.getSlaveId(), + frame.getFunctionCode(), registerValues); + socket.write(Buffer.buffer(responseData)); + log.info("[testPollingResponse][已发送读响应, registerValues={}]", registerValues); + }); + socket.handler(parser); - // 4. 构造读响应帧(模拟寄存器数据,RTU 格式) - int[] registerValues = new int[quantity]; - for (int i = 0; i < quantity; i++) { - registerValues[i] = 100 + i * 100; // 模拟值: 100, 200, 300, ... - } - byte[] responseData = buildReadResponse(readRequest.getSlaveId(), - readRequest.getFunctionCode(), registerValues); - socket.write(Buffer.buffer(responseData)); - log.info("[testPollingResponse][已发送读响应, registerValues={}]", registerValues); - - // 5. 等待一段时间让网关处理 - Thread.sleep(20000); + // 3. 持续等待(200 秒),期间会自动回复所有收到的读请求 + Thread.sleep(200000); } finally { socket.close(); } @@ -199,23 +203,20 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { } /** - * 发送帧并等待响应(使用 IotModbusRecordParserFactory 自动检测帧格式) + * 发送帧并等待响应(使用 IotModbusFrameDecoder 自动检测帧格式并解码) */ private IotModbusFrame sendAndReceive(NetSocket socket, byte[] frameData) throws Exception { CompletableFuture responseFuture = new CompletableFuture<>(); - // 使用 RecordParserFactory 创建拆包器(自动检测帧格式) - RecordParser parser = IotModbusRecordParserFactory.create(CUSTOM_FC, - buffer -> { + // 使用 FrameDecoder 创建拆包器(自动检测帧格式 + 解码,直接回调 IotModbusFrame) + RecordParser parser = FRAME_DECODER.createRecordParser( + (frame, frameFormat) -> { try { - // 检测到的帧格式应该是 RTU,使用 RTU 格式解码 - IotModbusFrame frame = FRAME_CODEC.decodeResponse( - buffer.getBytes(), IotModbusFrameFormatEnum.MODBUS_RTU); + log.info("[sendAndReceive][检测到帧格式: {}]", frameFormat); responseFuture.complete(frame); } catch (Exception e) { responseFuture.completeExceptionally(e); } - }, - format -> log.info("[sendAndReceive][检测到帧格式: {}]", format)); + }); socket.handler(parser); // 发送请求 @@ -231,18 +232,16 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { */ private IotModbusFrame waitForRequest(NetSocket socket) throws Exception { CompletableFuture requestFuture = new CompletableFuture<>(); - // 使用 RecordParserFactory 创建拆包器 - RecordParser parser = IotModbusRecordParserFactory.create(CUSTOM_FC, - buffer -> { + // 使用 FrameDecoder 创建拆包器(直接回调 IotModbusFrame) + RecordParser parser = FRAME_DECODER.createRecordParser( + (frame, frameFormat) -> { try { - IotModbusFrame frame = FRAME_CODEC.decodeResponse( - buffer.getBytes(), IotModbusFrameFormatEnum.MODBUS_RTU); + log.info("[waitForRequest][检测到帧格式: {}]", frameFormat); requestFuture.complete(frame); } catch (Exception e) { requestFuture.completeExceptionally(e); } - }, - format -> log.info("[waitForRequest][检测到帧格式: {}]", format)); + }); socket.handler(parser); // 等待(超时 30 秒,因为轮询间隔可能比较长) @@ -264,7 +263,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { JSONObject json = new JSONObject(); json.set("method", "auth"); json.set("params", params); - return FRAME_CODEC.encodeCustomFrame(SLAVE_ID, json.toString(), + return FRAME_ENCODER.encodeCustomFrame(SLAVE_ID, json.toString(), IotModbusFrameFormatEnum.MODBUS_RTU, 0); } @@ -286,7 +285,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { frame[3 + i * 2 + 1] = (byte) (registerValues[i] & 0xFF); } // 计算 CRC16 - int crc = IotModbusFrameCodec.calculateCrc16(frame, totalLength - 2); + int crc = IotModbusUtils.calculateCrc16(frame, totalLength - 2); frame[totalLength - 2] = (byte) (crc & 0xFF); // CRC Low frame[totalLength - 1] = (byte) ((crc >> 8) & 0xFF); // CRC High return frame; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java index 4d249927e7..c6c7ff28c3 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java @@ -6,8 +6,8 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec; -import io.vertx.core.Handler; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClient; @@ -61,7 +61,8 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { // ===================== 编解码器 ===================== - private static final IotModbusFrameCodec FRAME_CODEC = new IotModbusFrameCodec(CUSTOM_FC); + private static final IotModbusFrameDecoder FRAME_DECODER = new IotModbusFrameDecoder(CUSTOM_FC); + private static final IotModbusFrameEncoder FRAME_ENCODER = new IotModbusFrameEncoder(CUSTOM_FC); // ===================== 设备信息(根据实际情况修改,从 iot_device 表查询) ===================== @@ -113,7 +114,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { // ===================== 轮询响应测试 ===================== /** - * 轮询响应测试:认证后等待网关下发 FC03 读请求,构造读响应帧发回 + * 轮询响应测试:认证后持续监听网关下发的读请求,每次收到都自动构造读响应帧发回 */ @Test public void testPollingResponse() throws Exception { @@ -123,29 +124,31 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { IotModbusFrame authResponse = authenticate(socket); log.info("[testPollingResponse][认证响应: {}]", authResponse.getCustomData()); - // 2. 等待网关下发读请求 - log.info("[testPollingResponse][等待网关下发读请求...]"); - IotModbusFrame readRequest = waitForRequest(socket); - log.info("[testPollingResponse][收到读请求: slaveId={}, FC={}, transactionId={}]", - readRequest.getSlaveId(), readRequest.getFunctionCode(), readRequest.getTransactionId()); + // 2. 设置持续监听:每收到一个读请求,自动回复 + log.info("[testPollingResponse][开始持续监听网关下发的读请求...]"); + CompletableFuture done = new CompletableFuture<>(); + RecordParser parser = FRAME_DECODER.createRecordParser((frame, frameFormat) -> { + log.info("[testPollingResponse][收到请求: slaveId={}, FC={}, transactionId={}]", + frame.getSlaveId(), frame.getFunctionCode(), frame.getTransactionId()); + // 解析读请求中的起始地址和数量 + byte[] pdu = frame.getPdu(); + int startAddress = ((pdu[0] & 0xFF) << 8) | (pdu[1] & 0xFF); + int quantity = ((pdu[2] & 0xFF) << 8) | (pdu[3] & 0xFF); + log.info("[testPollingResponse][读请求参数: startAddress={}, quantity={}]", startAddress, quantity); - // 3. 解析读请求中的起始地址和数量 - byte[] pdu = readRequest.getPdu(); - int startAddress = ((pdu[0] & 0xFF) << 8) | (pdu[1] & 0xFF); - int quantity = ((pdu[2] & 0xFF) << 8) | (pdu[3] & 0xFF); - log.info("[testPollingResponse][读请求参数: startAddress={}, quantity={}]", startAddress, quantity); + // 构造读响应帧(模拟寄存器数据) + int[] registerValues = new int[quantity]; + for (int i = 0; i < quantity; i++) { + registerValues[i] = 100 + i * 100; // 模拟值: 100, 200, 300, ... + } + byte[] responseData = buildReadResponse(frame.getTransactionId(), + frame.getSlaveId(), frame.getFunctionCode(), registerValues); + socket.write(Buffer.buffer(responseData)); + log.info("[testPollingResponse][已发送读响应, registerValues={}]", registerValues); + }); + socket.handler(parser); - // 4. 构造读响应帧(模拟寄存器数据) - int[] registerValues = new int[quantity]; - for (int i = 0; i < quantity; i++) { - registerValues[i] = 100 + i * 100; // 模拟值: 100, 200, 300, ... - } - byte[] responseData = buildReadResponse(readRequest.getTransactionId(), - readRequest.getSlaveId(), readRequest.getFunctionCode(), registerValues); - socket.write(Buffer.buffer(responseData)); - log.info("[testPollingResponse][已发送读响应, registerValues={}]", registerValues); - - // 5. 等待一段时间让网关处理 + // 3. 持续等待(200 秒),期间会自动回复所有收到的读请求 Thread.sleep(200000); } finally { socket.close(); @@ -201,15 +204,20 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { } /** - * 发送帧并等待响应(MODBUS_TCP 格式) - *

- * 使用两阶段 RecordParser 拆包:fixedSizeMode(6) 读 MBAP 头 → fixedSizeMode(length) 读 body + * 发送帧并等待响应(使用 IotModbusFrameDecoder 自动检测帧格式并解码) */ private IotModbusFrame sendAndReceive(NetSocket socket, byte[] frameData) throws Exception { CompletableFuture responseFuture = new CompletableFuture<>(); - // 创建 TCP 两阶段拆包 RecordParser - RecordParser parser = RecordParser.newFixed(6); - parser.handler(new TcpRecordParserHandler(parser, responseFuture)); + // 使用 FrameDecoder 创建拆包器(自动检测帧格式 + 解码,直接回调 IotModbusFrame) + RecordParser parser = FRAME_DECODER.createRecordParser( + (frame, frameFormat) -> { + try { + log.info("[sendAndReceive][检测到帧格式: {}]", frameFormat); + responseFuture.complete(frame); + } catch (Exception e) { + responseFuture.completeExceptionally(e); + } + }); socket.handler(parser); // 发送请求 @@ -225,54 +233,22 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { */ private IotModbusFrame waitForRequest(NetSocket socket) throws Exception { CompletableFuture requestFuture = new CompletableFuture<>(); - RecordParser parser = RecordParser.newFixed(6); - parser.handler(new TcpRecordParserHandler(parser, requestFuture)); + // 使用 FrameDecoder 创建拆包器(直接回调 IotModbusFrame) + RecordParser parser = FRAME_DECODER.createRecordParser( + (frame, frameFormat) -> { + try { + log.info("[waitForRequest][检测到帧格式: {}]", frameFormat); + requestFuture.complete(frame); + } catch (Exception e) { + requestFuture.completeExceptionally(e); + } + }); socket.handler(parser); // 等待(超时 30 秒,因为轮询间隔可能比较长) return requestFuture.get(30000, TimeUnit.MILLISECONDS); } - /** - * MODBUS_TCP 两阶段拆包 Handler - */ - private class TcpRecordParserHandler implements Handler { - - private final RecordParser parser; - private final CompletableFuture future; - private byte[] mbapHeader; - private boolean waitingForBody = false; - - TcpRecordParserHandler(RecordParser parser, CompletableFuture future) { - this.parser = parser; - this.future = future; - } - - @Override - public void handle(Buffer buffer) { - try { - if (waitingForBody) { - // Phase 2: 收到 body(unitId + PDU) - byte[] body = buffer.getBytes(); - byte[] fullFrame = new byte[mbapHeader.length + body.length]; - System.arraycopy(mbapHeader, 0, fullFrame, 0, mbapHeader.length); - System.arraycopy(body, 0, fullFrame, mbapHeader.length, body.length); - - IotModbusFrame frame = FRAME_CODEC.decodeResponse(fullFrame, IotModbusFrameFormatEnum.MODBUS_TCP); - future.complete(frame); - } else { - // Phase 1: 收到 MBAP 头 6 字节 - this.mbapHeader = buffer.getBytes(); - int length = ((mbapHeader[4] & 0xFF) << 8) | (mbapHeader[5] & 0xFF); - this.waitingForBody = true; - parser.fixedSizeMode(length); - } - } catch (Exception e) { - future.completeExceptionally(e); - } - } - } - /** * 构造认证帧(MODBUS_TCP 格式) *

@@ -286,7 +262,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { JSONObject json = new JSONObject(); json.set("method", "auth"); json.set("params", params); - return FRAME_CODEC.encodeCustomFrame(SLAVE_ID, json.toString(), + return FRAME_ENCODER.encodeCustomFrame(SLAVE_ID, json.toString(), IotModbusFrameFormatEnum.MODBUS_TCP, 1); }