add 增加 ruoyi-common-mqtt 模块

This commit is contained in:
疯狂的狮子Li
2026-03-19 10:14:08 +08:00
parent ac8bf12aa5
commit 261d00131e
11 changed files with 304 additions and 0 deletions

View File

@@ -55,6 +55,8 @@
<anyline.version>8.7.3-20251210</anyline.version>
<!-- 工作流配置 -->
<warm-flow.version>1.8.4</warm-flow.version>
<!-- mqtt客户端 -->
<mica-mqtt.version>2.5.11</mica-mqtt.version>
<!-- mq配置 -->
<rocketmq.version>2.3.4</rocketmq.version>
@@ -382,6 +384,12 @@
<version>${warm-flow.version}</version>
</dependency>
<dependency>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
<version>${mica-mqtt.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@@ -43,6 +43,7 @@
<module>ruoyi-common-nacos</module>
<module>ruoyi-common-bus</module>
<module>ruoyi-common-sse</module>
<module>ruoyi-common-mqtt</module>
</modules>
<artifactId>ruoyi-common</artifactId>

View File

@@ -244,6 +244,12 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-mqtt</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ruoyi-common-mqtt</artifactId>
<description>
ruoyi-common-mqtt mqtt模块
</description>
<dependencies>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-core</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-json</artifactId>
</dependency>
<dependency>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,73 @@
package org.dromara.common.mqtt.config;
import org.dromara.common.mqtt.listener.MqttClientConnectListener;
import org.dromara.common.mqtt.listener.MqttClientGlobalMessageListener;
import org.dromara.mica.mqtt.core.client.MqttClientCreator;
import org.dromara.mica.mqtt.core.client.MqttClientCustomizer;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.VirtualThreadTaskExecutor;
import org.tio.utils.thread.ThreadUtils;
import org.tio.utils.thread.pool.SynThreadPoolExecutor;
import org.tio.utils.thread.pool.TioCallerRunsPolicy;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* mqtt客户端配置初始化
* <p>
* 用法文档 <a href="https://mica-mqtt.dreamlu.net/guide/spring/client.html">...</a>
* 测试server搭建:
* 可执行下载其他mqtt服务端搭建
* 也可使用 mica自带的server搭建 <a href="https://mica-mqtt.dreamlu.net/guide/spring/server.html">...</a>
*
* @author Lion Li
*/
@AutoConfiguration
@ConditionalOnProperty(value = "mqtt.client.enabled", havingValue = "true")
public class MqttAutoConfiguration {
@Bean
public MqttClientConnectListener mqttClientConnectListener(MqttClientCreator mqttClientCreator) {
return new MqttClientConnectListener(mqttClientCreator);
}
@Bean
public MqttClientGlobalMessageListener mqttClientGlobalMessageListener() {
return new MqttClientGlobalMessageListener();
}
/**
* 客户端使用虚拟线程配置
*/
@Bean
public MqttClientCustomizer mqttClientCustomizer() {
return creator -> {
// 这个数不重要 已经使用虚拟线程 就是填一下防止报错
int corePoolSize = ThreadUtils.CORE_POOL_SIZE;
ThreadFactory factory = new VirtualThreadTaskExecutor("tio-worker-virtual").getVirtualThreadFactory();
SynThreadPoolExecutor tioExecutor = new SynThreadPoolExecutor(corePoolSize, corePoolSize,
0L, new LinkedBlockingQueue<>(), factory, new TioCallerRunsPolicy());
tioExecutor.prestartCoreThread();
creator.tioExecutor(tioExecutor);
ThreadFactory factory1 = new VirtualThreadTaskExecutor("tio-group-virtual").getVirtualThreadFactory();
ThreadPoolExecutor groupExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize,
0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory1, new TioCallerRunsPolicy());
groupExecutor.prestartCoreThread();
creator.groupExecutor(groupExecutor);
ThreadFactory factory2 = new VirtualThreadTaskExecutor("biz-worker-virtual").getVirtualThreadFactory();
ThreadPoolExecutor mqttExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize,
0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory2, new TioCallerRunsPolicy());
mqttExecutor.prestartCoreThread();
creator.mqttExecutor(mqttExecutor);
};
}
}

View File

@@ -0,0 +1,37 @@
package org.dromara.common.mqtt.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.core.client.IMqttClientConnectListener;
import org.dromara.mica.mqtt.core.client.MqttClientCreator;
import org.tio.core.ChannelContext;
/**
* 客户端连接状态监听
*
* @author Lion Li
*/
@Slf4j
public class MqttClientConnectListener implements IMqttClientConnectListener {
//
private final MqttClientCreator mqttClientCreator;
public MqttClientConnectListener(MqttClientCreator mqttClientCreator) {
this.mqttClientCreator = mqttClientCreator;
}
@Override
public void onConnected(ChannelContext context, boolean isReconnect) {
// 创建连接
log.info("MqttConnectedEvent:{}", context);
}
@Override
public void onDisconnect(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
// 离线时更新重连
log.info("MqttDisconnectEvent:{}", context, throwable);
// 在断线时更新 clientId、username、password
// mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
// .username("newUserName")
// .password("newPassword");
}
}

