mirror of
https://gitee.com/jd-platform-opensource/asyncTool.git
synced 2026-03-22 04:27:15 +08:00
超时时间轮
This commit is contained in:
@@ -3,6 +3,7 @@ package com.jd.platform.async.executor;
|
||||
|
||||
import com.jd.platform.async.callback.DefaultGroupCallback;
|
||||
import com.jd.platform.async.callback.IGroupCallback;
|
||||
import com.jd.platform.async.executor.wheel.Timer;
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
|
||||
import java.util.*;
|
||||
@@ -23,6 +24,10 @@ public class Async {
|
||||
* 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个
|
||||
*/
|
||||
private static ExecutorService executorService;
|
||||
/**
|
||||
* 超时管理时间轮
|
||||
*/
|
||||
private static Timer timer = new Timer();
|
||||
|
||||
/**
|
||||
* 出发点
|
||||
@@ -154,4 +159,12 @@ public class Async {
|
||||
" completedCount " + COMMON_POOL.getCompletedTaskCount() +
|
||||
" largestCount " + COMMON_POOL.getLargestPoolSize();
|
||||
}
|
||||
|
||||
public static Timer getTimer() {
|
||||
return timer;
|
||||
}
|
||||
|
||||
public static void setTimer(Timer timer) {
|
||||
Async.timer = timer;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.jd.platform.async.executor.wheel;
|
||||
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
|
||||
/**
|
||||
* 任务的超时检查
|
||||
* @author shenkaiwen5
|
||||
* @version 1.0
|
||||
* @date 2021-11-12
|
||||
*/
|
||||
public class TimeOutCheckMession implements Runnable {
|
||||
|
||||
/**
|
||||
* 任务
|
||||
*/
|
||||
private WorkerWrapper wrapper;
|
||||
|
||||
/**
|
||||
* 状态
|
||||
*/
|
||||
private static final int WORKING = 3;
|
||||
private static final int INIT = 0;
|
||||
|
||||
public TimeOutCheckMession(WorkerWrapper wrapper) {
|
||||
this.wrapper = wrapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* 超时定义:
|
||||
*
|
||||
* 1.单个依赖: 当前任务超时,全链路超时
|
||||
* 2.多个依赖: 当前任务超时,
|
||||
*/
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
//超时时间已到,查看是否为INIT或RUN,是的话自身快速失败
|
||||
if (wrapper.getState() == INIT || wrapper.getState() == WORKING) {
|
||||
wrapper.fastFail(wrapper.getState(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
package com.jd.platform.async.executor.wheel;
|
||||
|
||||
import java.util.concurrent.DelayQueue;
|
||||
|
||||
/**
|
||||
* 时间轮
|
||||
*/
|
||||
public class TimeWheel {
|
||||
|
||||
/**
|
||||
* 一个时间槽的范围
|
||||
*/
|
||||
private long tickMs;
|
||||
|
||||
/**
|
||||
* 时间轮大小
|
||||
*/
|
||||
private int wheelSize;
|
||||
|
||||
/**
|
||||
* 时间跨度
|
||||
*/
|
||||
private long interval;
|
||||
|
||||
/**
|
||||
* 时间槽
|
||||
*/
|
||||
private TimerTaskList[] timerTaskLists;
|
||||
|
||||
/**
|
||||
* 当前时间
|
||||
*/
|
||||
private long currentTime;
|
||||
|
||||
/**
|
||||
* 上层时间轮
|
||||
*/
|
||||
private volatile TimeWheel overflowWheel;
|
||||
|
||||
/**
|
||||
* 一个Timer只有一个delayQueue
|
||||
*/
|
||||
private DelayQueue<TimerTaskList> delayQueue;
|
||||
|
||||
public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
|
||||
this.currentTime = currentTime;
|
||||
this.tickMs = tickMs;
|
||||
this.wheelSize = wheelSize;
|
||||
this.interval = tickMs * wheelSize;
|
||||
this.timerTaskLists = new TimerTaskList[wheelSize];
|
||||
//currentTime为tickMs的整数倍 这里做取整操作
|
||||
this.currentTime = currentTime - (currentTime % tickMs);
|
||||
this.delayQueue = delayQueue;
|
||||
for (int i = 0; i < wheelSize; i++) {
|
||||
timerTaskLists[i] = new TimerTaskList();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建或者获取上层时间轮
|
||||
*/
|
||||
private TimeWheel getOverflowWheel() {
|
||||
if (overflowWheel == null) {
|
||||
synchronized (this) {
|
||||
if (overflowWheel == null) {
|
||||
overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
|
||||
}
|
||||
}
|
||||
}
|
||||
return overflowWheel;
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加任务到时间轮
|
||||
*/
|
||||
public boolean addTask(TimerTask timerTask) {
|
||||
long expiration = timerTask.getDelayMs();
|
||||
//过期任务直接执行
|
||||
if (expiration < currentTime + tickMs) {
|
||||
return false;
|
||||
} else if (expiration < currentTime + interval) {
|
||||
//当前时间轮可以容纳该任务 加入时间槽
|
||||
Long virtualId = expiration / tickMs;
|
||||
int index = (int) (virtualId % wheelSize);
|
||||
System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration);
|
||||
TimerTaskList timerTaskList = timerTaskLists[index];
|
||||
timerTaskList.addTask(timerTask);
|
||||
if (timerTaskList.setExpiration(virtualId * tickMs)) {
|
||||
//添加到delayQueue中
|
||||
delayQueue.offer(timerTaskList);
|
||||
}
|
||||
} else {
|
||||
//放到上一层的时间轮
|
||||
TimeWheel timeWheel = getOverflowWheel();
|
||||
timeWheel.addTask(timerTask);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 推进时间
|
||||
*/
|
||||
public void advanceClock(long timestamp) {
|
||||
if (timestamp >= currentTime + tickMs) {
|
||||
currentTime = timestamp - (timestamp % tickMs);
|
||||
if (overflowWheel != null) {
|
||||
//推进上层时间轮时间
|
||||
this.getOverflowWheel().advanceClock(timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package com.jd.platform.async.executor.wheel;
|
||||
|
||||
import java.util.concurrent.DelayQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 定时器
|
||||
*/
|
||||
public class Timer {
|
||||
|
||||
/**
|
||||
* 底层时间轮
|
||||
*/
|
||||
private TimeWheel timeWheel;
|
||||
|
||||
/**
|
||||
* 一个Timer只有一个delayQueue
|
||||
*/
|
||||
private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
|
||||
|
||||
/**
|
||||
* 过期任务执行线程
|
||||
*/
|
||||
private ExecutorService workerThreadPool;
|
||||
|
||||
/**
|
||||
* 轮询delayQueue获取过期任务线程
|
||||
*/
|
||||
private ExecutorService bossThreadPool;
|
||||
|
||||
/**
|
||||
* 构造函数
|
||||
*/
|
||||
public Timer() {
|
||||
timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);
|
||||
workerThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1);
|
||||
bossThreadPool = Executors.newFixedThreadPool(1);
|
||||
//20ms获取一次过期任务
|
||||
bossThreadPool.submit(() -> {
|
||||
while (true) {
|
||||
this.advanceClock(20);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加任务
|
||||
*/
|
||||
public void addTask(TimerTask timerTask) {
|
||||
//添加失败任务直接执行
|
||||
if (!timeWheel.addTask(timerTask)) {
|
||||
workerThreadPool.submit(timerTask.getTask());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取过期任务
|
||||
*/
|
||||
private void advanceClock(long timeout) {
|
||||
try {
|
||||
TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
|
||||
if (timerTaskList != null) {
|
||||
//推进时间
|
||||
timeWheel.advanceClock(timerTaskList.getExpiration());
|
||||
//执行过期任务(包含降级操作)
|
||||
timerTaskList.flush(this::addTask);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
package com.jd.platform.async.executor.wheel;
|
||||
|
||||
/**
|
||||
* 任务
|
||||
*/
|
||||
public class TimerTask {
|
||||
|
||||
/**
|
||||
* 延迟时间
|
||||
*/
|
||||
private long delayMs;
|
||||
|
||||
/**
|
||||
* 任务
|
||||
*/
|
||||
private Runnable task;
|
||||
|
||||
/**
|
||||
* 时间槽
|
||||
*/
|
||||
protected TimerTaskList timerTaskList;
|
||||
|
||||
/**
|
||||
* 下一个节点
|
||||
*/
|
||||
protected TimerTask next;
|
||||
|
||||
/**
|
||||
* 上一个节点
|
||||
*/
|
||||
protected TimerTask pre;
|
||||
|
||||
/**
|
||||
* 描述
|
||||
*/
|
||||
public String desc;
|
||||
|
||||
public TimerTask(long delayMs, Runnable task) {
|
||||
this.delayMs = System.currentTimeMillis() + delayMs;
|
||||
this.task = task;
|
||||
this.timerTaskList = null;
|
||||
this.next = null;
|
||||
this.pre = null;
|
||||
}
|
||||
|
||||
public Runnable getTask() {
|
||||
return task;
|
||||
}
|
||||
|
||||
public long getDelayMs() {
|
||||
return delayMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return desc;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
package com.jd.platform.async.executor.wheel;
|
||||
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 时间槽
|
||||
*/
|
||||
public class TimerTaskList implements Delayed {
|
||||
|
||||
/**
|
||||
* 过期时间
|
||||
*/
|
||||
private AtomicLong expiration = new AtomicLong(-1L);
|
||||
|
||||
/**
|
||||
* 根节点
|
||||
*/
|
||||
private TimerTask root = new TimerTask(-1L, null);
|
||||
|
||||
{
|
||||
root.pre = root;
|
||||
root.next = root;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置过期时间
|
||||
*/
|
||||
public boolean setExpiration(long expire) {
|
||||
return expiration.getAndSet(expire) != expire;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取过期时间
|
||||
*/
|
||||
public long getExpiration() {
|
||||
return expiration.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* 新增任务
|
||||
*/
|
||||
public void addTask(TimerTask timerTask) {
|
||||
synchronized (this) {
|
||||
if (timerTask.timerTaskList == null) {
|
||||
timerTask.timerTaskList = this;
|
||||
TimerTask tail = root.pre;
|
||||
timerTask.next = root;
|
||||
timerTask.pre = tail;
|
||||
tail.next = timerTask;
|
||||
root.pre = timerTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除任务
|
||||
*/
|
||||
public void removeTask(TimerTask timerTask) {
|
||||
synchronized (this) {
|
||||
if (timerTask.timerTaskList.equals(this)) {
|
||||
timerTask.next.pre = timerTask.pre;
|
||||
timerTask.pre.next = timerTask.next;
|
||||
timerTask.timerTaskList = null;
|
||||
timerTask.next = null;
|
||||
timerTask.pre = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 重新分配
|
||||
*/
|
||||
public synchronized void flush(Consumer<TimerTask> flush) {
|
||||
TimerTask timerTask = root.next;
|
||||
while (!timerTask.equals(root)) {
|
||||
this.removeTask(timerTask);
|
||||
flush.accept(timerTask);
|
||||
timerTask = root.next;
|
||||
}
|
||||
expiration.set(-1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed o) {
|
||||
if (o instanceof TimerTaskList) {
|
||||
return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,10 @@ import com.jd.platform.async.callback.DefaultCallback;
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
import com.jd.platform.async.callback.IWorker;
|
||||
import com.jd.platform.async.exception.SkippedException;
|
||||
import com.jd.platform.async.executor.Async;
|
||||
import com.jd.platform.async.executor.timer.SystemClock;
|
||||
import com.jd.platform.async.executor.wheel.TimeOutCheckMession;
|
||||
import com.jd.platform.async.executor.wheel.TimerTask;
|
||||
import com.jd.platform.async.worker.DependWrapper;
|
||||
import com.jd.platform.async.worker.ResultState;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
@@ -72,6 +75,10 @@ public class WorkerWrapper<T, V> {
|
||||
* 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的
|
||||
*/
|
||||
private volatile boolean needCheckNextWrapperResult = true;
|
||||
/**
|
||||
* 超时时间
|
||||
*/
|
||||
private Long delayMs;
|
||||
|
||||
private static final int FINISH = 1;
|
||||
private static final int ERROR = 2;
|
||||
@@ -289,6 +296,12 @@ public class WorkerWrapper<T, V> {
|
||||
* 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
|
||||
*/
|
||||
private void fire() {
|
||||
//如果对任务有单独超时设置
|
||||
if (delayMs != null) {
|
||||
com.jd.platform.async.executor.wheel.TimerTask timerTask = new TimerTask(delayMs, new TimeOutCheckMession(this));
|
||||
Async.getTimer().addTask(timerTask);
|
||||
}
|
||||
|
||||
//阻塞取结果
|
||||
workResult = workerDoJob();
|
||||
}
|
||||
@@ -296,7 +309,7 @@ public class WorkerWrapper<T, V> {
|
||||
/**
|
||||
* 快速失败
|
||||
*/
|
||||
private boolean fastFail(int expect, Exception e) {
|
||||
public boolean fastFail(int expect, Exception e) {
|
||||
//试图将它从expect状态,改成Error
|
||||
if (!compareAndSetState(expect, ERROR)) {
|
||||
return false;
|
||||
@@ -433,7 +446,7 @@ public class WorkerWrapper<T, V> {
|
||||
}
|
||||
|
||||
|
||||
private int getState() {
|
||||
public int getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
@@ -604,6 +617,13 @@ public class WorkerWrapper<T, V> {
|
||||
|
||||
return wrapper;
|
||||
}
|
||||
}
|
||||
|
||||
public Long getDelayMs() {
|
||||
return delayMs;
|
||||
}
|
||||
|
||||
public void setDelayMs(Long delayMs) {
|
||||
this.delayMs = delayMs;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user