mirror of
https://gitee.com/zhijiantianya/ruoyi-vue-pro.git
synced 2025-12-26 00:26:20 +08:00
reactor:【IoT 物联网】清理 yudao-module-iot-api
This commit is contained in:
parent
456423b5aa
commit
0faee76ffd
@ -7,7 +7,6 @@
|
|||||||
<version>${revision}</version>
|
<version>${revision}</version>
|
||||||
</parent>
|
</parent>
|
||||||
<modules>
|
<modules>
|
||||||
<module>yudao-module-iot-api</module>
|
|
||||||
<module>yudao-module-iot-biz</module>
|
<module>yudao-module-iot-biz</module>
|
||||||
<module>yudao-module-iot-core</module>
|
<module>yudao-module-iot-core</module>
|
||||||
<module>yudao-module-iot-gateway</module>
|
<module>yudao-module-iot-gateway</module>
|
||||||
|
|||||||
@ -1,53 +0,0 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|
||||||
<parent>
|
|
||||||
<artifactId>yudao-module-iot</artifactId>
|
|
||||||
<groupId>cn.iocoder.boot</groupId>
|
|
||||||
<version>${revision}</version>
|
|
||||||
</parent>
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
|
||||||
<artifactId>yudao-module-iot-api</artifactId>
|
|
||||||
<packaging>jar</packaging>
|
|
||||||
|
|
||||||
<name>${project.artifactId}</name>
|
|
||||||
<!-- TODO 芋艿:需要在整理下,特别是 PF4J -->
|
|
||||||
<description>
|
|
||||||
物联网 模块 API,暴露给其它模块调用
|
|
||||||
</description>
|
|
||||||
|
|
||||||
<dependencies>
|
|
||||||
<dependency>
|
|
||||||
<groupId>cn.iocoder.boot</groupId>
|
|
||||||
<artifactId>yudao-common</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- Web 相关 -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework</groupId>
|
|
||||||
<artifactId>spring-web</artifactId>
|
|
||||||
<scope>provided</scope> <!-- 设置为 provided,只有工具类需要使用到 -->
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- 工具类相关 -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
|
||||||
<artifactId>jackson-databind</artifactId>
|
|
||||||
<scope>provided</scope> <!-- 设置为 provided,只有工具类需要使用到 -->
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- <dependency>-->
|
|
||||||
<!-- <groupId>org.pf4j</groupId> <!– PF4J:内置插件机制 –>-->
|
|
||||||
<!-- <artifactId>pf4j-spring</artifactId>-->
|
|
||||||
<!-- </dependency>-->
|
|
||||||
|
|
||||||
<!-- 参数校验 -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.boot</groupId>
|
|
||||||
<artifactId>spring-boot-starter-validation</artifactId>
|
|
||||||
<optional>true</optional>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
|
||||||
|
|
||||||
</project>
|
|
||||||
@ -1,26 +0,0 @@
|
|||||||
package cn.iocoder.yudao.module.iot.api.device.dto.control.upstream;
|
|
||||||
|
|
||||||
import jakarta.validation.constraints.NotEmpty;
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* IoT 设备【事件】上报 Request DTO
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
public class IotDeviceEventReportReqDTO extends IotDeviceUpstreamAbstractReqDTO {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 事件标识
|
|
||||||
*/
|
|
||||||
@NotEmpty(message = "事件标识不能为空")
|
|
||||||
private String identifier;
|
|
||||||
/**
|
|
||||||
* 事件参数
|
|
||||||
*/
|
|
||||||
private Map<String, Object> params;
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,22 +0,0 @@
|
|||||||
package cn.iocoder.yudao.module.iot.api.device.dto.control.upstream;
|
|
||||||
|
|
||||||
import jakarta.validation.constraints.NotEmpty;
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* IoT 设备【属性】上报 Request DTO
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
public class IotDevicePropertyReportReqDTO extends IotDeviceUpstreamAbstractReqDTO {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 属性参数
|
|
||||||
*/
|
|
||||||
@NotEmpty(message = "属性参数不能为空")
|
|
||||||
private Map<String, Object> properties;
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,45 +0,0 @@
|
|||||||
package cn.iocoder.yudao.module.iot.api.device.dto.control.upstream;
|
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.common.util.json.databind.TimestampLocalDateTimeSerializer;
|
|
||||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
|
||||||
import jakarta.validation.constraints.NotEmpty;
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* IoT 设备上行的抽象 Request DTO
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
public abstract class IotDeviceUpstreamAbstractReqDTO {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 请求编号
|
|
||||||
*/
|
|
||||||
private String requestId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 插件实例的进程编号
|
|
||||||
*/
|
|
||||||
private String processId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 产品标识
|
|
||||||
*/
|
|
||||||
@NotEmpty(message = "产品标识不能为空")
|
|
||||||
private String productKey;
|
|
||||||
/**
|
|
||||||
* 设备名称
|
|
||||||
*/
|
|
||||||
@NotEmpty(message = "设备名称不能为空")
|
|
||||||
private String deviceName;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 上报时间
|
|
||||||
*/
|
|
||||||
@JsonSerialize(using = TimestampLocalDateTimeSerializer.class) // 解决 iot plugins 序列化 LocalDateTime 是数组,导致无法解析的问题
|
|
||||||
private LocalDateTime reportTime;
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -23,11 +23,6 @@
|
|||||||
<artifactId>yudao-module-system</artifactId>
|
<artifactId>yudao-module-system</artifactId>
|
||||||
<version>${revision}</version>
|
<version>${revision}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>cn.iocoder.boot</groupId>
|
|
||||||
<artifactId>yudao-module-iot-api</artifactId>
|
|
||||||
<version>${revision}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>cn.iocoder.boot</groupId>
|
<groupId>cn.iocoder.boot</groupId>
|
||||||
<artifactId>yudao-module-iot-core</artifactId>
|
<artifactId>yudao-module-iot-core</artifactId>
|
||||||
|
|||||||
@ -1,14 +1,21 @@
|
|||||||
package cn.iocoder.yudao.module.iot.dal.dataobject.rule;
|
package cn.iocoder.yudao.module.iot.dal.dataobject.alert;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
|
||||||
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
|
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
|
||||||
import cn.iocoder.yudao.module.iot.enums.rule.IotAlertConfigReceiveTypeEnum;
|
import cn.iocoder.yudao.framework.mybatis.core.type.IntegerListTypeHandler;
|
||||||
|
import cn.iocoder.yudao.framework.mybatis.core.type.LongListTypeHandler;
|
||||||
|
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
|
||||||
|
import cn.iocoder.yudao.module.iot.enums.DictTypeConstants;
|
||||||
|
import cn.iocoder.yudao.module.iot.enums.alert.IotAlertConfigReceiveTypeEnum;
|
||||||
import cn.iocoder.yudao.module.system.api.user.dto.AdminUserRespDTO;
|
import cn.iocoder.yudao.module.system.api.user.dto.AdminUserRespDTO;
|
||||||
import com.baomidou.mybatisplus.annotation.KeySequence;
|
import com.baomidou.mybatisplus.annotation.KeySequence;
|
||||||
import com.baomidou.mybatisplus.annotation.TableField;
|
import com.baomidou.mybatisplus.annotation.TableField;
|
||||||
import com.baomidou.mybatisplus.annotation.TableId;
|
import com.baomidou.mybatisplus.annotation.TableId;
|
||||||
import com.baomidou.mybatisplus.annotation.TableName;
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.*;
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -41,37 +48,37 @@ public class IotAlertConfigDO extends BaseDO {
|
|||||||
/**
|
/**
|
||||||
* 配置状态
|
* 配置状态
|
||||||
*
|
*
|
||||||
* TODO 数据字典
|
* 字典 {@link DictTypeConstants#ALERT_LEVEL}
|
||||||
*/
|
*/
|
||||||
private Integer level;
|
private Integer level;
|
||||||
/**
|
/**
|
||||||
* 配置状态
|
* 配置状态
|
||||||
*
|
*
|
||||||
* 枚举 {@link cn.iocoder.yudao.framework.common.enums.CommonStatusEnum}
|
* 枚举 {@link CommonStatusEnum}
|
||||||
*/
|
*/
|
||||||
private Integer status;
|
private Integer status;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 关联的规则场景编号数组
|
* 关联的场景联动规则编号数组
|
||||||
*
|
*
|
||||||
* 关联 {@link IotRuleSceneDO#getId()}
|
* 关联 {@link IotRuleSceneDO#getId()}
|
||||||
*/
|
*/
|
||||||
@TableField(typeHandler = JacksonTypeHandler.class)
|
@TableField(typeHandler = LongListTypeHandler.class)
|
||||||
private List<Long> ruleSceneIds;
|
private List<Long> sceneRuleIds;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 接收的用户编号数组
|
* 接收的用户编号数组
|
||||||
*
|
*
|
||||||
* 关联 {@link AdminUserRespDTO#getId()}
|
* 关联 {@link AdminUserRespDTO#getId()}
|
||||||
*/
|
*/
|
||||||
@TableField(typeHandler = JacksonTypeHandler.class)
|
@TableField(typeHandler = LongListTypeHandler.class)
|
||||||
private List<Long> receiveUserIds;
|
private List<Long> receiveUserIds;
|
||||||
/**
|
/**
|
||||||
* 接收的类型数组
|
* 接收的类型数组
|
||||||
*
|
*
|
||||||
* 枚举 {@link IotAlertConfigReceiveTypeEnum}
|
* 枚举 {@link IotAlertConfigReceiveTypeEnum}
|
||||||
*/
|
*/
|
||||||
@TableField(typeHandler = JacksonTypeHandler.class)
|
@TableField(typeHandler = IntegerListTypeHandler.class)
|
||||||
private List<Integer> receiveTypes;
|
private List<Integer> receiveTypes;
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package cn.iocoder.yudao.module.iot.dal.dataobject.rule;
|
package cn.iocoder.yudao.module.iot.dal.dataobject.alert;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
|
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
|
||||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||||
@ -45,17 +45,17 @@ public class IotAlertRecordDO extends BaseDO {
|
|||||||
private String name;
|
private String name;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 产品标识
|
* 产品编号
|
||||||
*
|
*
|
||||||
* 关联 {@link IotProductDO#getProductKey()} ()}
|
* 关联 {@link IotProductDO#getId()}
|
||||||
*/
|
*/
|
||||||
private String productKey;
|
private Long productId;
|
||||||
/**
|
/**
|
||||||
* 设备名称
|
* 设备编号
|
||||||
*
|
*
|
||||||
* 冗余 {@link IotDeviceDO#getDeviceName()}
|
* 关联 {@link IotDeviceDO#getId()}
|
||||||
*/
|
*/
|
||||||
private String deviceName;
|
private String deviceId;
|
||||||
|
|
||||||
// TODO @芋艿:有没更好的方式
|
// TODO @芋艿:有没更好的方式
|
||||||
/**
|
/**
|
||||||
@ -64,7 +64,6 @@ public class IotAlertRecordDO extends BaseDO {
|
|||||||
@TableField(typeHandler = JacksonTypeHandler.class)
|
@TableField(typeHandler = JacksonTypeHandler.class)
|
||||||
private IotDeviceMessage deviceMessage;
|
private IotDeviceMessage deviceMessage;
|
||||||
|
|
||||||
// TODO @芋艿:换成枚举,枚举对应 ApiErrorLogProcessStatusEnum
|
|
||||||
/**
|
/**
|
||||||
* 处理状态
|
* 处理状态
|
||||||
*
|
*
|
||||||
@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.rule;
|
|||||||
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
|
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
|
||||||
import cn.iocoder.yudao.framework.tenant.core.db.TenantBaseDO;
|
import cn.iocoder.yudao.framework.tenant.core.db.TenantBaseDO;
|
||||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||||
|
import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertConfigDO;
|
||||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||||
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
|
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
|
||||||
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
|
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
|
||||||
|
|||||||
@ -14,5 +14,6 @@ public class DictTypeConstants {
|
|||||||
|
|
||||||
public static final String DEVICE_STATE = "iot_device_state";
|
public static final String DEVICE_STATE = "iot_device_state";
|
||||||
|
|
||||||
|
public static final String ALERT_LEVEL = "iot_alert_level";
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package cn.iocoder.yudao.module.iot.enums.rule;
|
package cn.iocoder.yudao.module.iot.enums.alert;
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
|
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
@ -16,14 +16,13 @@ import java.util.Arrays;
|
|||||||
public enum IotDataSinkTypeEnum implements ArrayValuable<Integer> {
|
public enum IotDataSinkTypeEnum implements ArrayValuable<Integer> {
|
||||||
|
|
||||||
HTTP(1, "HTTP"),
|
HTTP(1, "HTTP"),
|
||||||
TCP(2, "TCP"),
|
TCP(2, "TCP"), // TODO @puhui999:待实现;
|
||||||
WEBSOCKET(3, "WebSocket"),
|
WEBSOCKET(3, "WebSocket"), // TODO @puhui999:待实现;
|
||||||
|
|
||||||
MQTT(10, "MQTT"),
|
MQTT(10, "MQTT"), // TODO 待实现;
|
||||||
|
|
||||||
DATABASE(20, "Database"),
|
DATABASE(20, "Database"), // TODO @puhui999:待实现;可以简单点,对应的表名是什么,字段先固定了。
|
||||||
// TODO @芋艿:改成 Redis;通过 execute 通用化;
|
REDIS_STREAM(21, "Redis Stream"), // TODO @puhui999:改成 Redis;然后枚举不同的数据结构?这样,枚举就可以是 Redis 了
|
||||||
REDIS_STREAM(21, "Redis Stream"),
|
|
||||||
|
|
||||||
ROCKETMQ(30, "RocketMQ"),
|
ROCKETMQ(30, "RocketMQ"),
|
||||||
RABBITMQ(31, "RabbitMQ"),
|
RABBITMQ(31, "RabbitMQ"),
|
||||||
@ -1,7 +1,6 @@
|
|||||||
package cn.iocoder.yudao.module.iot.mq.consumer.device;
|
package cn.iocoder.yudao.module.iot.mq.consumer.device;
|
||||||
|
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
|
|
||||||
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
|
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
|
||||||
@ -19,6 +18,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 针对 {@link IotDeviceMessage} 的业务处理器:调用 method 对应的逻辑。例如说:
|
* 针对 {@link IotDeviceMessage} 的业务处理器:调用 method 对应的逻辑。例如说:
|
||||||
@ -83,15 +83,14 @@ public class IotDeviceMessageSubscriber implements IotMessageSubscriber<IotDevic
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// 如果是 STATE 相关的消息,无需处理,不然就重复处理状态了
|
// 如果是 STATE 相关的消息,无需处理,不然就重复处理状态了
|
||||||
if (ObjectUtils.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.STATE_ONLINE.getMethod(),
|
if (Objects.equals(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
|
||||||
IotDeviceMessageMethodEnum.STATE_OFFLINE.getMethod())) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 特殊:设备非在线时,主动标记设备为在线
|
// 特殊:设备非在线时,主动标记设备为在线
|
||||||
// 为什么不直接更新状态呢?因为通过 IotDeviceMessage 可以经过一系列的处理,例如说记录日志、规则引擎等等
|
// 为什么不直接更新状态呢?因为通过 IotDeviceMessage 可以经过一系列的处理,例如说记录日志、规则引擎等等
|
||||||
try {
|
try {
|
||||||
deviceMessageService.sendDeviceMessage(IotDeviceMessage.buildStateOnline().setDeviceId(device.getId()));
|
deviceMessageService.sendDeviceMessage(IotDeviceMessage.buildStateUpdateOnline().setDeviceId(device.getId()));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// 注意:即使执行失败,也不影响主流程
|
// 注意:即使执行失败,也不影响主流程
|
||||||
log.error("[forceDeviceOnline][message({}) device({}) 强制设备上线失败]", message, device, e);
|
log.error("[forceDeviceOnline][message({}) device({}) 强制设备上线失败]", message, device, e);
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package cn.iocoder.yudao.module.iot.service.device.message;
|
package cn.iocoder.yudao.module.iot.service.device.message;
|
||||||
|
|
||||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||||
|
import cn.hutool.core.lang.Assert;
|
||||||
import cn.hutool.core.map.MapUtil;
|
import cn.hutool.core.map.MapUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import cn.hutool.extra.spring.SpringUtil;
|
import cn.hutool.extra.spring.SpringUtil;
|
||||||
@ -13,7 +14,6 @@ import cn.iocoder.yudao.module.iot.controller.admin.device.vo.message.IotDeviceM
|
|||||||
import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO;
|
import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO;
|
||||||
import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO;
|
import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO;
|
||||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
|
|
||||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||||
@ -176,15 +176,12 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
|||||||
// TODO @芋艿:可优化:未来逻辑复杂后,可以独立拆除 Processor 处理器
|
// TODO @芋艿:可优化:未来逻辑复杂后,可以独立拆除 Processor 处理器
|
||||||
@SuppressWarnings("SameReturnValue")
|
@SuppressWarnings("SameReturnValue")
|
||||||
private Object handleUpstreamDeviceMessage0(IotDeviceMessage message, IotDeviceDO device) {
|
private Object handleUpstreamDeviceMessage0(IotDeviceMessage message, IotDeviceDO device) {
|
||||||
// 设备上线
|
// 设备上下线
|
||||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.STATE_ONLINE.getMethod())) {
|
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
|
||||||
deviceService.updateDeviceState(device, IotDeviceStateEnum.ONLINE.getState());
|
String stateStr = IotDeviceMessageUtils.getIdentifier(message);
|
||||||
// TODO 芋艿:子设备的关联
|
assert stateStr != null;
|
||||||
return null;
|
Assert.notEmpty(stateStr, "设备状态不能为空");
|
||||||
}
|
deviceService.updateDeviceState(device, Integer.valueOf(stateStr));
|
||||||
// 设备下线
|
|
||||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.STATE_OFFLINE.getMethod())) {
|
|
||||||
deviceService.updateDeviceState(device, IotDeviceStateEnum.OFFLINE.getState());
|
|
||||||
// TODO 芋艿:子设备的关联
|
// TODO 芋艿:子设备的关联
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -172,7 +172,7 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
|
|||||||
for (IotDataRuleDO rule : rules) {
|
for (IotDataRuleDO rule : rules) {
|
||||||
IotDataRuleDO.SourceConfig found = CollUtil.findOne(rule.getSourceConfigs(),
|
IotDataRuleDO.SourceConfig found = CollUtil.findOne(rule.getSourceConfigs(),
|
||||||
config -> ObjectUtils.equalsAny(config.getDeviceId(), deviceId, IotDeviceDO.DEVICE_ID_ALL)
|
config -> ObjectUtils.equalsAny(config.getDeviceId(), deviceId, IotDeviceDO.DEVICE_ID_ALL)
|
||||||
&& (StrUtil.isNotEmpty(config.getMethod()) || ObjUtil.equal(config.getMethod(), method))
|
&& Objects.equals(config.getMethod(), method)
|
||||||
&& (StrUtil.isEmpty(config.getIdentifier()) || ObjUtil.equal(config.getIdentifier(), identifier)));
|
&& (StrUtil.isEmpty(config.getIdentifier()) || ObjUtil.equal(config.getIdentifier(), identifier)));
|
||||||
if (found != null) {
|
if (found != null) {
|
||||||
matchedRules.add(new IotDataRuleDO().setId(rule.getId()).setSinkIds(rule.getSinkIds()));
|
matchedRules.add(new IotDataRuleDO().setId(rule.getId()).setSinkIds(rule.getSinkIds()));
|
||||||
|
|||||||
@ -18,6 +18,8 @@ import org.springframework.web.util.UriComponentsBuilder;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HTTP 的 {@link IotDataRuleAction} 实现类
|
* HTTP 的 {@link IotDataRuleAction} 实现类
|
||||||
*
|
*
|
||||||
@ -36,6 +38,7 @@ public class IotHttpDataSinkAction implements IotDataRuleAction {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public void execute(IotDeviceMessage message, IotDataSinkDO dataSink) {
|
public void execute(IotDeviceMessage message, IotDataSinkDO dataSink) {
|
||||||
IotDataSinkHttpConfig config = (IotDataSinkHttpConfig) dataSink.getConfig();
|
IotDataSinkHttpConfig config = (IotDataSinkHttpConfig) dataSink.getConfig();
|
||||||
Assert.notNull(config, "配置({})不能为空", dataSink.getId());
|
Assert.notNull(config, "配置({})不能为空", dataSink.getId());
|
||||||
@ -49,8 +52,7 @@ public class IotHttpDataSinkAction implements IotDataRuleAction {
|
|||||||
if (CollUtil.isNotEmpty(config.getHeaders())) {
|
if (CollUtil.isNotEmpty(config.getHeaders())) {
|
||||||
config.getHeaders().putAll(config.getHeaders());
|
config.getHeaders().putAll(config.getHeaders());
|
||||||
}
|
}
|
||||||
// TODO @puhui999:@yunai:可能需要通过设备查询到租户,然后 set
|
headers.add(HEADER_TENANT_ID, message.getTenantId().toString());
|
||||||
// headers.add(HEADER_TENANT_ID, message.getTenantId().toString());
|
|
||||||
// 1.2 构建 URL
|
// 1.2 构建 URL
|
||||||
UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromUriString(config.getUrl());
|
UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromUriString(config.getUrl());
|
||||||
if (CollUtil.isNotEmpty(config.getQuery())) {
|
if (CollUtil.isNotEmpty(config.getQuery())) {
|
||||||
@ -72,18 +74,17 @@ public class IotHttpDataSinkAction implements IotDataRuleAction {
|
|||||||
requestEntity = new HttpEntity<>(JsonUtils.toJsonString(requestBody), headers);
|
requestEntity = new HttpEntity<>(JsonUtils.toJsonString(requestBody), headers);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2.1 发送请求
|
// 2. 发送请求
|
||||||
responseEntity = restTemplate.exchange(url, method, requestEntity, String.class);
|
responseEntity = restTemplate.exchange(url, method, requestEntity, String.class);
|
||||||
// 2.2 记录日志
|
|
||||||
if (responseEntity.getStatusCode().is2xxSuccessful()) {
|
if (responseEntity.getStatusCode().is2xxSuccessful()) {
|
||||||
log.info("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求成功({})]",
|
log.info("[execute][message({}) config({}) url({}) method({}) requestEntity({}) 请求成功({})]",
|
||||||
message, config, url, method, requestEntity, responseEntity);
|
message, config, url, method, requestEntity, responseEntity);
|
||||||
} else {
|
} else {
|
||||||
log.error("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求失败({})]",
|
log.error("[execute][message({}) config({}) url({}) method({}) requestEntity({}) 请求失败({})]",
|
||||||
message, config, url, method, requestEntity, responseEntity);
|
message, config, url, method, requestEntity, responseEntity);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求异常({})]",
|
log.error("[execute][message({}) config({}) url({}) method({}) requestEntity({}) 请求异常({})]",
|
||||||
message, config, url, method, requestEntity, responseEntity, e);
|
message, config, url, method, requestEntity, responseEntity, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkKafkaConfig;
|
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkKafkaConfig;
|
||||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||||
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
||||||
@ -9,6 +10,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
|
|||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.support.SendResult;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
@ -36,13 +38,27 @@ public class IotKafkaDataRuleAction extends
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(IotDeviceMessage message, IotDataSinkKafkaConfig config) throws Exception {
|
public void execute(IotDeviceMessage message, IotDataSinkKafkaConfig config) throws Exception {
|
||||||
// 1. 获取或创建 KafkaTemplate
|
try {
|
||||||
KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
|
// 1. 获取或创建 KafkaTemplate
|
||||||
|
KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
|
||||||
|
|
||||||
// 2. 发送消息并等待结果
|
// 2. 发送消息并等待结果
|
||||||
kafkaTemplate.send(config.getTopic(), message.toString())
|
SendResult<String, String> sendResult = kafkaTemplate.send(config.getTopic(), JsonUtils.toJsonString(message))
|
||||||
.get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待
|
.get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
|
||||||
log.info("[execute0][message({}) 发送成功]", message);
|
// 3. 处理发送结果
|
||||||
|
if (sendResult != null && sendResult.getRecordMetadata() != null) {
|
||||||
|
log.info("[execute][message({}) config({}) 发送成功,结果: partition={}, offset={}, timestamp={}]",
|
||||||
|
message, config,
|
||||||
|
sendResult.getRecordMetadata().partition(),
|
||||||
|
sendResult.getRecordMetadata().offset(),
|
||||||
|
sendResult.getRecordMetadata().timestamp());
|
||||||
|
} else {
|
||||||
|
log.warn("[execute][message({}) config({}) 发送结果为空]", message, config);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[execute][message({}) config({}) 发送失败]", message, config, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -1,7 +1,8 @@
|
|||||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRabbitMQConfig;
|
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||||
|
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRabbitMQConfig;
|
||||||
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
import com.rabbitmq.client.Connection;
|
import com.rabbitmq.client.Connection;
|
||||||
@ -10,8 +11,6 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RabbitMQ 的 {@link IotDataRuleAction} 实现类
|
* RabbitMQ 的 {@link IotDataRuleAction} 实现类
|
||||||
*
|
*
|
||||||
@ -20,8 +19,8 @@ import java.nio.charset.StandardCharsets;
|
|||||||
@ConditionalOnClass(name = "com.rabbitmq.client.Channel")
|
@ConditionalOnClass(name = "com.rabbitmq.client.Channel")
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class IotRabbitMQDataRuleAction extends
|
public class IotRabbitMQDataRuleAction
|
||||||
IotDataRuleCacheableAction<IotDataSinkRabbitMQConfig, Channel> {
|
extends IotDataRuleCacheableAction<IotDataSinkRabbitMQConfig, Channel> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer getType() {
|
public Integer getType() {
|
||||||
@ -30,17 +29,22 @@ public class IotRabbitMQDataRuleAction extends
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(IotDeviceMessage message, IotDataSinkRabbitMQConfig config) throws Exception {
|
public void execute(IotDeviceMessage message, IotDataSinkRabbitMQConfig config) throws Exception {
|
||||||
// 1.1 获取或创建 Channel
|
try {
|
||||||
Channel channel = getProducer(config);
|
// 1.1 获取或创建 Channel
|
||||||
// 1.2 声明交换机、队列和绑定关系
|
Channel channel = getProducer(config);
|
||||||
channel.exchangeDeclare(config.getExchange(), "direct", true);
|
// 1.2 声明交换机、队列和绑定关系
|
||||||
channel.queueDeclare(config.getQueue(), true, false, false, null);
|
channel.exchangeDeclare(config.getExchange(), "direct", true);
|
||||||
channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey());
|
channel.queueDeclare(config.getQueue(), true, false, false, null);
|
||||||
|
channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey());
|
||||||
|
|
||||||
// 2. 发送消息
|
// 2. 发送消息
|
||||||
channel.basicPublish(config.getExchange(), config.getRoutingKey(), null,
|
channel.basicPublish(config.getExchange(), config.getRoutingKey(), null,
|
||||||
message.toString().getBytes(StandardCharsets.UTF_8));
|
JsonUtils.toJsonByte(message));
|
||||||
log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config);
|
log.info("[execute][message({}) config({}) 发送成功]", message, config);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[execute][message({}) config({}) 发送失败]", message, config, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRedisStreamConfig;
|
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRedisStreamConfig;
|
||||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||||
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
||||||
@ -38,10 +39,10 @@ public class IotRedisStreamRuleAction extends
|
|||||||
RedisTemplate<String, Object> redisTemplate = getProducer(config);
|
RedisTemplate<String, Object> redisTemplate = getProducer(config);
|
||||||
|
|
||||||
// 2. 创建并发送 Stream 记录
|
// 2. 创建并发送 Stream 记录
|
||||||
ObjectRecord<String, IotDeviceMessage> record = StreamRecords.newRecord()
|
ObjectRecord<String, ?> record = StreamRecords.newRecord()
|
||||||
.ofObject(message).withStreamKey(config.getTopic());
|
.ofObject(JsonUtils.toJsonString(message)).withStreamKey(config.getTopic());
|
||||||
String recordId = String.valueOf(redisTemplate.opsForStream().add(record));
|
String recordId = String.valueOf(redisTemplate.opsForStream().add(record));
|
||||||
log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config);
|
log.info("[execute][消息发送成功] messageId: {}, config: {}", recordId, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -56,11 +57,11 @@ public class IotRedisStreamRuleAction extends
|
|||||||
serverConfig.setPassword(config.getPassword());
|
serverConfig.setPassword(config.getPassword());
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建 RedisTemplate 并配置
|
// 2.1 创建 RedisTemplate 并配置
|
||||||
RedissonClient redisson = Redisson.create(redissonConfig);
|
RedissonClient redisson = Redisson.create(redissonConfig);
|
||||||
RedisTemplate<String, Object> template = new RedisTemplate<>();
|
RedisTemplate<String, Object> template = new RedisTemplate<>();
|
||||||
template.setConnectionFactory(new RedissonConnectionFactory(redisson));
|
template.setConnectionFactory(new RedissonConnectionFactory(redisson));
|
||||||
// 设置序列化器
|
// 2.2 设置序列化器
|
||||||
template.setKeySerializer(RedisSerializer.string());
|
template.setKeySerializer(RedisSerializer.string());
|
||||||
template.setHashKeySerializer(RedisSerializer.string());
|
template.setHashKeySerializer(RedisSerializer.string());
|
||||||
template.setValueSerializer(RedisSerializer.json());
|
template.setValueSerializer(RedisSerializer.json());
|
||||||
|
|||||||
@ -1,14 +1,14 @@
|
|||||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRocketMQConfig;
|
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||||
|
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRocketMQConfig;
|
||||||
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||||
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
import org.apache.rocketmq.client.producer.SendStatus;
|
import org.apache.rocketmq.client.producer.SendStatus;
|
||||||
import org.apache.rocketmq.common.message.Message;
|
import org.apache.rocketmq.common.message.Message;
|
||||||
import org.apache.rocketmq.remoting.common.RemotingHelper;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@ -33,19 +33,15 @@ public class IotRocketMQDataRuleAction extends
|
|||||||
// 1. 获取或创建 Producer
|
// 1. 获取或创建 Producer
|
||||||
DefaultMQProducer producer = getProducer(config);
|
DefaultMQProducer producer = getProducer(config);
|
||||||
|
|
||||||
// 2.1 创建消息对象,指定Topic、Tag和消息体
|
// 2.1 创建消息对象,指定 Topic、Tag 和消息体
|
||||||
Message msg = new Message(
|
Message msg = new Message(config.getTopic(), config.getTags(), JsonUtils.toJsonByte(message));
|
||||||
config.getTopic(),
|
|
||||||
config.getTags(),
|
|
||||||
message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)
|
|
||||||
);
|
|
||||||
// 2.2 发送同步消息并处理结果
|
// 2.2 发送同步消息并处理结果
|
||||||
SendResult sendResult = producer.send(msg);
|
SendResult sendResult = producer.send(msg);
|
||||||
// 2.3 处理发送结果
|
// 2.3 处理发送结果
|
||||||
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
|
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
|
||||||
log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult);
|
log.info("[execute][message({}) config({}) 发送成功,结果({})]", message, config, sendResult);
|
||||||
} else {
|
} else {
|
||||||
log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult);
|
log.error("[execute][message({}) config({}) 发送失败,结果({})]", message, config, sendResult);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -39,6 +39,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
|
|||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
|
// TODO @芋艿:@puhui999:需要调整下;
|
||||||
// 创建共享的测试消息
|
// 创建共享的测试消息
|
||||||
//message = IotDeviceMessage.builder().messageId("TEST-001").reportTime(LocalDateTime.now())
|
//message = IotDeviceMessage.builder().messageId("TEST-001").reportTime(LocalDateTime.now())
|
||||||
// .productKey("testProduct").deviceName("testDevice")
|
// .productKey("testProduct").deviceName("testDevice")
|
||||||
|
|||||||
@ -19,10 +19,6 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
|
|||||||
|
|
||||||
// ========== 设备状态 ==========
|
// ========== 设备状态 ==========
|
||||||
|
|
||||||
// TODO @芋艿:要合并下;thing.state.update
|
|
||||||
STATE_ONLINE("thing.state.online", "设备上线", true),
|
|
||||||
STATE_OFFLINE("thing.state.offline", "设备下线", true),
|
|
||||||
|
|
||||||
STATE_UPDATE("thing.state.update", "设备状态更新", true),
|
STATE_UPDATE("thing.state.update", "设备状态更新", true),
|
||||||
|
|
||||||
// ========== 设备属性 ==========
|
// ========== 设备属性 ==========
|
||||||
@ -52,7 +48,7 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
|
|||||||
/**
|
/**
|
||||||
* 不进行 reply 回复的方法集合
|
* 不进行 reply 回复的方法集合
|
||||||
*/
|
*/
|
||||||
public static final Set<String> REPLY_DISABLED = Set.of(STATE_ONLINE.getMethod(), STATE_OFFLINE.getMethod());
|
public static final Set<String> REPLY_DISABLED = Set.of(STATE_UPDATE.getMethod());
|
||||||
|
|
||||||
private final String method;
|
private final String method;
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,9 @@
|
|||||||
package cn.iocoder.yudao.module.iot.core.mq.message;
|
package cn.iocoder.yudao.module.iot.core.mq.message;
|
||||||
|
|
||||||
|
import cn.hutool.core.map.MapUtil;
|
||||||
import cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants;
|
import cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants;
|
||||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||||
|
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
|
||||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
@ -128,12 +130,14 @@ public class IotDeviceMessage {
|
|||||||
|
|
||||||
// ========== 核心方法:在 of 基础方法之上,添加对应 method ==========
|
// ========== 核心方法:在 of 基础方法之上,添加对应 method ==========
|
||||||
|
|
||||||
public static IotDeviceMessage buildStateOnline() {
|
public static IotDeviceMessage buildStateUpdateOnline() {
|
||||||
return requestOf(IotDeviceMessageMethodEnum.STATE_ONLINE.getMethod());
|
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
|
||||||
|
MapUtil.of("state", IotDeviceStateEnum.ONLINE.getState()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static IotDeviceMessage buildStateOffline() {
|
public static IotDeviceMessage buildStateOffline() {
|
||||||
return requestOf(IotDeviceMessageMethodEnum.STATE_OFFLINE.getMethod());
|
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
|
||||||
|
MapUtil.of("state", IotDeviceStateEnum.OFFLINE.getState()));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -55,11 +55,16 @@ public class IotDeviceMessageUtils {
|
|||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public static String getIdentifier(IotDeviceMessage message) {
|
public static String getIdentifier(IotDeviceMessage message) {
|
||||||
|
if (message.getParams() == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
|
if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
|
||||||
message.getMethod(), IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod())
|
IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod())) {
|
||||||
&& message.getParams() != null) {
|
|
||||||
Map<String, Object> params = (Map<String, Object>) message.getParams();
|
Map<String, Object> params = (Map<String, Object>) message.getParams();
|
||||||
return MapUtil.getStr(params, "identifier");
|
return MapUtil.getStr(params, "identifier");
|
||||||
|
} else if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
|
||||||
|
Map<String, Object> params = (Map<String, Object>) message.getParams();
|
||||||
|
return MapUtil.getStr(params, "state");
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -209,7 +209,7 @@ public class IotEmqxAuthEventHandler {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// 2. 构建设备状态消息
|
// 2. 构建设备状态消息
|
||||||
IotDeviceMessage message = online ? IotDeviceMessage.buildStateOnline()
|
IotDeviceMessage message = online ? IotDeviceMessage.buildStateUpdateOnline()
|
||||||
: IotDeviceMessage.buildStateOffline();
|
: IotDeviceMessage.buildStateOffline();
|
||||||
|
|
||||||
// 3. 发送设备状态消息
|
// 3. 发送设备状态消息
|
||||||
|
|||||||
@ -78,7 +78,7 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
|
|||||||
Assert.notBlank(token, "生成 token 不能为空位");
|
Assert.notBlank(token, "生成 token 不能为空位");
|
||||||
|
|
||||||
// 3. 执行上线
|
// 3. 执行上线
|
||||||
IotDeviceMessage message = IotDeviceMessage.buildStateOnline();
|
IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline();
|
||||||
deviceMessageService.sendDeviceMessage(message,
|
deviceMessageService.sendDeviceMessage(message,
|
||||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
|
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user