diff --git a/mallchat-chat-server/src/main/resources/application.yml b/mallchat-chat-server/src/main/resources/application.yml index 961a9f9..88a3e29 100644 --- a/mallchat-chat-server/src/main/resources/application.yml +++ b/mallchat-chat-server/src/main/resources/application.yml @@ -1,87 +1,87 @@ logging: - level: - org.springframework.web: INFO - com.github.binarywang.demo.wx.mp: DEBUG - me.chanjar.weixin: DEBUG + level: + org.springframework.web: INFO + com.github.binarywang.demo.wx.mp: DEBUG + me.chanjar.weixin: DEBUG mybatis-plus: - mapper-locations: classpath:mapper/**/*.xml - configuration: - log-impl: org.apache.ibatis.logging.stdout.StdOutImpl - map-underscore-to-camel-case: true + mapper-locations: classpath:mapper/**/*.xml + configuration: + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl + map-underscore-to-camel-case: true spring: - profiles: - #运行的环境 - active: my-test - application: - name: mallchat - datasource: - url: jdbc:mysql://${mallchat.mysql.ip}:${mallchat.mysql.port}/${mallchat.mysql.db}?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai - username: ${mallchat.mysql.username} - password: ${mallchat.mysql.password} - driver-class-name: com.mysql.cj.jdbc.Driver - hikari: - minimum-idle: 3 - maximum-pool-size: 10 - max-lifetime: 30000 #不能小于30秒,否则默认回到1800秒 - connection-test-query: SELECT 1 - mvc: - pathmatch: - matching-strategy: ANT_PATH_MATCHER - redis: - # Redis服务器地址 - host: ${mallchat.redis.host} - # Redis服务器端口号 - port: ${mallchat.redis.port} - # 使用的数据库索引,默认是0 - database: 0 - # 连接超时时间 - timeout: 1800000 - # 设置密码 - password: ${mallchat.redis.password} - jackson: - serialization: - write-dates-as-timestamps: true + profiles: + #运行的环境 + active: my-test + application: + name: mallchat + datasource: + url: jdbc:mysql://${mallchat.mysql.ip}:${mallchat.mysql.port}/${mallchat.mysql.db}?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai + username: ${mallchat.mysql.username} + password: ${mallchat.mysql.password} + driver-class-name: com.mysql.cj.jdbc.Driver + hikari: + minimum-idle: 3 + maximum-pool-size: 10 + max-lifetime: 30000 #不能小于30秒,否则默认回到1800秒 + connection-test-query: SELECT 1 + mvc: + pathmatch: + matching-strategy: ANT_PATH_MATCHER + redis: + # Redis服务器地址 + host: ${mallchat.redis.host} + # Redis服务器端口号 + port: ${mallchat.redis.port} + # 使用的数据库索引,默认是0 + database: 0 + # 连接超时时间 + timeout: 1800000 + # 设置密码 + password: ${mallchat.redis.password} + jackson: + serialization: + write-dates-as-timestamps: true jwt: - secret: ${mallchat.jwt.secret} + secret: ${mallchat.jwt.secret} wx: - mp: - # callback: http://f4cd-113-92-129-127.ngrok.io - callback: ${mallchat.wx.callback} - configs: - - appId: ${mallchat.wx.appId} # 第一个公众号的appid - secret: ${mallchat.wx.secret} # 公众号的appsecret - token: ${mallchat.wx.token} # 接口配置里的Token值 - aesKey: ${mallchat.wx.aesKey} # 接口配置里的EncodingAESKey值 + mp: + # callback: http://f4cd-113-92-129-127.ngrok.io + callback: ${mallchat.wx.callback} + configs: + - appId: ${mallchat.wx.appId} # 第一个公众号的appid + secret: ${mallchat.wx.secret} # 公众号的appsecret + token: ${mallchat.wx.token} # 接口配置里的Token值 + aesKey: ${mallchat.wx.aesKey} # 接口配置里的EncodingAESKey值 chatai: - chatgpt: - use: ${mallchat.chatgpt.use} - AIUserId: ${mallchat.chatgpt.uid} - key: ${mallchat.chatgpt.key} - proxyUrl: ${mallchat.chatgpt.proxyUrl} - chatglm2: - use: ${mallchat.chatglm2.use} - url: ${mallchat.chatglm2.url} - minute: 3 # 每个用户每3分钟可以请求一次 - AIUserId: ${mallchat.chatglm2.uid} + chatgpt: + use: ${mallchat.chatgpt.use} + AIUserId: ${mallchat.chatgpt.uid} + key: ${mallchat.chatgpt.key} + proxyUrl: ${mallchat.chatgpt.proxyUrl} + chatglm2: + use: ${mallchat.chatglm2.use} + url: ${mallchat.chatglm2.url} + minute: 3 # 每个用户每3分钟可以请求一次 + AIUserId: ${mallchat.chatglm2.uid} rocketmq: - name-server: ${rocketmq.name-server} - # 默认的消息组 - producer: - group: chatGroup - send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。 - compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B - max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B - retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。 - retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。 - retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false - access-key: ${rocketmq.access-key} # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 - secret-key: ${rocketmq.secret-key} # Secret Key - enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档 - customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。 - # Consumer 配置项 - consumer: - access-key: ${rocketmq.access-key} # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 - secret-key: ${rocketmq.secret-key} # Secret Key - listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, > 。默认情况下,不配置表示监听。 - erbadagang-consumer-group: - topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费 + name-server: ${rocketmq.name-server} + # 默认的消息组 + producer: + group: chatGroup + send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。 + compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B + max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B + retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。 + retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。 + retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false + access-key: ${rocketmq.access-key} # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 + secret-key: ${rocketmq.secret-key} # Secret Key + enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档 + customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。 + # Consumer 配置项 + consumer: + access-key: ${rocketmq.access-key} # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 + secret-key: ${rocketmq.secret-key} # Secret Key + listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, > 。默认情况下,不配置表示监听。 + erbadagang-consumer-group: + topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费 diff --git a/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/FixWindow.java b/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/FixWindow.java new file mode 100644 index 0000000..72c7dba --- /dev/null +++ b/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/FixWindow.java @@ -0,0 +1,28 @@ +package com.abin.frequencycontrol.service.frequencycontrol.single; + +public class FixWindow { + public Integer count; //当前窗口累计请求数 + public long lastAcquireTime;//最后一次请求时间 + public Long windowInMillis; //固定窗口时间区间(毫秒) + public Integer maxRequests; // 最大请求限制 + + public FixWindow(Long windowInMillis, Integer maxRequests) { + this.windowInMillis = windowInMillis; + this.maxRequests = maxRequests; + } + + public synchronized boolean tryAcquire() { + long currentTime = System.currentTimeMillis(); //获取系统当前时间 + //当前和上次不在同一时间窗口 + if (currentTime - lastAcquireTime > windowInMillis) { + count = 0; // 计数器清0 + lastAcquireTime = currentTime; //开启新的时间窗口 + } else { //同一窗口内 + if (count < maxRequests) { // 小于阀值 + count++; //计数统计器加1 + return true; + } + } + return false; + } +} diff --git a/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/LeakyBucketRateLimiter.java b/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/LeakyBucketRateLimiter.java new file mode 100644 index 0000000..8e33563 --- /dev/null +++ b/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/LeakyBucketRateLimiter.java @@ -0,0 +1,39 @@ +package com.abin.frequencycontrol.service.frequencycontrol.single; + +public class LeakyBucketRateLimiter { + private final int capacity; // 桶的容量 + private final int rate; // 出桶速率 (每秒的请求数) + private long lastRequestTime; // 上一个请求的时间戳 + + public LeakyBucketRateLimiter(int capacity, int rate) { + this.capacity = capacity; + this.rate = rate; + this.lastRequestTime = System.currentTimeMillis(); + } + + /** + * 尝试请求 + * + * @return >0 请求进入桶里,返回的是需要休眠的时间。 + */ + public synchronized long tryAcquire() { + //当前时间 + long currentTime = System.currentTimeMillis(); + //漏桶空的 + if (currentTime > lastRequestTime) { + lastRequestTime = currentTime; + return 0; // 请求被允许 + } + //上次取水的间隔时间 + long elapsedTime = lastRequestTime - currentTime; + // 计算桶中的水量 + int water = (int) elapsedTime * rate / 1000; + //水量不超过容量 + if (water < capacity) { + long sleepTime = (1000 / rate) + elapsedTime; + lastRequestTime = currentTime + sleepTime; + return sleepTime; + } + return -1; // 请求被拒绝 + } +} diff --git a/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/SlideWindow.java b/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/SlideWindow.java new file mode 100644 index 0000000..e38519f --- /dev/null +++ b/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/SlideWindow.java @@ -0,0 +1,36 @@ +package com.abin.frequencycontrol.service.frequencycontrol.single; + +import java.util.LinkedList; + +public class SlideWindow { + + private final int maxRequests;//最大请求 + private final long windowInMillis;//窗口范围 + private LinkedList requestTimestamps;//每个请求的时间戳 + + public SlideWindow(int maxRequests, long windowInMillis) { + this.maxRequests = maxRequests; + this.windowInMillis = windowInMillis; + this.requestTimestamps = new LinkedList<>(); + } + + public synchronized boolean tryAcquire() { + long currentTime = System.currentTimeMillis(); + //清除过期窗口的请求 + cleanExpiredRequests(currentTime); + //统计窗口内的请求数小于总限制 + if (requestTimestamps.size() < maxRequests) { + requestTimestamps.addLast(currentTime); + return true; + } + + return false; + } + + private void cleanExpiredRequests(long currentTime) { + //由于是LinkedList,头结点就是最早的请求,判断超出时间窗口就移除,留下的都是窗口内的请求 + while (!requestTimestamps.isEmpty() && (currentTime - requestTimestamps.getFirst() > windowInMillis)) { + requestTimestamps.removeFirst(); + } + } +} diff --git a/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/Test.java b/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/Test.java new file mode 100644 index 0000000..65ef931 --- /dev/null +++ b/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/Test.java @@ -0,0 +1,157 @@ +package com.abin.frequencycontrol.service.frequencycontrol.single; + +import lombok.SneakyThrows; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class Test { + private static AtomicInteger pass1 = new AtomicInteger(); + private static AtomicInteger pass2 = new AtomicInteger(); + private static AtomicInteger pass3 = new AtomicInteger(); + private static AtomicInteger pass4 = new AtomicInteger(); + private static AtomicInteger total = new AtomicInteger(); + private static List list1 = new ArrayList<>(); + private static List list2 = new ArrayList<>(); + private static List list3 = new ArrayList<>(); + private static List list4 = new ArrayList<>(); + private static List listTotal = new ArrayList<>(); + private static List x = new ArrayList<>(); + private static volatile boolean stop = false; + private static final Integer QPS = 20; + private static final Integer accuracy = 4; + + + public static void main(String[] args) throws InterruptedException { + List list = Arrays.asList(2, 2, 5, 20, 20, 20, 10, 5, 30, 30, 30, 2, 2, 2, 2, 2, 2, 2, 20, 20, 10, 10, 20, 10, 10, 5, 5, 5, 5, 2, 6, 4, 5, 7, 30, 30, 20, 20, 10, 20, 10, 20, 10, 20, 30, 20, 20, 10, 20, 10, 20, 10, 20, 30, 20, 20, 10, 20, 3, 3, 3, 20, 30, 20, 20, 10, 20, 10, 20, 10, 20); + List time = split(list); + AtomicInteger index = new AtomicInteger(0); + tick(); + TokenBucketRateLimiter tokenBucketRateLimiter = new TokenBucketRateLimiter(30, QPS); + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); + FixWindow fixWindow = new FixWindow(1000L, QPS); + SlideWindow slideWindow = new SlideWindow(QPS, 1000L); + LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(30, QPS); + ExecutorService executorService = Executors.newFixedThreadPool(100); + ExecutorService common = Executors.newFixedThreadPool(100); + scheduledExecutorService.scheduleAtFixedRate(() -> { + int andIncrement = index.getAndIncrement(); + if (andIncrement < time.size()) { + Integer integer = time.get(andIncrement); + for (Integer i = 0; i < integer; i++) { + common.execute(() -> { + total.incrementAndGet(); + }); + common.execute(() -> { + fixWindowLimit(fixWindow); + }); + common.execute(() -> { + slideWindowLimit(slideWindow); + }); + executorService.execute(() -> { + leakyBucketRateLimit(leakyBucketRateLimiter); + }); + common.execute(() -> { + tokenBucketRateLimit(tokenBucketRateLimiter); + }); + } + } else { + stop = true; + } + + }, 0, 250 / accuracy, TimeUnit.MILLISECONDS); + Thread.sleep(100000); + } + + private static List split(List result) { + ArrayList res = new ArrayList<>(result.size() * 4); + for (int i = 0; i < result.size(); i++) { + Integer integer = result.get(i); + Integer divisor = integer / accuracy; + if (divisor == 0) divisor = 1; + res.add(Math.min(integer, divisor)); + integer -= divisor; + res.add(Math.min(integer, divisor)); + integer -= divisor; + res.add(Math.min(integer, divisor)); + integer -= divisor; + res.add(Math.min(integer, divisor)); + } + return res; + } + + private static void tokenBucketRateLimit(TokenBucketRateLimiter tokenBucketRateLimiter) { + if (tokenBucketRateLimiter.tryAcquire()) { + pass4.incrementAndGet(); + } + } + + private static void leakyBucketRateLimit(LeakyBucketRateLimiter leakyBucketRateLimiter) { + long l = leakyBucketRateLimiter.tryAcquire(); + if (l == 0) { + pass3.incrementAndGet(); + } else if (l > 0) { + try { + Thread.sleep(l); + } catch (InterruptedException e) { + e.printStackTrace(); + } + pass3.incrementAndGet(); + } + } + + private static void slideWindowLimit(SlideWindow fixWindow) { + boolean b = fixWindow.tryAcquire(); + if (b) { + pass2.incrementAndGet(); + } + } + + private static void fixWindowLimit(FixWindow fixWindow) { + boolean b = fixWindow.tryAcquire(); + if (b) { + pass1.incrementAndGet(); + } + } + + private static void limit(List result) { + } + + private static void tick() { + Thread timer = new Thread(new TimerTask()); + timer.setName("sentinel-timer-task"); + timer.start(); + } + + static class TimerTask implements Runnable { + @SneakyThrows + @Override + public void run() { + long start = System.currentTimeMillis(); + int i = 0; + while (!stop) { + i++; + Thread.sleep(250); + list1.add(pass1.getAndSet(0)); + list2.add(pass2.getAndSet(0)); + list3.add(pass3.getAndSet(0)); + list4.add(pass4.getAndSet(0)); + listTotal.add(total.getAndSet(0)); + x.add(i * 0.25); + } + System.out.println(x); + System.out.println(listTotal); + System.out.println(list1); + System.out.println(list2); + System.out.println(list3); + System.out.println(list4); + + } + } +} diff --git a/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/TokenBucketRateLimiter.java b/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/TokenBucketRateLimiter.java new file mode 100644 index 0000000..721bda6 --- /dev/null +++ b/mallchat-tools/mallchat-frequency-control/src/main/java/com/abin/frequencycontrol/service/frequencycontrol/single/TokenBucketRateLimiter.java @@ -0,0 +1,41 @@ +package com.abin.frequencycontrol.service.frequencycontrol.single; + +public class TokenBucketRateLimiter { + private final int capacity; // 令牌桶容量 + private final int rate; // 令牌产生速率 (每秒的令牌数) + private int tokens; // 当前令牌数量 + private long lastRefillTime; // 上次令牌补充时间 + private long left = 0; + + public TokenBucketRateLimiter(int capacity, int rate) { + this.capacity = capacity; + this.rate = rate; + this.tokens = capacity; + this.lastRefillTime = System.currentTimeMillis(); + } + + public synchronized boolean tryAcquire() { + refillTokens(); + if (tokens > 0) { + tokens--; + return true; + } + return false; + } + + //补充令牌 + private void refillTokens() { + long currentTime = System.currentTimeMillis(); + //当前时间和上次请求时间相差的令牌数 + long elapsedTime = currentTime - lastRefillTime; + + if (elapsedTime > 0) { + //需要补充的令牌 + int newTokens = (int) ((elapsedTime * rate + left) / 1000); + left = (elapsedTime * rate + left) % 1000; + //令牌不能超过桶的大小 + tokens = Math.min(tokens + newTokens, capacity); + lastRefillTime = currentTime; + } + } +}