feat(iot):modbus-tcp-slave 优化它的 code、decode 逻辑;

This commit is contained in:
YunaiV
2026-02-08 01:41:30 +08:00
parent a0db86848d
commit 346ae3ff48
9 changed files with 525 additions and 615 deletions

View File

@@ -4,7 +4,6 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
@@ -12,9 +11,8 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusRecordParserFactory;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.downstream.IotModbusTcpSlaveDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.downstream.IotModbusTcpSlaveDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.upstream.IotModbusTcpSlaveUpstreamHandler;
@@ -23,6 +21,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotM
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePollScheduler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetServer;
@@ -33,19 +32,15 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
// TODO @AI不用主动上报
// DONE @AI不用主动上报
/**
* IoT 网关 Modbus TCP Slave 协议
* <p>
* 作为 TCP Server 接收设备主动连接:
* 1. 设备通过自定义功能码FC 65发送认证请求
* 2. 认证成功后,根据设备配置的 mode 决定工作模式
* - mode=1云端轮询网关主动发送 Modbus 读请求,设备响应
* - mode=2主动上报设备主动上报数据网关透传
* 2. 认证成功后,网关主动发送 Modbus 读请求,设备响应(云端轮询模式
*
* @author 芋道源码
*/
@@ -85,20 +80,16 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
*/
private Long requestCleanupTimerId;
/**
* 未认证连接的帧格式缓存socket → 检测到的帧格式
*/
private final Map<NetSocket, IotModbusFrameFormatEnum> pendingFrameFormats = new ConcurrentHashMap<>();
// ========== 各组件 ==========
// TODO @芋艿:稍后排序下,有点小乱;
private final IotModbusTcpSlaveConfig slaveConfig;
private final IotModbusFrameCodec frameCodec;
private final IotModbusFrameDecoder frameDecoder;
private final IotModbusFrameEncoder frameEncoder;
private final IotModbusTcpSlaveConnectionManager connectionManager;
private final IotModbusTcpSlaveConfigCacheService configCacheService;
private final IotModbusTcpSlavePendingRequestManager pendingRequestManager;
private final IotModbusTcpSlaveUpstreamHandler upstreamHandler;
private final IotModbusTcpSlaveDownstreamHandler downstreamHandler;
private final IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber;
private final IotModbusTcpSlavePollScheduler pollScheduler;
@@ -118,33 +109,27 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
this.pendingRequestManager = new IotModbusTcpSlavePendingRequestManager();
// 初始化帧编解码器
this.frameCodec = new IotModbusFrameCodec(slaveConfig.getCustomFunctionCode());
this.frameDecoder = new IotModbusFrameDecoder(slaveConfig.getCustomFunctionCode());
this.frameEncoder = new IotModbusFrameEncoder(slaveConfig.getCustomFunctionCode());
// 初始化轮询调度器
this.pollScheduler = new IotModbusTcpSlavePollScheduler(
vertx, connectionManager, frameEncoder, pendingRequestManager,
slaveConfig.getRequestTimeout());
// 初始化 Handler
IotModbusDataConverter dataConverter = new IotModbusDataConverter();
IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotDeviceService deviceService = SpringUtil.getBean(IotDeviceService.class);
this.upstreamHandler = new IotModbusTcpSlaveUpstreamHandler(
deviceApi, messageService, dataConverter, frameCodec,
connectionManager, configCacheService, pendingRequestManager, serverId);
this.downstreamHandler = new IotModbusTcpSlaveDownstreamHandler(
connectionManager, configCacheService, dataConverter, frameCodec);
// 初始化轮询调度器
this.pollScheduler = new IotModbusTcpSlavePollScheduler(
vertx, connectionManager, frameCodec, pendingRequestManager,
slaveConfig.getRequestTimeout());
// 设置认证成功回调:启动轮询
// TODO @AI感觉直接去调用不用注册回调了更简洁
this.upstreamHandler.setOnAuthSuccess((deviceId, config) -> {
if (config.getMode() != null
&& config.getMode().equals(IotModbusModeEnum.POLLING.getMode())) {
pollScheduler.updatePolling(config);
}
});
deviceApi, messageService, dataConverter, frameEncoder,
connectionManager, configCacheService, pendingRequestManager,
pollScheduler, deviceService, serverId);
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotModbusTcpSlaveDownstreamHandler downstreamHandler = new IotModbusTcpSlaveDownstreamHandler(
connectionManager, configCacheService, dataConverter, frameEncoder);
this.downstreamSubscriber = new IotModbusTcpSlaveDownstreamSubscriber(
this, downstreamHandler, messageBus);
}
@@ -168,6 +153,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
try {
// 1.1 首次加载配置
// TODO @AI可能首次不用加载你在想想
refreshConfig();
// 1.2 启动配置刷新定时器
int refreshInterval = slaveConfig.getConfigRefreshInterval();
@@ -190,10 +176,21 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
downstreamSubscriber.start();
} catch (Exception e) {
log.error("[start][IoT Modbus TCP Slave 协议 {} 启动失败]", getId(), e);
if (configRefreshTimerId != null) {
vertx.cancelTimer(configRefreshTimerId);
configRefreshTimerId = null;
}
if (requestCleanupTimerId != null) {
vertx.cancelTimer(requestCleanupTimerId);
requestCleanupTimerId = null;
}
connectionManager.closeAll();
if (netServer != null) {
netServer.close();
}
if (vertx != null) {
vertx.close();
}
// TODO @AI其它相关的 close
throw e;
}
}
@@ -252,20 +249,19 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
* 启动 TCP Server
*/
private void startTcpServer() {
// TODO @AIhost 一定要设置么?
// 1. 创建 TCP Server
NetServerOptions options = new NetServerOptions()
.setPort(properties.getPort())
.setHost("0.0.0.0");
.setPort(properties.getPort());
netServer = vertx.createNetServer(options);
// 2. 设置连接处理器
netServer.connectHandler(this::handleConnection);
// TODO @AI是不是 sync 就好,不用 onSuccess/onFailure 了?感觉更简洁。失败,肯定就要抛出异常,结束初始化了!
netServer.listen()
.onSuccess(server -> log.info("[startTcpServer][TCP Server 启动成功, port={}]",
server.actualPort()))
.onFailure(e -> log.error("[startTcpServer][TCP Server 启动失败]", e));
try {
netServer.listen().toCompletionStage().toCompletableFuture().get();
log.info("[startTcpServer][TCP Server 启动成功, port={}]", properties.getPort());
} catch (Exception e) {
throw new RuntimeException("[startTcpServer][TCP Server 启动失败]", e);
}
}
/**
@@ -274,58 +270,24 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
private void handleConnection(NetSocket socket) {
log.info("[handleConnection][新连接, remoteAddress={}]", socket.remoteAddress());
// 1.1 创建带帧格式检测的 RecordParser
// TODO @AI看看怎么从这个类里面拿出去让这个类的职责更单一
RecordParser parser = IotModbusRecordParserFactory.create(
slaveConfig.getCustomFunctionCode(),
// 完整帧回调
// TODO @AI感觉搞个独立的类稍微好点
frameBuffer -> {
byte[] frameBytes = frameBuffer.getBytes();
// 获取该连接的帧格式
ConnectionInfo connInfo = connectionManager.getConnectionInfo(socket);
IotModbusFrameFormatEnum frameFormat = connInfo != null ? connInfo.getFrameFormat() : null;
if (frameFormat == null) {
// 未认证的连接,使用首帧检测到的帧格式
frameFormat = pendingFrameFormats.get(socket);
}
if (frameFormat == null) {
log.warn("[handleConnection][帧格式未检测到, remoteAddress={}]", socket.remoteAddress());
return;
}
// 解码帧
IotModbusFrame frame = frameCodec.decodeResponse(frameBytes, frameFormat);
// 交给 UpstreamHandler 处理
upstreamHandler.handleFrame(socket, frame, frameFormat);
},
// 帧格式检测回调:保存到未认证缓存
detectedFormat -> {
// TODO @AI是不是不用缓存每次都探测因为一般 auth 首包后,基本也没探测的诉求了!
pendingFrameFormats.put(socket, detectedFormat);
// 如果连接已注册(不太可能在检测阶段),也更新
// TODO @AI是否非必须
connectionManager.setFrameFormat(socket, detectedFormat);
log.debug("[handleConnection][帧格式检测: {}, remoteAddress={}]",
detectedFormat, socket.remoteAddress());
}
);
// 1.2 设置数据处理器
socket.handler(parser);
// 1. 创建 RecordParser 并设置为数据处理器
RecordParser recordParser = frameDecoder.createRecordParser((frame, frameFormat) -> {
// 【重要】帧处理分发,即消息处理
upstreamHandler.handleFrame(socket, frame, frameFormat);
});
socket.handler(recordParser);
// 2.1 连接关闭处理
socket.closeHandler(v -> {
pendingFrameFormats.remove(socket);
ConnectionInfo info = connectionManager.removeConnection(socket);
// TODO @AIif return 简化下;
if (info != null && info.getDeviceId() != null) {
pollScheduler.stopPolling(info.getDeviceId());
pendingRequestManager.removeDevice(info.getDeviceId());
log.info("[handleConnection][连接关闭, deviceId={}, remoteAddress={}]",
info.getDeviceId(), socket.remoteAddress());
} else {
if (info == null || info.getDeviceId() == null) {
log.info("[handleConnection][未认证连接关闭, remoteAddress={}]", socket.remoteAddress());
return;
}
pollScheduler.stopPolling(info.getDeviceId());
pendingRequestManager.removeDevice(info.getDeviceId());
log.info("[handleConnection][连接关闭, deviceId={}, remoteAddress={}]",
info.getDeviceId(), socket.remoteAddress());
});
// 2.2 异常处理
socket.exceptionHandler(e -> {

View File

@@ -4,116 +4,205 @@ import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
// TODO @AI看看是不是不要搞成 factory而是直接 new可以一起讨论下
/**
* IoT Modbus RecordParser 工厂
* IoT Modbus 帧解码器集成 TCP 拆包 + 帧格式探测 + 帧解码一条龙完成从 TCP 字节流到 IotModbusFrame 的转换
* <p>
* 创建带自动帧格式检测的 RecordParser
* 流程
* 1. 首帧检测读前 6 字节判断 MODBUS_TCPProtocolId==0x0000 Length 合理 MODBUS_RTU
* 2. 检测后自动切换到对应的拆包模式
* 2. 检测后切换到对应的拆包 Handler并将首包 6 字节通过 handleFirstBytes() 交给新 Handler 处理
* 3. 拆包完成后解码为 IotModbusFrame通过回调返回
* - MODBUS_TCP两阶段 RecordParserMBAP length 字段驱动
* - MODBUS_RTU功能码驱动的状态机
*
* @author 芋道源码
*/
@Slf4j
public class IotModbusRecordParserFactory {
public class IotModbusFrameDecoder {
private final int customFunctionCode;
public IotModbusFrameDecoder(int customFunctionCode) {
this.customFunctionCode = customFunctionCode;
}
/**
* 创建带自动帧格式检测的 RecordParser
*
* @param customFunctionCode 自定义功能码
* @param frameHandler 完整帧回调
* @param onFormatDetected 帧格式检测回调
* @param frameHandler 完整帧回调解码后的 IotModbusFrame + 检测到的帧格式
* @return RecordParser 实例
*/
public static RecordParser create(int customFunctionCode,
Handler<Buffer> frameHandler,
Consumer<IotModbusFrameFormatEnum> onFormatDetected) {
// 先创建一个 RecordParser使用 fixedSizeMode(6) 读取首帧前 6 字节进行帧格式检测
// TODO @AI最小需要 6 个字节么有可能更小的情况下就探测出来
public RecordParser createRecordParser(BiConsumer<IotModbusFrame, IotModbusFrameFormatEnum> frameHandler) {
// 先创建一个 RecordParser使用 fixedSizeMode(6) 读取首帧前 6 字节进行帧格式检测
RecordParser parser = RecordParser.newFixed(6);
parser.handler(new DetectPhaseHandler(parser, customFunctionCode, frameHandler, onFormatDetected));
parser.handler(new DetectPhaseHandler(parser, customFunctionCode, frameHandler));
return parser;
}
// ==================== 帧解码 ====================
/**
* 帧格式检测阶段 Handler
* 解码响应帧拆包后的完整帧 byte[]
*
* @param data 完整帧字节数组
* @param format 帧格式
* @return 解码后的 IotModbusFrame
*/
private IotModbusFrame decodeResponse(byte[] data, IotModbusFrameFormatEnum format) {
if (format == IotModbusFrameFormatEnum.MODBUS_TCP) {
return decodeTcpResponse(data);
} else {
return decodeRtuResponse(data);
}
}
/**
* 解码 MODBUS_TCP 响应
* 格式[TransactionId(2)] [ProtocolId(2)] [Length(2)] [UnitId(1)] [FC(1)] [Data...]
*/
private IotModbusFrame decodeTcpResponse(byte[] data) {
if (data.length < 8) {
log.warn("[decodeTcpResponse][数据长度不足: {}]", data.length);
return null;
}
ByteBuffer buf = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN);
int transactionId = buf.getShort() & 0xFFFF;
buf.getShort(); // protocolId固定 0x0000Modbus 协议标识
buf.getShort(); // length后续字节数UnitId + PDU拆包阶段已使用
int slaveId = buf.get() & 0xFF;
int functionCode = buf.get() & 0xFF;
// 提取 PDU 数据 functionCode 之后到末尾
byte[] pdu = new byte[data.length - 8];
System.arraycopy(data, 8, pdu, 0, pdu.length);
return buildFrame(slaveId, functionCode, pdu, transactionId);
}
/**
* 解码 MODBUS_RTU 响应
* 格式[SlaveId(1)] [FC(1)] [Data...] [CRC(2)]
*/
private IotModbusFrame decodeRtuResponse(byte[] data) {
if (data.length < 4) {
log.warn("[decodeRtuResponse][数据长度不足: {}]", data.length);
return null;
}
// 校验 CRC
if (!IotModbusUtils.verifyCrc16(data)) {
log.warn("[decodeRtuResponse][CRC 校验失败]");
return null;
}
int slaveId = data[0] & 0xFF;
int functionCode = data[1] & 0xFF;
// PDU 数据不含 slaveIdfunctionCodeCRC
byte[] pdu = new byte[data.length - 4];
System.arraycopy(data, 2, pdu, 0, pdu.length);
return buildFrame(slaveId, functionCode, pdu, null);
}
/**
* 构建 IotModbusFrame
*/
private IotModbusFrame buildFrame(int slaveId, int functionCode, byte[] pdu, Integer transactionId) {
IotModbusFrame frame = new IotModbusFrame()
.setSlaveId(slaveId)
.setFunctionCode(functionCode)
.setPdu(pdu)
.setTransactionId(transactionId);
// 异常响应
// TODO @AI0x80 看看是不是要枚举
if ((functionCode & 0x80) != 0) {
frame.setException(true);
// TODO @AI0x7f 看看是不是要枚举
frame.setFunctionCode(functionCode & 0x7F);
if (pdu.length >= 1) {
frame.setExceptionCode(pdu[0] & 0xFF);
}
return frame;
}
// 自定义功能码
if (functionCode == customFunctionCode) {
// data 区格式[byteCount(1)] [JSON data(N)]
if (pdu.length >= 1) {
int byteCount = pdu[0] & 0xFF;
if (pdu.length >= 1 + byteCount) {
frame.setCustomData(new String(pdu, 1, byteCount, StandardCharsets.UTF_8));
}
}
}
return frame;
}
// ==================== 拆包 Handler ====================
/**
* 帧格式检测阶段 Handler仅处理首包探测后切换到对应的拆包 Handler
*/
@SuppressWarnings("ClassCanBeRecord")
private static class DetectPhaseHandler implements Handler<Buffer> {
@RequiredArgsConstructor
private class DetectPhaseHandler implements Handler<Buffer> {
private final RecordParser parser;
private final int customFunctionCode;
private final Handler<Buffer> frameHandler;
private final Consumer<IotModbusFrameFormatEnum> onFormatDetected;
// TODO @AI简化构造方法使用 lombok
DetectPhaseHandler(RecordParser parser, int customFunctionCode,
Handler<Buffer> frameHandler,
Consumer<IotModbusFrameFormatEnum> onFormatDetected) {
this.parser = parser;
this.customFunctionCode = customFunctionCode;
this.frameHandler = frameHandler;
this.onFormatDetected = onFormatDetected;
}
private final BiConsumer<IotModbusFrame, IotModbusFrameFormatEnum> frameHandler;
@Override
public void handle(Buffer buffer) {
byte[] header = buffer.getBytes();
// 检测byte[2]==0x00 && byte[3]==0x00 && 1<=length<=253
int protocolId = ((header[2] & 0xFF) << 8) | (header[3] & 0xFF);
int length = ((header[4] & 0xFF) << 8) | (header[5] & 0xFF);
// 检测帧格式protocolId==0x0000 length 合法 MODBUS_TCP否则 MODBUS_RTU
byte[] bytes = buffer.getBytes();
int protocolId = ((bytes[2] & 0xFF) << 8) | (bytes[3] & 0xFF);
int length = ((bytes[4] & 0xFF) << 8) | (bytes[5] & 0xFF);
// 分别处理 MODBUS_TCPMODBUS_RTU 两种情况
if (protocolId == 0x0000 && length >= 1 && length <= 253) {
// MODBUS_TCP
// MODBUS_TCP切换到 TCP 拆包 Handler
log.debug("[DetectPhaseHandler][检测到 MODBUS_TCP 帧格式]");
onFormatDetected.accept(IotModbusFrameFormatEnum.MODBUS_TCP);
// 切换到 TCP 拆包模式处理当前首帧
TcpFrameHandler tcpHandler = new TcpFrameHandler(parser, frameHandler);
parser.handler(tcpHandler);
// 当前 header MBAP 的前 6 字节需要继续读 length 字节
tcpHandler.handleMbapHeader(header, length);
// 当前 bytes MBAP 的前 6 字节直接交给 tcpHandler 处理
tcpHandler.handleFirstBytes(bytes);
} else {
// MODBUS_RTU
// MODBUS_RTU切换到 RTU 拆包 Handler
log.debug("[DetectPhaseHandler][检测到 MODBUS_RTU 帧格式]");
onFormatDetected.accept(IotModbusFrameFormatEnum.MODBUS_RTU);
// 切换到 RTU 拆包模式处理当前首帧
RtuFrameHandler rtuHandler = new RtuFrameHandler(parser, customFunctionCode, frameHandler);
RtuFrameHandler rtuHandler = new RtuFrameHandler(parser, frameHandler, customFunctionCode);
parser.handler(rtuHandler);
// 当前 header 包含前 6 字节slaveId + FC + 部分数据需要拼接处理
rtuHandler.handleInitialBytes(header);
// 当前 bytes 包含前 6 字节slaveId + FC + 部分数据交给 rtuHandler 处理
rtuHandler.handleFirstBytes(bytes);
}
}
}
/**
* MODBUS_TCP 拆包 Handler两阶段 RecordParser
* <p>
* Phase 1: fixedSizeMode(6) MBAP 6 字节提取 length
* Phase 2: fixedSizeMode(length) unitId + PDU
*/
private static class TcpFrameHandler implements Handler<Buffer> {
@RequiredArgsConstructor
private class TcpFrameHandler implements Handler<Buffer> {
private final RecordParser parser;
private final Handler<Buffer> frameHandler;
private final BiConsumer<IotModbusFrame, IotModbusFrameFormatEnum> frameHandler;
private byte[] mbapHeader;
private boolean waitingForBody = false;
// TODO @AIlombok
TcpFrameHandler(RecordParser parser, Handler<Buffer> frameHandler) {
this.parser = parser;
this.frameHandler = frameHandler;
}
/**
* 处理首帧的 MBAP
* 处理探测阶段传来的首帧 6 字节 MBAP
*
* @param bytes 探测阶段消费的 6 字节
*/
void handleMbapHeader(byte[] header, int length) {
this.mbapHeader = header;
void handleFirstBytes(byte[] bytes) {
int length = ((bytes[4] & 0xFF) << 8) | (bytes[5] & 0xFF);
this.mbapHeader = bytes;
this.waitingForBody = true;
parser.fixedSizeMode(length);
}
@@ -124,10 +213,14 @@ public class IotModbusRecordParserFactory {
// Phase 2: 收到 bodyunitId + PDU
byte[] body = buffer.getBytes();
// 拼接完整帧MBAP(6) + body
Buffer frame = Buffer.buffer(mbapHeader.length + body.length);
frame.appendBytes(mbapHeader);
frame.appendBytes(body);
frameHandler.handle(frame);
byte[] fullFrame = new byte[mbapHeader.length + body.length];
System.arraycopy(mbapHeader, 0, fullFrame, 0, mbapHeader.length);
System.arraycopy(body, 0, fullFrame, mbapHeader.length, body.length);
// 解码并回调
IotModbusFrame frame = decodeResponse(fullFrame, IotModbusFrameFormatEnum.MODBUS_TCP);
if (frame != null) {
frameHandler.accept(frame, IotModbusFrameFormatEnum.MODBUS_TCP);
}
// 切回 Phase 1
waitingForBody = false;
mbapHeader = null;
@@ -159,7 +252,8 @@ public class IotModbusRecordParserFactory {
* - FC05/06 响应fixedSizeMode(6) addr(2) + value(2) + CRC(2)
* - FC15/16 响应fixedSizeMode(6) addr(2) + quantity(2) + CRC(2)
*/
private static class RtuFrameHandler implements Handler<Buffer> {
@RequiredArgsConstructor
private class RtuFrameHandler implements Handler<Buffer> {
private static final int STATE_HEADER = 0;
private static final int STATE_EXCEPTION_BODY = 1;
@@ -168,50 +262,41 @@ public class IotModbusRecordParserFactory {
private static final int STATE_WRITE_BODY = 4;
private final RecordParser parser;
private final BiConsumer<IotModbusFrame, IotModbusFrameFormatEnum> frameHandler;
private final int customFunctionCode;
private final Handler<Buffer> frameHandler;
private int state = STATE_HEADER;
private byte slaveId;
private byte functionCode;
private byte byteCount;
// TODO @AIlombok
RtuFrameHandler(RecordParser parser, int customFunctionCode, Handler<Buffer> frameHandler) {
this.parser = parser;
this.customFunctionCode = customFunctionCode;
this.frameHandler = frameHandler;
}
private Buffer pendingData;
private int expectedDataLen;
/**
* 处理首帧检测阶段传来的初始 6 字节
* 由于 RTU 首帧跳过了格式检测我们需要拼接处理
* 处理测阶段传来的首帧 6 字节
* <p>
* 由于 RTU 首帧被探测阶段消费了 6 字节这里需要从中提取 slaveId + FC 并根据 FC 处理剩余数据
*
* @param bytes 探测阶段消费的 6 字节[slaveId][FC][...4 bytes...]
*/
void handleInitialBytes(byte[] initialBytes) {
// initialBytes 包含 6 字节[slaveId][FC][...4 bytes...]
this.slaveId = initialBytes[0];
this.functionCode = initialBytes[1];
void handleFirstBytes(byte[] bytes) {
this.slaveId = bytes[0];
this.functionCode = bytes[1];
int fc = functionCode & 0xFF;
// 根据功能码确定还需要多少字节
if ((fc & 0x80) != 0) {
// 异常响应还需要 exceptionCode(1) + CRC(2) = 3 字节
// 我们已经 4 字节剩余initialBytes[2..5]足够
// 拼接完整帧并交付
// 完整帧 = slaveId(1) + FC(1) + exceptionCode(1) + CRC(2) = 5
// 异常响应完整帧 = slaveId(1) + FC(1) + exceptionCode(1) + CRC(2) = 5 字节
// 6 字节 1 字节取前 5 字节组装
Buffer frame = Buffer.buffer(5);
frame.appendByte(slaveId);
frame.appendByte(functionCode);
frame.appendBytes(initialBytes, 2, 3); // exceptionCode + CRC
frameHandler.handle(frame);
// 剩余 1 字节需要留给下一帧 RecordParser 不支持回推
// 简化处理重置状态开始读下一帧
frame.appendBytes(bytes, 2, 3); // exceptionCode + CRC
emitFrame(frame);
resetToHeader();
} else if (isReadResponse(fc) || fc == customFunctionCode) {
// 读响应或自定义 FCinitialBytes[2] = byteCount
this.byteCount = initialBytes[2];
// 读响应或自定义 FCbytes[2] = byteCount
this.byteCount = bytes[2];
int bc = byteCount & 0xFF;
// 已有数据initialBytes[3..5] = 3 字节
// 已有数据bytes[3..5] = 3 字节
// 还需byteCount + CRC(2) - 3 字节已有
int remaining = bc + 2 - 3;
if (remaining <= 0) {
@@ -221,36 +306,30 @@ public class IotModbusRecordParserFactory {
frame.appendByte(slaveId);
frame.appendByte(functionCode);
frame.appendByte(byteCount);
frame.appendBytes(initialBytes, 3, bc + 2); // data + CRC
frameHandler.handle(frame);
frame.appendBytes(bytes, 3, bc + 2); // data + CRC
emitFrame(frame);
resetToHeader();
} else {
// 需要继续读
state = STATE_READ_DATA;
// 保存已有数据片段
parser.fixedSizeMode(remaining);
// handle() 中需要拼接 initialBytes[3..5] + 新读取的数据
// 为了简化我们用一个 Buffer 暂存
this.pendingData = Buffer.buffer();
this.pendingData.appendBytes(initialBytes, 3, 3);
this.pendingData.appendBytes(bytes, 3, 3); // 暂存已有的 3 字节
this.expectedDataLen = bc + 2; // byteCount 个数据 + 2 CRC
parser.fixedSizeMode(remaining);
}
} else if (isWriteResponse(fc)) {
// 写响应FC05/06/15/16总长 = slaveId(1) + FC(1) + addr(2) + value/qty(2) + CRC(2) = 8
// 写响应总长 = slaveId(1) + FC(1) + addr(2) + value/qty(2) + CRC(2) = 8 字节
// 已有 6 字节还需 2 字节
state = STATE_WRITE_BODY;
this.pendingData = Buffer.buffer();
this.pendingData.appendBytes(initialBytes, 2, 4); // 4 bytes already
parser.fixedSizeMode(2); // need 2 more bytes (CRC)
this.pendingData.appendBytes(bytes, 2, 4); // 暂存已有的 4 字节
parser.fixedSizeMode(2); // 还需 2 字节CRC
} else {
log.warn("[RtuFrameHandler][未知功能码: 0x{}]", Integer.toHexString(fc));
resetToHeader();
}
}
private Buffer pendingData;
private int expectedDataLen;
@Override
public void handle(Buffer buffer) {
switch (state) {
@@ -279,7 +358,6 @@ public class IotModbusRecordParserFactory {
this.slaveId = header[0];
this.functionCode = header[1];
int fc = functionCode & 0xFF;
if ((fc & 0x80) != 0) {
// 异常响应
state = STATE_EXCEPTION_BODY;
@@ -305,7 +383,7 @@ public class IotModbusRecordParserFactory {
frame.appendByte(slaveId);
frame.appendByte(functionCode);
frame.appendBuffer(buffer);
frameHandler.handle(frame);
emitFrame(frame);
resetToHeader();
}
@@ -327,7 +405,7 @@ public class IotModbusRecordParserFactory {
frame.appendByte(functionCode);
frame.appendByte(byteCount);
frame.appendBuffer(pendingData);
frameHandler.handle(frame);
emitFrame(frame);
resetToHeader();
}
// 否则继续等待不应该发生因为我们精确设置了 fixedSizeMode
@@ -340,16 +418,27 @@ public class IotModbusRecordParserFactory {
frame.appendByte(slaveId);
frame.appendByte(functionCode);
frame.appendBuffer(pendingData);
frameHandler.handle(frame);
emitFrame(frame);
resetToHeader();
}
/**
* 发射完整帧解码并回调
*/
private void emitFrame(Buffer frameBuffer) {
IotModbusFrame frame = decodeResponse(frameBuffer.getBytes(), IotModbusFrameFormatEnum.MODBUS_RTU);
if (frame != null) {
frameHandler.accept(frame, IotModbusFrameFormatEnum.MODBUS_RTU);
}
}
private void resetToHeader() {
state = STATE_HEADER;
pendingData = null;
parser.fixedSizeMode(2); // slaveId + FC
}
// TODO @AI可以抽到 IotModbusUtils
private boolean isReadResponse(int fc) {
return fc >= 1 && fc <= 4;
}

View File

@@ -1,128 +1,24 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
/**
* IoT Modbus 帧编码器
* IoT Modbus 帧编码器
* <p>
* Modbus 协议编解码不处理 TCP 粘包 RecordParser 处理
* 支持 MODBUS_TCPMBAP MODBUS_RTUCRC16两种帧格式以及自定义功能码扩展
* 负责将 Modbus 请求/响应编码为字节数组支持 MODBUS_TCPMBAP MODBUS_RTUCRC16两种帧格式
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusFrameCodec {
public class IotModbusFrameEncoder {
private final int customFunctionCode;
public IotModbusFrameCodec(int customFunctionCode) {
this.customFunctionCode = customFunctionCode;
}
// ==================== 解码 ====================
/**
* 解码响应帧拆包后的完整帧 byte[]
*
* @param data 完整帧字节数组
* @param format 帧格式
* @return 解码后的 IotModbusFrame
*/
public IotModbusFrame decodeResponse(byte[] data, IotModbusFrameFormatEnum format) {
if (format == IotModbusFrameFormatEnum.MODBUS_TCP) {
return decodeTcpResponse(data);
} else {
return decodeRtuResponse(data);
}
}
/**
* 解码 MODBUS_TCP 响应
* 格式[TransactionId(2)] [ProtocolId(2)] [Length(2)] [UnitId(1)] [FC(1)] [Data...]
*/
private IotModbusFrame decodeTcpResponse(byte[] data) {
if (data.length < 8) {
log.warn("[decodeTcpResponse][数据长度不足: {}]", data.length);
return null;
}
ByteBuffer buf = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN);
int transactionId = buf.getShort() & 0xFFFF;
buf.getShort(); // protocolId跳过// TODO @AI跳过原因最好写下
buf.getShort(); // length跳过// TODO @AI跳过原因最好写下
int slaveId = buf.get() & 0xFF;
int functionCode = buf.get() & 0xFF;
// 提取 PDU 数据 functionCode 之后到末尾
byte[] pdu = new byte[data.length - 8];
System.arraycopy(data, 8, pdu, 0, pdu.length);
// 构建 IotModbusFrame
return buildFrame(slaveId, functionCode, pdu, transactionId);
}
/**
* 解码 MODBUS_RTU 响应
* 格式[SlaveId(1)] [FC(1)] [Data...] [CRC(2)]
*/
private IotModbusFrame decodeRtuResponse(byte[] data) {
if (data.length < 4) {
log.warn("[decodeRtuResponse][数据长度不足: {}]", data.length);
return null;
}
// 校验 CRC
if (!verifyCrc16(data)) {
log.warn("[decodeRtuResponse][CRC 校验失败]");
return null;
}
int slaveId = data[0] & 0xFF;
int functionCode = data[1] & 0xFF;
// PDU 数据不含 slaveIdfunctionCodeCRC
byte[] pdu = new byte[data.length - 4];
System.arraycopy(data, 2, pdu, 0, pdu.length);
// 构建 IotModbusFrame
return buildFrame(slaveId, functionCode, pdu, null);
}
/**
* 构建 IotModbusFrame
*/
private IotModbusFrame buildFrame(int slaveId, int functionCode, byte[] pdu, Integer transactionId) {
IotModbusFrame frame = new IotModbusFrame()
.setSlaveId(slaveId)
.setFunctionCode(functionCode)
.setPdu(pdu)
.setTransactionId(transactionId);
// 异常响应
// TODO @AI0x80 看看是不是要枚举
if ((functionCode & 0x80) != 0) {
frame.setException(true);
// TODO @AI0x7f 看看是不是要枚举
frame.setFunctionCode(functionCode & 0x7F);
if (pdu.length >= 1) {
frame.setExceptionCode(pdu[0] & 0xFF);
}
return frame;
}
// 自定义功能码
if (functionCode == customFunctionCode) {
// data 区格式[byteCount(1)] [JSON data(N)]
if (pdu.length >= 1) {
int byteCount = pdu[0] & 0xFF;
if (pdu.length >= 1 + byteCount) {
frame.setCustomData(new String(pdu, 1, byteCount, StandardCharsets.UTF_8));
}
}
}
return frame;
}
// ==================== 编码 ====================
/**
@@ -269,49 +165,10 @@ public class IotModbusFrameCodec {
frame[0] = (byte) slaveId;
System.arraycopy(pdu, 0, frame, 1, pdu.length);
// 计算并追加 CRC16
int crc = calculateCrc16(frame, frame.length - 2);
int crc = IotModbusUtils.calculateCrc16(frame, frame.length - 2);
frame[frame.length - 2] = (byte) (crc & 0xFF); // CRC Low
frame[frame.length - 1] = (byte) ((crc >> 8) & 0xFF); // CRC High
return frame;
}
// ==================== CRC16 工具 ====================
// TODO @AIhutool 有没工具类可以用
/**
* 计算 CRC-16/MODBUS
*
* @param data 数据
* @param length 计算长度
* @return CRC16
*/
public static int calculateCrc16(byte[] data, int length) {
int crc = 0xFFFF;
for (int i = 0; i < length; i++) {
crc ^= (data[i] & 0xFF);
for (int j = 0; j < 8; j++) {
if ((crc & 0x0001) != 0) {
crc >>= 1;
crc ^= 0xA001;
} else {
crc >>= 1;
}
}
}
return crc;
}
// TODO @AIhutool 有没工具类可以用
/**
* 校验 CRC16
*/
private boolean verifyCrc16(byte[] data) {
if (data.length < 3) {
return false;
}
int computed = calculateCrc16(data, data.length - 2);
int received = (data[data.length - 2] & 0xFF) | ((data[data.length - 1] & 0xFF) << 8);
return computed == received;
}
}

View File

@@ -3,14 +3,61 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec;
import lombok.extern.slf4j.Slf4j;
/**
* IoT Modbus 响应值提取器
* IoT Modbus 工具类
* <p>
* 从解码后的 IotModbusFrame 中提取寄存器值用于后续的点位翻译
* 提供
* 1. CRC-16/MODBUS 计算和校验
* 2. 从解码后的 IotModbusFrame 中提取寄存器值用于后续的点位翻译
*
* @author 芋道源码
*/
@Slf4j
public class IotModbusResponseParser {
public class IotModbusUtils {
// TODO @AI可以把 12345 这些 fucntion code 在这里枚举下
// TODO @AI一些枚举 0x80 这些可以这里枚举
// ==================== CRC16 工具 ====================
/**
* 计算 CRC-16/MODBUS
*
* @param data 数据
* @param length 计算长度
* @return CRC16
*/
public static int calculateCrc16(byte[] data, int length) {
int crc = 0xFFFF;
for (int i = 0; i < length; i++) {
crc ^= (data[i] & 0xFF);
for (int j = 0; j < 8; j++) {
if ((crc & 0x0001) != 0) {
crc >>= 1;
crc ^= 0xA001;
} else {
crc >>= 1;
}
}
}
return crc;
}
/**
* 校验 CRC16
*
* @param data 包含 CRC 的完整数据
* @return 校验是否通过
*/
public static boolean verifyCrc16(byte[] data) {
if (data.length < 3) {
return false;
}
int computed = calculateCrc16(data, data.length - 2);
int received = (data[data.length - 2] & 0xFF) | ((data[data.length - 1] & 0xFF) << 8);
return computed == received;
}
// ==================== 响应值提取 ====================
/**
* 从帧中提取寄存器值FC01-04 读响应

View File

@@ -8,7 +8,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo;
@@ -35,7 +35,7 @@ public class IotModbusTcpSlaveDownstreamHandler {
private final IotModbusTcpSlaveConnectionManager connectionManager;
private final IotModbusTcpSlaveConfigCacheService configCacheService;
private final IotModbusDataConverter dataConverter;
private final IotModbusFrameCodec frameCodec;
private final IotModbusFrameEncoder frameEncoder;
/**
* TCP 事务 ID 自增器
@@ -117,11 +117,11 @@ public class IotModbusTcpSlaveDownstreamHandler {
}
if (rawValues.length == 1 && fcEnum.getWriteSingleCode() != null) {
// 单个值使用单写功能码FC05/FC06
data = frameCodec.encodeWriteSingleRequest(slaveId, fcEnum.getWriteSingleCode(),
data = frameEncoder.encodeWriteSingleRequest(slaveId, fcEnum.getWriteSingleCode(),
point.getRegisterAddress(), rawValues[0], frameFormat, transactionId);
} else if (fcEnum.getWriteMultipleCode() != null) {
// 多个值使用多写功能码FC15/FC16
data = frameCodec.encodeWriteMultipleRegistersRequest(slaveId,
data = frameEncoder.encodeWriteMultipleRegistersRequest(slaveId,
point.getRegisterAddress(), rawValues, frameFormat, transactionId);
} else {
log.warn("[writeProperty][点位 {} 不支持写操作]", point.getIdentifier());

View File

@@ -1,11 +1,13 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.upstream;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
@@ -19,69 +21,76 @@ import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusResponseParser;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager.PendingRequest;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePollScheduler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.net.NetSocket;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.function.BiConsumer;
// TODO @AI逻辑有点多看看是不是分区域
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException;
// DONE @AI逻辑有点多看看是不是分区域 => 已按区域划分:认证 / 轮询响应
/**
* IoT Modbus TCP Slave 上行数据处理器
* <p>
* 处理:
* 1. 自定义 FC 认证
* 2. 轮询响应mode=1→ 点位翻译 → thing.property.post
* 3. 主动上报mode=2→ 透传 property.report TODO @AI这种模式应该不用支持因为主动上报,都走标准 tcp 即可
* // DONE @AI不用主动上报主动上报走标准 tcp 即可
*
* @author 芋道源码
*/
@Slf4j
public class IotModbusTcpSlaveUpstreamHandler {
private static final String METHOD_AUTH = "auth";
private final IotDeviceCommonApi deviceApi;
private final IotDeviceMessageService messageService;
private final IotModbusDataConverter dataConverter;
private final IotModbusFrameCodec frameCodec;
private final IotModbusFrameEncoder frameEncoder;
private final IotModbusTcpSlaveConnectionManager connectionManager;
private final IotModbusTcpSlaveConfigCacheService configCacheService;
private final IotModbusTcpSlavePendingRequestManager pendingRequestManager;
private final IotModbusTcpSlavePollScheduler pollScheduler;
private final IotDeviceService deviceService;
private final String serverId;
/**
* 认证成功回调:(deviceId, config) → 启动轮询等
*/
@Setter
private BiConsumer<Long, IotModbusDeviceConfigRespDTO> onAuthSuccess;
public IotModbusTcpSlaveUpstreamHandler(IotDeviceCommonApi deviceApi,
IotDeviceMessageService messageService,
IotModbusDataConverter dataConverter,
IotModbusFrameCodec frameCodec,
IotModbusFrameEncoder frameEncoder,
IotModbusTcpSlaveConnectionManager connectionManager,
IotModbusTcpSlaveConfigCacheService configCacheService,
IotModbusTcpSlavePendingRequestManager pendingRequestManager,
IotModbusTcpSlavePollScheduler pollScheduler,
IotDeviceService deviceService,
String serverId) {
this.deviceApi = deviceApi;
this.messageService = messageService;
this.dataConverter = dataConverter;
this.frameCodec = frameCodec;
this.frameEncoder = frameEncoder;
this.connectionManager = connectionManager;
this.configCacheService = configCacheService;
this.pendingRequestManager = pendingRequestManager;
this.pollScheduler = pollScheduler;
this.deviceService = deviceService;
this.serverId = serverId;
}
// ========== 帧处理入口 ==========
/**
* 处理帧
*/
@@ -89,159 +98,158 @@ public class IotModbusTcpSlaveUpstreamHandler {
if (frame == null) {
return;
}
// 1.1 自定义功能码(认证等扩展)
if (StrUtil.isNotEmpty(frame.getCustomData())) {
handleCustomFrame(socket, frame, frameFormat);
return;
}
// 1.2 异常响应
// 1. 异常响应
if (frame.isException()) {
// TODO @AI这种需要返回一个结果给 modbus client
log.warn("[handleFrame][设备异常响应, slaveId={}, FC={}, exceptionCode={}]",
frame.getSlaveId(), frame.getFunctionCode(), frame.getExceptionCode());
return;
}
// 1.3 未认证连接,丢弃
// 2. 自定义功能码(认证等扩展)
if (StrUtil.isNotEmpty(frame.getCustomData())) {
handleCustomFrame(socket, frame, frameFormat);
return;
}
// 1.2 未认证连接,丢弃
// TODO @AI把 1.2、1.3 拿到 handlePollingResponse 里;是否需要登录,自己知道!
if (!connectionManager.isAuthenticated(socket)) {
// TODO @AI这种需要返回一个结果给 modbus client
log.warn("[handleFrame][未认证连接, 丢弃数据, remoteAddress={}]", socket.remoteAddress());
return;
}
// TODO @AI获取不到看看要不要也打个告警然后
// 2. 标准 Modbus 响应
// 3. DONE @AI断言必须是云端轮询不再支持主动上报
// TODO @AI貌似只能轮询到一次
// 1.3 标准 Modbus 响应(只支持云端轮询模式)
// TODO @AI可以把
ConnectionInfo info = connectionManager.getConnectionInfo(socket);
if (info == null) {
log.warn("[handleFrame][已认证但连接信息为空, remoteAddress={}]", socket.remoteAddress());
return;
}
// TODO @AI可以断言下必须是云端轮询
if (info.getMode() != null && info.getMode().equals(IotModbusModeEnum.ACTIVE_REPORT.getMode())) {
// mode=2主动上报透传
handleActiveReport(info, frame);
} else {
// mode=1云端轮询匹配 PendingRequest
handlePollingResponse(info, frame, frameFormat);
}
handlePollingResponse(info, frame, frameFormat);
}
// ========== 自定义 FC 处理(认证等) ==========
/**
* 处理自定义功能码帧
* <p>
* 异常分层翻译,参考 {@link cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpAbstractHandler}
*/
private void handleCustomFrame(NetSocket socket, IotModbusFrame frame, IotModbusFrameFormatEnum frameFormat) {
try {
// TODO @AI直接使用 JsonUtils 去解析出 IotDeviceMessage
JSONObject json = JSONUtil.parseObj(frame.getCustomData());
String method = json.getStr("method");
// TODO @AI method 枚举下;
if ("auth".equals(method)) {
if (METHOD_AUTH.equals(method)) {
handleAuth(socket, frame, json, frameFormat);
return;
}
// TODO @AI把 frame 都打印下;
log.warn("[handleCustomFrame][未知 method: {}]", method);
log.warn("[handleCustomFrame][未知 method: {}, frame: slaveId={}, FC={}, customData={}]",
method, frame.getSlaveId(), frame.getFunctionCode(), frame.getCustomData());
} catch (ServiceException e) {
// 已知业务异常,返回对应的错误码和错误信息
sendCustomResponse(socket, frame, frameFormat, e.getCode(), e.getMessage());
} catch (IllegalArgumentException e) {
// 参数校验异常,返回 400 错误
sendCustomResponse(socket, frame, frameFormat, BAD_REQUEST.getCode(), e.getMessage());
} catch (Exception e) {
// TODO @AI各种情况的翻译看看怎么弄比较合适是不是要用 fc 自定义的 callback 下?
log.error("[handleCustomFrame][解析自定义 FC 数据失败]", e);
// 其他未知异常,返回 500 错误
log.error("[handleCustomFrame][解析自定义 FC 数据失败, frame: slaveId={}, FC={}, customData={}]",
frame.getSlaveId(), frame.getFunctionCode(), frame.getCustomData(), e);
sendCustomResponse(socket, frame, frameFormat,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
}
}
// TODO @芋艿:在 review 下这个类;
// TODO @AI不传递 json直接在 frame
/**
* 处理认证请求
*/
private void handleAuth(NetSocket socket, IotModbusFrame frame, JSONObject json,
IotModbusFrameFormatEnum frameFormat) {
// TODO @AI参数为空的校验
// TODO @AI是不是可以 JsonUtils.convert(json, IotDeviceAuthReqDTO.class)
JSONObject params = json.getJSONObject("params");
if (params == null) {
sendAuthResponse(socket, frame, frameFormat, 1, "params 为空");
return;
throw invalidParamException("params 不能为空");
}
// TODO @AI参数判空
// DONE @AI参数判空
String clientId = params.getStr("clientId");
String username = params.getStr("username");
String password = params.getStr("password");
// TODO @AI逐个判空
if (StrUtil.hasBlank(clientId, username, password)) {
throw invalidParamException("clientId、username、password 不能为空");
}
try {
// 1. 调用认证 API
IotDeviceAuthReqDTO authReq = new IotDeviceAuthReqDTO()
.setClientId(clientId).setUsername(username).setPassword(password);
CommonResult<Boolean> authResult = deviceApi.authDevice(authReq);
// TODO @AI应该不用 close 吧?!
// TODO @AIBooleanUtils.isFalse
if (authResult == null || !authResult.isSuccess() || !Boolean.TRUE.equals(authResult.getData())) {
log.warn("[handleAuth][认证失败, clientId={}, username={}]", clientId, username);
sendAuthResponse(socket, frame, frameFormat, 1, "认证失败");
socket.close();
return;
}
// 1. 调用认证 API
IotDeviceAuthReqDTO authReq = new IotDeviceAuthReqDTO()
.setClientId(clientId).setUsername(username).setPassword(password);
CommonResult<Boolean> authResult = deviceApi.authDevice(authReq);
authResult.checkError();
if (BooleanUtil.isFalse(authResult.getData())) {
log.warn("[handleAuth][认证失败, clientId={}, username={}]", clientId, username);
sendCustomResponse(socket, frame, frameFormat, 1, "认证失败");
return;
}
// 2. 认证成功,查找设备配置(通过 username 作为 deviceName 查找)
// TODO 根据实际的认证模型优化查找逻辑
// TODO @AI通过 device
IotModbusDeviceConfigRespDTO config = configCacheService.findConfigByAuth(clientId, username, password);
if (config == null) {
// 退而求其次,遍历缓存查找
log.info("[handleAuth][认证成功但未找到设备配置, clientId={}, username={}]", clientId, username);
}
// 2.2 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
Assert.notNull(deviceInfo, "解析设备信息失败");
// 2.3 获取设备信息
// TODO @AI这里要优化下不要通过 spring 这样注入;
IotDeviceService deviceService = SpringUtil.getBean(IotDeviceService.class);
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
Assert.notNull(device, "设备不存在");
// TODO @AI校验 frameFormat 是否一致;不一致,连接也失败;
// 2.1 认证成功,查找设备配置
IotModbusDeviceConfigRespDTO config = configCacheService.findConfigByAuth(clientId, username, password);
if (config == null) {
log.info("[handleAuth][认证成功但未找到设备配置, clientId={}, username={}]", clientId, username);
}
// 2.2 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
Assert.notNull(deviceInfo, "解析设备信息失败");
// 2.3 获取设备信息
// DONE @AIIotDeviceService 作为构造参数传入,不通过 SpringUtil.getBean
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
Assert.notNull(device, "设备不存在");
// 3. 注册连接
ConnectionInfo connectionInfo = new ConnectionInfo()
.setDeviceId(device.getId())
.setSlaveId(frame.getSlaveId())
.setFrameFormat(frameFormat)
.setMode(config != null ? config.getMode() : IotModbusModeEnum.POLLING.getMode());
// 3. 注册连接
ConnectionInfo connectionInfo = new ConnectionInfo()
.setDeviceId(device.getId())
.setSlaveId(frame.getSlaveId())
.setFrameFormat(frameFormat)
.setMode(config != null ? config.getMode() : IotModbusModeEnum.POLLING.getMode());
if (config != null) {
connectionInfo.setDeviceId(config.getDeviceId())
.setProductKey(config.getProductKey())
.setDeviceName(config.getDeviceName());
}
connectionManager.registerConnection(socket, connectionInfo);
if (config != null) {
connectionInfo.setDeviceId(config.getDeviceId())
.setProductKey(config.getProductKey())
.setDeviceName(config.getDeviceName());
}
connectionManager.registerConnection(socket, connectionInfo);
// 4. 发送认证成功响应
sendCustomResponse(socket, frame, frameFormat, 0, "success");
log.info("[handleAuth][认证成功, clientId={}, deviceId={}]", clientId,
config != null ? config.getDeviceId() : device.getId());
// 4. 发送认证成功响应
sendAuthResponse(socket, frame, frameFormat, 0, "success");
log.info("[handleAuth][认证成功, clientId={}, deviceId={}]", clientId,
config != null ? config.getDeviceId() : "unknown");
// 5. 回调:启动轮询等
// TODO @AI是不是不要 callback而是主动调用
if (onAuthSuccess != null && config != null) {
onAuthSuccess.accept(config.getDeviceId(), config);
}
} catch (Exception e) {
log.error("[handleAuth][认证异常]", e);
sendAuthResponse(socket, frame, frameFormat, 1, "认证异常");
socket.close();
// 5. 直接启动轮询
if (config != null) {
pollScheduler.updatePolling(config);
}
}
/**
* 发送认证响应
* 发送自定义功能码响应
*/
private void sendAuthResponse(NetSocket socket, IotModbusFrame frame,
IotModbusFrameFormatEnum frameFormat,
int code, String message) {
// TODO @AI不一定用 auth response而是 custom
private void sendCustomResponse(NetSocket socket, IotModbusFrame frame,
IotModbusFrameFormatEnum frameFormat,
int code, String message) {
JSONObject resp = new JSONObject();
resp.set("method", "auth");
resp.set("method", METHOD_AUTH);
resp.set("code", code);
resp.set("message", message);
byte[] data = frameCodec.encodeCustomFrame(frame.getSlaveId(), resp.toString(),
byte[] data = frameEncoder.encodeCustomFrame(frame.getSlaveId(), resp.toString(),
frameFormat, frame.getTransactionId() != null ? frame.getTransactionId() : 0);
connectionManager.sendToSocket(socket, data);
}
// ========== 轮询响应处理 ==========
/**
* 处理轮询响应(mode=1
* 处理轮询响应(云端轮询模式
*/
private void handlePollingResponse(ConnectionInfo info, IotModbusFrame frame,
IotModbusFrameFormatEnum frameFormat) {
@@ -254,7 +262,7 @@ public class IotModbusTcpSlaveUpstreamHandler {
return;
}
// 1.2 提取寄存器值
int[] rawValues = IotModbusResponseParser.extractValues(frame);
int[] rawValues = IotModbusUtils.extractValues(frame);
if (rawValues == null) {
log.warn("[handlePollingResponse][提取寄存器值失败, deviceId={}, identifier={}]",
info.getDeviceId(), request.getIdentifier());
@@ -262,20 +270,17 @@ public class IotModbusTcpSlaveUpstreamHandler {
}
// 1.3 查找点位配置
IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(info.getDeviceId());
if (config == null || config.getPoints() == null) {
if (config == null || CollUtil.isEmpty(config.getPoints())) {
return;
}
// TODO @AIfindone arrayUtil
var point = config.getPoints().stream()
.filter(p -> p.getId().equals(request.getPointId()))
.findFirst().orElse(null);
var point = CollUtil.findOne(config.getPoints(), p -> p.getId().equals(request.getPointId()));
if (point == null) {
return;
}
// TODO @AI拆成 2.1、2.2
// 4. 点位翻译 → 上报
// 2.1 点位翻译
Object convertedValue = dataConverter.convertToPropertyValue(rawValues, point);
// 2.2 上报属性
Map<String, Object> params = MapUtil.of(request.getIdentifier(), convertedValue);
IotDeviceMessage message = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params);
@@ -284,29 +289,4 @@ public class IotModbusTcpSlaveUpstreamHandler {
info.getDeviceId(), request.getIdentifier(), rawValues, convertedValue);
}
// TODO @AI不需要这个逻辑
/**
* 处理主动上报mode=2
* 设备直接上报 property.report 格式:{propertyId: value},不做点位翻译
*/
@SuppressWarnings("unchecked")
private void handleActiveReport(ConnectionInfo info, IotModbusFrame frame) {
// mode=2 下设备上报标准 Modbus 帧,但由于没有点位翻译,
// 这里暂时将原始寄存器值以 FC+地址 为 key 上报
int[] rawValues = IotModbusResponseParser.extractValues(frame);
if (rawValues == null) {
return;
}
// 简单上报:以 "register_FC{fc}" 作为属性名
String propertyKey = "register_FC" + frame.getFunctionCode();
Map<String, Object> params = MapUtil.of(propertyKey, rawValues);
IotDeviceMessage message = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params);
messageService.sendDeviceMessage(message, info.getProductKey(), info.getDeviceName(), serverId);
log.debug("[handleActiveReport][设备={}, FC={}, 原始值={}]",
info.getDeviceId(), frame.getFunctionCode(), rawValues);
}
}

View File

@@ -4,7 +4,7 @@ import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager.PendingRequest;
import io.vertx.core.Vertx;
@@ -37,7 +37,7 @@ public class IotModbusTcpSlavePollScheduler {
private final Vertx vertx;
private final IotModbusTcpSlaveConnectionManager connectionManager;
private final IotModbusFrameCodec frameCodec;
private final IotModbusFrameEncoder frameEncoder;
private final IotModbusTcpSlavePendingRequestManager pendingRequestManager;
private final int requestTimeout;
@@ -163,7 +163,7 @@ public class IotModbusTcpSlavePollScheduler {
int transactionId = transactionIdCounter.incrementAndGet() & 0xFFFF;
int slaveId = connInfo.getSlaveId() != null ? connInfo.getSlaveId() : 1;
// 2.2 编码读请求
byte[] data = frameCodec.encodeReadRequest(slaveId, point.getFunctionCode(),
byte[] data = frameEncoder.encodeReadRequest(slaveId, point.getFunctionCode(),
point.getRegisterAddress(), point.getRegisterCount(), frameFormat, transactionId);
// 2.3 注册 PendingRequest
PendingRequest pendingRequest = new PendingRequest(

View File

@@ -6,8 +6,9 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusRecordParserFactory;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusUtils;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
@@ -59,7 +60,8 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
// ===================== 编解码器 =====================
private static final IotModbusFrameCodec FRAME_CODEC = new IotModbusFrameCodec(CUSTOM_FC);
private static final IotModbusFrameDecoder FRAME_DECODER = new IotModbusFrameDecoder(CUSTOM_FC);
private static final IotModbusFrameEncoder FRAME_ENCODER = new IotModbusFrameEncoder(CUSTOM_FC);
// ===================== 设备信息(根据实际情况修改,从 iot_device 表查询) =====================
@@ -111,7 +113,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
// ===================== 轮询响应测试 =====================
/**
* 轮询响应测试:认证后等待网关下发 FC03 读请求RTU 格式),构造读响应帧发回
* 轮询响应测试:认证后持续监听网关下发的读请求RTU 格式),每次收到都自动构造读响应帧发回
*/
@Test
public void testPollingResponse() throws Exception {
@@ -121,30 +123,32 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
IotModbusFrame authResponse = authenticate(socket);
log.info("[testPollingResponse][认证响应: {}]", authResponse.getCustomData());
// 2. 等待网关下发读请求
log.info("[testPollingResponse][等待网关下发读请求...]");
IotModbusFrame readRequest = waitForRequest(socket);
log.info("[testPollingResponse][收到读请求: slaveId={}, FC={}]",
readRequest.getSlaveId(), readRequest.getFunctionCode());
// 2. 设置持续监听:每收到一个读请求,自动回复
log.info("[testPollingResponse][开始持续监听网关下发读请求...]");
CompletableFuture<Void> done = new CompletableFuture<>();
RecordParser parser = FRAME_DECODER.createRecordParser((frame, frameFormat) -> {
log.info("[testPollingResponse][收到请求: slaveId={}, FC={}]",
frame.getSlaveId(), frame.getFunctionCode());
// 解析读请求中的起始地址和数量
byte[] pdu = frame.getPdu();
int startAddress = ((pdu[0] & 0xFF) << 8) | (pdu[1] & 0xFF);
int quantity = ((pdu[2] & 0xFF) << 8) | (pdu[3] & 0xFF);
log.info("[testPollingResponse][读请求参数: startAddress={}, quantity={}]", startAddress, quantity);
// 3. 解析读请求中的起始地址和数量
byte[] pdu = readRequest.getPdu();
int startAddress = ((pdu[0] & 0xFF) << 8) | (pdu[1] & 0xFF);
int quantity = ((pdu[2] & 0xFF) << 8) | (pdu[3] & 0xFF);
log.info("[testPollingResponse][读请求参数: startAddress={}, quantity={}]", startAddress, quantity);
// 构造读响应帧模拟寄存器数据RTU 格式)
int[] registerValues = new int[quantity];
for (int i = 0; i < quantity; i++) {
registerValues[i] = 100 + i * 100; // 模拟值: 100, 200, 300, ...
}
byte[] responseData = buildReadResponse(frame.getSlaveId(),
frame.getFunctionCode(), registerValues);
socket.write(Buffer.buffer(responseData));
log.info("[testPollingResponse][已发送读响应, registerValues={}]", registerValues);
});
socket.handler(parser);
// 4. 构造读响应帧模拟寄存器数据RTU 格式)
int[] registerValues = new int[quantity];
for (int i = 0; i < quantity; i++) {
registerValues[i] = 100 + i * 100; // 模拟值: 100, 200, 300, ...
}
byte[] responseData = buildReadResponse(readRequest.getSlaveId(),
readRequest.getFunctionCode(), registerValues);
socket.write(Buffer.buffer(responseData));
log.info("[testPollingResponse][已发送读响应, registerValues={}]", registerValues);
// 5. 等待一段时间让网关处理
Thread.sleep(20000);
// 3. 持续等待200 秒),期间会自动回复所有收到的读请求
Thread.sleep(200000);
} finally {
socket.close();
}
@@ -199,23 +203,20 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
}
/**
* 发送帧并等待响应(使用 IotModbusRecordParserFactory 自动检测帧格式)
* 发送帧并等待响应(使用 IotModbusFrameDecoder 自动检测帧格式并解码
*/
private IotModbusFrame sendAndReceive(NetSocket socket, byte[] frameData) throws Exception {
CompletableFuture<IotModbusFrame> responseFuture = new CompletableFuture<>();
// 使用 RecordParserFactory 创建拆包器(自动检测帧格式
RecordParser parser = IotModbusRecordParserFactory.create(CUSTOM_FC,
buffer -> {
// 使用 FrameDecoder 创建拆包器(自动检测帧格式 + 解码,直接回调 IotModbusFrame
RecordParser parser = FRAME_DECODER.createRecordParser(
(frame, frameFormat) -> {
try {
// 检测到帧格式应该是 RTU使用 RTU 格式解码
IotModbusFrame frame = FRAME_CODEC.decodeResponse(
buffer.getBytes(), IotModbusFrameFormatEnum.MODBUS_RTU);
log.info("[sendAndReceive][检测到帧格式: {}]", frameFormat);
responseFuture.complete(frame);
} catch (Exception e) {
responseFuture.completeExceptionally(e);
}
},
format -> log.info("[sendAndReceive][检测到帧格式: {}]", format));
});
socket.handler(parser);
// 发送请求
@@ -231,18 +232,16 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
*/
private IotModbusFrame waitForRequest(NetSocket socket) throws Exception {
CompletableFuture<IotModbusFrame> requestFuture = new CompletableFuture<>();
// 使用 RecordParserFactory 创建拆包器
RecordParser parser = IotModbusRecordParserFactory.create(CUSTOM_FC,
buffer -> {
// 使用 FrameDecoder 创建拆包器(直接回调 IotModbusFrame
RecordParser parser = FRAME_DECODER.createRecordParser(
(frame, frameFormat) -> {
try {
IotModbusFrame frame = FRAME_CODEC.decodeResponse(
buffer.getBytes(), IotModbusFrameFormatEnum.MODBUS_RTU);
log.info("[waitForRequest][检测到帧格式: {}]", frameFormat);
requestFuture.complete(frame);
} catch (Exception e) {
requestFuture.completeExceptionally(e);
}
},
format -> log.info("[waitForRequest][检测到帧格式: {}]", format));
});
socket.handler(parser);
// 等待(超时 30 秒,因为轮询间隔可能比较长)
@@ -264,7 +263,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
JSONObject json = new JSONObject();
json.set("method", "auth");
json.set("params", params);
return FRAME_CODEC.encodeCustomFrame(SLAVE_ID, json.toString(),
return FRAME_ENCODER.encodeCustomFrame(SLAVE_ID, json.toString(),
IotModbusFrameFormatEnum.MODBUS_RTU, 0);
}
@@ -286,7 +285,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
frame[3 + i * 2 + 1] = (byte) (registerValues[i] & 0xFF);
}
// 计算 CRC16
int crc = IotModbusFrameCodec.calculateCrc16(frame, totalLength - 2);
int crc = IotModbusUtils.calculateCrc16(frame, totalLength - 2);
frame[totalLength - 2] = (byte) (crc & 0xFF); // CRC Low
frame[totalLength - 1] = (byte) ((crc >> 8) & 0xFF); // CRC High
return frame;

View File

@@ -6,8 +6,8 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameCodec;
import io.vertx.core.Handler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
@@ -61,7 +61,8 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
// ===================== 编解码器 =====================
private static final IotModbusFrameCodec FRAME_CODEC = new IotModbusFrameCodec(CUSTOM_FC);
private static final IotModbusFrameDecoder FRAME_DECODER = new IotModbusFrameDecoder(CUSTOM_FC);
private static final IotModbusFrameEncoder FRAME_ENCODER = new IotModbusFrameEncoder(CUSTOM_FC);
// ===================== 设备信息(根据实际情况修改,从 iot_device 表查询) =====================
@@ -113,7 +114,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
// ===================== 轮询响应测试 =====================
/**
* 轮询响应测试:认证后等待网关下发 FC03 读请求,构造读响应帧发回
* 轮询响应测试:认证后持续监听网关下发的读请求,每次收到都自动构造读响应帧发回
*/
@Test
public void testPollingResponse() throws Exception {
@@ -123,29 +124,31 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
IotModbusFrame authResponse = authenticate(socket);
log.info("[testPollingResponse][认证响应: {}]", authResponse.getCustomData());
// 2. 等待网关下发读请求
log.info("[testPollingResponse][等待网关下发读请求...]");
IotModbusFrame readRequest = waitForRequest(socket);
log.info("[testPollingResponse][收到读请求: slaveId={}, FC={}, transactionId={}]",
readRequest.getSlaveId(), readRequest.getFunctionCode(), readRequest.getTransactionId());
// 2. 设置持续监听:每收到一个读请求,自动回复
log.info("[testPollingResponse][开始持续监听网关下发读请求...]");
CompletableFuture<Void> done = new CompletableFuture<>();
RecordParser parser = FRAME_DECODER.createRecordParser((frame, frameFormat) -> {
log.info("[testPollingResponse][收到请求: slaveId={}, FC={}, transactionId={}]",
frame.getSlaveId(), frame.getFunctionCode(), frame.getTransactionId());
// 解析读请求中的起始地址和数量
byte[] pdu = frame.getPdu();
int startAddress = ((pdu[0] & 0xFF) << 8) | (pdu[1] & 0xFF);
int quantity = ((pdu[2] & 0xFF) << 8) | (pdu[3] & 0xFF);
log.info("[testPollingResponse][读请求参数: startAddress={}, quantity={}]", startAddress, quantity);
// 3. 解析读请求中的起始地址和数量
byte[] pdu = readRequest.getPdu();
int startAddress = ((pdu[0] & 0xFF) << 8) | (pdu[1] & 0xFF);
int quantity = ((pdu[2] & 0xFF) << 8) | (pdu[3] & 0xFF);
log.info("[testPollingResponse][读请求参数: startAddress={}, quantity={}]", startAddress, quantity);
// 构造读响应帧(模拟寄存器数据)
int[] registerValues = new int[quantity];
for (int i = 0; i < quantity; i++) {
registerValues[i] = 100 + i * 100; // 模拟值: 100, 200, 300, ...
}
byte[] responseData = buildReadResponse(frame.getTransactionId(),
frame.getSlaveId(), frame.getFunctionCode(), registerValues);
socket.write(Buffer.buffer(responseData));
log.info("[testPollingResponse][已发送读响应, registerValues={}]", registerValues);
});
socket.handler(parser);
// 4. 构造读响应帧(模拟寄存器数据)
int[] registerValues = new int[quantity];
for (int i = 0; i < quantity; i++) {
registerValues[i] = 100 + i * 100; // 模拟值: 100, 200, 300, ...
}
byte[] responseData = buildReadResponse(readRequest.getTransactionId(),
readRequest.getSlaveId(), readRequest.getFunctionCode(), registerValues);
socket.write(Buffer.buffer(responseData));
log.info("[testPollingResponse][已发送读响应, registerValues={}]", registerValues);
// 5. 等待一段时间让网关处理
// 3. 持续等待200 秒),期间会自动回复所有收到的读请求
Thread.sleep(200000);
} finally {
socket.close();
@@ -201,15 +204,20 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
}
/**
* 发送帧并等待响应(MODBUS_TCP 格式
* <p>
* 使用两阶段 RecordParser 拆包fixedSizeMode(6) 读 MBAP 头 → fixedSizeMode(length) 读 body
* 发送帧并等待响应(使用 IotModbusFrameDecoder 自动检测帧格式并解码
*/
private IotModbusFrame sendAndReceive(NetSocket socket, byte[] frameData) throws Exception {
CompletableFuture<IotModbusFrame> responseFuture = new CompletableFuture<>();
// 创建 TCP 两阶段拆包 RecordParser
RecordParser parser = RecordParser.newFixed(6);
parser.handler(new TcpRecordParserHandler(parser, responseFuture));
// 使用 FrameDecoder 创建拆包器(自动检测帧格式 + 解码,直接回调 IotModbusFrame
RecordParser parser = FRAME_DECODER.createRecordParser(
(frame, frameFormat) -> {
try {
log.info("[sendAndReceive][检测到帧格式: {}]", frameFormat);
responseFuture.complete(frame);
} catch (Exception e) {
responseFuture.completeExceptionally(e);
}
});
socket.handler(parser);
// 发送请求
@@ -225,54 +233,22 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
*/
private IotModbusFrame waitForRequest(NetSocket socket) throws Exception {
CompletableFuture<IotModbusFrame> requestFuture = new CompletableFuture<>();
RecordParser parser = RecordParser.newFixed(6);
parser.handler(new TcpRecordParserHandler(parser, requestFuture));
// 使用 FrameDecoder 创建拆包器(直接回调 IotModbusFrame
RecordParser parser = FRAME_DECODER.createRecordParser(
(frame, frameFormat) -> {
try {
log.info("[waitForRequest][检测到帧格式: {}]", frameFormat);
requestFuture.complete(frame);
} catch (Exception e) {
requestFuture.completeExceptionally(e);
}
});
socket.handler(parser);
// 等待(超时 30 秒,因为轮询间隔可能比较长)
return requestFuture.get(30000, TimeUnit.MILLISECONDS);
}
/**
* MODBUS_TCP 两阶段拆包 Handler
*/
private class TcpRecordParserHandler implements Handler<Buffer> {
private final RecordParser parser;
private final CompletableFuture<IotModbusFrame> future;
private byte[] mbapHeader;
private boolean waitingForBody = false;
TcpRecordParserHandler(RecordParser parser, CompletableFuture<IotModbusFrame> future) {
this.parser = parser;
this.future = future;
}
@Override
public void handle(Buffer buffer) {
try {
if (waitingForBody) {
// Phase 2: 收到 bodyunitId + PDU
byte[] body = buffer.getBytes();
byte[] fullFrame = new byte[mbapHeader.length + body.length];
System.arraycopy(mbapHeader, 0, fullFrame, 0, mbapHeader.length);
System.arraycopy(body, 0, fullFrame, mbapHeader.length, body.length);
IotModbusFrame frame = FRAME_CODEC.decodeResponse(fullFrame, IotModbusFrameFormatEnum.MODBUS_TCP);
future.complete(frame);
} else {
// Phase 1: 收到 MBAP 头 6 字节
this.mbapHeader = buffer.getBytes();
int length = ((mbapHeader[4] & 0xFF) << 8) | (mbapHeader[5] & 0xFF);
this.waitingForBody = true;
parser.fixedSizeMode(length);
}
} catch (Exception e) {
future.completeExceptionally(e);
}
}
}
/**
* 构造认证帧MODBUS_TCP 格式)
* <p>
@@ -286,7 +262,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
JSONObject json = new JSONObject();
json.set("method", "auth");
json.set("params", params);
return FRAME_CODEC.encodeCustomFrame(SLAVE_ID, json.toString(),
return FRAME_ENCODER.encodeCustomFrame(SLAVE_ID, json.toString(),
IotModbusFrameFormatEnum.MODBUS_TCP, 1);
}