feat(iot):【协议改造】移除 codecType,使用 protocolType 替代

This commit is contained in:
YunaiV
2026-02-04 10:00:15 +08:00
parent cc0d786d0f
commit a77e1780cc
25 changed files with 159 additions and 662 deletions

View File

@@ -58,7 +58,7 @@ public class IoTDeviceApiImpl implements IotDeviceCommonApi {
return success(BeanUtils.toBean(device, IotDeviceRespDTO.class, deviceDTO -> {
IotProductDO product = productService.getProductFromCache(deviceDTO.getProductId());
if (product != null) {
deviceDTO.setCodecType(product.getCodecType());
deviceDTO.setProtocolType(product.getProtocolType()).setSerializeType(product.getSerializeType());
}
}));
}

View File

@@ -67,10 +67,15 @@ public class IotProductRespVO {
@DictFormat(DictTypeConstants.NET_TYPE)
private Integer netType;
@Schema(description = "数据格式", requiredMode = Schema.RequiredMode.REQUIRED, example = "2")
@ExcelProperty(value = "数据格式", converter = DictConvert.class)
@DictFormat(DictTypeConstants.CODEC_TYPE)
private String codecType;
@Schema(description = "协议类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "mqtt")
@ExcelProperty(value = "协议类型", converter = DictConvert.class)
@DictFormat(DictTypeConstants.PROTOCOL_TYPE)
private String protocolType;
@Schema(description = "序列化类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "json")
@ExcelProperty(value = "序列化类型", converter = DictConvert.class)
@DictFormat(DictTypeConstants.SERIALIZE_TYPE)
private String serializeType;
@Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("创建时间")

View File

@@ -1,6 +1,8 @@
package cn.iocoder.yudao.module.iot.controller.admin.product.vo.product;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.enums.product.IotNetTypeEnum;
import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -44,9 +46,15 @@ public class IotProductSaveReqVO {
@InEnum(value = IotNetTypeEnum.class, message = "联网方式必须是 {value}")
private Integer netType;
@Schema(description = "数据格式", requiredMode = Schema.RequiredMode.REQUIRED, example = "2")
@NotEmpty(message = "数据格式不能为空")
private String codecType;
@Schema(description = "协议类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "mqtt")
@InEnum(value = IotProtocolTypeEnum.class, message = "协议类型必须是 {value}")
@NotEmpty(message = "协议类型不能为空")
private String protocolType;
@Schema(description = "序列化类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "json")
@InEnum(value = IotSerializeTypeEnum.class, message = "序列化类型必须是 {value}")
@NotEmpty(message = "序列化类型不能为空")
private String serializeType;
@Schema(description = "是否开启动态注册", example = "false")
@NotNull(message = "是否开启动态注册不能为空")

View File

@@ -78,12 +78,16 @@ public class IotProductDO extends TenantBaseDO {
*/
private Integer netType;
/**
* 数据格式(编解码器类型
* 协议类型
* <p>
* 字典 {@link cn.iocoder.yudao.module.iot.enums.DictTypeConstants#CODEC_TYPE}
*
* 目的:用于 gateway-server 解析消息格式
* 枚举 {@link cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum}
*/
private String codecType;
private String protocolType;
/**
* 序列化类型
* <p>
* 枚举 {@link cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum}
*/
private String serializeType;
}

View File

@@ -8,8 +8,8 @@ package cn.iocoder.yudao.module.iot.enums;
public class DictTypeConstants {
public static final String NET_TYPE = "iot_net_type";
public static final String LOCATION_TYPE = "iot_location_type";
public static final String CODEC_TYPE = "iot_codec_type";
public static final String PROTOCOL_TYPE = "iot_protocol_type";
public static final String SERIALIZE_TYPE = "iot_serialize_type";
public static final String PRODUCT_STATUS = "iot_product_status";
public static final String PRODUCT_DEVICE_TYPE = "iot_product_device_type";

View File

@@ -34,8 +34,12 @@ public class IotDeviceRespDTO {
*/
private Long productId;
/**
* 编解码器类型
* 协议类型
*/
private String codecType;
private String protocolType;
/**
* 序列化类型
*/
private String serializeType;
}

View File

@@ -1,33 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.codec;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
/**
* {@link cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage} 的编解码器
*
* @author 芋道源码
*/
public interface IotDeviceMessageCodec {
/**
* 编码消息
*
* @param message 消息
* @return 编码后的消息内容
*/
byte[] encode(IotDeviceMessage message);
/**
* 解码消息
*
* @param bytes 消息内容
* @return 解码后的消息内容
*/
IotDeviceMessage decode(byte[] bytes);
/**
* @return 数据格式(编码器类型)
*/
String type();
}

View File

@@ -1,89 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.codec.alink;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
/**
* 阿里云 Alink {@link IotDeviceMessage} 的编解码器
*
* @author 芋道源码
*/
@Component
public class IotAlinkDeviceMessageCodec implements IotDeviceMessageCodec {
public static final String TYPE = "Alink";
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class AlinkMessage {
public static final String VERSION_1 = "1.0";
/**
* 消息 ID且每个消息 ID 在当前设备具有唯一性
*/
private String id;
/**
* 版本号
*/
private String version;
/**
* 请求方法
*/
private String method;
/**
* 请求参数
*/
private Object params;
/**
* 响应结果
*/
private Object data;
/**
* 响应错误码
*/
private Integer code;
/**
* 响应提示
*
* 特殊:这里阿里云是 message为了保持和项目的 {@link CommonResult#getMsg()} 一致。
*/
private String msg;
}
@Override
public String type() {
return TYPE;
}
@Override
public byte[] encode(IotDeviceMessage message) {
AlinkMessage alinkMessage = new AlinkMessage(message.getRequestId(), AlinkMessage.VERSION_1,
message.getMethod(), message.getParams(), message.getData(), message.getCode(), message.getMsg());
return JsonUtils.toJsonByte(alinkMessage);
}
@Override
@SuppressWarnings("DataFlowIssue")
public IotDeviceMessage decode(byte[] bytes) {
AlinkMessage alinkMessage = JsonUtils.parseObject(bytes, AlinkMessage.class);
Assert.notNull(alinkMessage, "消息不能为空");
Assert.equals(alinkMessage.getVersion(), AlinkMessage.VERSION_1, "消息版本号必须是 1.0");
return IotDeviceMessage.of(alinkMessage.getId(), alinkMessage.getMethod(), alinkMessage.getParams(),
alinkMessage.getData(), alinkMessage.getCode(), alinkMessage.getMsg());
}
}

View File

@@ -1,4 +0,0 @@
/**
* 提供设备接入的各种数据(请求、响应)的编解码
*/
package cn.iocoder.yudao.module.iot.gateway.codec;

View File

@@ -1,286 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.codec.tcp;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import io.vertx.core.buffer.Buffer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* TCP/UDP 二进制格式 {@link IotDeviceMessage} 编解码器
* <p>
* 二进制协议格式(所有数值使用大端序):
*
* <pre>
* +--------+--------+--------+---------------------------+--------+--------+
* | 魔术字 | 版本号 | 消息类型| 消息长度(4 字节) |
* +--------+--------+--------+---------------------------+--------+--------+
* | 消息 ID 长度(2 字节) | 消息 ID (变长字符串) |
* +--------+--------+--------+--------+--------+--------+--------+--------+
* | 方法名长度(2 字节) | 方法名(变长字符串) |
* +--------+--------+--------+--------+--------+--------+--------+--------+
* | 消息体数据(变长) |
* +--------+--------+--------+--------+--------+--------+--------+--------+
* </pre>
* <p>
* 消息体格式:
* - 请求消息params 数据(JSON)
* - 响应消息code (4字节) + msg 长度(2字节) + msg 字符串 + data 数据(JSON)
* <p>
* 注意deviceId 不包含在协议中,由服务器根据连接上下文自动设置
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
public static final String TYPE = "TCP_BINARY";
/**
* 协议魔术字,用于协议识别
*/
private static final byte MAGIC_NUMBER = (byte) 0x7E;
/**
* 协议版本号
*/
private static final byte PROTOCOL_VERSION = (byte) 0x01;
/**
* 请求消息类型
*/
private static final byte REQUEST = (byte) 0x01;
/**
* 响应消息类型
*/
private static final byte RESPONSE = (byte) 0x02;
/**
* 协议头部固定长度(魔术字 + 版本号 + 消息类型 + 消息长度)
*/
private static final int HEADER_FIXED_LENGTH = 7;
/**
* 最小消息长度(头部 + 消息ID长度 + 方法名长度)
*/
private static final int MIN_MESSAGE_LENGTH = HEADER_FIXED_LENGTH + 4;
@Override
public String type() {
return TYPE;
}
@Override
public byte[] encode(IotDeviceMessage message) {
Assert.notNull(message, "消息不能为空");
Assert.notBlank(message.getMethod(), "消息方法不能为空");
try {
// 1. 确定消息类型
byte messageType = determineMessageType(message);
// 2. 构建消息体
byte[] bodyData = buildMessageBody(message, messageType);
// 3. 构建完整消息
return buildCompleteMessage(message, messageType, bodyData);
} catch (Exception e) {
log.error("[encode][TCP 二进制消息编码失败,消息: {}]", message, e);
throw new RuntimeException("TCP 二进制消息编码失败: " + e.getMessage(), e);
}
}
@Override
public IotDeviceMessage decode(byte[] bytes) {
Assert.notNull(bytes, "待解码数据不能为空");
Assert.isTrue(bytes.length >= MIN_MESSAGE_LENGTH, "数据包长度不足");
try {
Buffer buffer = Buffer.buffer(bytes);
// 解析协议头部和消息内容
int index = 0;
// 1. 验证魔术字
byte magic = buffer.getByte(index++);
Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic);
// 2. 验证版本号
byte version = buffer.getByte(index++);
Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version);
// 3. 读取消息类型
byte messageType = buffer.getByte(index++);
// 直接验证消息类型,无需抽取方法
Assert.isTrue(messageType == REQUEST || messageType == RESPONSE,
"无效的消息类型: " + messageType);
// 4. 读取消息长度
int messageLength = buffer.getInt(index);
index += 4;
Assert.isTrue(messageLength == buffer.length(),
"消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length());
// 5. 读取消息 ID
short messageIdLength = buffer.getShort(index);
index += 2;
String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name());
index += messageIdLength;
// 6. 读取方法名
short methodLength = buffer.getShort(index);
index += 2;
String method = buffer.getString(index, index + methodLength, StandardCharsets.UTF_8.name());
index += methodLength;
// 7. 解析消息体
return parseMessageBody(buffer, index, messageType, messageId, method);
} catch (Exception e) {
log.error("[decode][TCP 二进制消息解码失败,数据长度: {}]", bytes.length, e);
throw new RuntimeException("TCP 二进制消息解码失败: " + e.getMessage(), e);
}
}
/**
* 确定消息类型
* 优化后的判断逻辑:有响应字段就是响应消息,否则就是请求消息
*/
private byte determineMessageType(IotDeviceMessage message) {
// 判断是否为响应消息:有响应码或响应消息时为响应
if (message.getCode() != null) {
return RESPONSE;
}
// 默认为请求消息
return REQUEST;
}
/**
* 构建消息体
*/
private byte[] buildMessageBody(IotDeviceMessage message, byte messageType) {
Buffer bodyBuffer = Buffer.buffer();
if (messageType == RESPONSE) {
// code
bodyBuffer.appendInt(message.getCode() != null ? message.getCode() : 0);
// msg
String msg = message.getMsg() != null ? message.getMsg() : "";
byte[] msgBytes = StrUtil.utf8Bytes(msg);
bodyBuffer.appendShort((short) msgBytes.length);
bodyBuffer.appendBytes(msgBytes);
// data
if (message.getData() != null) {
bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getData()));
}
} else {
// 请求消息只处理 params 参数
// TODO @haohao如果为空是不是得写个长度 0 哈?
if (message.getParams() != null) {
bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getParams()));
}
}
return bodyBuffer.getBytes();
}
/**
* 构建完整消息
*/
private byte[] buildCompleteMessage(IotDeviceMessage message, byte messageType, byte[] bodyData) {
Buffer buffer = Buffer.buffer();
// 1. 写入协议头部
buffer.appendByte(MAGIC_NUMBER);
buffer.appendByte(PROTOCOL_VERSION);
buffer.appendByte(messageType);
// 2. 预留消息长度位置(在 5. 更新消息长度)
int lengthPosition = buffer.length();
buffer.appendInt(0);
// 3. 写入消息 ID
String messageId = StrUtil.isNotBlank(message.getRequestId()) ? message.getRequestId()
: IotDeviceMessageUtils.generateMessageId();
byte[] messageIdBytes = StrUtil.utf8Bytes(messageId);
buffer.appendShort((short) messageIdBytes.length);
buffer.appendBytes(messageIdBytes);
// 4. 写入方法名
byte[] methodBytes = StrUtil.utf8Bytes(message.getMethod());
buffer.appendShort((short) methodBytes.length);
buffer.appendBytes(methodBytes);
// 5. 写入消息体
buffer.appendBytes(bodyData);
// 6. 更新消息长度
buffer.setInt(lengthPosition, buffer.length());
return buffer.getBytes();
}
/**
* 解析消息体
*/
private IotDeviceMessage parseMessageBody(Buffer buffer, int startIndex, byte messageType,
String messageId, String method) {
if (startIndex >= buffer.length()) {
// 空消息体
return IotDeviceMessage.of(messageId, method, null, null, null, null);
}
if (messageType == RESPONSE) {
// 响应消息:解析 code + msg + data
return parseResponseMessage(buffer, startIndex, messageId, method);
} else {
// 请求消息:解析 payload
Object payload = parseJsonData(buffer, startIndex, buffer.length());
return IotDeviceMessage.of(messageId, method, payload, null, null, null);
}
}
/**
* 解析响应消息
*/
private IotDeviceMessage parseResponseMessage(Buffer buffer, int startIndex, String messageId, String method) {
int index = startIndex;
// 1. 读取响应码
Integer code = buffer.getInt(index);
index += 4;
// 2. 读取响应消息
short msgLength = buffer.getShort(index);
index += 2;
String msg = msgLength > 0 ? buffer.getString(index, index + msgLength, StandardCharsets.UTF_8.name()) : null;
index += msgLength;
// 3. 读取响应数据
Object data = null;
if (index < buffer.length()) {
data = parseJsonData(buffer, index, buffer.length());
}
return IotDeviceMessage.of(messageId, method, null, data, code, msg);
}
/**
* 解析 JSON 数据
*/
private Object parseJsonData(Buffer buffer, int startIndex, int endIndex) {
if (startIndex >= endIndex) {
return null;
}
try {
String jsonStr = buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name());
return JsonUtils.parseObject(jsonStr, Object.class);
} catch (Exception e) {
log.warn("[parseJsonData][JSON 解析失败,返回原始字符串]", e);
return buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name());
}
}
/**
* 快速检测是否为二进制格式
*
* @param data 数据
* @return 是否为二进制格式
*/
public static boolean isBinaryFormatQuick(byte[] data) {
return data != null && data.length >= 1 && data[0] == MAGIC_NUMBER;
}
}

