feat(iot):modbus-tcp-slave 整体代码进一步优化

This commit is contained in:
YunaiV
2026-02-08 10:13:06 +08:00
parent 346ae3ff48
commit 3ab33527e3
17 changed files with 427 additions and 479 deletions

View File

@@ -3,7 +3,6 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.device;
import cn.iocoder.yudao.framework.tenant.core.db.TenantBaseDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusByteOrderEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusRawDataTypeEnum;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableId;
@@ -62,7 +61,7 @@ public class IotDeviceModbusPointDO extends TenantBaseDO {
/**
* Modbus 功能码
*
* 枚举 {@link IotModbusFunctionCodeEnum}
* 取值范围FC01-04读线圈、读离散输入、读保持寄存器、读输入寄存器
*/
private Integer functionCode;
/**

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.core.biz.dto;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusByteOrderEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusRawDataTypeEnum;
import lombok.Data;
@@ -33,7 +32,7 @@ public class IotModbusPointRespDTO {
/**
* Modbus 功能码
*
* 枚举 {@link IotModbusFunctionCodeEnum}
* 取值范围FC01-04读线圈、读离散输入、读保持寄存器、读输入寄存器
*/
private Integer functionCode;
/**

View File

@@ -1,60 +0,0 @@
package cn.iocoder.yudao.module.iot.core.enums;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.Arrays;
/**
* IoT Modbus 功能码枚举
*
* @author 芋道源码
*/
@Getter
@RequiredArgsConstructor
public enum IotModbusFunctionCodeEnum implements ArrayValuable<Integer> {
READ_COILS(1, "读线圈", true, 5, 15),
READ_DISCRETE_INPUTS(2, "读离散输入", false, null, null),
READ_HOLDING_REGISTERS(3, "读保持寄存器", true, 6, 16),
READ_INPUT_REGISTERS(4, "读输入寄存器", false, null, null);
public static final Integer[] ARRAYS = Arrays.stream(values())
.map(IotModbusFunctionCodeEnum::getCode)
.toArray(Integer[]::new);
/**
* 功能码
*/
private final Integer code;
/**
* 名称
*/
private final String name;
/**
* 是否支持写操作
*/
private final Boolean writable;
/**
* 单个写功能码
*/
private final Integer writeSingleCode;
/**
* 多个写功能码
*/
private final Integer writeMultipleCode;
@Override
public Integer[] array() {
return ARRAYS;
}
public static IotModbusFunctionCodeEnum valueOf(Integer code) {
return Arrays.stream(values())
.filter(e -> e.getCode().equals(code))
.findFirst()
.orElse(null);
}
}

View File

@@ -0,0 +1,261 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import lombok.extern.slf4j.Slf4j;
/**
* IoT Modbus 工具类
* <p>
* 提供:
* 1. Modbus 协议常量(功能码、掩码等)
* 2. CRC-16/MODBUS 计算和校验
* 3. 功能码分类判断
* 4. 从解码后的 ${IotModbusFrame} 中提取寄存器值(用于后续的点位翻译)
*
* @author 芋道源码
*/
@Slf4j
public class IotModbusUtils {
/** FC01: 读线圈 */
public static final int FC_READ_COILS = 1;
/** FC02: 读离散输入 */
public static final int FC_READ_DISCRETE_INPUTS = 2;
/** FC03: 读保持寄存器 */
public static final int FC_READ_HOLDING_REGISTERS = 3;
/** FC04: 读输入寄存器 */
public static final int FC_READ_INPUT_REGISTERS = 4;
/** FC05: 写单个线圈 */
public static final int FC_WRITE_SINGLE_COIL = 5;
/** FC06: 写单个寄存器 */
public static final int FC_WRITE_SINGLE_REGISTER = 6;
/** FC15: 写多个线圈 */
public static final int FC_WRITE_MULTIPLE_COILS = 15;
/** FC16: 写多个寄存器 */
public static final int FC_WRITE_MULTIPLE_REGISTERS = 16;
/**
* 异常响应掩码:响应帧的功能码最高位为 1 时,表示异常响应
* 例如:请求 FC=0x03异常响应 FC=0x830x03 | 0x80
*/
public static final int FC_EXCEPTION_MASK = 0x80;
/**
* 功能码掩码:用于从异常响应中提取原始功能码
* 例如:异常 FC=0x83原始 FC = 0x83 & 0x7F = 0x03
*/
public static final int FC_MASK = 0x7F;
// ==================== 功能码分类判断 ====================
/**
* 判断是否为读响应FC01-04
*/
public static boolean isReadResponse(int functionCode) {
return functionCode >= FC_READ_COILS && functionCode <= FC_READ_INPUT_REGISTERS;
}
/**
* 判断是否为写响应FC05/06/15/16
*/
public static boolean isWriteResponse(int functionCode) {
return functionCode == FC_WRITE_SINGLE_COIL || functionCode == FC_WRITE_SINGLE_REGISTER
|| functionCode == FC_WRITE_MULTIPLE_COILS || functionCode == FC_WRITE_MULTIPLE_REGISTERS;
}
/**
* 判断是否为异常响应
*/
public static boolean isExceptionResponse(int functionCode) {
return (functionCode & FC_EXCEPTION_MASK) != 0;
}
/**
* 从异常响应中提取原始功能码
*/
public static int extractOriginalFunctionCode(int exceptionFunctionCode) {
return exceptionFunctionCode & FC_MASK;
}
/**
* 判断读功能码是否支持写操作
* <p>
* FC01读线圈和 FC03读保持寄存器支持写操作
* FC02读离散输入和 FC04读输入寄存器为只读。
*
* @param readFunctionCode 读功能码FC01-04
* @return 是否支持写操作
*/
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public static boolean isWritable(int readFunctionCode) {
return readFunctionCode == FC_READ_COILS || readFunctionCode == FC_READ_HOLDING_REGISTERS;
}
/**
* 获取单写功能码
* <p>
* FC01读线圈→ FC05写单个线圈
* FC03读保持寄存器→ FC06写单个寄存器
* 其他返回 null不支持写
*
* @param readFunctionCode 读功能码
* @return 单写功能码,不支持写时返回 null
*/
@SuppressWarnings("EnhancedSwitchMigration")
public static Integer getWriteSingleFunctionCode(int readFunctionCode) {
switch (readFunctionCode) {
case FC_READ_COILS:
return FC_WRITE_SINGLE_COIL;
case FC_READ_HOLDING_REGISTERS:
return FC_WRITE_SINGLE_REGISTER;
default:
return null;
}
}
/**
* 获取多写功能码
* <p>
* FC01读线圈→ FC15写多个线圈
* FC03读保持寄存器→ FC16写多个寄存器
* 其他返回 null不支持写
*
* @param readFunctionCode 读功能码
* @return 多写功能码,不支持写时返回 null
*/
@SuppressWarnings("EnhancedSwitchMigration")
public static Integer getWriteMultipleFunctionCode(int readFunctionCode) {
switch (readFunctionCode) {
case FC_READ_COILS:
return FC_WRITE_MULTIPLE_COILS;
case FC_READ_HOLDING_REGISTERS:
return FC_WRITE_MULTIPLE_REGISTERS;
default:
return null;
}
}
// ==================== 点位查找 ====================
/**
* 查找点位配置
*
* @param config 设备 Modbus 配置
* @param identifier 点位标识符
* @return 匹配的点位配置,未找到返回 null
*/
public static IotModbusPointRespDTO findPoint(IotModbusDeviceConfigRespDTO config, String identifier) {
return CollUtil.findOne(config.getPoints(), p -> identifier.equals(p.getIdentifier()));
}
// ==================== CRC16 工具 ====================
/**
* 计算 CRC-16/MODBUS
*
* @param data 数据
* @param length 计算长度
* @return CRC16 值
*/
public static int calculateCrc16(byte[] data, int length) {
int crc = 0xFFFF;
for (int i = 0; i < length; i++) {
crc ^= (data[i] & 0xFF);
for (int j = 0; j < 8; j++) {
if ((crc & 0x0001) != 0) {
crc >>= 1;
crc ^= 0xA001;
} else {
crc >>= 1;
}
}
}
return crc;
}
/**
* 校验 CRC16
*
* @param data 包含 CRC 的完整数据
* @return 校验是否通过
*/
public static boolean verifyCrc16(byte[] data) {
if (data.length < 3) {
return false;
}
int computed = calculateCrc16(data, data.length - 2);
int received = (data[data.length - 2] & 0xFF) | ((data[data.length - 1] & 0xFF) << 8);
return computed == received;
}
// ==================== 响应值提取 ====================
/**
* 从帧中提取寄存器值FC01-04 读响应)
*
* @param frame 解码后的 Modbus 帧
* @return 寄存器值数组int[]),失败返回 null
*/
@SuppressWarnings("EnhancedSwitchMigration")
public static int[] extractValues(IotModbusFrame frame) {
if (frame == null || frame.isException()) {
return null;
}
byte[] pdu = frame.getPdu();
if (pdu == null || pdu.length < 1) {
return null;
}
int functionCode = frame.getFunctionCode();
switch (functionCode) {
case FC_READ_COILS:
case FC_READ_DISCRETE_INPUTS:
return extractCoilValues(pdu);
case FC_READ_HOLDING_REGISTERS:
case FC_READ_INPUT_REGISTERS:
return extractRegisterValues(pdu);
default:
log.warn("[extractValues][不支持的功能码: {}]", functionCode);
return null;
}
}
/**
* 提取线圈/离散输入值
* PDU 格式FC01/02 响应):[ByteCount(1)] [CoilStatus(N)]
*/
private static int[] extractCoilValues(byte[] pdu) {
if (pdu.length < 2) {
return null;
}
int byteCount = pdu[0] & 0xFF;
int bitCount = byteCount * 8;
int[] values = new int[bitCount];
for (int i = 0; i < bitCount && (1 + i / 8) < pdu.length; i++) {
values[i] = ((pdu[1 + i / 8] >> (i % 8)) & 0x01);
}
return values;
}
/**
* 提取寄存器值
* PDU 格式FC03/04 响应):[ByteCount(1)] [RegisterData(N*2)]
*/
private static int[] extractRegisterValues(byte[] pdu) {
if (pdu.length < 2) {
return null;
}
int byteCount = pdu[0] & 0xFF;
int registerCount = byteCount / 2;
int[] values = new int[registerCount];
for (int i = 0; i < registerCount && (1 + i * 2 + 1) < pdu.length; i++) {
values[i] = ((pdu[1 + i * 2] & 0xFF) << 8) | (pdu[1 + i * 2 + 1] & 0xFF);
}
return values;
}
}

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager;
import com.ghgande.j2mod.modbus.io.ModbusTCPTransaction;
import com.ghgande.j2mod.modbus.msg.*;
@@ -12,6 +11,8 @@ import com.ghgande.j2mod.modbus.util.BitVector;
import io.vertx.core.Future;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils.*;
// TODO @AI感觉它更像一个工具类但是名字叫 client 很奇怪;
/**
* IoT Modbus TCP 客户端
@@ -98,18 +99,14 @@ public class IotModbusTcpClient {
*/
@SuppressWarnings("EnhancedSwitchMigration")
private ModbusRequest createReadRequest(Integer functionCode, Integer address, Integer count) {
IotModbusFunctionCodeEnum functionCodeEnum = IotModbusFunctionCodeEnum.valueOf(functionCode);
if (functionCodeEnum == null) {
throw new IllegalArgumentException("不支持的功能码: " + functionCode);
}
switch (functionCodeEnum) {
case READ_COILS:
switch (functionCode) {
case FC_READ_COILS:
return new ReadCoilsRequest(address, count);
case READ_DISCRETE_INPUTS:
case FC_READ_DISCRETE_INPUTS:
return new ReadInputDiscretesRequest(address, count);
case READ_HOLDING_REGISTERS:
case FC_READ_HOLDING_REGISTERS:
return new ReadMultipleRegistersRequest(address, count);
case READ_INPUT_REGISTERS:
case FC_READ_INPUT_REGISTERS:
return new ReadInputRegistersRequest(address, count);
default:
throw new IllegalArgumentException("不支持的功能码: " + functionCode);
@@ -119,13 +116,10 @@ public class IotModbusTcpClient {
/**
* 创建写入请求
*/
@SuppressWarnings("EnhancedSwitchMigration")
private ModbusRequest createWriteRequest(Integer functionCode, Integer address, Integer count, int[] values) {
IotModbusFunctionCodeEnum functionCodeEnum = IotModbusFunctionCodeEnum.valueOf(functionCode);
if (functionCodeEnum == null) {
throw new IllegalArgumentException("不支持的功能码: " + functionCode);
}
switch (functionCodeEnum) {
case READ_COILS: // 写线圈(使用功能码 5 或 15
switch (functionCode) {
case FC_READ_COILS: // 写线圈(使用功能码 5 或 15
if (count == 1) {
return new WriteCoilRequest(address, values[0] != 0);
} else {
@@ -135,7 +129,7 @@ public class IotModbusTcpClient {
}
return new WriteMultipleCoilsRequest(address, bv);
}
case READ_HOLDING_REGISTERS: // 写保持寄存器(使用功能码 6 或 16
case FC_READ_HOLDING_REGISTERS: // 写保持寄存器(使用功能码 6 或 16
if (count == 1) {
return new WriteSingleRegisterRequest(address, new SimpleRegister(values[0]));
} else {
@@ -145,8 +139,8 @@ public class IotModbusTcpClient {
}
return new WriteMultipleRegistersRequest(address, registers);
}
case READ_DISCRETE_INPUTS: // 只读
case READ_INPUT_REGISTERS: // 只读
case FC_READ_DISCRETE_INPUTS: // 只读
case FC_READ_INPUT_REGISTERS: // 只读
return null;
default:
throw new IllegalArgumentException("不支持的功能码: " + functionCode);
@@ -156,13 +150,10 @@ public class IotModbusTcpClient {
/**
* 从响应中提取值
*/
@SuppressWarnings("EnhancedSwitchMigration")
private int[] extractValues(ModbusResponse response, Integer functionCode) {
IotModbusFunctionCodeEnum functionCodeEnum = IotModbusFunctionCodeEnum.valueOf(functionCode);
if (functionCodeEnum == null) {
throw new IllegalArgumentException("不支持的功能码: " + functionCode);
}
switch (functionCodeEnum) {
case READ_COILS:
switch (functionCode) {
case FC_READ_COILS:
ReadCoilsResponse coilsResponse = (ReadCoilsResponse) response;
int bitCount = coilsResponse.getBitCount();
int[] coilValues = new int[bitCount];
@@ -170,7 +161,7 @@ public class IotModbusTcpClient {
coilValues[i] = coilsResponse.getCoilStatus(i) ? 1 : 0;
}
return coilValues;
case READ_DISCRETE_INPUTS:
case FC_READ_DISCRETE_INPUTS:
ReadInputDiscretesResponse discretesResponse = (ReadInputDiscretesResponse) response;
int discreteCount = discretesResponse.getBitCount();
int[] discreteValues = new int[discreteCount];
@@ -178,7 +169,7 @@ public class IotModbusTcpClient {
discreteValues[i] = discretesResponse.getDiscreteStatus(i) ? 1 : 0;
}
return discreteValues;
case READ_HOLDING_REGISTERS:
case FC_READ_HOLDING_REGISTERS:
ReadMultipleRegistersResponse holdingResponse = (ReadMultipleRegistersResponse) response;
InputRegister[] holdingRegisters = holdingResponse.getRegisters();
int[] holdingValues = new int[holdingRegisters.length];
@@ -186,7 +177,7 @@ public class IotModbusTcpClient {
holdingValues[i] = holdingRegisters[i].getValue();
}
return holdingValues;
case READ_INPUT_REGISTERS:
case FC_READ_INPUT_REGISTERS:
ReadInputRegistersResponse inputResponse = (ReadInputRegistersResponse) response;
InputRegister[] inputRegisters = inputResponse.getRegisters();
int[] inputValues = new int[inputRegisters.length];

View File

@@ -1,12 +1,11 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client.IotModbusTcpClient;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager;
@@ -60,19 +59,19 @@ public class IotModbusTcpDownstreamHandler {
for (Map.Entry<String, Object> entry : propertyMap.entrySet()) {
String identifier = entry.getKey();
Object value = entry.getValue();
// 2.1.1 查找对应的点位配置
IotModbusPointRespDTO point = findPoint(config, identifier);
// 2.1 查找对应的点位配置
IotModbusPointRespDTO point = IotModbusUtils.findPoint(config, identifier);
if (point == null) {
log.warn("[handle][设备 {} 没有点位配置: {}]", message.getDeviceId(), identifier);
continue;
}
// 2.1.2 检查是否支持写操作
if (!isWritable(point.getFunctionCode())) {
// 2.2 检查是否支持写操作
if (!IotModbusUtils.isWritable(point.getFunctionCode())) {
log.warn("[handle][点位 {} 不支持写操作, 功能码={}]", identifier, point.getFunctionCode());
continue;
}
// 2.2 执行写入
// 2.3 执行写入
writeProperty(config, point, value);
}
}
@@ -104,19 +103,4 @@ public class IotModbusTcpDownstreamHandler {
config.getDeviceId(), point.getIdentifier(), e));
}
/**
* 查找点位配置
*/
private IotModbusPointRespDTO findPoint(IotModbusDeviceConfigRespDTO config, String identifier) {
return CollUtil.findOne(config.getPoints(), p -> identifier.equals(p.getIdentifier()));
}
/**
* 检查功能码是否支持写操作
*/
private boolean isWritable(Integer functionCode) {
IotModbusFunctionCodeEnum functionCodeEnum = IotModbusFunctionCodeEnum.valueOf(functionCode);
return functionCodeEnum != null && Boolean.TRUE.equals(functionCodeEnum.getWritable());
}
}

View File

@@ -1,61 +1,31 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.IotModbusTcpMasterProtocol;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
// TODO @AI是不是可以继承 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolDownstreamSubscriber.java
/**
* IoT Modbus TCP 下行消息订阅器:订阅消息总线的下行消息并转发给处理器
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusTcpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
public class IotModbusTcpDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
private final IotModbusTcpMasterProtocol protocol;
private final IotModbusTcpDownstreamHandler downstreamHandler;
private final IotMessageBus messageBus;
/**
* 启动订阅
*/
public void start() {
messageBus.register(this);
log.info("[start][Modbus TCP Master 下行消息订阅器已启动, topic={}]", getTopic());
}
/**
* 停止订阅
*/
public void stop() {
messageBus.unregister(this);
log.info("[stop][Modbus TCP Master 下行消息订阅器已停止]");
public IotModbusTcpDownstreamSubscriber(IotModbusTcpMasterProtocol protocol,
IotModbusTcpDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
super(protocol, messageBus);
this.downstreamHandler = downstreamHandler;
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
return getTopic(); // 点对点消费
}
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][收到下行消息: {}]", message);
try {
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败]", e);
}
protected void handleMessage(IotDeviceMessage message) {
downstreamHandler.handle(message);
}
}

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@@ -25,7 +26,7 @@ public class IotModbusTcpSlaveConfig {
*/
@NotNull(message = "自定义功能码不能为空")
@Min(value = 65, message = "自定义功能码不能小于 65")
// TODO @AI搞个范围
@Max(value = 72, message = "自定义功能码不能大于 72")
private Integer customFunctionCode = 65;
/**

View File

@@ -1,5 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import lombok.Data;
import lombok.experimental.Accessors;
@@ -25,19 +27,19 @@ public class IotModbusFrame {
*/
private byte[] pdu;
/**
* 事务标识符TCP 模式特有)
*
* // TODO @AI最好是 @某个类型独有;
* 事务标识符
* <p>
* 仅 {@link IotModbusFrameFormatEnum#MODBUS_TCP} 格式有值,
*/
private Integer transactionId;
/**
* 是否异常响应
*/
private boolean exception;
// TODO @AI是不是要枚举一些异常另外是不是覆盖掉 exception因为只要判断有异常码是不是就可以了
/**
* 异常码(当 exception=true 时有效)
* 异常码
* <p>
* 当功能码最高位为 1 时(异常响应),此字段存储异常码。
* 为 null 表示非异常响应。
*
* @see IotModbusUtils#FC_EXCEPTION_MASK
*/
private Integer exceptionCode;
@@ -46,4 +48,11 @@ public class IotModbusFrame {
*/
private String customData;
/**
* 是否异常响应(基于 exceptionCode 是否有值判断)
*/
public boolean isException() {
return exceptionCode != null;
}
}

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
@@ -118,11 +119,8 @@ public class IotModbusFrameDecoder {
.setPdu(pdu)
.setTransactionId(transactionId);
// 异常响应
// TODO @AI0x80 看看是不是要枚举;
if ((functionCode & 0x80) != 0) {
frame.setException(true);
// TODO @AI0x7f 看看是不是要枚举;
frame.setFunctionCode(functionCode & 0x7F);
if (IotModbusUtils.isExceptionResponse(functionCode)) {
frame.setFunctionCode(IotModbusUtils.extractOriginalFunctionCode(functionCode));
if (pdu.length >= 1) {
frame.setExceptionCode(pdu[0] & 0xFF);
}
@@ -247,7 +245,7 @@ public class IotModbusFrameDecoder {
* 状态机流程:
* Phase 1: fixedSizeMode(2) → 读 slaveId + functionCode
* Phase 2: 根据 functionCode 确定剩余长度:
* - 异常响应 (FC & 0x80)fixedSizeMode(3) → exceptionCode(1) + CRC(2)
* - 异常响应 (FC & EXCEPTION_MASK)fixedSizeMode(3) → exceptionCode(1) + CRC(2)
* - 自定义 FC / FC01-04 响应fixedSizeMode(1) → 读 byteCount → fixedSizeMode(byteCount + 2)
* - FC05/06 响应fixedSizeMode(6) → addr(2) + value(2) + CRC(2)
* - FC15/16 响应fixedSizeMode(6) → addr(2) + quantity(2) + CRC(2)
@@ -283,7 +281,7 @@ public class IotModbusFrameDecoder {
this.slaveId = bytes[0];
this.functionCode = bytes[1];
int fc = functionCode & 0xFF;
if ((fc & 0x80) != 0) {
if (IotModbusUtils.isExceptionResponse(fc)) {
// 异常响应:完整帧 = slaveId(1) + FC(1) + exceptionCode(1) + CRC(2) = 5 字节
// 已有 6 字节(多 1 字节),取前 5 字节组装
Buffer frame = Buffer.buffer(5);
@@ -292,7 +290,7 @@ public class IotModbusFrameDecoder {
frame.appendBytes(bytes, 2, 3); // exceptionCode + CRC
emitFrame(frame);
resetToHeader();
} else if (isReadResponse(fc) || fc == customFunctionCode) {
} else if (IotModbusUtils.isReadResponse(fc) || fc == customFunctionCode) {
// 读响应或自定义 FCbytes[2] = byteCount
this.byteCount = bytes[2];
int bc = byteCount & 0xFF;
@@ -317,7 +315,7 @@ public class IotModbusFrameDecoder {
this.expectedDataLen = bc + 2; // byteCount 个数据 + 2 CRC
parser.fixedSizeMode(remaining);
}
} else if (isWriteResponse(fc)) {
} else if (IotModbusUtils.isWriteResponse(fc)) {
// 写响应:总长 = slaveId(1) + FC(1) + addr(2) + value/qty(2) + CRC(2) = 8 字节
// 已有 6 字节,还需 2 字节
state = STATE_WRITE_BODY;
@@ -358,15 +356,15 @@ public class IotModbusFrameDecoder {
this.slaveId = header[0];
this.functionCode = header[1];
int fc = functionCode & 0xFF;
if ((fc & 0x80) != 0) {
if (IotModbusUtils.isExceptionResponse(fc)) {
// 异常响应
state = STATE_EXCEPTION_BODY;
parser.fixedSizeMode(3); // exceptionCode(1) + CRC(2)
} else if (isReadResponse(fc) || fc == customFunctionCode) {
} else if (IotModbusUtils.isReadResponse(fc) || fc == customFunctionCode) {
// 读响应或自定义 FC
state = STATE_READ_BYTE_COUNT;
parser.fixedSizeMode(1); // byteCount
} else if (isWriteResponse(fc)) {
} else if (IotModbusUtils.isWriteResponse(fc)) {
// 写响应
state = STATE_WRITE_BODY;
pendingData = Buffer.buffer();
@@ -438,14 +436,6 @@ public class IotModbusFrameDecoder {
parser.fixedSizeMode(2); // slaveId + FC
}
// TODO @AI可以抽到 IotModbusUtils 里?
private boolean isReadResponse(int fc) {
return fc >= 1 && fc <= 4;
}
private boolean isWriteResponse(int fc) {
return fc == 5 || fc == 6 || fc == 15 || fc == 16;
}
}
}

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

View File

@@ -1,127 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec;
import lombok.extern.slf4j.Slf4j;
/**
* IoT Modbus 工具类
* <p>
* 提供:
* 1. CRC-16/MODBUS 计算和校验
* 2. 从解码后的 IotModbusFrame 中提取寄存器值(用于后续的点位翻译)
*
* @author 芋道源码
*/
@Slf4j
public class IotModbusUtils {
// TODO @AI可以把 1、2、3、4、5 这些 fucntion code 在这里枚举下。
// TODO @AI一些枚举 0x80 这些可以这里枚举;
// ==================== CRC16 工具 ====================
/**
* 计算 CRC-16/MODBUS
*
* @param data 数据
* @param length 计算长度
* @return CRC16 值
*/
public static int calculateCrc16(byte[] data, int length) {
int crc = 0xFFFF;
for (int i = 0; i < length; i++) {
crc ^= (data[i] & 0xFF);
for (int j = 0; j < 8; j++) {
if ((crc & 0x0001) != 0) {
crc >>= 1;
crc ^= 0xA001;
} else {
crc >>= 1;
}
}
}
return crc;
}
/**
* 校验 CRC16
*
* @param data 包含 CRC 的完整数据
* @return 校验是否通过
*/
public static boolean verifyCrc16(byte[] data) {
if (data.length < 3) {
return false;
}
int computed = calculateCrc16(data, data.length - 2);
int received = (data[data.length - 2] & 0xFF) | ((data[data.length - 1] & 0xFF) << 8);
return computed == received;
}
// ==================== 响应值提取 ====================
/**
* 从帧中提取寄存器值FC01-04 读响应)
*
* @param frame 解码后的 Modbus 帧
* @return 寄存器值数组int[]),失败返回 null
*/
@SuppressWarnings("EnhancedSwitchMigration")
public static int[] extractValues(IotModbusFrame frame) {
if (frame == null || frame.isException()) {
return null;
}
byte[] pdu = frame.getPdu();
if (pdu == null || pdu.length < 1) {
return null;
}
// TODO @AIjmodbus 看看有没可以复用的枚举类
int functionCode = frame.getFunctionCode();
switch (functionCode) {
case 1: // Read Coils
case 2: // Read Discrete Inputs
return extractCoilValues(pdu);
case 3: // Read Holding Registers
case 4: // Read Input Registers
return extractRegisterValues(pdu);
default:
log.warn("[extractValues][不支持的功能码: {}]", functionCode);
return null;
}
}
/**
* 提取线圈/离散输入值
* PDU 格式FC01/02 响应):[ByteCount(1)] [CoilStatus(N)]
*/
private static int[] extractCoilValues(byte[] pdu) {
if (pdu.length < 2) {
return null;
}
int byteCount = pdu[0] & 0xFF;
int bitCount = byteCount * 8;
int[] values = new int[bitCount];
for (int i = 0; i < bitCount && (1 + i / 8) < pdu.length; i++) {
values[i] = ((pdu[1 + i / 8] >> (i % 8)) & 0x01);
}
return values;
}
/**
* 提取寄存器值
* PDU 格式FC03/04 响应):[ByteCount(1)] [RegisterData(N*2)]
*/
private static int[] extractRegisterValues(byte[] pdu) {
if (pdu.length < 2) {
return null;
}
int byteCount = pdu[0] & 0xFF;
int registerCount = byteCount / 2;
int[] values = new int[registerCount];
for (int i = 0; i < registerCount && (1 + i * 2 + 1) < pdu.length; i++) {
values[i] = ((pdu[1 + i * 2] & 0xFF) << 8) | (pdu[1 + i * 2 + 1] & 0xFF);
}
return values;
}
}

View File

@@ -1,13 +1,14 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.downstream;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager;
@@ -18,7 +19,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
// TODO @AI看看能不能和 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamHandler.java 有一些复用逻辑;
/**
* IoT Modbus TCP Slave 下行消息处理器
* <p>
@@ -48,7 +48,7 @@ public class IotModbusTcpSlaveDownstreamHandler {
@SuppressWarnings("unchecked")
public void handle(IotDeviceMessage message) {
// 1.1 检查是否是属性设置消息
if (!IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod().equals(message.getMethod())) {
if (ObjUtil.notEqual(IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod(), message.getMethod())) {
log.debug("[handle][忽略非属性设置消息: {}]", message.getMethod());
return;
}
@@ -76,13 +76,13 @@ public class IotModbusTcpSlaveDownstreamHandler {
String identifier = entry.getKey();
Object value = entry.getValue();
// 2.1 查找对应的点位配置
IotModbusPointRespDTO point = findPoint(config, identifier);
IotModbusPointRespDTO point = IotModbusUtils.findPoint(config, identifier);
if (point == null) {
log.warn("[handle][设备 {} 没有点位配置: {}]", message.getDeviceId(), identifier);
continue;
}
// 2.2 检查是否支持写操作
if (!isWritable(point.getFunctionCode())) {
if (!IotModbusUtils.isWritable(point.getFunctionCode())) {
log.warn("[handle][点位 {} 不支持写操作, 功能码={}]", identifier, point.getFunctionCode());
continue;
}
@@ -97,56 +97,36 @@ public class IotModbusTcpSlaveDownstreamHandler {
*/
private void writeProperty(Long deviceId, ConnectionInfo connInfo,
IotModbusPointRespDTO point, Object value) {
// 1. 转换属性值为原始值
// 1.1 转换属性值为原始值
int[] rawValues = dataConverter.convertToRawValues(value, point);
// 2. 确定帧格式和事务 ID
// 1.2 确定帧格式和事务 ID
IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat();
if (frameFormat == null) {
frameFormat = IotModbusFrameFormatEnum.MODBUS_TCP;
}
Assert.notNull(frameFormat, "连接帧格式不能为空");
int transactionId = transactionIdCounter.incrementAndGet() & 0xFFFF;
int slaveId = connInfo.getSlaveId() != null ? connInfo.getSlaveId() : 1;
// 3. 编码写请求
// 1.3 编码写请求
byte[] data;
IotModbusFunctionCodeEnum fcEnum = IotModbusFunctionCodeEnum.valueOf(point.getFunctionCode());
if (fcEnum == null) {
log.warn("[writeProperty][未知功能码: {}]", point.getFunctionCode());
return;
}
if (rawValues.length == 1 && fcEnum.getWriteSingleCode() != null) {
int readFunctionCode = point.getFunctionCode();
Integer writeSingleCode = IotModbusUtils.getWriteSingleFunctionCode(readFunctionCode);
Integer writeMultipleCode = IotModbusUtils.getWriteMultipleFunctionCode(readFunctionCode);
if (rawValues.length == 1 && writeSingleCode != null) {
// 单个值使用单写功能码FC05/FC06
data = frameEncoder.encodeWriteSingleRequest(slaveId, fcEnum.getWriteSingleCode(),
data = frameEncoder.encodeWriteSingleRequest(slaveId, writeSingleCode,
point.getRegisterAddress(), rawValues[0], frameFormat, transactionId);
} else if (fcEnum.getWriteMultipleCode() != null) {
} else if (writeMultipleCode != null) {
// 多个值使用多写功能码FC15/FC16
data = frameEncoder.encodeWriteMultipleRegistersRequest(slaveId,
point.getRegisterAddress(), rawValues, frameFormat, transactionId);
} else {
log.warn("[writeProperty][点位 {} 不支持写操作]", point.getIdentifier());
log.warn("[writeProperty][点位 {} 不支持写操作, 功能码={}]", point.getIdentifier(), readFunctionCode);
return;
}
// 4. 发送
// 2. 发送
connectionManager.sendToDevice(deviceId, data);
log.info("[writeProperty][写入成功, deviceId={}, identifier={}, value={}]",
deviceId, point.getIdentifier(), value);
}
/**
* 查找点位配置
*/
private IotModbusPointRespDTO findPoint(IotModbusDeviceConfigRespDTO config, String identifier) {
return CollUtil.findOne(config.getPoints(), p -> identifier.equals(p.getIdentifier()));
}
/**
* 检查功能码是否支持写操作
*/
private boolean isWritable(Integer functionCode) {
IotModbusFunctionCodeEnum functionCodeEnum = IotModbusFunctionCodeEnum.valueOf(functionCode);
return functionCodeEnum != null && Boolean.TRUE.equals(functionCodeEnum.getWritable());
}
}

View File

@@ -1,61 +1,31 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.IotModbusTcpSlaveProtocol;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
// TODO @AI是不是可以继承 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolDownstreamSubscriber.java
/**
* IoT Modbus TCP Slave 下行消息订阅器:订阅消息总线的下行消息并转发给处理器
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusTcpSlaveDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
public class IotModbusTcpSlaveDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
private final IotModbusTcpSlaveProtocol protocol;
private final IotModbusTcpSlaveDownstreamHandler downstreamHandler;
private final IotMessageBus messageBus;
/**
* 启动订阅
*/
public void start() {
messageBus.register(this);
log.info("[start][Modbus TCP Slave 下行消息订阅器已启动, topic={}]", getTopic());
}
/**
* 停止订阅
*/
public void stop() {
messageBus.unregister(this);
log.info("[stop][Modbus TCP Slave 下行消息订阅器已停止]");
public IotModbusTcpSlaveDownstreamSubscriber(IotModbusTcpSlaveProtocol protocol,
IotModbusTcpSlaveDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
super(protocol, messageBus);
this.downstreamHandler = downstreamHandler;
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
return getTopic(); // 点对点消费
}
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][收到下行消息: {}]", message);
try {
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败]", e);
}
protected void handleMessage(IotDeviceMessage message) {
downstreamHandler.handle(message);
}
}

View File

@@ -5,14 +5,15 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum;
@@ -20,9 +21,9 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo;
@@ -40,14 +41,12 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException;
// DONE @AI逻辑有点多看看是不是分区域 => 已按区域划分:认证 / 轮询响应
/**
* IoT Modbus TCP Slave 上行数据处理器
* <p>
* 处理:
* 1. 自定义 FC 认证
* 2. 轮询响应mode=1→ 点位翻译 → thing.property.post
* // DONE @AI不用主动上报主动上报走标准 tcp 即可
* 2. 轮询响应 → 点位翻译 → thing.property.post
*
* @author 芋道源码
*/
@@ -105,29 +104,14 @@ public class IotModbusTcpSlaveUpstreamHandler {
return;
}
// 2. 自定义功能码(认证等扩展)
// 2. 情况一:自定义功能码(认证等扩展)
if (StrUtil.isNotEmpty(frame.getCustomData())) {
handleCustomFrame(socket, frame, frameFormat);
return;
}
// 1.2 未认证连接,丢弃
// TODO @AI把 1.2、1.3 拿到 handlePollingResponse 里;是否需要登录,自己知道!
if (!connectionManager.isAuthenticated(socket)) {
log.warn("[handleFrame][未认证连接, 丢弃数据, remoteAddress={}]", socket.remoteAddress());
return;
}
// 3. DONE @AI断言必须是云端轮询不再支持主动上报
// TODO @AI貌似只能轮询到一次
// 1.3 标准 Modbus 响应(只支持云端轮询模式)
// TODO @AI可以把
ConnectionInfo info = connectionManager.getConnectionInfo(socket);
if (info == null) {
log.warn("[handleFrame][已认证但连接信息为空, remoteAddress={}]", socket.remoteAddress());
return;
}
handlePollingResponse(info, frame, frameFormat);
// 3. 情况二:标准 Modbus 响应 → 轮询响应处理
handlePollingResponse(socket, frame, frameFormat);
}
// ========== 自定义 FC 处理(认证等) ==========
@@ -138,96 +122,83 @@ public class IotModbusTcpSlaveUpstreamHandler {
* 异常分层翻译,参考 {@link cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpAbstractHandler}
*/
private void handleCustomFrame(NetSocket socket, IotModbusFrame frame, IotModbusFrameFormatEnum frameFormat) {
String method = null;
try {
JSONObject json = JSONUtil.parseObj(frame.getCustomData());
String method = json.getStr("method");
IotDeviceMessage message = JsonUtils.parseObject(frame.getCustomData(), IotDeviceMessage.class);
if (message == null) {
throw invalidParamException("自定义 FC 数据解析失败");
}
method = message.getMethod();
if (METHOD_AUTH.equals(method)) {
handleAuth(socket, frame, json, frameFormat);
handleAuth(socket, frame, frameFormat, message.getParams());
return;
}
log.warn("[handleCustomFrame][未知 method: {}, frame: slaveId={}, FC={}, customData={}]",
method, frame.getSlaveId(), frame.getFunctionCode(), frame.getCustomData());
} catch (ServiceException e) {
// 已知业务异常,返回对应的错误码和错误信息
sendCustomResponse(socket, frame, frameFormat, e.getCode(), e.getMessage());
sendCustomResponse(socket, frame, frameFormat, method, e.getCode(), e.getMessage());
} catch (IllegalArgumentException e) {
// 参数校验异常,返回 400 错误
sendCustomResponse(socket, frame, frameFormat, BAD_REQUEST.getCode(), e.getMessage());
sendCustomResponse(socket, frame, frameFormat, method, BAD_REQUEST.getCode(), e.getMessage());
} catch (Exception e) {
// 其他未知异常,返回 500 错误
log.error("[handleCustomFrame][解析自定义 FC 数据失败, frame: slaveId={}, FC={}, customData={}]",
frame.getSlaveId(), frame.getFunctionCode(), frame.getCustomData(), e);
sendCustomResponse(socket, frame, frameFormat,
sendCustomResponse(socket, frame, frameFormat, method,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
}
}
// TODO @芋艿:在 review 下这个类;
// TODO @AI不传递 json直接在 frame
/**
* 处理认证请求
*/
private void handleAuth(NetSocket socket, IotModbusFrame frame, JSONObject json,
IotModbusFrameFormatEnum frameFormat) {
// TODO @AI是不是可以 JsonUtils.convert(json, IotDeviceAuthReqDTO.class)
JSONObject params = json.getJSONObject("params");
if (params == null) {
throw invalidParamException("params 不能为空");
}
// DONE @AI参数判空
String clientId = params.getStr("clientId");
String username = params.getStr("username");
String password = params.getStr("password");
// TODO @AI逐个判空
if (StrUtil.hasBlank(clientId, username, password)) {
throw invalidParamException("clientId、username、password 不能为空");
}
@SuppressWarnings("DataFlowIssue")
private void handleAuth(NetSocket socket, IotModbusFrame frame, IotModbusFrameFormatEnum frameFormat, Object params) {
// 1. 解析认证参数
IotDeviceAuthReqDTO request = JsonUtils.convertObject(params, IotDeviceAuthReqDTO.class);
Assert.notNull(request, "认证参数不能为空");
Assert.notBlank(request.getClientId(), "clientId 不能为空");
Assert.notBlank(request.getUsername(), "username 不能为空");
Assert.notBlank(request.getPassword(), "password 不能为空");
// 1. 调用认证 API
IotDeviceAuthReqDTO authReq = new IotDeviceAuthReqDTO()
.setClientId(clientId).setUsername(username).setPassword(password);
CommonResult<Boolean> authResult = deviceApi.authDevice(authReq);
authResult.checkError();
if (BooleanUtil.isFalse(authResult.getData())) {
log.warn("[handleAuth][认证失败, clientId={}, username={}]", clientId, username);
sendCustomResponse(socket, frame, frameFormat, 1, "认证失败");
// 2.1 调用认证 API
CommonResult<Boolean> result = deviceApi.authDevice(request);
result.checkError();
if (BooleanUtil.isFalse(result.getData())) {
log.warn("[handleAuth][认证失败, clientId={}, username={}]", request.getClientId(), request.getUsername());
sendCustomResponse(socket, frame, frameFormat, METHOD_AUTH, 1, "认证失败");
return;
}
// 2.1 认证成功,查找设备配置
IotModbusDeviceConfigRespDTO config = configCacheService.findConfigByAuth(clientId, username, password);
if (config == null) {
log.info("[handleAuth][认证成功但未找到设备配置, clientId={}, username={}]", clientId, username);
}
// 2.2 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(request.getUsername());
Assert.notNull(deviceInfo, "解析设备信息失败");
// 2.3 获取设备信息
// DONE @AIIotDeviceService 作为构造参数传入,不通过 SpringUtil.getBean
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
Assert.notNull(device, "设备不存在");
// 3. 注册连接
// 3.1 注册连接
ConnectionInfo connectionInfo = new ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(deviceInfo.getProductKey())
.setDeviceName(deviceInfo.getDeviceName())
.setSlaveId(frame.getSlaveId())
.setFrameFormat(frameFormat)
.setMode(config != null ? config.getMode() : IotModbusModeEnum.POLLING.getMode());
if (config != null) {
connectionInfo.setDeviceId(config.getDeviceId())
.setProductKey(config.getProductKey())
.setDeviceName(config.getDeviceName());
}
.setFrameFormat(frameFormat);
connectionManager.registerConnection(socket, connectionInfo);
// 3.2 发送上线消息
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
messageService.sendDeviceMessage(onlineMessage, deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
// 3.3 发送成功响应
sendCustomResponse(socket, frame, frameFormat, METHOD_AUTH,
GlobalErrorCodeConstants.SUCCESS.getCode(), "success");
log.info("[handleAuth][认证成功, clientId={}, deviceId={}]", request.getClientId(), device.getId());
// 4. 发送认证成功响应
sendCustomResponse(socket, frame, frameFormat, 0, "success");
log.info("[handleAuth][认证成功, clientId={}, deviceId={}]", clientId,
config != null ? config.getDeviceId() : device.getId());
// 5. 直接启动轮询
// 4. 加载设备配置并启动轮询
IotModbusDeviceConfigRespDTO config = configCacheService.loadDeviceConfig(device.getId());
if (config != null) {
pollScheduler.updatePolling(config);
} else {
log.warn("[handleAuth][认证成功但未找到设备配置, deviceId={}]", device.getId());
}
}
@@ -236,12 +207,13 @@ public class IotModbusTcpSlaveUpstreamHandler {
*/
private void sendCustomResponse(NetSocket socket, IotModbusFrame frame,
IotModbusFrameFormatEnum frameFormat,
int code, String message) {
JSONObject resp = new JSONObject();
resp.set("method", METHOD_AUTH);
resp.set("code", code);
resp.set("message", message);
byte[] data = frameEncoder.encodeCustomFrame(frame.getSlaveId(), resp.toString(),
String method, int code, String message) {
Map<String, Object> response = MapUtil.<String, Object>builder()
.put("method", method)
.put("code", code)
.put("message", message)
.build();
byte[] data = frameEncoder.encodeCustomFrame(frame.getSlaveId(), JsonUtils.toJsonString(response),
frameFormat, frame.getTransactionId() != null ? frame.getTransactionId() : 0);
connectionManager.sendToSocket(socket, data);
}
@@ -251,9 +223,16 @@ public class IotModbusTcpSlaveUpstreamHandler {
/**
* 处理轮询响应(云端轮询模式)
*/
private void handlePollingResponse(ConnectionInfo info, IotModbusFrame frame,
private void handlePollingResponse(NetSocket socket, IotModbusFrame frame,
IotModbusFrameFormatEnum frameFormat) {
// 1.1 匹配 PendingRequest
// 1. 获取连接信息(未认证连接丢弃)
ConnectionInfo info = connectionManager.getConnectionInfo(socket);
if (info == null) {
log.warn("[handlePollingResponse][未认证连接, 丢弃数据, remoteAddress={}]", socket.remoteAddress());
return;
}
// 2.1 匹配 PendingRequest
PendingRequest request = pendingRequestManager.matchResponse(
info.getDeviceId(), frame, frameFormat);
if (request == null) {
@@ -261,26 +240,27 @@ public class IotModbusTcpSlaveUpstreamHandler {
info.getDeviceId(), frame.getFunctionCode());
return;
}
// 1.2 提取寄存器值
// 2.2 提取寄存器值
int[] rawValues = IotModbusUtils.extractValues(frame);
if (rawValues == null) {
log.warn("[handlePollingResponse][提取寄存器值失败, deviceId={}, identifier={}]",
info.getDeviceId(), request.getIdentifier());
return;
}
// 1.3 查找点位配置
// 2.3 查找点位配置
IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(info.getDeviceId());
if (config == null || CollUtil.isEmpty(config.getPoints())) {
return;
}
var point = CollUtil.findOne(config.getPoints(), p -> p.getId().equals(request.getPointId()));
IotModbusPointRespDTO point = CollUtil.findOne(config.getPoints(),
p -> p.getId().equals(request.getPointId()));
if (point == null) {
return;
}
// 2.1 点位翻译
// 3.1 点位翻译
Object convertedValue = dataConverter.convertToPropertyValue(rawValues, point);
// 2.2 上报属性
// 3.2 上报属性
Map<String, Object> params = MapUtil.of(request.getIdentifier(), convertedValue);
IotDeviceMessage message = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params);

View File

@@ -80,7 +80,7 @@ public class IotModbusTcpSlaveConfigCacheService {
*/
private List<IotModbusDeviceConfigRespDTO> buildMockConfigs() {
IotModbusDeviceConfigRespDTO config = new IotModbusDeviceConfigRespDTO();
config.setDeviceId(1L);
config.setDeviceId(25L);
config.setProductKey("4aymZgOTOOCrDKRT");
config.setDeviceName("small");
config.setSlaveId(1);

View File

@@ -8,7 +8,7 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;