From d8d9c2d3d6dfa099173e4ba7b85d9e460eb46fb6 Mon Sep 17 00:00:00 2001 From: shenkaiwen5 Date: Mon, 15 Nov 2021 10:55:46 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4=E8=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jd/platform/async/executor/Async.java | 13 ++ .../executor/wheel/TimeOutCheckMession.java | 42 +++++++ .../async/executor/wheel/TimeWheel.java | 112 ++++++++++++++++++ .../platform/async/executor/wheel/Timer.java | 74 ++++++++++++ .../async/executor/wheel/TimerTask.java | 58 +++++++++ .../async/executor/wheel/TimerTaskList.java | 98 +++++++++++++++ .../platform/async/wrapper/WorkerWrapper.java | 24 +++- 7 files changed, 419 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/jd/platform/async/executor/wheel/TimeOutCheckMession.java create mode 100644 src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java create mode 100644 src/main/java/com/jd/platform/async/executor/wheel/Timer.java create mode 100644 src/main/java/com/jd/platform/async/executor/wheel/TimerTask.java create mode 100644 src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java diff --git a/src/main/java/com/jd/platform/async/executor/Async.java b/src/main/java/com/jd/platform/async/executor/Async.java index f88d50b..b7a4d96 100644 --- a/src/main/java/com/jd/platform/async/executor/Async.java +++ b/src/main/java/com/jd/platform/async/executor/Async.java @@ -3,6 +3,7 @@ package com.jd.platform.async.executor; import com.jd.platform.async.callback.DefaultGroupCallback; import com.jd.platform.async.callback.IGroupCallback; +import com.jd.platform.async.executor.wheel.Timer; import com.jd.platform.async.wrapper.WorkerWrapper; import java.util.*; @@ -23,6 +24,10 @@ public class Async { * 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个 */ private static ExecutorService executorService; + /** + * 超时管理时间轮 + */ + private static Timer timer = new Timer(); /** * 出发点 @@ -154,4 +159,12 @@ public class Async { " completedCount " + COMMON_POOL.getCompletedTaskCount() + " largestCount " + COMMON_POOL.getLargestPoolSize(); } + + public static Timer getTimer() { + return timer; + } + + public static void setTimer(Timer timer) { + Async.timer = timer; + } } diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeOutCheckMession.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeOutCheckMession.java new file mode 100644 index 0000000..3db9edc --- /dev/null +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimeOutCheckMession.java @@ -0,0 +1,42 @@ +package com.jd.platform.async.executor.wheel; + +import com.jd.platform.async.wrapper.WorkerWrapper; + +/** + * 任务的超时检查 + * @author shenkaiwen5 + * @version 1.0 + * @date 2021-11-12 + */ +public class TimeOutCheckMession implements Runnable { + + /** + * 任务 + */ + private WorkerWrapper wrapper; + + /** + * 状态 + */ + private static final int WORKING = 3; + private static final int INIT = 0; + + public TimeOutCheckMession(WorkerWrapper wrapper) { + this.wrapper = wrapper; + } + + /** + * 超时定义: + * + * 1.单个依赖: 当前任务超时,全链路超时 + * 2.多个依赖: 当前任务超时, + */ + + @Override + public void run() { + //超时时间已到,查看是否为INIT或RUN,是的话自身快速失败 + if (wrapper.getState() == INIT || wrapper.getState() == WORKING) { + wrapper.fastFail(wrapper.getState(), null); + } + } +} diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java new file mode 100644 index 0000000..a317941 --- /dev/null +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java @@ -0,0 +1,112 @@ +package com.jd.platform.async.executor.wheel; + +import java.util.concurrent.DelayQueue; + +/** + * 时间轮 + */ +public class TimeWheel { + + /** + * 一个时间槽的范围 + */ + private long tickMs; + + /** + * 时间轮大小 + */ + private int wheelSize; + + /** + * 时间跨度 + */ + private long interval; + + /** + * 时间槽 + */ + private TimerTaskList[] timerTaskLists; + + /** + * 当前时间 + */ + private long currentTime; + + /** + * 上层时间轮 + */ + private volatile TimeWheel overflowWheel; + + /** + * 一个Timer只有一个delayQueue + */ + private DelayQueue delayQueue; + + public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue delayQueue) { + this.currentTime = currentTime; + this.tickMs = tickMs; + this.wheelSize = wheelSize; + this.interval = tickMs * wheelSize; + this.timerTaskLists = new TimerTaskList[wheelSize]; + //currentTime为tickMs的整数倍 这里做取整操作 + this.currentTime = currentTime - (currentTime % tickMs); + this.delayQueue = delayQueue; + for (int i = 0; i < wheelSize; i++) { + timerTaskLists[i] = new TimerTaskList(); + } + } + + /** + * 创建或者获取上层时间轮 + */ + private TimeWheel getOverflowWheel() { + if (overflowWheel == null) { + synchronized (this) { + if (overflowWheel == null) { + overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue); + } + } + } + return overflowWheel; + } + + /** + * 添加任务到时间轮 + */ + public boolean addTask(TimerTask timerTask) { + long expiration = timerTask.getDelayMs(); + //过期任务直接执行 + if (expiration < currentTime + tickMs) { + return false; + } else if (expiration < currentTime + interval) { + //当前时间轮可以容纳该任务 加入时间槽 + Long virtualId = expiration / tickMs; + int index = (int) (virtualId % wheelSize); + System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration); + TimerTaskList timerTaskList = timerTaskLists[index]; + timerTaskList.addTask(timerTask); + if (timerTaskList.setExpiration(virtualId * tickMs)) { + //添加到delayQueue中 + delayQueue.offer(timerTaskList); + } + } else { + //放到上一层的时间轮 + TimeWheel timeWheel = getOverflowWheel(); + timeWheel.addTask(timerTask); + } + return true; + } + + /** + * 推进时间 + */ + public void advanceClock(long timestamp) { + if (timestamp >= currentTime + tickMs) { + currentTime = timestamp - (timestamp % tickMs); + if (overflowWheel != null) { + //推进上层时间轮时间 + this.getOverflowWheel().advanceClock(timestamp); + } + } + } +} diff --git a/src/main/java/com/jd/platform/async/executor/wheel/Timer.java b/src/main/java/com/jd/platform/async/executor/wheel/Timer.java new file mode 100644 index 0000000..30f86d2 --- /dev/null +++ b/src/main/java/com/jd/platform/async/executor/wheel/Timer.java @@ -0,0 +1,74 @@ +package com.jd.platform.async.executor.wheel; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * 定时器 + */ +public class Timer { + + /** + * 底层时间轮 + */ + private TimeWheel timeWheel; + + /** + * 一个Timer只有一个delayQueue + */ + private DelayQueue delayQueue = new DelayQueue<>(); + + /** + * 过期任务执行线程 + */ + private ExecutorService workerThreadPool; + + /** + * 轮询delayQueue获取过期任务线程 + */ + private ExecutorService bossThreadPool; + + /** + * 构造函数 + */ + public Timer() { + timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue); + workerThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1); + bossThreadPool = Executors.newFixedThreadPool(1); + //20ms获取一次过期任务 + bossThreadPool.submit(() -> { + while (true) { + this.advanceClock(20); + } + }); + } + + /** + * 添加任务 + */ + public void addTask(TimerTask timerTask) { + //添加失败任务直接执行 + if (!timeWheel.addTask(timerTask)) { + workerThreadPool.submit(timerTask.getTask()); + } + } + + /** + * 获取过期任务 + */ + private void advanceClock(long timeout) { + try { + TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS); + if (timerTaskList != null) { + //推进时间 + timeWheel.advanceClock(timerTaskList.getExpiration()); + //执行过期任务(包含降级操作) + timerTaskList.flush(this::addTask); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimerTask.java b/src/main/java/com/jd/platform/async/executor/wheel/TimerTask.java new file mode 100644 index 0000000..f5e0279 --- /dev/null +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimerTask.java @@ -0,0 +1,58 @@ +package com.jd.platform.async.executor.wheel; + +/** + * 任务 + */ +public class TimerTask { + + /** + * 延迟时间 + */ + private long delayMs; + + /** + * 任务 + */ + private Runnable task; + + /** + * 时间槽 + */ + protected TimerTaskList timerTaskList; + + /** + * 下一个节点 + */ + protected TimerTask next; + + /** + * 上一个节点 + */ + protected TimerTask pre; + + /** + * 描述 + */ + public String desc; + + public TimerTask(long delayMs, Runnable task) { + this.delayMs = System.currentTimeMillis() + delayMs; + this.task = task; + this.timerTaskList = null; + this.next = null; + this.pre = null; + } + + public Runnable getTask() { + return task; + } + + public long getDelayMs() { + return delayMs; + } + + @Override + public String toString() { + return desc; + } +} diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java b/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java new file mode 100644 index 0000000..d25b962 --- /dev/null +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java @@ -0,0 +1,98 @@ +package com.jd.platform.async.executor.wheel; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +/** + * 时间槽 + */ +public class TimerTaskList implements Delayed { + + /** + * 过期时间 + */ + private AtomicLong expiration = new AtomicLong(-1L); + + /** + * 根节点 + */ + private TimerTask root = new TimerTask(-1L, null); + + { + root.pre = root; + root.next = root; + } + + /** + * 设置过期时间 + */ + public boolean setExpiration(long expire) { + return expiration.getAndSet(expire) != expire; + } + + /** + * 获取过期时间 + */ + public long getExpiration() { + return expiration.get(); + } + + /** + * 新增任务 + */ + public void addTask(TimerTask timerTask) { + synchronized (this) { + if (timerTask.timerTaskList == null) { + timerTask.timerTaskList = this; + TimerTask tail = root.pre; + timerTask.next = root; + timerTask.pre = tail; + tail.next = timerTask; + root.pre = timerTask; + } + } + } + + /** + * 移除任务 + */ + public void removeTask(TimerTask timerTask) { + synchronized (this) { + if (timerTask.timerTaskList.equals(this)) { + timerTask.next.pre = timerTask.pre; + timerTask.pre.next = timerTask.next; + timerTask.timerTaskList = null; + timerTask.next = null; + timerTask.pre = null; + } + } + } + + /** + * 重新分配 + */ + public synchronized void flush(Consumer flush) { + TimerTask timerTask = root.next; + while (!timerTask.equals(root)) { + this.removeTask(timerTask); + flush.accept(timerTask); + timerTask = root.next; + } + expiration.set(-1L); + } + + @Override + public long getDelay(TimeUnit unit) { + return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); + } + + @Override + public int compareTo(Delayed o) { + if (o instanceof TimerTaskList) { + return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get()); + } + return 0; + } +} diff --git a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index d722e4d..bbb54f5 100755 --- a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -4,7 +4,10 @@ import com.jd.platform.async.callback.DefaultCallback; import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.exception.SkippedException; +import com.jd.platform.async.executor.Async; import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.executor.wheel.TimeOutCheckMession; +import com.jd.platform.async.executor.wheel.TimerTask; import com.jd.platform.async.worker.DependWrapper; import com.jd.platform.async.worker.ResultState; import com.jd.platform.async.worker.WorkResult; @@ -72,6 +75,10 @@ public class WorkerWrapper { * 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的 */ private volatile boolean needCheckNextWrapperResult = true; + /** + * 超时时间 + */ + private Long delayMs; private static final int FINISH = 1; private static final int ERROR = 2; @@ -289,6 +296,12 @@ public class WorkerWrapper { * 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程 */ private void fire() { + //如果对任务有单独超时设置 + if (delayMs != null) { + com.jd.platform.async.executor.wheel.TimerTask timerTask = new TimerTask(delayMs, new TimeOutCheckMession(this)); + Async.getTimer().addTask(timerTask); + } + //阻塞取结果 workResult = workerDoJob(); } @@ -296,7 +309,7 @@ public class WorkerWrapper { /** * 快速失败 */ - private boolean fastFail(int expect, Exception e) { + public boolean fastFail(int expect, Exception e) { //试图将它从expect状态,改成Error if (!compareAndSetState(expect, ERROR)) { return false; @@ -433,7 +446,7 @@ public class WorkerWrapper { } - private int getState() { + public int getState() { return state.get(); } @@ -604,6 +617,13 @@ public class WorkerWrapper { return wrapper; } + } + public Long getDelayMs() { + return delayMs; + } + + public void setDelayMs(Long delayMs) { + this.delayMs = delayMs; } }