View File

@@ -1,110 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.codec.tcp;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
/**
* TCP/UDP JSON 格式 {@link IotDeviceMessage} 编解码器
*
* 采用纯 JSON 格式传输,格式如下:
* {
* "id": "消息 ID",
* "method": "消息方法",
* "params": {...}, // 请求参数
* "data": {...}, // 响应结果
* "code": 200, // 响应错误码
* "msg": "success", // 响应提示
* "timestamp": 时间戳
* }
*
* @author 芋道源码
*/
@Component
public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
public static final String TYPE = "TCP_JSON";
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class TcpJsonMessage {
/**
* 消息 ID且每个消息 ID 在当前设备具有唯一性
*/
private String id;
/**
* 请求方法
*/
private String method;
/**
* 请求参数
*/
private Object params;
/**
* 响应结果
*/
private Object data;
/**
* 响应错误码
*/
private Integer code;
/**
* 响应提示
*/
private String msg;
/**
* 时间戳
*/
private Long timestamp;
}
@Override
public String type() {
return TYPE;
}
@Override
public byte[] encode(IotDeviceMessage message) {
TcpJsonMessage tcpJsonMessage = new TcpJsonMessage(
message.getRequestId(),
message.getMethod(),
message.getParams(),
message.getData(),
message.getCode(),
message.getMsg(),
System.currentTimeMillis());
return JsonUtils.toJsonByte(tcpJsonMessage);
}
@Override
@SuppressWarnings("DataFlowIssue")
public IotDeviceMessage decode(byte[] bytes) {
String jsonStr = StrUtil.utf8Str(bytes).trim();
TcpJsonMessage tcpJsonMessage = JsonUtils.parseObject(jsonStr, TcpJsonMessage.class);
Assert.notNull(tcpJsonMessage, "消息不能为空");
Assert.notBlank(tcpJsonMessage.getMethod(), "消息方法不能为空");
return IotDeviceMessage.of(
tcpJsonMessage.getId(),
tcpJsonMessage.getMethod(),
tcpJsonMessage.getParams(),
tcpJsonMessage.getData(),
tcpJsonMessage.getCode(),
tcpJsonMessage.getMsg());
}
}

