From 31a8a686fdadf034ab4c4c3b2615a7cdd2450789 Mon Sep 17 00:00:00 2001 From: wuweifeng10 Date: Wed, 17 Nov 2021 12:04:40 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=BE=E7=BD=AE=E5=8D=95=E4=B8=AA=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4=E8=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../async/executor/timeout/ITimeoutTask.java | 19 ++++ .../async/executor/timeout/SingleTask.java | 61 +++++++++++ .../async/executor/timeout/WheelMain.java | 100 ++++++++++++++++++ 3 files changed, 180 insertions(+) create mode 100644 src/main/java/com/jd/platform/async/executor/timeout/ITimeoutTask.java create mode 100644 src/main/java/com/jd/platform/async/executor/timeout/SingleTask.java create mode 100644 src/main/java/com/jd/platform/async/executor/timeout/WheelMain.java diff --git a/src/main/java/com/jd/platform/async/executor/timeout/ITimeoutTask.java b/src/main/java/com/jd/platform/async/executor/timeout/ITimeoutTask.java new file mode 100644 index 0000000..1d2c60a --- /dev/null +++ b/src/main/java/com/jd/platform/async/executor/timeout/ITimeoutTask.java @@ -0,0 +1,19 @@ +package com.jd.platform.async.executor.timeout; + +/** + * 一个单一的任务 + * @author wuweifeng + * @version 1.0 + * @date 2021-11-16 + */ +public interface ITimeoutTask { + /** + * 超时时间 + */ + int delayMs(); + + /** + * 时间到了后的回调 + */ + void timeoutCallback(); +} diff --git a/src/main/java/com/jd/platform/async/executor/timeout/SingleTask.java b/src/main/java/com/jd/platform/async/executor/timeout/SingleTask.java new file mode 100644 index 0000000..5719b8e --- /dev/null +++ b/src/main/java/com/jd/platform/async/executor/timeout/SingleTask.java @@ -0,0 +1,61 @@ +package com.jd.platform.async.executor.timeout; + +/** + * 单个任务,任务最小单元 + * @author wuweifeng + * @version 1.0 + * @date 2021-11-16 + */ +public class SingleTask { + + /** + * 处于时间轮的第几圈,第0圈就代表本次要执行,第1圈代表下一次才执行自己 + */ + private int level; + /** + * 超时时间 + */ + private int delayMs; + /** + * 创建时时间 + */ + private long currentTime; + /** + * 可用于回调 + */ + private ITimeoutTask timeoutTask; + + /** + * 构建该task + */ + public static SingleTask build(ITimeoutTask timeoutTask) { + SingleTask singleTask = new SingleTask(); + singleTask.delayMs = timeoutTask.delayMs(); + singleTask.currentTime = System.currentTimeMillis(); + singleTask.timeoutTask = timeoutTask; + //计算在第几轮 + singleTask.level = singleTask.delayMs / WheelMain.MAX_SIZE; + + return singleTask; + } + + public void setLevel(int level) { + this.level = level; + } + + public int getLevel() { + return level; + } + + public int getDelayMs() { + return delayMs; + } + + public long getCurrentTime() { + return currentTime; + } + + public ITimeoutTask getTimeoutTask() { + return timeoutTask; + } +} diff --git a/src/main/java/com/jd/platform/async/executor/timeout/WheelMain.java b/src/main/java/com/jd/platform/async/executor/timeout/WheelMain.java new file mode 100644 index 0000000..56e64c5 --- /dev/null +++ b/src/main/java/com/jd/platform/async/executor/timeout/WheelMain.java @@ -0,0 +1,100 @@ +package com.jd.platform.async.executor.timeout; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 入口调度器 + * + * @author wuweifeng + * @version 1.0 + * @date 2021-11-16 + */ +public class WheelMain { + /** + * 共20个槽位 + */ + public static final int MAX_SIZE = 20; + /** + * 总的队列,外层是20个槽位,内层是每个槽位内的任务集合 + * 注意,即便是这个List,一样会发生在遍历时,同时有数据新插入的情况。造成无法对level进行减1的操作 + */ + private static List> allTaskList = new ArrayList<>(MAX_SIZE); + + /** + * 执行到的index + */ + private static final AtomicInteger INDEX = new AtomicInteger(0); + + /** + * 初始化 + */ + public static void init() { + for (int i = 0; i < MAX_SIZE; i++) { + CopyOnWriteArrayList list = new CopyOnWriteArrayList<>(); + allTaskList.add(list); + } + //单线程调度器,1ms执行一次。注意,当发生fullGc时,该定时器将不再能准确执行 + ScheduledExecutorService bossThreadPool = Executors.newSingleThreadScheduledExecutor(); + bossThreadPool.scheduleAtFixedRate(WheelMain::advanceClock, 0, 1, TimeUnit.MILLISECONDS); + } + + /** + * 获取过期任务并执行 + *

+ * 这个方法作为定时任务,1毫秒一执行 + */ + private static void advanceClock() { + try { + int index = INDEX.get(); + if (index >= MAX_SIZE) { + INDEX.set(index % MAX_SIZE); + } + //取当前走到的槽位 + CopyOnWriteArrayList timerTaskList = allTaskList.get(index % 20); + //索引前进一格 + INDEX.incrementAndGet(); + + //遍历槽内所有任务 + for (SingleTask singleTask : timerTaskList) { + //如果level = 0,代表时间到了,或者创建时间+超时时间>当前时间了,也是时间到了 + if (singleTask.getLevel() <= 0 || singleTask.getCurrentTime() + singleTask.getDelayMs() >= System.currentTimeMillis()) { + //给予回调 + singleTask.getTimeoutTask().timeoutCallback(); + //从列表删除 + timerTaskList.remove(singleTask); + } else { + singleTask.setLevel(singleTask.getLevel() - 1); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + + + /** + * 添加一个任务到时间轮的对应槽位 + */ + public synchronized static void addTask(ITimeoutTask timeoutTask) { + SingleTask singleTask = SingleTask.build(timeoutTask); + //获取超时时间 + int delay = timeoutTask.delayMs(); + if (delay <= 0) { + return; + } + //放到第几个槽 + int putIndex = INDEX.get() + delay % MAX_SIZE; + CopyOnWriteArrayList list = allTaskList.get(putIndex % MAX_SIZE); + //添加到该槽位的队列中 + list.add(singleTask); + } + + +}