View File

@@ -0,0 +1,23 @@
package org.dromara.common.mqtt.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.core.client.IMqttClientGlobalMessageListener;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
/**
* 全局消息监听,可以监听到所有订阅消息
*
* @author Lion Li
*/
@Slf4j
public class MqttClientGlobalMessageListener implements IMqttClientGlobalMessageListener {
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
log.info("MqttGlobalMessageEvent => topic: {}, msg: {}", topic, new String(payload, StandardCharsets.UTF_8));
}
}

View File

@@ -0,0 +1 @@
org.dromara.common.mqtt.config.MqttAutoConfiguration

View File

@@ -88,6 +88,11 @@
<artifactId>ruoyi-common-sensitive</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-mqtt</artifactId>
</dependency>
<!-- 测试消息总线使用 搭配 TestBusController -->
<!-- <dependency>-->
<!-- <groupId>org.dromara</groupId>-->

View File

@@ -0,0 +1,76 @@
package org.dromara.demo.controller;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.demo.domain.TestDemo;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.core.annotation.MqttClientSubscribe;
import org.dromara.mica.mqtt.core.deserialize.MqttJsonDeserializer;
import org.dromara.mica.mqtt.spring.client.MqttClientTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
/**
* mqtt 演示案例
* <p>
* 用法文档 <a href="https://mica-mqtt.dreamlu.net/guide/spring/client.html">...</a>
* 测试server搭建:
* 可执行下载其他mqtt服务端搭建
* 也可使用 mica自带的server搭建 <a href="https://mica-mqtt.dreamlu.net/guide/spring/server.html">...</a>
*
* @author Lion Li
*/
@RequiredArgsConstructor
@RestController
@RequestMapping("/demo/mqtt")
@Slf4j
public class MqttController {
@Lazy
@Autowired
private MqttClientTemplate client;
@GetMapping("/send")
public boolean send() {
client.publish("/test/client", "测试测试".getBytes(StandardCharsets.UTF_8));
return true;
}
@MqttClientSubscribe("/test/#")
public void subQos0(String topic, byte[] payload) {
log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.QOS1)
public void subQos1(String topic, byte[] payload) {
log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register")
public void thingSubRegister(String topic, byte[] payload) {
// 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
// 注意mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。
log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe(
value = "/test/json",
deserialize = MqttJsonDeserializer.class // 2.4.5 开始支持 自定义序列化,默认 json 序列化
)
public void testJson(String topic, MqttPublishMessage message, TestDemo data) {
// 2.4.5 开始支持,支持 2 到 3 个参数,字段类型映射规则如下
// String 字符串会默认映射到 topic
// MqttPublishMessage 会默认映射到 原始的消息,可以拿到 mqtt5 的 props 参数
// byte[] 会映射到 mqtt 消息内容 payload
// ByteBuffer 会映射到 mqtt 消息内容 payload
// 其他类型会走序列化,确保消息能够序列化,默认为 json 序列化
log.info("topic:{} json data:{}", topic, data);
}
}

View File

@@ -116,3 +116,43 @@ easy-es:
enable-track-total-hits: true
# 数据刷新策略,默认为不刷新
refresh-policy: immediate
--- # mqtt 配置
# 具体配置还需查看文档
# https://mica-mqtt.dreamlu.net/guide/spring/client.html
mqtt.client:
# 是否开启客户端默认true
enabled: false
# 连接的服务端 ip 默认127.0.0.1
ip: 127.0.0.1
# 端口默认1883
port: 1883
# 客户端名称
name: Mqtt-Client
# 客户端Id非常重要一般为设备 sn不可重复
client-id: 000001
username: ruoyi
password: 123456
# 超时时间单位默认5秒
timeout: 5
# 重连时间,默认 5000 毫秒
re-interval: 5000
# mqtt 协议版本,可选 MQTT_3_1、mqtt_3_1_1、mqtt_5默认mqtt_3_1_1
version: mqtt_3_1_1
# 接收数据的 buffer size默认8k
read-buffer-size: 8KB
# 消息解析最大 bytes 长度默认10M
max-bytes-in-message: 10MB
# keep-alive 时间,单位:秒
keep-alive-secs: 60
# 开启保留 session 时session 的有效期
session-expiry-interval-secs: 0
# 工作线程数,如果消息量比较大,例如做 emqx 的转发消息处理,可以调大此参数
biz-thread-pool-size: 2
# 是否开启 ssl
ssl:
enabled: false
keystore-path:
keystore-pass:
truststore-path:
truststore-pass: