From bfbc352a1ca80c7ddedd5dfa535b4083218ebe86 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sat, 24 Jan 2026 13:12:37 +0800 Subject: [PATCH] =?UTF-8?q?feat(iot):=E3=80=90=E7=BD=91=E5=85=B3=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=EF=BC=9A65%=E3=80=91=E6=95=B4=E4=BD=93=E5=88=9D?= =?UTF-8?q?=E6=AD=A5=E5=AE=9E=E7=8E=B0=EF=BC=88=E4=BC=98=E5=8C=96=E9=83=A8?= =?UTF-8?q?=E5=88=86=E4=BB=A3=E7=A0=81=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../framework/common/util/json/JsonUtils.java | 33 ++ .../iot/service/device/IotDeviceService.java | 41 ++ .../service/device/IotDeviceServiceImpl.java | 274 ++++++++- .../message/IotDeviceMessageServiceImpl.java | 526 +++--------------- .../iot/core/mq/message/IotDeviceMessage.java | 17 + .../iot/core/topic/IotDeviceIdentify.java | 6 - .../iot/core/topic/IotDeviceIdentity.java | 32 ++ .../auth/IotSubDeviceRegisterRespDTO.java | 4 + .../topic/event/IotDeviceEventPostReqDTO.java | 57 ++ .../IotDevicePropertyPackPostReqDTO.java | 54 +- .../property/IotDevicePropertyPostReqDTO.java | 36 ++ .../topic/topo/IotDeviceTopoDeleteReqDTO.java | 27 +- .../topic/topo/IotDeviceTopoGetReqDTO.java | 16 + .../topic/topo/IotDeviceTopoGetRespDTO.java | 24 + .../core/topic/topo/IotDeviceTopoRespDTO.java | 28 - 15 files changed, 599 insertions(+), 576 deletions(-) delete mode 100644 yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/IotDeviceIdentify.java create mode 100644 yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/IotDeviceIdentity.java create mode 100644 yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/event/IotDeviceEventPostReqDTO.java create mode 100644 yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/property/IotDevicePropertyPostReqDTO.java create mode 100644 yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoGetReqDTO.java create mode 100644 yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoGetRespDTO.java delete mode 100644 yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoRespDTO.java diff --git a/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/json/JsonUtils.java b/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/json/JsonUtils.java index e35cd9b437..fe55d9bf51 100644 --- a/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/json/JsonUtils.java +++ b/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/json/JsonUtils.java @@ -229,4 +229,37 @@ public class JsonUtils { return JSONUtil.isTypeJSONObject(str); } + /** + * 将 Object 转换为目标类型 + *

+ * 避免先转 jsonString 再 parseObject 的性能损耗 + * + * @param obj 源对象(可以是 Map、POJO 等) + * @param clazz 目标类型 + * @return 转换后的对象 + */ + public static T convertObject(Object obj, Class clazz) { + if (obj == null) { + return null; + } + if (clazz.isInstance(obj)) { + return clazz.cast(obj); + } + return objectMapper.convertValue(obj, clazz); + } + + /** + * 将 Object 转换为目标类型(支持泛型) + * + * @param obj 源对象 + * @param typeReference 目标类型引用 + * @return 转换后的对象 + */ + public static T convertObject(Object obj, TypeReference typeReference) { + if (obj == null) { + return null; + } + return objectMapper.convertValue(obj, typeReference); + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java index 5ddc973667..d69195a98c 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java @@ -4,6 +4,10 @@ import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.*; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetRespDTO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; import jakarta.validation.Valid; @@ -321,4 +325,41 @@ public interface IotDeviceService { */ List getDeviceListByGatewayId(Long gatewayId); + // ========== 网关-拓扑管理(设备上报) ========== + + /** + * 处理添加拓扑关系消息(网关设备上报) + * + * @param message 消息 + * @param gatewayDevice 网关设备 + * @return 成功添加的子设备列表 + */ + List handleTopoAddMessage(IotDeviceMessage message, IotDeviceDO gatewayDevice); + + /** + * 处理删除拓扑关系消息(网关设备上报) + * + * @param message 消息 + * @param gatewayDevice 网关设备 + * @return 成功删除的子设备列表 + */ + List handleTopoDeleteMessage(IotDeviceMessage message, IotDeviceDO gatewayDevice); + + /** + * 处理获取拓扑关系消息(网关设备上报) + * + * @param gatewayDevice 网关设备 + * @return 拓扑关系响应 + */ + IotDeviceTopoGetRespDTO handleTopoGetMessage(IotDeviceDO gatewayDevice); + + /** + * 处理子设备动态注册消息(网关设备上报) + * + * @param message 消息 + * @param gatewayDevice 网关设备 + * @return 注册结果列表 + */ + List handleSubDeviceRegisterMessage(IotDeviceMessage message, IotDeviceDO gatewayDevice); + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java index 569085f89f..4104a30617 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java @@ -7,6 +7,7 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.exception.ServiceException; import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.framework.common.util.object.BeanUtils; import cn.iocoder.yudao.framework.common.util.validation.ValidationUtils; import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore; @@ -14,6 +15,13 @@ import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.*; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetRespDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceGroupDO; @@ -64,6 +72,10 @@ public class IotDeviceServiceImpl implements IotDeviceService { @Override public Long createDevice(IotDeviceSaveReqVO createReqVO) { + return createDevice0(createReqVO).getId(); + } + + private IotDeviceDO createDevice0(IotDeviceSaveReqVO createReqVO) { // 1.1 校验产品是否存在 IotProductDO product = productService.getProduct(createReqVO.getProductId()); if (product == null) { @@ -81,7 +93,7 @@ public class IotDeviceServiceImpl implements IotDeviceService { IotDeviceDO device = BeanUtils.toBean(createReqVO, IotDeviceDO.class); initDevice(device, product); deviceMapper.insert(device); - return device.getId(); + return device; } private void validateCreateDeviceParam(String productKey, String deviceName, @@ -298,6 +310,37 @@ public class IotDeviceServiceImpl implements IotDeviceService { // 2. 清空对应缓存 deleteDeviceCache(device); + + // 3. 网关设备下线时,联动所有子设备下线 + if (Objects.equals(state, IotDeviceStateEnum.OFFLINE.getState()) + && IotProductDeviceTypeEnum.isGateway(device.getDeviceType())) { + handleGatewayOffline(device); + } + } + + /** + * 处理网关下线,联动所有子设备下线 + * + * @param gatewayDevice 网关设备 + */ + private void handleGatewayOffline(IotDeviceDO gatewayDevice) { + List subDevices = deviceMapper.selectListByGatewayId(gatewayDevice.getId()); + if (CollUtil.isEmpty(subDevices)) { + return; + } + for (IotDeviceDO subDevice : subDevices) { + if (Objects.equals(subDevice.getState(), IotDeviceStateEnum.ONLINE.getState())) { + try { + updateDeviceState(subDevice, IotDeviceStateEnum.OFFLINE.getState()); + log.info("[handleGatewayOffline][网关({}/{}) 下线,子设备({}/{}) 联动下线]", + gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), + subDevice.getProductKey(), subDevice.getDeviceName()); + } catch (Exception ex) { + log.error("[handleGatewayOffline][子设备({}/{}) 下线失败]", + subDevice.getProductKey(), subDevice.getDeviceName(), ex); + } + } + } } @Override @@ -533,19 +576,10 @@ public class IotDeviceServiceImpl implements IotDeviceService { } // 1.1 校验网关设备存在且类型正确 validateGatewayDeviceExists(gatewayId); - // 1.2 校验子设备存在 + // 1.2 校验每个设备是否可绑定 List devices = deviceMapper.selectByIds(ids); - if (devices.size() != ids.size()) { - throw exception(DEVICE_NOT_EXISTS); - } - // 1.3 校验每个设备是否可绑定 for (IotDeviceDO device : devices) { - if (!IotProductDeviceTypeEnum.isGatewaySub(device.getDeviceType())) { - throw exception(DEVICE_NOT_GATEWAY_SUB, device.getProductKey(), device.getDeviceName()); - } - if (device.getGatewayId() != null && !device.getGatewayId().equals(gatewayId)) { - throw exception(DEVICE_GATEWAY_BINDTO_EXISTS, device.getProductKey(), device.getDeviceName()); - } + checkSubDeviceCanBind(device, gatewayId); } // 2. 批量更新数据库 @@ -555,21 +589,33 @@ public class IotDeviceServiceImpl implements IotDeviceService { // 3. 清空对应缓存 deleteDeviceCache(devices); + + // TODO @AI:需要下发网关设备,让其建立拓扑关系吗?(增加) + } + + private void checkSubDeviceCanBind(IotDeviceDO device, Long gatewayId) { + if (!IotProductDeviceTypeEnum.isGatewaySub(device.getDeviceType())) { + throw exception(DEVICE_NOT_GATEWAY_SUB, device.getProductKey(), device.getDeviceName()); + } + if (ObjUtil.equals(device.getGatewayId(), gatewayId)) { + throw exception(DEVICE_GATEWAY_BINDTO_EXISTS, device.getProductKey(), device.getDeviceName()); + } } @Override @Transactional(rollbackFor = Exception.class) public void unbindDeviceGateway(Collection ids) { + // 1. 校验设备存在 if (CollUtil.isEmpty(ids)) { return; } - // 1. 校验设备存在 List devices = deviceMapper.selectByIds(ids); - if (devices.size() != ids.size()) { - throw exception(DEVICE_NOT_EXISTS); + if (CollUtil.isNotEmpty(devices)) { + return; } // 2. 批量更新数据库(将 gatewayId 设置为 null) + // TODO @AI:需要搞个方法,专门批量更新某个字段为 null。 List updateList = devices.stream() .filter(device -> device.getGatewayId() != null) .map(device -> new IotDeviceDO().setId(device.getId()).setGatewayId(null)) @@ -580,6 +626,8 @@ public class IotDeviceServiceImpl implements IotDeviceService { // 3. 清空对应缓存 deleteDeviceCache(devices); + + // TODO @AI:需要下发网关设备,让其建立拓扑关系吗?(减少) } @Override @@ -592,6 +640,202 @@ public class IotDeviceServiceImpl implements IotDeviceService { return deviceMapper.selectListByGatewayId(gatewayId); } + // ========== 网关-拓扑管理(设备上报) ========== + + @Override + public List handleTopoAddMessage(IotDeviceMessage message, IotDeviceDO gatewayDevice) { + // 1.1 校验网关设备类型 + if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) { + throw exception(DEVICE_NOT_GATEWAY); + } + // 1.2 解析参数 + IotDeviceTopoAddReqDTO params = JsonUtils.parseObject(JsonUtils.toJsonString(message.getParams()), + IotDeviceTopoAddReqDTO.class); + if (params == null || CollUtil.isEmpty(params.getSubDevices())) { + throw exception(DEVICE_TOPO_PARAMS_INVALID); + } + + // 2. 遍历处理每个子设备 + List addedSubDevices = new ArrayList<>(); + for (IotDeviceAuthReqDTO subDeviceAuth : params.getSubDevices()) { + try { + IotDeviceDO subDevice = addDeviceTopo(gatewayDevice, subDeviceAuth); + addedSubDevices.add(new IotDeviceIdentity(subDevice.getProductKey(), subDevice.getDeviceName())); + } catch (Exception ex) { + log.warn("[handleTopoAddMessage][网关({}/{}) 添加子设备失败,message={}]", + gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), message, ex); + } + } + + // 3. 返回响应数据(包含成功添加的子设备列表) + return addedSubDevices; + } + + private IotDeviceDO addDeviceTopo(IotDeviceDO gatewayDevice, IotDeviceAuthReqDTO subDeviceAuth) { + // 1.1 解析子设备信息 + IotDeviceAuthUtils.DeviceInfo subDeviceInfo = IotDeviceAuthUtils.parseUsername(subDeviceAuth.getUsername()); + if (subDeviceInfo == null) { + throw exception(DEVICE_TOPO_SUB_DEVICE_USERNAME_INVALID); + } + // 1.2 校验子设备认证信息 + if (!authDevice(subDeviceAuth)) { + throw exception(DEVICE_TOPO_SUB_DEVICE_AUTH_FAILED); + } + // 1.3 获取子设备 + IotDeviceDO subDevice = getSelf().getDeviceFromCache(subDeviceInfo.getProductKey(), subDeviceInfo.getDeviceName()); + if (subDevice == null) { + throw exception(DEVICE_NOT_EXISTS); + } + // 1.4 校验子设备类型 + checkSubDeviceCanBind(subDevice, gatewayDevice.getId()); + + // 2. 更新数据库 + deviceMapper.updateById(new IotDeviceDO().setId(subDevice.getId()).setGatewayId(subDevice.getGatewayId())); + log.info("[addDeviceTopo][网关({}/{}) 绑定子设备({}/{})]", + gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), + subDevice.getProductKey(), subDevice.getDeviceName()); + + // 3. 清空对应缓存 + deleteDeviceCache(subDevice); + return subDevice; + } + + @Override + public List handleTopoDeleteMessage(IotDeviceMessage message, IotDeviceDO gatewayDevice) { + // 1.1 校验网关设备类型 + if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) { + throw exception(DEVICE_NOT_GATEWAY); + } + // 1.2 解析参数 + IotDeviceTopoDeleteReqDTO params = JsonUtils.parseObject(JsonUtils.toJsonString(message.getParams()), + IotDeviceTopoDeleteReqDTO.class); + if (params == null || CollUtil.isEmpty(params.getSubDevices())) { + throw exception(DEVICE_TOPO_PARAMS_INVALID); + } + + // 2. 遍历处理每个子设备 + List deletedSubDevices = new ArrayList<>(); + for (IotDeviceIdentity subDeviceIdentity : params.getSubDevices()) { + try { + deleteDeviceTopo(gatewayDevice, subDeviceIdentity); + deletedSubDevices.add(subDeviceIdentity); + } catch (Exception ex) { + log.warn("[handleTopoDeleteMessage][网关({}/{}) 删除子设备失败,productKey={}, deviceName={}]", + gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), + subDeviceIdentity.getProductKey(), subDeviceIdentity.getDeviceName(), ex); + } + } + + // 3. 返回响应数据(包含成功删除的子设备列表) + return deletedSubDevices; + } + + private void deleteDeviceTopo(IotDeviceDO gatewayDevice, IotDeviceIdentity subDeviceIdentity) { + // 1.1 获取子设备 + IotDeviceDO subDevice = getSelf().getDeviceFromCache(subDeviceIdentity.getProductKey(), subDeviceIdentity.getDeviceName()); + if (subDevice == null) { + throw exception(DEVICE_NOT_EXISTS); + } + // 1.2 校验子设备是否绑定到该网关 + if (ObjUtil.notEqual(subDevice.getGatewayId(), gatewayDevice.getId())) { + throw exception(DEVICE_TOPO_SUB_NOT_BINDTO_GATEWAY, + subDeviceIdentity.getProductKey(), subDeviceIdentity.getDeviceName()); + } + + // 2. 更新数据库 + // TODO @AI:直接调用更新方法; +// unbindDeviceGateway(Collections.singletonList(subDevice.getId())); + log.info("[deleteDeviceTopo][网关({}/{}) 解绑子设备({}/{})]", + gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), + subDevice.getProductKey(), subDevice.getDeviceName()); + + // 3. 清空对应缓存 + deleteDeviceCache(subDevice); + + // 4. 子设备下线 + if (Objects.equals(subDevice.getState(), IotDeviceStateEnum.ONLINE.getState())) { + updateDeviceState(subDevice, IotDeviceStateEnum.OFFLINE.getState()); + } + } + + @Override + public IotDeviceTopoGetRespDTO handleTopoGetMessage(IotDeviceDO gatewayDevice) { + // 1. 校验网关设备类型 + if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) { + throw exception(DEVICE_NOT_GATEWAY); + } + + // 2. 获取子设备列表并转换 + List subDevices = deviceMapper.selectListByGatewayId(gatewayDevice.getId()); + List subDeviceIdentities = convertList(subDevices, subDevice -> + new IotDeviceIdentity(subDevice.getProductKey(), subDevice.getDeviceName())); + return new IotDeviceTopoGetRespDTO().setSubDevices(subDeviceIdentities); + } + + @Override + public List handleSubDeviceRegisterMessage(IotDeviceMessage message, IotDeviceDO gatewayDevice) { + // 1.1 校验网关设备类型 + if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) { + throw exception(DEVICE_NOT_GATEWAY); + } + // 1.2 解析参数 + if (!(message.getParams() instanceof List)) { + throw exception(DEVICE_SUB_REGISTER_PARAMS_INVALID); + } + // TODO @AI:这个要不也弄到 JsonUtils 里面去?感觉类似 convertObject 呀。 + List paramsList = JsonUtils.parseArray(JsonUtils.toJsonString(message.getParams()), + IotSubDeviceRegisterReqDTO.class); + if (CollUtil.isEmpty(paramsList)) { + throw exception(DEVICE_SUB_REGISTER_PARAMS_INVALID); + } + + // 2. 遍历注册每个子设备 + List results = new ArrayList<>(paramsList.size()); + for (IotSubDeviceRegisterReqDTO params : paramsList) { + try { + IotDeviceDO device = registerSubDevice(gatewayDevice, params); + results.add(new IotSubDeviceRegisterRespDTO( + params.getProductKey(), params.getDeviceName(), device.getDeviceSecret())); + } catch (Exception ex) { + log.error("[handleSubDeviceRegisterMessage][子设备({}/{}) 注册失败]", + params.getProductKey(), params.getDeviceName(), ex); + } + } + + // 3. 返回响应数据(包含成功注册的子设备列表) + return results; + } + + private IotDeviceDO registerSubDevice(IotDeviceDO gatewayDevice, IotSubDeviceRegisterReqDTO params) { + // 1.1 校验产品 + IotProductDO product = productService.getProductByProductKey(params.getProductKey()); + if (product == null) { + throw exception(PRODUCT_NOT_EXISTS); + } + // 1.2 校验产品是否为网关子设备类型 + if (!IotProductDeviceTypeEnum.isGatewaySub(product.getDeviceType())) { + throw exception(DEVICE_SUB_REGISTER_PRODUCT_NOT_GATEWAY_SUB, params.getProductKey()); + } + // 1.3 查找设备是否已存在 + // TODO @AI:存在的时候,必须父设备是自己,才返回,否则抛出业务异常; + IotDeviceDO existDevice = getSelf().getDeviceFromCache(params.getProductKey(), params.getDeviceName()); + if (existDevice != null) { + // 已存在则返回设备信息 + return existDevice; + } + + // 2. 创建新设备 + IotDeviceSaveReqVO createReqVO = new IotDeviceSaveReqVO() + .setDeviceName(params.getDeviceName()) + .setProductId(product.getId()) + .setGatewayId(gatewayDevice.getId()); + IotDeviceDO newDevice = createDevice0(createReqVO); + log.info("[registerSubDevice][网关({}/{}) 注册子设备({}/{})]", + gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), + newDevice.getProductKey(), newDevice.getDeviceName()); + return newDevice; + } + private IotDeviceServiceImpl getSelf() { return SpringUtil.getBean(getClass()); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java index 302b4bfdfd..36f65c3c5a 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java @@ -4,7 +4,6 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.exception.ServiceException; @@ -12,30 +11,21 @@ import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.common.util.date.LocalDateTimeUtils; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.framework.common.util.object.BeanUtils; -import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.IotDeviceSaveReqVO; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.message.IotDeviceMessagePageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO; -import cn.iocoder.yudao.module.iot.core.biz.dto.*; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; -import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO; -import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO; -import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO; -import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO; -import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO; -import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoRespDTO; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceMessageDO; -import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO; import cn.iocoder.yudao.module.iot.dal.tdengine.IotDeviceMessageMapper; -import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum; import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; -import cn.iocoder.yudao.module.iot.service.product.IotProductService; import cn.iocoder.yudao.module.iot.service.device.property.IotDevicePropertyService; import cn.iocoder.yudao.module.iot.service.ota.IotOtaTaskRecordService; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -50,11 +40,12 @@ import org.springframework.validation.annotation.Validated; import java.sql.Timestamp; import java.time.LocalDateTime; -import java.util.*; +import java.util.List; +import java.util.Map; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList; -import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.*; +import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL; /** * IoT 设备消息 Service 实现类 @@ -73,9 +64,6 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { @Resource @Lazy // 延迟加载,避免循环依赖 private IotOtaTaskRecordService otaTaskRecordService; - @Resource - @Lazy // 延迟加载,避免循环依赖 - private IotProductService productService; @Resource private IotDeviceMessageMapper deviceMessageMapper; @@ -210,11 +198,6 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { Assert.notEmpty(stateStr, "设备状态不能为空"); Integer state = Integer.valueOf(stateStr); deviceService.updateDeviceState(device, state); - // 特殊:网关设备下线时,网关子设备联动下线 - if (Objects.equal(state, IotDeviceStateEnum.OFFLINE.getState()) - && IotProductDeviceTypeEnum.isGateway(device.getDeviceType())) { - handleGatewayOffline(device, message.getServerId()); - } return null; } @@ -237,349 +220,31 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { // 添加拓扑关系 if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.TOPO_ADD.getMethod())) { - return handleTopoAdd(message, device); + return deviceService.handleTopoAddMessage(message, device); } // 删除拓扑关系 if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod())) { - return handleTopoDelete(message, device); + return deviceService.handleTopoDeleteMessage(message, device); } // 获取拓扑关系 if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.TOPO_GET.getMethod())) { - return handleTopoGet(device); + return deviceService.handleTopoGetMessage(device); } // 子设备动态注册 if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod())) { - return handleSubDeviceRegister(message, device); + return deviceService.handleSubDeviceRegisterMessage(message, device); } return null; } - // ========== 拓扑管理处理方法 ========== - - // TODO @AI:是不是更适合在 deviceService 里面处理? - /** - * 处理添加拓扑关系请求 - * - * @param message 消息 - * @param gatewayDevice 网关设备 - * @return 响应数据 - */ - private Object handleTopoAdd(IotDeviceMessage message, IotDeviceDO gatewayDevice) { - // 1.1 校验网关设备类型 - if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) { - throw exception(DEVICE_NOT_GATEWAY); - } - // 1.2 解析参数 - // TODO @AI:是不是 parseObject 增加一个方法,允许传入 object 类型,避免先转 jsonString 再 parseObject ; - IotDeviceTopoAddReqDTO params = JsonUtils.parseObject(JsonUtils.toJsonString(message.getParams()), - IotDeviceTopoAddReqDTO.class); - if (params == null || CollUtil.isEmpty(params.getSubDevices())) { - throw exception(DEVICE_TOPO_PARAMS_INVALID); - } - - // 2. 遍历处理每个子设备 - // TODO @AI:processTopoAddSubDevice 不要抽成小方法; - List addedSubDevices = new ArrayList<>(params.getSubDevices().size()); - for (IotDeviceAuthReqDTO subDeviceAuth : params.getSubDevices()) { - try { - IotDeviceDO subDevice = processTopoAddSubDevice(subDeviceAuth, gatewayDevice); - addedSubDevices.add(subDevice); - } catch (Exception ex) { - log.warn("[handleTopoAdd][网关({}/{}) 添加子设备失败,username={}]", - gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), - subDeviceAuth.getUsername(), ex); - } - } - // TODO @AI:http://help.aliyun.com/zh/marketplace/add-topological-relationship 要回复的! - - // 3. 发送拓扑变更通知 - // TODO @AI:这里不应该发,它更多发生在,管理后台改动后,主动下发;http://help.aliyun.com/zh/marketplace/notify-gateway-topology-changes - for (IotDeviceDO subDevice : addedSubDevices) { - sendTopoChangeNotify(gatewayDevice, "add", subDevice); - } - return null; - } - - /** - * 处理单个子设备的拓扑添加 - * - * @param subDeviceAuth 子设备认证信息 - * @param gatewayDevice 网关设备 - * @return 添加成功的子设备,失败返回 null - */ - private IotDeviceDO processTopoAddSubDevice(IotDeviceAuthReqDTO subDeviceAuth, IotDeviceDO gatewayDevice) { - // 1.1 解析子设备信息 - IotDeviceAuthUtils.DeviceInfo subDeviceInfo = IotDeviceAuthUtils.parseUsername(subDeviceAuth.getUsername()); - if (subDeviceInfo == null) { - throw exception(DEVICE_TOPO_SUB_DEVICE_USERNAME_INVALID); - } - // 1.2 校验子设备认证信息 - if (!deviceService.authDevice(subDeviceAuth)) { - throw exception(DEVICE_TOPO_SUB_DEVICE_AUTH_FAILED); - } - - // 1.3 获取子设备 - IotDeviceDO subDevice = deviceService.getDeviceFromCache(subDeviceInfo.getProductKey(), subDeviceInfo.getDeviceName()); - if (subDevice == null) { - throw exception(DEVICE_NOT_EXISTS); - } - // 1.4 校验子设备类型 - if (!IotProductDeviceTypeEnum.isGatewaySub(subDevice.getDeviceType())) { - throw exception(DEVICE_NOT_GATEWAY_SUB, subDevice.getProductKey(), subDevice.getDeviceName()); - } - // 1.5 校验子设备是否已绑定到其他网关 - if (subDevice.getGatewayId() != null - && ObjectUtil.notEqual(subDevice.getGatewayId(), gatewayDevice.getId())) { - throw exception(DEVICE_GATEWAY_BINDTO_EXISTS, subDevice.getProductKey(), subDevice.getDeviceName()); - } - - // 2. 绑定拓扑关系 - // TODO @AI:上面的校验,貌似和 bindDeviceGateway 里的,有点重复; - deviceService.bindDeviceGateway(Collections.singletonList(subDevice.getId()), gatewayDevice.getId()); - log.info("[processTopoAddSubDevice][网关({}/{}) 绑定子设备({}/{})]", - gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), - subDevice.getProductKey(), subDevice.getDeviceName()); - return subDevice; - } - - // TODO @AI:是不是更适合在 deviceService 里面处理? - - /** - * 处理删除拓扑关系请求 - * - * @param message 消息 - * @param gatewayDevice 网关设备 - * @return 响应数据 - */ - private Object handleTopoDelete(IotDeviceMessage message, IotDeviceDO gatewayDevice) { - // 1. 校验网关设备类型 - if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) { - throw exception(DEVICE_NOT_GATEWAY); - } - - // 2. 解析参数(数组格式) - IotDeviceTopoDeleteReqDTO params = JsonUtils.parseObject(JsonUtils.toJsonString(message.getParams()), - IotDeviceTopoDeleteReqDTO.class); - if (params == null || params.getSubDevices() == null || params.getSubDevices().isEmpty()) { - throw exception(DEVICE_TOPO_PARAMS_INVALID); - } - - // 3. 遍历处理每个子设备 - List deletedSubDevices = new ArrayList<>(); - for (IotDeviceTopoDeleteReqDTO.SubDevice subDeviceInfo : params.getSubDevices()) { - try { - IotDeviceDO subDevice = processTopoDeleteSubDevice(subDeviceInfo, gatewayDevice); - deletedSubDevices.add(subDevice); - } catch (Exception ex) { - log.warn("[handleTopoDelete][网关({}/{}) 删除子设备失败,productKey={}, deviceName={}]", - gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), - subDeviceInfo.getProductKey(), subDeviceInfo.getDeviceName(), ex); - } - } - - // 4. 发送拓扑变更通知 - for (IotDeviceDO subDevice : deletedSubDevices) { - sendTopoChangeNotify(gatewayDevice, "delete", subDevice); - } - return null; - } - - // TODO @AI:是不是更适合在 deviceService 里面处理? - - /** - * 处理单个子设备的拓扑删除 - * - * @param subDeviceInfo 子设备标识 - * @param gatewayDevice 网关设备 - * @return 删除成功的子设备,失败返回 null - */ - private IotDeviceDO processTopoDeleteSubDevice(IotDeviceTopoDeleteReqDTO.SubDevice subDeviceInfo, - IotDeviceDO gatewayDevice) { - // 1. 获取子设备 - IotDeviceDO subDevice = deviceService.getDeviceFromCache(subDeviceInfo.getProductKey(), subDeviceInfo.getDeviceName()); - if (subDevice == null) { - throw exception(DEVICE_NOT_EXISTS); - } - - // 2. 校验子设备是否绑定到该网关 - if (!Objects.equal(subDevice.getGatewayId(), gatewayDevice.getId())) { - throw exception(DEVICE_TOPO_SUB_NOT_BINDTO_GATEWAY, subDeviceInfo.getProductKey(), subDeviceInfo.getDeviceName()); - } - - // 3. 解绑拓扑关系 - deviceService.unbindDeviceGateway(Collections.singletonList(subDevice.getId())); - log.info("[processTopoDeleteSubDevice][网关({}/{}) 解绑子设备({}/{})]", - gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), - subDevice.getProductKey(), subDevice.getDeviceName()); - - // 4. 子设备下线 - if (Objects.equal(subDevice.getState(), IotDeviceStateEnum.ONLINE.getState())) { - deviceService.updateDeviceState(subDevice, IotDeviceStateEnum.OFFLINE.getState()); - } - - return subDevice; - } - - // TODO @AI:是不是更适合在 deviceService 里面处理? - /** - * 处理获取拓扑关系请求 - * - * @param gatewayDevice 网关设备 - * @return 子设备列表 - */ - private Object handleTopoGet(IotDeviceDO gatewayDevice) { - // 1. 校验网关设备类型 - if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) { - throw exception(DEVICE_NOT_GATEWAY); - } - - // 2. 获取子设备列表 - List subDevices = deviceService.getDeviceListByGatewayId(gatewayDevice.getId()); - - // 3. 转换为响应格式 - return convertList(subDevices, subDevice -> new IotDeviceTopoRespDTO() - .setProductKey(subDevice.getProductKey()) - .setDeviceName(subDevice.getDeviceName())); - } - - // TODO @AI:是不是更适合在 deviceService 里面处理? - /** - * 发送拓扑变更通知 - * - * @param gatewayDevice 网关设备 - * @param changeType 变更类型:add/delete - * @param subDevice 子设备 - */ - private void sendTopoChangeNotify(IotDeviceDO gatewayDevice, String changeType, IotDeviceDO subDevice) { - try { - String serverId = devicePropertyService.getDeviceServerId(gatewayDevice.getId()); - if (StrUtil.isEmpty(serverId)) { - log.warn("[sendTopoChangeNotify][网关({}/{}) serverId 为空,无法发送拓扑变更通知]", - gatewayDevice.getProductKey(), gatewayDevice.getDeviceName()); - return; - } - - Map params = MapUtil.builder(new HashMap()) - .put("changeType", changeType) - .put("subDevice", MapUtil.builder(new HashMap()) - .put("productKey", subDevice.getProductKey()) - .put("deviceName", subDevice.getDeviceName()) - .build()) - .build(); - - IotDeviceMessage notifyMessage = IotDeviceMessage.requestOf( - IotDeviceMessageMethodEnum.TOPO_CHANGE.getMethod(), params); - sendDeviceMessage(notifyMessage, gatewayDevice, serverId); - } catch (Exception ex) { - log.error("[sendTopoChangeNotify][发送拓扑变更通知失败,网关({}/{}), 子设备({}/{})]", - gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), - subDevice.getProductKey(), subDevice.getDeviceName(), ex); - } - } - - // ========== 子设备注册处理方法 ========== - - // TODO @AI:是不是更适合在 deviceService 里面处理? - /** - * 处理子设备动态注册请求 - * - * @param message 消息 - * @param gatewayDevice 网关设备 - * @return 注册结果列表 - */ - private Object handleSubDeviceRegister(IotDeviceMessage message, IotDeviceDO gatewayDevice) { - // 1. 校验网关设备类型 - if (!IotProductDeviceTypeEnum.isGateway(gatewayDevice.getDeviceType())) { - throw exception(DEVICE_NOT_GATEWAY); - } - - // 2. 解析参数(数组) - List paramsList; - if (message.getParams() instanceof List) { - paramsList = JsonUtils.parseArray(JsonUtils.toJsonString(message.getParams()), - IotSubDeviceRegisterReqDTO.class); - } else { - throw exception(DEVICE_SUB_REGISTER_PARAMS_INVALID); - } - - if (paramsList == null || paramsList.isEmpty()) { - throw exception(DEVICE_SUB_REGISTER_PARAMS_INVALID); - } - - // 3. 遍历注册每个子设备 - List results = new ArrayList<>(); - for (IotSubDeviceRegisterReqDTO params : paramsList) { - try { - IotSubDeviceRegisterRespDTO result = registerSubDevice(params, gatewayDevice); - results.add(result); - } catch (Exception ex) { - log.error("[handleSubDeviceRegister][子设备({}/{}) 注册失败]", - params.getProductKey(), params.getDeviceName(), ex); - // 继续处理其他子设备,不影响整体流程 - } - } - - return results; - } - - // TODO @AI:是不是更适合在 deviceService 里面处理? - /** - * 注册单个子设备 - * - * @param params 注册参数 - * @param gatewayDevice 网关设备 - * @return 注册结果 - */ - private IotSubDeviceRegisterRespDTO registerSubDevice(IotSubDeviceRegisterReqDTO params, - IotDeviceDO gatewayDevice) { - // 1. 查找产品 - IotProductDO product = productService.getProductByProductKey(params.getProductKey()); - if (product == null) { - throw exception(PRODUCT_NOT_EXISTS); - } - - // 2. 校验产品是否为网关子设备类型 - if (!IotProductDeviceTypeEnum.isGatewaySub(product.getDeviceType())) { - throw exception(DEVICE_SUB_REGISTER_PRODUCT_NOT_GATEWAY_SUB, params.getProductKey()); - } - - // 3. 查找设备是否已存在 - IotDeviceDO existDevice = deviceService.getDeviceFromCache(params.getProductKey(), params.getDeviceName()); - if (existDevice != null) { - // 已存在则返回设备信息 - return new IotSubDeviceRegisterRespDTO() - .setProductKey(existDevice.getProductKey()) - .setDeviceName(existDevice.getDeviceName()) - .setDeviceSecret(existDevice.getDeviceSecret()); - } - - // 4. 创建新设备 - IotDeviceSaveReqVO createReqVO = new IotDeviceSaveReqVO() - .setDeviceName(params.getDeviceName()) - .setProductId(product.getId()) - .setGatewayId(gatewayDevice.getId()); - Long deviceId = deviceService.createDevice(createReqVO); - - // 5. 获取新创建的设备信息 - IotDeviceDO newDevice = deviceService.getDevice(deviceId); - log.info("[registerSubDevice][网关({}/{}) 注册子设备({}/{})]", - gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), - newDevice.getProductKey(), newDevice.getDeviceName()); - - return new IotSubDeviceRegisterRespDTO() - .setProductKey(newDevice.getProductKey()) - .setDeviceName(newDevice.getDeviceName()) - .setDeviceSecret(newDevice.getDeviceSecret()); - } - // ========== 批量上报处理方法 ========== /** * 处理批量上报消息 *

- * 将 pack 消息拆分成多条标准消息,递归处理 + * 将 pack 消息拆分成多条标准消息,发送到 MQ 让规则引擎处理 * * @param packMessage 批量消息 * @param gatewayDevice 网关设备 @@ -590,134 +255,71 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { JsonUtils.toJsonString(packMessage.getParams()), IotDevicePropertyPackPostReqDTO.class); if (params == null) { - log.warn("[handlePackMessage][消息({}) 参数解析失败]", packMessage.getId()); + log.warn("[handlePackMessage][消息({}) 参数解析失败]", packMessage); return; } - // 2. 处理网关自身属性 - // TODO @AI:是不是经过总线会更好: - // TODO @AI:是不是少处理了 event 事件? - if (params.getProperties() != null && !params.getProperties().isEmpty()) { - Map gatewayProperties = convertPackProperties(params.getProperties()); - IotDeviceMessage gatewayMsg = IotDeviceMessage.builder() - .id(IotDeviceMessageUtils.generateMessageId()) - .deviceId(gatewayDevice.getId()) - .tenantId(gatewayDevice.getTenantId()) - .serverId(packMessage.getServerId()) - .method(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()) - .params(gatewayProperties) - .reportTime(LocalDateTime.now()) - .build(); - // 直接调用处理,不通过消息总线 + // 2. 处理网关设备(自身)的数据 + sendDevicePackData(gatewayDevice, packMessage.getServerId(), params.getProperties(), params.getEvents()); + + // 3. 处理子设备的数据 + if (CollUtil.isEmpty(params.getSubDevices())) { + return; + } + for (IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData : params.getSubDevices()) { try { - devicePropertyService.saveDeviceProperty(gatewayDevice, gatewayMsg); - getSelf().createDeviceLogAsync(gatewayMsg); + IotDeviceIdentity identity = subDeviceData.getIdentity(); + IotDeviceDO subDevice = deviceService.getDeviceFromCache(identity.getProductKey(), identity.getDeviceName()); + if (subDevice == null) { + log.warn("[handlePackMessage][子设备({}/{}) 不存在]", identity.getProductKey(), identity.getDeviceName()); + return; + } + sendDevicePackData(subDevice, packMessage.getServerId(), subDeviceData.getProperties(), subDeviceData.getEvents()); } catch (Exception ex) { - log.error("[handlePackMessage][网关({}) 属性处理失败]", gatewayDevice.getId(), ex); + log.error("[handlePackMessage][子设备({}/{}) 数据处理失败]", subDeviceData.getIdentity().getProductKey(), + subDeviceData.getIdentity().getDeviceName(), ex); } } + } - // 3. 处理子设备数据 - if (params.getSubDevices() != null) { - for (IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData : params.getSubDevices()) { - try { - handleSubDevicePackData(packMessage, subDeviceData); - } catch (Exception ex) { - log.error("[handlePackMessage][子设备({}/{}) 数据处理失败]", - subDeviceData.getIdentity().getProductKey(), - subDeviceData.getIdentity().getDeviceName(), ex); + /** + * 发送设备 pack 数据到 MQ(属性 + 事件) + * + * @param device 设备 + * @param serverId 服务标识 + * @param properties 属性数据 + * @param events 事件数据 + */ + private void sendDevicePackData(IotDeviceDO device, String serverId, + Map properties, + Map events) { + // 1. 发送属性消息 + if (MapUtil.isNotEmpty(properties)) { + IotDeviceMessage propertyMsg = IotDeviceMessage.requestOf( + device.getId(), device.getTenantId(), serverId, + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + IotDevicePropertyPostReqDTO.of(properties)); + deviceMessageProducer.sendDeviceMessage(propertyMsg); + } + + // 2. 发送事件消息 + if (MapUtil.isNotEmpty(events)) { + for (Map.Entry eventEntry : events.entrySet()) { + String eventId = eventEntry.getKey(); + IotDevicePropertyPackPostReqDTO.EventValue eventValue = eventEntry.getValue(); + if (eventValue == null) { + continue; } + IotDeviceMessage eventMsg = IotDeviceMessage.requestOf( + device.getId(), device.getTenantId(), serverId, + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + IotDeviceEventPostReqDTO.of(eventId, eventValue.getValue(), eventValue.getTime())); + deviceMessageProducer.sendDeviceMessage(eventMsg); } } } - /** - * 处理子设备的 pack 数据 - * - * @param packMessage 原始 pack 消息 - * @param subDeviceData 子设备数据 - */ - private void handleSubDevicePackData(IotDeviceMessage packMessage, - IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData) { - // 1. 获取子设备 - IotDevicePropertyPackPostReqDTO.DeviceIdentity identity = subDeviceData.getIdentity(); - IotDeviceDO subDevice = deviceService.getDeviceFromCache(identity.getProductKey(), identity.getDeviceName()); - if (subDevice == null) { - log.warn("[handleSubDevicePackData][子设备({}/{}) 不存在]", - identity.getProductKey(), identity.getDeviceName()); - return; - } - - // 2. 处理子设备属性 - if (subDeviceData.getProperties() != null && !subDeviceData.getProperties().isEmpty()) { - Map properties = convertPackProperties(subDeviceData.getProperties()); - IotDeviceMessage subMsg = IotDeviceMessage.builder() - .id(IotDeviceMessageUtils.generateMessageId()) - .deviceId(subDevice.getId()) - .tenantId(subDevice.getTenantId()) - .serverId(packMessage.getServerId()) - .method(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()) - .params(properties) - .reportTime(LocalDateTime.now()) - .build(); - devicePropertyService.saveDeviceProperty(subDevice, subMsg); - getSelf().createDeviceLogAsync(subMsg); - } - - // 3. 处理子设备事件 - // TODO @AI:事件处理可以后续扩展 - } - - /** - * 转换 pack 属性格式为标准属性格式 - *

- * pack 格式:{"temperature": {"value": 25.5, "time": 1524448722000}} - * 标准格式:{"temperature": 25.5} - * - * @param packProperties pack 属性 - * @return 标准属性 - */ - private Map convertPackProperties(Map packProperties) { - Map result = new HashMap<>(); - for (Map.Entry entry : packProperties.entrySet()) { - if (entry.getValue() != null) { - result.put(entry.getKey(), entry.getValue().getValue()); - } - } - return result; - } - - // ========== 网关下线联动处理 ========== - - // TODO 芋艿:是不是写到 deviceService 里更合适?更解耦。 - /** - * 处理网关下线,联动所有子设备下线 - * - * @param gatewayDevice 网关设备 - * @param serverId 服务标识 - */ - private void handleGatewayOffline(IotDeviceDO gatewayDevice, String serverId) { - // 1. 获取网关下所有子设备 - List subDevices = deviceService.getDeviceListByGatewayId(gatewayDevice.getId()); - if (subDevices == null || subDevices.isEmpty()) { - return; - } - - // 2. 将在线的子设备设置为下线 - for (IotDeviceDO subDevice : subDevices) { - if (Objects.equal(subDevice.getState(), IotDeviceStateEnum.ONLINE.getState())) { - try { - deviceService.updateDeviceState(subDevice, IotDeviceStateEnum.OFFLINE.getState()); - log.info("[handleGatewayOffline][网关({}/{}) 下线,子设备({}/{}) 联动下线]", - gatewayDevice.getProductKey(), gatewayDevice.getDeviceName(), - subDevice.getProductKey(), subDevice.getDeviceName()); - } catch (Exception ex) { - log.error("[handleGatewayOffline][子设备({}/{}) 下线失败]", - subDevice.getProductKey(), subDevice.getDeviceName(), ex); - } - } - } - } + // ========= 设备消息查询 ========== @Override public PageResult getDeviceMessagePage(IotDeviceMessagePageReqVO pageReqVO) { diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java index 6821c0d160..feed3eb2a2 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java @@ -108,6 +108,23 @@ public class IotDeviceMessage { return of(requestId, method, params, null, null, null); } + /** + * 创建设备请求消息(包含设备信息) + * + * @param deviceId 设备编号 + * @param tenantId 租户编号 + * @param serverId 服务标识 + * @param method 消息方法 + * @param params 消息参数 + * @return 消息对象 + */ + public static IotDeviceMessage requestOf(Long deviceId, Long tenantId, String serverId, + String method, Object params) { + IotDeviceMessage message = of(null, method, params, null, null, null); + return message.setId(IotDeviceMessageUtils.generateMessageId()) + .setDeviceId(deviceId).setTenantId(tenantId).setServerId(serverId); + } + public static IotDeviceMessage replyOf(String requestId, String method, Object data, Integer code, String msg) { if (code == null) { diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/IotDeviceIdentify.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/IotDeviceIdentify.java deleted file mode 100644 index 98396bc334..0000000000 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/IotDeviceIdentify.java +++ /dev/null @@ -1,6 +0,0 @@ -package cn.iocoder.yudao.module.iot.core.topic; - -// TODO @AI:增加 productKey、DeviceName -// TODO @AI:有更合适的类名么??? -public class IotDeviceIdentify { -} diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/IotDeviceIdentity.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/IotDeviceIdentity.java new file mode 100644 index 0000000000..1987026718 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/IotDeviceIdentity.java @@ -0,0 +1,32 @@ +package cn.iocoder.yudao.module.iot.core.topic; + +import jakarta.validation.constraints.NotEmpty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * IoT 设备标识 + * + * 用于标识一个设备的基本信息(productKey + deviceName) + * + * @author 芋道源码 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class IotDeviceIdentity { + + /** + * 产品标识 + */ + @NotEmpty(message = "产品标识不能为空") + private String productKey; + + /** + * 设备名称 + */ + @NotEmpty(message = "设备名称不能为空") + private String deviceName; + +} diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/auth/IotSubDeviceRegisterRespDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/auth/IotSubDeviceRegisterRespDTO.java index cbf3289e4f..bf054f25c3 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/auth/IotSubDeviceRegisterRespDTO.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/auth/IotSubDeviceRegisterRespDTO.java @@ -1,6 +1,8 @@ package cn.iocoder.yudao.module.iot.core.topic.auth; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; /** * IoT 子设备动态注册 Response DTO @@ -11,6 +13,8 @@ import lombok.Data; * @see 阿里云 - 动态注册子设备 */ @Data +@NoArgsConstructor +@AllArgsConstructor public class IotSubDeviceRegisterRespDTO { /** diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/event/IotDeviceEventPostReqDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/event/IotDeviceEventPostReqDTO.java new file mode 100644 index 0000000000..f19a1bad68 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/event/IotDeviceEventPostReqDTO.java @@ -0,0 +1,57 @@ +package cn.iocoder.yudao.module.iot.core.topic.event; + +import lombok.Data; + +/** + * IoT 设备事件上报 Request DTO + *

+ * 用于 thing.event.{eventId}.post 消息的 params 参数 + * + * @author 芋道源码 + * @see 阿里云 - 设备上报事件 + */ +@Data +public class IotDeviceEventPostReqDTO { + + /** + * 事件标识符 + */ + private String eventId; + + /** + * 事件输出参数 + */ + private Object value; + + /** + * 上报时间(毫秒时间戳,可选) + */ + private Long time; + + /** + * 创建事件上报 DTO + * + * @param eventId 事件标识符 + * @param value 事件值 + * @return DTO 对象 + */ + public static IotDeviceEventPostReqDTO of(String eventId, Object value) { + return of(eventId, value, null); + } + + /** + * 创建事件上报 DTO(带时间) + * + * @param eventId 事件标识符 + * @param value 事件值 + * @param time 上报时间 + * @return DTO 对象 + */ + public static IotDeviceEventPostReqDTO of(String eventId, Object value, Long time) { + return new IotDeviceEventPostReqDTO() + .setEventId(eventId) + .setValue(value) + .setTime(time); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/property/IotDevicePropertyPackPostReqDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/property/IotDevicePropertyPackPostReqDTO.java index bfd214e772..24494984eb 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/property/IotDevicePropertyPackPostReqDTO.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/property/IotDevicePropertyPackPostReqDTO.java @@ -1,7 +1,7 @@ package cn.iocoder.yudao.module.iot.core.topic.property; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import lombok.Data; -import lombok.experimental.Accessors; import java.util.List; import java.util.Map; @@ -17,14 +17,13 @@ import java.util.Map; @Data public class IotDevicePropertyPackPostReqDTO { - // TODO @AI:不用 PropertyValue,直接使用 Object 接收就行! /** * 网关自身属性 *

* key: 属性标识符 - * value: 属性值对象(包含 value 和 time) + * value: 属性值 */ - private Map properties; + private Map properties; /** * 网关自身事件 @@ -39,24 +38,6 @@ public class IotDevicePropertyPackPostReqDTO { */ private List subDevices; - /** - * 属性值对象 - */ - @Data - public static class PropertyValue { - - /** - * 属性值 - */ - private Object value; - - /** - * 上报时间(毫秒时间戳) - */ - private Long time; - - } - /** * 事件值对象 */ @@ -84,37 +65,24 @@ public class IotDevicePropertyPackPostReqDTO { /** * 子设备标识 */ - private DeviceIdentity identity; + private IotDeviceIdentity identity; /** * 子设备属性 + *

+ * key: 属性标识符 + * value: 属性值 */ - private Map properties; + private Map properties; /** * 子设备事件 + *

+ * key: 事件标识符 + * value: 事件值对象(包含 value 和 time) */ private Map events; } - /** - * 设备标识 - */ - @Data - @Accessors(chain = true) - public static class DeviceIdentity { - - /** - * 产品标识 - */ - private String productKey; - - /** - * 设备名称 - */ - private String deviceName; - - } - } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/property/IotDevicePropertyPostReqDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/property/IotDevicePropertyPostReqDTO.java new file mode 100644 index 0000000000..4adc2f8d4b --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/property/IotDevicePropertyPostReqDTO.java @@ -0,0 +1,36 @@ +package cn.iocoder.yudao.module.iot.core.topic.property; + +import java.util.HashMap; +import java.util.Map; + +/** + * IoT 设备属性上报 Request DTO + *

+ * 用于 thing.property.post 消息的 params 参数 + *

+ * 本质是一个 Map,key 为属性标识符,value 为属性值 + * + * @author 芋道源码 + * @see 阿里云 - 设备上报属性 + */ +public class IotDevicePropertyPostReqDTO extends HashMap { + + public IotDevicePropertyPostReqDTO() { + super(); + } + + public IotDevicePropertyPostReqDTO(Map properties) { + super(properties); + } + + /** + * 创建属性上报 DTO + * + * @param properties 属性数据 + * @return DTO 对象 + */ + public static IotDevicePropertyPostReqDTO of(Map properties) { + return new IotDevicePropertyPostReqDTO(properties); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoDeleteReqDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoDeleteReqDTO.java index 11de07f7ba..71ee2bb8b2 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoDeleteReqDTO.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoDeleteReqDTO.java @@ -1,5 +1,7 @@ package cn.iocoder.yudao.module.iot.core.topic.topo; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import lombok.Data; @@ -16,30 +18,11 @@ import java.util.List; @Data public class IotDeviceTopoDeleteReqDTO { - // TODO @AI:应该是数组;IotDeviceIdentify /** * 子设备标识列表 */ - private List subDevices; - - /** - * 子设备标识 - */ - @Data - public static class SubDevice { - - /** - * 子设备 ProductKey - */ - @NotEmpty(message = "产品标识不能为空") - private String productKey; - - /** - * 子设备 DeviceName - */ - @NotEmpty(message = "设备名称不能为空") - private String deviceName; - - } + @Valid + @NotEmpty(message = "子设备标识列表不能为空") + private List subDevices; } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoGetReqDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoGetReqDTO.java new file mode 100644 index 0000000000..7a61af0a58 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoGetReqDTO.java @@ -0,0 +1,16 @@ +package cn.iocoder.yudao.module.iot.core.topic.topo; + +import lombok.Data; + +/** + * IoT 设备拓扑关系获取 Request DTO + *

+ * 用于 thing.topo.get 请求的 params 参数(目前为空,预留扩展) + * + * @author 芋道源码 + * @see 阿里云 - 获取拓扑关系 + */ +@Data +public class IotDeviceTopoGetReqDTO { + +} diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoGetRespDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoGetRespDTO.java new file mode 100644 index 0000000000..69c9b1555e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoGetRespDTO.java @@ -0,0 +1,24 @@ +package cn.iocoder.yudao.module.iot.core.topic.topo; + +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import lombok.Data; + +import java.util.List; + +/** + * IoT 设备拓扑关系获取 Response DTO + *

+ * 用于 thing.topo.get 响应 + * + * @author 芋道源码 + * @see 阿里云 - 获取拓扑关系 + */ +@Data +public class IotDeviceTopoGetRespDTO { + + /** + * 子设备列表 + */ + private List subDevices; + +} diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoRespDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoRespDTO.java deleted file mode 100644 index 79302b6203..0000000000 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/topic/topo/IotDeviceTopoRespDTO.java +++ /dev/null @@ -1,28 +0,0 @@ -package cn.iocoder.yudao.module.iot.core.topic.topo; - -import lombok.Data; - -// TODO @AI:IotDeviceTopoGetRespDTO -/** - * IoT 设备拓扑关系 Response DTO - *

- * 用于 thing.topo.get 响应的子设备信息 - * - * @author 芋道源码 - * @see 阿里云 - 获取拓扑关系 - */ -@Data -public class IotDeviceTopoRespDTO { - - // TODO @AI:应该是数组;IotDeviceIdentify - /** - * 子设备 ProductKey - */ - private String productKey; - - /** - * 子设备 DeviceName - */ - private String deviceName; - -}