mirror of
https://gitee.com/zhijiantianya/ruoyi-vue-pro.git
synced 2026-03-22 05:07:17 +08:00
feat(iot):【网关设备:50%】整体初步实现(未测试),基于 gateway-device-topic-design.md 规划
This commit is contained in:
@@ -35,6 +35,16 @@ public interface ErrorCodeConstants {
|
||||
ErrorCode DEVICE_SERIAL_NUMBER_EXISTS = new ErrorCode(1_050_003_008, "设备序列号已存在,序列号必须全局唯一");
|
||||
ErrorCode DEVICE_NOT_GATEWAY_SUB = new ErrorCode(1_050_003_009, "设备【{}/{}】不是网关子设备类型,无法绑定到网关");
|
||||
ErrorCode DEVICE_GATEWAY_BINDTO_EXISTS = new ErrorCode(1_050_003_010, "设备【{}/{}】已绑定到其他网关,请先解绑");
|
||||
// 拓扑管理相关错误码 1-050-003-100
|
||||
ErrorCode DEVICE_TOPO_PARAMS_INVALID = new ErrorCode(1_050_003_100, "拓扑管理参数无效");
|
||||
ErrorCode DEVICE_TOPO_SUB_DEVICE_USERNAME_INVALID = new ErrorCode(1_050_003_101, "子设备用户名格式无效");
|
||||
ErrorCode DEVICE_TOPO_SUB_DEVICE_AUTH_FAILED = new ErrorCode(1_050_003_102, "子设备认证失败");
|
||||
ErrorCode DEVICE_TOPO_SUB_NOT_BINDTO_GATEWAY = new ErrorCode(1_050_003_103, "子设备【{}/{}】未绑定到该网关");
|
||||
// TODO @AI:这里的错误码校验,要不要使用?
|
||||
ErrorCode DEVICE_TOPO_SUB_DEVICE_NOT_BOUND = new ErrorCode(1_050_003_104, "子设备【{}/{}】未绑定到任何网关");
|
||||
// 子设备注册相关错误码 1-050-003-200
|
||||
ErrorCode DEVICE_SUB_REGISTER_PARAMS_INVALID = new ErrorCode(1_050_003_200, "子设备注册参数无效");
|
||||
ErrorCode DEVICE_SUB_REGISTER_PRODUCT_NOT_GATEWAY_SUB = new ErrorCode(1_050_003_201, "产品【{}】不是网关子设备类型");
|
||||
|
||||
// ========== 产品分类 1-050-004-000 ==========
|
||||
ErrorCode PRODUCT_CATEGORY_NOT_EXISTS = new ErrorCode(1_050_004_000, "产品分类不存在");
|
||||
|
||||
@@ -468,6 +468,13 @@ public class IotDeviceServiceImpl implements IotDeviceService {
|
||||
log.error("[authDevice][设备({}/{}) 密码不正确]", productKey, deviceName);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 3. 校验子设备拓扑关系:子设备必须先绑定到某网关才能认证上线
|
||||
if (IotProductDeviceTypeEnum.isGatewaySub(device.getDeviceType())
|
||||
&& device.getGatewayId() != null) {
|
||||
log.warn("[authDevice][子设备({}/{}) 未绑定到任何网关,认证失败]", productKey, deviceName);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -10,17 +10,30 @@ import cn.iocoder.yudao.framework.common.pojo.PageResult;
|
||||
import cn.iocoder.yudao.framework.common.util.date.LocalDateTimeUtils;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.IotDeviceSaveReqVO;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.message.IotDeviceMessagePageReqVO;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.*;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceMessageDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDeviceMessageMapper;
|
||||
import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
|
||||
import cn.iocoder.yudao.module.iot.service.device.property.IotDevicePropertyService;
|
||||
import cn.iocoder.yudao.module.iot.service.ota.IotOtaTaskRecordService;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
@@ -35,12 +48,11 @@ import org.springframework.validation.annotation.Validated;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
||||
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
|
||||
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL;
|
||||
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.*;
|
||||
|
||||
/**
|
||||
* IoT 设备消息 Service 实现类
|
||||
@@ -59,6 +71,9 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
@Resource
|
||||
@Lazy // 延迟加载,避免循环依赖
|
||||
private IotOtaTaskRecordService otaTaskRecordService;
|
||||
@Resource
|
||||
@Lazy // 延迟加载,避免循环依赖
|
||||
private IotProductService productService;
|
||||
|
||||
@Resource
|
||||
private IotDeviceMessageMapper deviceMessageMapper;
|
||||
@@ -168,9 +183,10 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
// 2. 记录消息
|
||||
getSelf().createDeviceLogAsync(message);
|
||||
|
||||
// 3. 回复消息。前提:非 _reply 消息,并且非禁用回复的消息
|
||||
// TODO @AI:我在想,是不是批量上传后,还是得 reply 。因为打包上传的时候,只是那条消息的回复。然后,需要单独给每个子消息回复,后续至于怎么使用,是不是得看具体业务了;例如说:1)批量上传属性,默认回复是批量上传的消息;然后,每个属性、事件拆包消息,单独回复,后续网关设备按需回复给子设备。
|
||||
// 3. 回复消息。前提:非 _reply 消息、非禁用回复的消息、非拆包消息
|
||||
if (IotDeviceMessageUtils.isReplyMessage(message)
|
||||
|| IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod())
|
||||
|| !message.needReply()
|
||||
|| StrUtil.isEmpty(message.getServerId())) {
|
||||
return;
|
||||
}
|
||||
@@ -185,15 +201,19 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
}
|
||||
|
||||
// TODO @芋艿:可优化:未来逻辑复杂后,可以独立拆除 Processor 处理器
|
||||
@SuppressWarnings("SameReturnValue")
|
||||
private Object handleUpstreamDeviceMessage0(IotDeviceMessage message, IotDeviceDO device) {
|
||||
// 设备上下线
|
||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
|
||||
String stateStr = IotDeviceMessageUtils.getIdentifier(message);
|
||||
assert stateStr != null;
|
||||
Assert.notEmpty(stateStr, "设备状态不能为空");
|
||||
deviceService.updateDeviceState(device, Integer.valueOf(stateStr));
|
||||
// TODO 芋艿:子设备的关联
|
||||
Integer state = Integer.valueOf(stateStr);
|
||||
deviceService.updateDeviceState(device, state);
|
||||
// 特殊:网关设备下线时,网关子设备联动下线
|
||||
if (Objects.equal(state, IotDeviceStateEnum.OFFLINE.getState())
|
||||
&& IotProductDeviceTypeEnum.isGateway(device.getDeviceType())) {
|
||||
handleGatewayOffline(device, message.getServerId());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -202,6 +222,11 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
devicePropertyService.saveDeviceProperty(device, message);
|
||||
return null;
|
||||
}
|
||||
// 批量上报(属性+事件+子设备)
|
||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod())) {
|
||||
handlePackMessage(message, device);
|
||||
return null;
|
||||
}
|
||||
|
||||
// OTA 上报升级进度
|
||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.OTA_PROGRESS.getMethod())) {
|
||||
@@ -209,10 +234,435 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
return null;
|
||||
}
|
||||
|
||||
// TODO @芋艿:这里可以按需,添加别的逻辑;
|
||||
// 添加拓扑关系
|
||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.TOPO_ADD.getMethod())) {
|
||||
return handleTopoAdd(message, device);
|
||||
}
|
||||
// 删除拓扑关系
|
||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod())) {
|
||||
return handleTopoDelete(message, device);
|
||||
}
|
||||
|
||||
// 获取拓扑关系
|
||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.TOPO_GET.getMethod())) {
|
||||
return handleTopoGet(device);
|
||||
}
|
||||
|
||||
// 子设备动态注册
|
||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod())) {
|
||||
return handleSubDeviceRegister(message, device);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
// ========== 拓扑管理处理方法 ==========
|
||||
|
||||
// TODO @AI:是不是更适合在 deviceService 里面处理?
|
||||
/**
|
||||
* 处理添加拓扑关系请求
|
||||
*
|
||||
* @param message 消息
|
||||
* @param gatewayDevice 网关设备
|
||||
* @return 响应数据
|
||||
*/
|
||||
private Object handleTopoAdd(IotDeviceMessage message, IotDeviceDO gatewayDevice) {
|
||||
// TODO @AI:这里是不是 1.1 1.2 1.3 这样?修改前,我们确认下,有没模块是这么写的;iot 消息处理里;
|
||||
// 1. 校验网关设备类型
|
||||
if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) {
|
||||
throw exception(DEVICE_NOT_GATEWAY);
|
||||
}
|
||||
|
||||
// 2. 解析参数
|
||||
// TODO @AI:是不是 parseObject 增加一个方法,允许传入 object 类型,避免先转 jsonString 再 parseObject ;
|
||||
IotDeviceTopoAddReqDTO params = JsonUtils.parseObject(JsonUtils.toJsonString(message.getParams()),
|
||||
IotDeviceTopoAddReqDTO.class);
|
||||
if (params == null) {
|
||||
throw exception(DEVICE_TOPO_PARAMS_INVALID);
|
||||
}
|
||||
|
||||
// 3. 解析子设备信息
|
||||
IotDeviceAuthUtils.DeviceInfo subDeviceInfo = IotDeviceAuthUtils.parseUsername(params.getUsername());
|
||||
if (subDeviceInfo == null) {
|
||||
throw exception(DEVICE_TOPO_SUB_DEVICE_USERNAME_INVALID);
|
||||
}
|
||||
|
||||
// 4. 校验子设备认证信息
|
||||
// TODO @AI:链式调用;
|
||||
IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO();
|
||||
authReqDTO.setClientId(params.getClientId());
|
||||
authReqDTO.setUsername(params.getUsername());
|
||||
authReqDTO.setPassword(params.getPassword());
|
||||
if (!deviceService.authDevice(authReqDTO)) {
|
||||
throw exception(DEVICE_TOPO_SUB_DEVICE_AUTH_FAILED);
|
||||
}
|
||||
|
||||
// 5. 获取子设备
|
||||
IotDeviceDO subDevice = deviceService.getDeviceFromCache(subDeviceInfo.getProductKey(), subDeviceInfo.getDeviceName());
|
||||
if (subDevice == null) {
|
||||
throw exception(DEVICE_NOT_EXISTS);
|
||||
}
|
||||
|
||||
// 6. 校验子设备类型
|
||||
if (!IotProductDeviceTypeEnum.isGatewaySub(subDevice.getDeviceType())) {
|
||||
throw exception(DEVICE_NOT_GATEWAY_SUB, subDevice.getProductKey(), subDevice.getDeviceName());
|
||||
}
|
||||
|
||||
// 7. 绑定拓扑关系
|
||||
// TODO @AI:这里要考虑,校验是不是老设备已经绑定到其他网关了?
|
||||
deviceService.bindDeviceGateway(Collections.singletonList(subDevice.getId()), gatewayDevice.getId());
|
||||
log.info("[handleTopoAdd][网关({}/{}) 绑定子设备({}/{})]",
|
||||
gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(),
|
||||
subDevice.getProductKey(), subDevice.getDeviceName());
|
||||
|
||||
// 8. 发送拓扑变更通知
|
||||
sendTopoChangeNotify(gatewayDevice, "add", subDevice);
|
||||
return null;
|
||||
}
|
||||
|
||||
// TODO @AI:是不是更适合在 deviceService 里面处理?
|
||||
|
||||
/**
|
||||
* 处理删除拓扑关系请求
|
||||
*
|
||||
* @param message 消息
|
||||
* @param gatewayDevice 网关设备
|
||||
* @return 响应数据
|
||||
*/
|
||||
private Object handleTopoDelete(IotDeviceMessage message, IotDeviceDO gatewayDevice) {
|
||||
// 1. 校验网关设备类型
|
||||
if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) {
|
||||
throw exception(DEVICE_NOT_GATEWAY);
|
||||
}
|
||||
|
||||
// 2. 解析参数
|
||||
IotDeviceTopoDeleteReqDTO params = JsonUtils.parseObject(JsonUtils.toJsonString(message.getParams()),
|
||||
IotDeviceTopoDeleteReqDTO.class);
|
||||
if (params == null) {
|
||||
throw exception(DEVICE_TOPO_PARAMS_INVALID);
|
||||
}
|
||||
|
||||
// 3. 获取子设备
|
||||
IotDeviceDO subDevice = deviceService.getDeviceFromCache(params.getProductKey(), params.getDeviceName());
|
||||
if (subDevice == null) {
|
||||
throw exception(DEVICE_NOT_EXISTS);
|
||||
}
|
||||
|
||||
// 4. 校验子设备是否绑定到该网关
|
||||
if (!Objects.equal(subDevice.getGatewayId(), gatewayDevice.getId())) {
|
||||
throw exception(DEVICE_TOPO_SUB_NOT_BINDTO_GATEWAY, params.getProductKey(), params.getDeviceName());
|
||||
}
|
||||
|
||||
// 5. 解绑拓扑关系
|
||||
deviceService.unbindDeviceGateway(Collections.singletonList(subDevice.getId()));
|
||||
log.info("[handleTopoDelete][网关({}/{}) 解绑子设备({}/{})]",
|
||||
gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(),
|
||||
subDevice.getProductKey(), subDevice.getDeviceName());
|
||||
|
||||
// 6. 子设备下线
|
||||
if (Objects.equal(subDevice.getState(), IotDeviceStateEnum.ONLINE.getState())) {
|
||||
deviceService.updateDeviceState(subDevice, IotDeviceStateEnum.OFFLINE.getState());
|
||||
}
|
||||
|
||||
// 7. 发送拓扑变更通知
|
||||
sendTopoChangeNotify(gatewayDevice, "delete", subDevice);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理获取拓扑关系请求
|
||||
*
|
||||
* @param gatewayDevice 网关设备
|
||||
* @return 子设备列表
|
||||
*/
|
||||
private Object handleTopoGet(IotDeviceDO gatewayDevice) {
|
||||
// 1. 校验网关设备类型
|
||||
if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) {
|
||||
throw exception(DEVICE_NOT_GATEWAY);
|
||||
}
|
||||
|
||||
// 2. 获取子设备列表
|
||||
List<IotDeviceDO> subDevices = deviceService.getDeviceListByGatewayId(gatewayDevice.getId());
|
||||
|
||||
// 3. 转换为响应格式
|
||||
return convertList(subDevices, subDevice -> new IotDeviceTopoRespDTO()
|
||||
.setProductKey(subDevice.getProductKey())
|
||||
.setDeviceName(subDevice.getDeviceName()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送拓扑变更通知
|
||||
*
|
||||
* @param gatewayDevice 网关设备
|
||||
* @param changeType 变更类型:add/delete
|
||||
* @param subDevice 子设备
|
||||
*/
|
||||
private void sendTopoChangeNotify(IotDeviceDO gatewayDevice, String changeType, IotDeviceDO subDevice) {
|
||||
try {
|
||||
String serverId = devicePropertyService.getDeviceServerId(gatewayDevice.getId());
|
||||
if (StrUtil.isEmpty(serverId)) {
|
||||
log.warn("[sendTopoChangeNotify][网关({}/{}) serverId 为空,无法发送拓扑变更通知]",
|
||||
gatewayDevice.getProductKey(), gatewayDevice.getDeviceName());
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, Object> params = MapUtil.builder(new HashMap<String, Object>())
|
||||
.put("changeType", changeType)
|
||||
.put("subDevice", MapUtil.builder(new HashMap<String, Object>())
|
||||
.put("productKey", subDevice.getProductKey())
|
||||
.put("deviceName", subDevice.getDeviceName())
|
||||
.build())
|
||||
.build();
|
||||
|
||||
IotDeviceMessage notifyMessage = IotDeviceMessage.requestOf(
|
||||
IotDeviceMessageMethodEnum.TOPO_CHANGE.getMethod(), params);
|
||||
sendDeviceMessage(notifyMessage, gatewayDevice, serverId);
|
||||
} catch (Exception ex) {
|
||||
log.error("[sendTopoChangeNotify][发送拓扑变更通知失败,网关({}/{}), 子设备({}/{})]",
|
||||
gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(),
|
||||
subDevice.getProductKey(), subDevice.getDeviceName(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
// ========== 子设备注册处理方法 ==========
|
||||
|
||||
/**
|
||||
* 处理子设备动态注册请求
|
||||
*
|
||||
* @param message 消息
|
||||
* @param gatewayDevice 网关设备
|
||||
* @return 注册结果列表
|
||||
*/
|
||||
private Object handleSubDeviceRegister(IotDeviceMessage message, IotDeviceDO gatewayDevice) {
|
||||
// 1. 校验网关设备类型
|
||||
if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) {
|
||||
throw exception(DEVICE_NOT_GATEWAY);
|
||||
}
|
||||
|
||||
// 2. 解析参数(数组)
|
||||
List<IotSubDeviceRegisterReqDTO> paramsList;
|
||||
if (message.getParams() instanceof List) {
|
||||
paramsList = JsonUtils.parseArray(JsonUtils.toJsonString(message.getParams()),
|
||||
IotSubDeviceRegisterReqDTO.class);
|
||||
} else {
|
||||
throw exception(DEVICE_SUB_REGISTER_PARAMS_INVALID);
|
||||
}
|
||||
|
||||
if (paramsList == null || paramsList.isEmpty()) {
|
||||
throw exception(DEVICE_SUB_REGISTER_PARAMS_INVALID);
|
||||
}
|
||||
|
||||
// 3. 遍历注册每个子设备
|
||||
List<IotSubDeviceRegisterRespDTO> results = new ArrayList<>();
|
||||
for (IotSubDeviceRegisterReqDTO params : paramsList) {
|
||||
try {
|
||||
IotSubDeviceRegisterRespDTO result = registerSubDevice(params, gatewayDevice);
|
||||
results.add(result);
|
||||
} catch (Exception ex) {
|
||||
log.error("[handleSubDeviceRegister][子设备({}/{}) 注册失败]",
|
||||
params.getProductKey(), params.getDeviceName(), ex);
|
||||
// 继续处理其他子设备,不影响整体流程
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册单个子设备
|
||||
*
|
||||
* @param params 注册参数
|
||||
* @param gatewayDevice 网关设备
|
||||
* @return 注册结果
|
||||
*/
|
||||
private IotSubDeviceRegisterRespDTO registerSubDevice(IotSubDeviceRegisterReqDTO params,
|
||||
IotDeviceDO gatewayDevice) {
|
||||
// 1. 查找产品
|
||||
IotProductDO product = productService.getProductByProductKey(params.getProductKey());
|
||||
if (product == null) {
|
||||
throw exception(PRODUCT_NOT_EXISTS);
|
||||
}
|
||||
|
||||
// 2. 校验产品是否为网关子设备类型
|
||||
if (!IotProductDeviceTypeEnum.isGatewaySub(product.getDeviceType())) {
|
||||
throw exception(DEVICE_SUB_REGISTER_PRODUCT_NOT_GATEWAY_SUB, params.getProductKey());
|
||||
}
|
||||
|
||||
// 3. 查找设备是否已存在
|
||||
IotDeviceDO existDevice = deviceService.getDeviceFromCache(params.getProductKey(), params.getDeviceName());
|
||||
if (existDevice != null) {
|
||||
// 已存在则返回设备信息
|
||||
return new IotSubDeviceRegisterRespDTO()
|
||||
.setProductKey(existDevice.getProductKey())
|
||||
.setDeviceName(existDevice.getDeviceName())
|
||||
.setDeviceSecret(existDevice.getDeviceSecret());
|
||||
}
|
||||
|
||||
// 4. 创建新设备
|
||||
IotDeviceSaveReqVO createReqVO = new IotDeviceSaveReqVO()
|
||||
.setDeviceName(params.getDeviceName())
|
||||
.setProductId(product.getId())
|
||||
.setGatewayId(gatewayDevice.getId());
|
||||
Long deviceId = deviceService.createDevice(createReqVO);
|
||||
|
||||
// 5. 获取新创建的设备信息
|
||||
IotDeviceDO newDevice = deviceService.getDevice(deviceId);
|
||||
log.info("[registerSubDevice][网关({}/{}) 注册子设备({}/{})]",
|
||||
gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(),
|
||||
newDevice.getProductKey(), newDevice.getDeviceName());
|
||||
|
||||
return new IotSubDeviceRegisterRespDTO()
|
||||
.setProductKey(newDevice.getProductKey())
|
||||
.setDeviceName(newDevice.getDeviceName())
|
||||
.setDeviceSecret(newDevice.getDeviceSecret());
|
||||
}
|
||||
|
||||
// ========== 批量上报处理方法 ==========
|
||||
|
||||
/**
|
||||
* 处理批量上报消息
|
||||
* <p>
|
||||
* 将 pack 消息拆分成多条标准消息,递归处理
|
||||
*
|
||||
* @param packMessage 批量消息
|
||||
* @param gatewayDevice 网关设备
|
||||
*/
|
||||
private void handlePackMessage(IotDeviceMessage packMessage, IotDeviceDO gatewayDevice) {
|
||||
// 1. 解析参数
|
||||
IotDevicePropertyPackPostReqDTO params = JsonUtils.parseObject(
|
||||
JsonUtils.toJsonString(packMessage.getParams()),
|
||||
IotDevicePropertyPackPostReqDTO.class);
|
||||
if (params == null) {
|
||||
log.warn("[handlePackMessage][消息({}) 参数解析失败]", packMessage.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 处理网关自身属性
|
||||
if (params.getProperties() != null && !params.getProperties().isEmpty()) {
|
||||
Map<String, Object> gatewayProperties = convertPackProperties(params.getProperties());
|
||||
IotDeviceMessage gatewayMsg = IotDeviceMessage.builder()
|
||||
.id(IotDeviceMessageUtils.generateMessageId())
|
||||
.parentMessageId(packMessage.getId())
|
||||
.deviceId(gatewayDevice.getId())
|
||||
.tenantId(gatewayDevice.getTenantId())
|
||||
.serverId(packMessage.getServerId())
|
||||
.method(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
|
||||
.params(gatewayProperties)
|
||||
.reportTime(LocalDateTime.now())
|
||||
.build();
|
||||
// 直接调用处理,不通过消息总线
|
||||
try {
|
||||
devicePropertyService.saveDeviceProperty(gatewayDevice, gatewayMsg);
|
||||
getSelf().createDeviceLogAsync(gatewayMsg);
|
||||
} catch (Exception ex) {
|
||||
log.error("[handlePackMessage][网关({}) 属性处理失败]", gatewayDevice.getId(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 处理子设备数据
|
||||
if (params.getSubDevices() != null) {
|
||||
for (IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData : params.getSubDevices()) {
|
||||
try {
|
||||
handleSubDevicePackData(packMessage, subDeviceData);
|
||||
} catch (Exception ex) {
|
||||
log.error("[handlePackMessage][子设备({}/{}) 数据处理失败]",
|
||||
subDeviceData.getIdentity().getProductKey(),
|
||||
subDeviceData.getIdentity().getDeviceName(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理子设备的 pack 数据
|
||||
*
|
||||
* @param packMessage 原始 pack 消息
|
||||
* @param subDeviceData 子设备数据
|
||||
*/
|
||||
private void handleSubDevicePackData(IotDeviceMessage packMessage,
|
||||
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData) {
|
||||
// 1. 获取子设备
|
||||
IotDevicePropertyPackPostReqDTO.DeviceIdentity identity = subDeviceData.getIdentity();
|
||||
IotDeviceDO subDevice = deviceService.getDeviceFromCache(identity.getProductKey(), identity.getDeviceName());
|
||||
if (subDevice == null) {
|
||||
log.warn("[handleSubDevicePackData][子设备({}/{}) 不存在]",
|
||||
identity.getProductKey(), identity.getDeviceName());
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 处理子设备属性
|
||||
if (subDeviceData.getProperties() != null && !subDeviceData.getProperties().isEmpty()) {
|
||||
Map<String, Object> properties = convertPackProperties(subDeviceData.getProperties());
|
||||
IotDeviceMessage subMsg = IotDeviceMessage.builder()
|
||||
.id(IotDeviceMessageUtils.generateMessageId())
|
||||
.parentMessageId(packMessage.getId())
|
||||
.deviceId(subDevice.getId())
|
||||
.tenantId(subDevice.getTenantId())
|
||||
.serverId(packMessage.getServerId())
|
||||
.method(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
|
||||
.params(properties)
|
||||
.reportTime(LocalDateTime.now())
|
||||
.build();
|
||||
devicePropertyService.saveDeviceProperty(subDevice, subMsg);
|
||||
getSelf().createDeviceLogAsync(subMsg);
|
||||
}
|
||||
|
||||
// 3. 处理子设备事件(如果需要)
|
||||
// TODO: 事件处理可以后续扩展
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换 pack 属性格式为标准属性格式
|
||||
* <p>
|
||||
* pack 格式:{"temperature": {"value": 25.5, "time": 1524448722000}}
|
||||
* 标准格式:{"temperature": 25.5}
|
||||
*
|
||||
* @param packProperties pack 属性
|
||||
* @return 标准属性
|
||||
*/
|
||||
private Map<String, Object> convertPackProperties(Map<String, IotDevicePropertyPackPostReqDTO.PropertyValue> packProperties) {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
for (Map.Entry<String, IotDevicePropertyPackPostReqDTO.PropertyValue> entry : packProperties.entrySet()) {
|
||||
if (entry.getValue() != null) {
|
||||
result.put(entry.getKey(), entry.getValue().getValue());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// ========== 网关下线联动处理 ==========
|
||||
|
||||
// TODO @AI:是不是写到 deviceService 里更合适?更解耦。
|
||||
/**
|
||||
* 处理网关下线,联动所有子设备下线
|
||||
*
|
||||
* @param gatewayDevice 网关设备
|
||||
* @param serverId 服务标识
|
||||
*/
|
||||
private void handleGatewayOffline(IotDeviceDO gatewayDevice, String serverId) {
|
||||
// 1. 获取网关下所有子设备
|
||||
List<IotDeviceDO> subDevices = deviceService.getDeviceListByGatewayId(gatewayDevice.getId());
|
||||
if (subDevices == null || subDevices.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 将在线的子设备设置为下线
|
||||
for (IotDeviceDO subDevice : subDevices) {
|
||||
if (Objects.equal(subDevice.getState(), IotDeviceStateEnum.ONLINE.getState())) {
|
||||
try {
|
||||
deviceService.updateDeviceState(subDevice, IotDeviceStateEnum.OFFLINE.getState());
|
||||
log.info("[handleGatewayOffline][网关({}/{}) 下线,子设备({}/{}) 联动下线]",
|
||||
gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(),
|
||||
subDevice.getProductKey(), subDevice.getDeviceName());
|
||||
} catch (Exception ex) {
|
||||
log.error("[handleGatewayOffline][子设备({}/{}) 下线失败]",
|
||||
subDevice.getProductKey(), subDevice.getDeviceName(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageResult<IotDeviceMessageDO> getDeviceMessagePage(IotDeviceMessagePageReqVO pageReqVO) {
|
||||
try {
|
||||
|
||||
@@ -24,12 +24,28 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
|
||||
|
||||
// TODO 芋艿:要不要加个 ping 消息;
|
||||
|
||||
// ========== 拓扑管理 ==========
|
||||
// 可参考:https://help.aliyun.com/zh/iot/user-guide/manage-topological-relationships
|
||||
|
||||
TOPO_ADD("thing.topo.add", "添加拓扑关系", true),
|
||||
TOPO_DELETE("thing.topo.delete", "删除拓扑关系", true),
|
||||
TOPO_GET("thing.topo.get", "获取拓扑关系", true),
|
||||
TOPO_CHANGE("thing.topo.change", "拓扑关系变更通知", false),
|
||||
|
||||
// ========== 设备注册 ==========
|
||||
// 可参考:https://help.aliyun.com/zh/iot/user-guide/register-devices
|
||||
|
||||
SUB_DEVICE_REGISTER("thing.sub.register", "子设备动态注册", true),
|
||||
|
||||
// ========== 设备属性 ==========
|
||||
// 可参考:https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services
|
||||
|
||||
PROPERTY_POST("thing.property.post", "属性上报", true),
|
||||
PROPERTY_SET("thing.property.set", "属性设置", false),
|
||||
|
||||
// TODO @AI:改成 thing.property.pack.post
|
||||
PROPERTY_PACK_POST("thing.event.property.pack.post", "批量上报(属性 + 事件 + 子设备)", true), // 网关独有
|
||||
|
||||
// ========== 设备事件 ==========
|
||||
// 可参考:https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services
|
||||
|
||||
@@ -50,6 +66,7 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
|
||||
|
||||
OTA_UPGRADE("thing.ota.upgrade", "OTA 固定信息推送", false),
|
||||
OTA_PROGRESS("thing.ota.progress", "OTA 升级进度上报", true),
|
||||
|
||||
;
|
||||
|
||||
public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod)
|
||||
@@ -60,7 +77,8 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
|
||||
*/
|
||||
public static final Set<String> REPLY_DISABLED = SetUtils.asSet(
|
||||
STATE_UPDATE.getMethod(),
|
||||
OTA_PROGRESS.getMethod() // 参考阿里云,OTA 升级进度上报,不进行回复
|
||||
OTA_PROGRESS.getMethod(), // 参考阿里云,OTA 升级进度上报,不进行回复
|
||||
TOPO_CHANGE.getMethod() // 拓扑变更通知,下行消息,不需要回复 TODO @AI:看看阿里云的文档,确认下是不是这样的
|
||||
);
|
||||
|
||||
private final String method;
|
||||
|
||||
@@ -94,6 +94,42 @@ public class IotDeviceMessage {
|
||||
*/
|
||||
private String msg;
|
||||
|
||||
/**
|
||||
* 父消息 ID
|
||||
* <p>
|
||||
* - null:原始消息,需要 reply
|
||||
* - 非 null:从父消息(如 pack)拆分而来,不需要单独 reply
|
||||
*/
|
||||
private String parentMessageId;
|
||||
|
||||
// TODO @TODO @AI:抽到工具类里,具体哪个,一起讨论下;
|
||||
/**
|
||||
* 判断是否需要发送 reply
|
||||
*
|
||||
* @return 是否需要回复
|
||||
*/
|
||||
public boolean needReply() {
|
||||
// 1. 来自拆包的消息,不单独 reply
|
||||
if (parentMessageId != null) {
|
||||
return false;
|
||||
}
|
||||
// 2. 某些方法本身不需要 reply(如 STATE_UPDATE)
|
||||
if (IotDeviceMessageMethodEnum.isReplyDisabled(method)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO @AI:没用的字段,删除删除
|
||||
/**
|
||||
* 获取原始消息 ID(用于日志追踪)
|
||||
*
|
||||
* @return 原始消息 ID
|
||||
*/
|
||||
public String getOriginMessageId() {
|
||||
return parentMessageId != null ? parentMessageId : id;
|
||||
}
|
||||
|
||||
// ========== 基础方法:只传递"codec(编解码)字段" ==========
|
||||
|
||||
public static IotDeviceMessage requestOf(String method) {
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
package cn.iocoder.yudao.module.iot.core.topic.auth;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import lombok.Data;
|
||||
|
||||
// TODO @AI:不用带 device 关键字;
|
||||
// TODO @AI:挂个阿里云的链接,https://help.aliyun.com/zh/iot/user-guide/register-devices 的「子设备的 MQTT 动态注册」小节
|
||||
/**
|
||||
* IoT 子设备动态注册 Request DTO
|
||||
* <p>
|
||||
* 用于 thing.sub.register 消息的 params 数组元素
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public class IotSubDeviceRegisterReqDTO {
|
||||
|
||||
/**
|
||||
* 子设备 ProductKey
|
||||
*/
|
||||
@NotEmpty(message = "产品标识不能为空")
|
||||
private String productKey;
|
||||
|
||||
/**
|
||||
* 子设备 DeviceName
|
||||
*/
|
||||
@NotEmpty(message = "设备名称不能为空")
|
||||
private String deviceName;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package cn.iocoder.yudao.module.iot.core.topic.auth;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
// TODO @AI:修复建议,参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/auth/IotSubDeviceRegisterReqDTO.java
|
||||
/**
|
||||
* IoT 子设备动态注册 Response DTO
|
||||
* <p>
|
||||
* 用于 thing.sub.register 响应的设备信息
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public class IotSubDeviceRegisterRespDTO {
|
||||
|
||||
/**
|
||||
* 子设备 ProductKey
|
||||
*/
|
||||
private String productKey;
|
||||
|
||||
/**
|
||||
* 子设备 DeviceName
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 分配的 DeviceSecret
|
||||
*/
|
||||
private String deviceSecret;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* TODO @AI:写下注释
|
||||
*/
|
||||
package cn.iocoder.yudao.module.iot.core.topic;
|
||||
@@ -0,0 +1,164 @@
|
||||
package cn.iocoder.yudao.module.iot.core.topic.property;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
// TODO @AI:挂个阿里云的链接,http://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services 的「设备批量上报属性、事件」小节
|
||||
/**
|
||||
* IoT 设备属性批量上报 Request DTO
|
||||
* <p>
|
||||
* 用于 thing.event.property.pack.post 消息的 params 参数
|
||||
* 参考阿里云 Alink 协议
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public class IotDevicePropertyPackPostReqDTO {
|
||||
|
||||
// TODO @AI:去掉里面的 time,直接平铺值(可能就是直接的 map);例如说 "Power":value, 而不是 PropertyValue "properties": {
|
||||
// "Power": [
|
||||
// {
|
||||
// "value": "on",
|
||||
// "time": 1524448722000
|
||||
// },
|
||||
// {
|
||||
// "value": "off",
|
||||
// "time": 1524448722001
|
||||
// }
|
||||
// ],
|
||||
// "WF": [
|
||||
// {
|
||||
// "value": 3,
|
||||
// "time": 1524448722000
|
||||
// },
|
||||
// {
|
||||
// "value": 4,
|
||||
// "time": 1524448722009
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
|
||||
/**
|
||||
* 网关自身属性
|
||||
* <p>
|
||||
* key: 属性标识符
|
||||
* value: 属性值对象(包含 value 和 time)
|
||||
*/
|
||||
private Map<String, PropertyValue> properties;
|
||||
|
||||
// TODO @AI:EventValue {
|
||||
//
|
||||
// "method": "thing.event.post",
|
||||
//
|
||||
// "version": "1.0",
|
||||
//
|
||||
// "params": {
|
||||
//
|
||||
// "identifier": "eat",
|
||||
//
|
||||
// "params": {
|
||||
//
|
||||
// "rice": 100
|
||||
//
|
||||
// }
|
||||
//
|
||||
// }
|
||||
//
|
||||
//}
|
||||
|
||||
|
||||
/**
|
||||
* 网关自身事件
|
||||
* <p>
|
||||
* key: 事件标识符
|
||||
* value: 事件值对象(包含 value 和 time)
|
||||
*/
|
||||
private Map<String, EventValue> events;
|
||||
|
||||
/**
|
||||
* 子设备数据列表
|
||||
*/
|
||||
private List<SubDeviceData> subDevices;
|
||||
|
||||
/**
|
||||
* 属性值对象
|
||||
*/
|
||||
@Data
|
||||
public static class PropertyValue {
|
||||
|
||||
/**
|
||||
* 属性值
|
||||
*/
|
||||
private Object value;
|
||||
|
||||
/**
|
||||
* 上报时间(毫秒时间戳)
|
||||
*/
|
||||
private Long time;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 事件值对象
|
||||
*/
|
||||
@Data
|
||||
public static class EventValue {
|
||||
|
||||
/**
|
||||
* 事件参数
|
||||
*/
|
||||
private Object value;
|
||||
|
||||
/**
|
||||
* 上报时间(毫秒时间戳)
|
||||
*/
|
||||
private Long time;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 子设备数据
|
||||
*/
|
||||
@Data
|
||||
public static class SubDeviceData {
|
||||
|
||||
/**
|
||||
* 子设备标识
|
||||
*/
|
||||
private DeviceIdentity identity;
|
||||
|
||||
/**
|
||||
* 子设备属性
|
||||
*/
|
||||
private Map<String, PropertyValue> properties;
|
||||
|
||||
/**
|
||||
* 子设备事件
|
||||
*/
|
||||
private Map<String, EventValue> events;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 设备标识
|
||||
*/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
public static class DeviceIdentity {
|
||||
|
||||
/**
|
||||
* 产品标识
|
||||
*/
|
||||
private String productKey;
|
||||
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package cn.iocoder.yudao.module.iot.core.topic.topo;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import lombok.Data;
|
||||
|
||||
// TODO @AI:得一起讨论下,到底使用什么后缀合适:1)一方面要体现出请求、响应;2)一方面体现出上下行(设备 to server,还是 server to 设备),可以一起讨论?
|
||||
// TODO @AI:文档地址:https://help.aliyun.com/zh/iot/user-guide/manage-topological-relationships?spm=a2c4g.11186623.help-menu-30520.d_2_2_7_3_2.2e983f47Z2iGbo&scm=20140722.H_89299._.OR_help-T_cn~zh-V_1#section-w33-vyg-12b
|
||||
/**
|
||||
* IoT 设备拓扑添加 Request DTO
|
||||
* <p>
|
||||
* 用于 thing.topo.add 消息的 params 参数
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public class IotDeviceTopoAddReqDTO {
|
||||
|
||||
// TODO @AI:是个数组;
|
||||
// TODO @AI:有响应结果的;
|
||||
|
||||
/**
|
||||
* 子设备客户端 ID
|
||||
*/
|
||||
@NotEmpty(message = "客户端 ID 不能为空")
|
||||
private String clientId;
|
||||
|
||||
/**
|
||||
* 子设备用户名
|
||||
*/
|
||||
@NotEmpty(message = "用户名不能为空")
|
||||
private String username;
|
||||
|
||||
/**
|
||||
* 子设备认证密码
|
||||
*/
|
||||
@NotEmpty(message = "密码不能为空")
|
||||
private String password;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package cn.iocoder.yudao.module.iot.core.topic.topo;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import lombok.Data;
|
||||
|
||||
// TODO @AI:是个数组;https://help.aliyun.com/zh/iot/user-guide/manage-topological-relationships?spm=a2c4g.11186623.help-menu-30520.d_2_2_7_3_2.540c390beZSDOG&scm=20140722.H_89299._.OR_help-T_cn~zh-V_1#section-rb1-wzw-y2b
|
||||
|
||||
// todo @AI:删除完,需要有个 reply 响应;
|
||||
|
||||
/**
|
||||
* IoT 设备拓扑删除 Request DTO
|
||||
* <p>
|
||||
* 用于 thing.topo.delete 消息的 params 参数
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public class IotDeviceTopoDeleteReqDTO {
|
||||
|
||||
/**
|
||||
* 子设备 ProductKey
|
||||
*/
|
||||
@NotEmpty(message = "产品标识不能为空")
|
||||
private String productKey;
|
||||
|
||||
/**
|
||||
* 子设备 DeviceName
|
||||
*/
|
||||
@NotEmpty(message = "设备名称不能为空")
|
||||
private String deviceName;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package cn.iocoder.yudao.module.iot.core.topic.topo;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
// TODO @AI:是不是改成 IotDeviceTopoGetRespDTO
|
||||
/**
|
||||
* IoT 设备拓扑关系 Response DTO
|
||||
* <p>
|
||||
* 用于 thing.topo.get 响应的子设备信息
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public class IotDeviceTopoRespDTO {
|
||||
|
||||
/**
|
||||
* 子设备 ProductKey
|
||||
*/
|
||||
private String productKey;
|
||||
|
||||
/**
|
||||
* 子设备 DeviceName
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
}
|
||||
@@ -63,4 +63,50 @@ public final class IotMqttTopicUtils {
|
||||
return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/" + topicSuffix;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建拓扑管理 Topic
|
||||
* <p>
|
||||
* 拓扑管理类 Topic 使用网关设备的 productKey/deviceName
|
||||
*
|
||||
* @param method 方法,如 thing.topo.add
|
||||
* @param gatewayProductKey 网关 ProductKey
|
||||
* @param gatewayDeviceName 网关 DeviceName
|
||||
* @param isReply 是否为响应
|
||||
* @return Topic
|
||||
*/
|
||||
public static String buildTopoTopic(String method, String gatewayProductKey,
|
||||
String gatewayDeviceName, boolean isReply) {
|
||||
return buildTopicByMethod(method, gatewayProductKey, gatewayDeviceName, isReply);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为拓扑管理 Topic(通过 method 判断)
|
||||
*
|
||||
* @param method 消息方法
|
||||
* @return 是否为拓扑管理 Topic
|
||||
*/
|
||||
public static boolean isTopoMethod(String method) {
|
||||
return method != null && method.startsWith("thing.topo.");
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为子设备注册 Topic
|
||||
*
|
||||
* @param method 消息方法
|
||||
* @return 是否为子设备注册 Topic
|
||||
*/
|
||||
public static boolean isSubDeviceRegisterMethod(String method) {
|
||||
return "thing.sub.register".equals(method);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为批量上报 Topic
|
||||
*
|
||||
* @param method 消息方法
|
||||
* @return 是否为批量上报 Topic
|
||||
*/
|
||||
public static boolean isPackPostMethod(String method) {
|
||||
return "thing.event.property.pack.post".equals(method);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user