View File

@@ -53,9 +53,10 @@ public class IotEmqxDownstreamHandler {
return;
}
// 2.2 构建载荷
byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
byte[] payload = deviceMessageService.serializeDeviceMessage(message, deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
// 2.3 发布消息
// 3. 发布消息
protocol.publishMessage(topic, payload);
}

View File

@@ -452,16 +452,17 @@ public class IotEmqxAuthEventHandler {
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
Assert.notNull(deviceInfo, "设备信息不能为空");
try {
// 1. 构建响应消息
// 1.1 构建响应消息
String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod();
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(null, method, result, 0, null);
// 2. 编码消息
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, "Alink");
// 3. 构建响应主题并延迟发布(等待设备连接成功并完成订阅)
// 1.2 序列化消息
byte[] encodedData = deviceMessageService.serializeDeviceMessage(responseMessage,
cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum.JSON);
// 1.3 构建响应主题
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method,
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), true);
// 2. 构建响应主题,并延迟发布(等待设备连接成功并完成订阅)
protocol.publishDelayMessage(replyTopic, encodedData, 5000);
log.info("[sendRegisterResultMessage][发送注册结果: topic={}]", replyTopic);
} catch (Exception e) {

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -34,22 +35,21 @@ public class IotEmqxUpstreamHandler {
try {
// 1. 解析主题,一次性获取所有信息
String[] topicParts = topic.split("/");
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
String productKey = ArrayUtil.get(topicParts, 2);
String deviceName = ArrayUtil.get(topicParts, 3);
if (topicParts.length < 4 || StrUtil.hasBlank(productKey, deviceName)) {
log.warn("[handle][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic);
return;
}
String productKey = topicParts[2];
String deviceName = topicParts[3];
// 3. 解码消息
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
// 2. 反序列化消息
IotDeviceMessage message = deviceMessageService.deserializeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[handle][topic({}) payload({}) 消息解码失败]", topic, new String(payload));
return;
}
// 4. 发送消息到队列
// 3. 发送消息到队列
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
} catch (Exception e) {
log.error("[handle][topic({}) payload({}) 处理异常]", topic, new String(payload), e);

View File

@@ -43,7 +43,7 @@ public class IotMqttDownstreamHandler {
}
// 2.1 序列化消息
byte[] payload = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getProductKey(),
byte[] payload = deviceMessageService.serializeDeviceMessage(message, connectionInfo.getProductKey(),
connectionInfo.getDeviceName());
Assert.isTrue(payload != null && payload.length > 0, "消息编码结果不能为空");
// 2.2 构建主题

View File

@@ -21,12 +21,6 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public abstract class IotMqttAbstractHandler {
// TODO @AI当前使用 Alink 序列化类型,后续可考虑支持更多序列化方式
/**
* 默认编解码类型MQTT 使用 Alink 协议)
*/
protected static final String DEFAULT_CODEC_TYPE = "Alink";
protected final IotMqttConnectionManager connectionManager;
protected final IotDeviceMessageService deviceMessageService;
@@ -40,8 +34,9 @@ public abstract class IotMqttAbstractHandler {
* @param method 方法名
* @param data 响应数据
*/
@SuppressWarnings("SameParameterValue")
protected void sendSuccessResponse(MqttEndpoint endpoint, String productKey, String deviceName,
String requestId, String method, Object data) {
String requestId, String method, Object data) {
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, data, 0, null);
writeResponse(endpoint, productKey, deviceName, method, responseMessage);
}
@@ -75,11 +70,12 @@ public abstract class IotMqttAbstractHandler {
private void writeResponse(MqttEndpoint endpoint, String productKey, String deviceName,
String method, IotDeviceMessage responseMessage) {
try {
// 1. 编码消息(使用默认编解码器
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE);
// 2. 构建响应主题,并发送
// 1.1 序列化消息(根据设备配置的序列化类型
byte[] encodedData = deviceMessageService.serializeDeviceMessage(responseMessage, productKey, deviceName);
// 1.2 构建响应主题
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, productKey, deviceName, true);
// 2. 发送响应消息
endpoint.publish(replyTopic, Buffer.buffer(encodedData), MqttQoS.AT_LEAST_ONCE, false, false);
log.debug("[writeResponse][发送响应,主题: {}code: {}]", replyTopic, responseMessage.getCode());
} catch (Exception e) {

View File

@@ -14,6 +14,8 @@ import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessa
import io.vertx.mqtt.MqttEndpoint;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IoT 网关 MQTT 设备注册处理器:处理设备动态注册消息(一型一密)
*
@@ -76,7 +78,8 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler {
log.warn("[handleRegister][注册失败,客户端 ID: {},错误: {}]", clientId, e.getMessage());
// 接受连接,并发送错误响应
endpoint.accept(false);
sendErrorResponse(endpoint, productKey, deviceName, null, method, 500, e.getMessage());
sendErrorResponse(endpoint, productKey, deviceName, null, method,
INTERNAL_SERVER_ERROR.getCode(), e.getMessage());
} finally {
// 注册完成后关闭连接(一型一密只用于获取 deviceSecret不保持连接
endpoint.close();

View File

@@ -60,7 +60,7 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
Assert.equals(deviceName, connectionInfo.getDeviceName(), "设备名称不匹配");
// 2. 反序列化消息
message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
message = deviceMessageService.deserializeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[handleBusinessRequest][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
sendErrorResponse(endpoint, productKey, deviceName, null, null,

View File

@@ -1,4 +1,4 @@
/**
* 提供设备接入的各种协议的实现
* 设备接入协议MQTT、EMQX、HTTP、TCP 等协议的实现
*/
package cn.iocoder.yudao.module.iot.gateway.protocol;
package cn.iocoder.yudao.module.iot.gateway.protocol;

View File

@@ -0,0 +1,6 @@
/**
* 消息序列化将设备消息转换为字节数组JSON、二进制等格式
*
* @see cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer
*/
package cn.iocoder.yudao.module.iot.gateway.serialize;

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.service.device.message;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
/**
@@ -10,45 +11,45 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
public interface IotDeviceMessageService {
/**
* 编码消息
* 序列化消息
*
* @param message 消息
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 编码后的消息内容
* @return 序列化后的消息内容
*/
byte[] encodeDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName);
byte[] serializeDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName);
/**
* 编码消息
* 序列化消息
*
* @param message 消息
* @param codecType 编解码器类型
* @return 编码后的消息内容
* @param message 消息
* @param serializeType 序列化类型
* @return 序列化后的消息内容
*/
byte[] encodeDeviceMessage(IotDeviceMessage message,
String codecType);
byte[] serializeDeviceMessage(IotDeviceMessage message,
IotSerializeTypeEnum serializeType);
/**
* 解码消息
* 反序列化消息
*
* @param bytes 消息内容
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 解码后的消息内容
* @return 反序列化后的消息内容
*/
IotDeviceMessage decodeDeviceMessage(byte[] bytes,
String productKey, String deviceName);
IotDeviceMessage deserializeDeviceMessage(byte[] bytes,
String productKey, String deviceName);
/**
* 解码消息
* 反序列化消息
*
* @param bytes 消息内容
* @param codecType 编解码器类型
* @return 解码后的消息内容
* @param bytes 消息内容
* @param serializeType 序列化类型
* @return 反序列化后的消息内容
*/
IotDeviceMessage decodeDeviceMessage(byte[] bytes, String codecType);
IotDeviceMessage deserializeDeviceMessage(byte[] bytes, IotSerializeTypeEnum serializeType);
/**
* 发送消息

View File

@@ -1,20 +1,20 @@
package cn.iocoder.yudao.module.iot.gateway.service.device.message;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_NOT_EXISTS;
@@ -28,80 +28,70 @@ import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVIC
@Slf4j
public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
/**
* 编解码器
*/
private final Map<String, IotDeviceMessageCodec> codes;
@Resource
private IotDeviceService deviceService;
@Resource
private IotDeviceMessageProducer deviceMessageProducer;
public IotDeviceMessageServiceImpl(List<IotDeviceMessageCodec> codes) {
this.codes = CollectionUtils.convertMap(codes, IotDeviceMessageCodec::type);
}
@Resource
private IotMessageSerializerManager messageSerializerManager;
@Override
public byte[] encodeDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName) {
public byte[] serializeDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName) {
// 1.1 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName);
if (device == null) {
throw exception(DEVICE_NOT_EXISTS, productKey, deviceName);
}
// 1.2 获取编解码
IotDeviceMessageCodec codec = codes.get(device.getCodecType());
if (codec == null) {
throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", device.getCodecType()));
}
// 1.2 获取序列化
IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(device.getSerializeType());
Assert.notNull(serializeType, "设备序列化类型不能为空");
// 2. 编码消息
return codec.encode(message);
// 2. 序列化消息
return serializeDeviceMessage(message, serializeType);
}
@Override
public byte[] encodeDeviceMessage(IotDeviceMessage message,
String codecType) {
// 1. 获取编解码
IotDeviceMessageCodec codec = codes.get(codecType);
if (codec == null) {
throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", codecType));
public byte[] serializeDeviceMessage(IotDeviceMessage message,
IotSerializeTypeEnum serializeType) {
// 1. 获取序列化
IotMessageSerializer serializer = messageSerializerManager.get(serializeType);
if (serializer == null) {
throw new IllegalArgumentException(StrUtil.format("序列化器({}) 不存在", serializeType));
}
// 2. 编码消息
return codec.encode(message);
// 2. 序列化消息
return serializer.serialize(message);
}
@Override
public IotDeviceMessage decodeDeviceMessage(byte[] bytes,
String productKey, String deviceName) {
public IotDeviceMessage deserializeDeviceMessage(byte[] bytes,
String productKey, String deviceName) {
// 1.1 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName);
if (device == null) {
throw exception(DEVICE_NOT_EXISTS, productKey, deviceName);
}
// 1.2 获取编解码
IotDeviceMessageCodec codec = codes.get(device.getCodecType());
if (codec == null) {
throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", device.getCodecType()));
}
// 1.2 获取序列化
IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(device.getSerializeType());
Assert.notNull(serializeType, "设备序列化类型不能为空");
// 2. 解码消息
return codec.decode(bytes);
// 2. 反序列化消息
return deserializeDeviceMessage(bytes, serializeType);
}
@Override
public IotDeviceMessage decodeDeviceMessage(byte[] bytes, String codecType) {
// 1. 获取编解码
IotDeviceMessageCodec codec = codes.get(codecType);
if (codec == null) {
throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", codecType));
public IotDeviceMessage deserializeDeviceMessage(byte[] bytes, IotSerializeTypeEnum serializeType) {
// 1. 获取序列化
IotMessageSerializer serializer = messageSerializerManager.get(serializeType);
if (serializer == null) {
throw new IllegalArgumentException(StrUtil.format("序列化器({}) 不存在", serializeType));
}
// 2. 解码消息
return codec.decode(bytes);
// 2. 反序列化消息
return serializer.deserialize(bytes);
}
@Override

View File

@@ -8,8 +8,8 @@ import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.core.util.IotProductAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
@@ -57,9 +57,9 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
private static Vertx vertx;
// ===================== 编解码器MQTT 使用 Alink 协议) =====================
// ===================== 序列化器 =====================
private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec();
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) =====================
@@ -211,7 +211,7 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
client.publishHandler(message -> {
log.info("[testDeviceRegister][收到响应: topic={}, payload={}]",
message.topicName(), message.payload().toString());
IotDeviceMessage response = CODEC.decode(message.payload().getBytes());
IotDeviceMessage response = SERIALIZER.deserialize(message.payload().getBytes());
responseFuture.complete(response);
});
// 3.2 订阅 _reply 主题
@@ -314,14 +314,14 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
client.publishHandler(message -> {
log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]",
message.topicName(), message.payload().toString());
IotDeviceMessage response = CODEC.decode(message.payload().getBytes());
IotDeviceMessage response = SERIALIZER.deserialize(message.payload().getBytes());
responseFuture.complete(response);
});
// 2. 编码并发布消息
byte[] payload = CODEC.encode(request);
log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]",
CODEC.type(), topic, new String(payload));
// 2. 序列化并发布消息
byte[] payload = SERIALIZER.serialize(request);
log.info("[publishAndWaitReply][Serializer: {}, 发送消息: topic={}, payload={}]",
SERIALIZER.getType(), topic, new String(payload));
client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false)
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[publishAndWaitReply][消息发布成功]");

View File

@@ -12,8 +12,8 @@ import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
@@ -65,9 +65,9 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
private static Vertx vertx;
// ===================== 编解码器MQTT 使用 Alink 协议) =====================
// ===================== 序列化器 =====================
private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec();
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) =====================
@@ -399,14 +399,14 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
client.publishHandler(message -> {
log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]",
message.topicName(), message.payload().toString());
IotDeviceMessage response = CODEC.decode(message.payload().getBytes());
IotDeviceMessage response = SERIALIZER.deserialize(message.payload().getBytes());
responseFuture.complete(response);
});
// 2. 编码并发布消息
byte[] payload = CODEC.encode(request);
log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]",
CODEC.type(), topic, new String(payload));
// 2. 序列化并发布消息
byte[] payload = SERIALIZER.serialize(request);
log.info("[publishAndWaitReply][Serializer: {}, 发送消息: topic={}, payload={}]",
SERIALIZER.getType(), topic, new String(payload));
client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false)
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[publishAndWaitReply][消息发布成功]");

View File

@@ -7,8 +7,8 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
@@ -59,9 +59,9 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest {
private static Vertx vertx;
// ===================== 编解码器MQTT 使用 Alink 协议) =====================
// ===================== 序列化器 =====================
private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec();
private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer();
// ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
@@ -236,14 +236,14 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest {
client.publishHandler(message -> {
log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]",
message.topicName(), message.payload().toString());
IotDeviceMessage response = CODEC.decode(message.payload().getBytes());
IotDeviceMessage response = SERIALIZER.deserialize(message.payload().getBytes());
responseFuture.complete(response);
});
// 2. 编码并发布消息
byte[] payload = CODEC.encode(request);
log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]",
CODEC.type(), topic, new String(payload));
// 2. 序列化并发布消息
byte[] payload = SERIALIZER.serialize(request);
log.info("[publishAndWaitReply][Serializer: {}, 发送消息: topic={}, payload={}]",
SERIALIZER.getType(), topic, new String(payload));
client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false)
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[publishAndWaitReply][消息发布成功]");