超时时间轮

This commit is contained in:
shenkaiwen5
2021-11-15 18:50:06 +08:00
parent a107323425
commit 39db175412
3 changed files with 88 additions and 34 deletions

View File

@@ -1,5 +1,7 @@
package com.jd.platform.async.executor.wheel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
/**
@@ -40,9 +42,9 @@ public class TimeWheel {
/**
* 一个Timer只有一个delayQueue
*/
private DelayQueue<TimerTaskList> delayQueue;
private List<List<TimerTaskList>> delayList;
public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
public TimeWheel(long tickMs, int wheelSize, long currentTime, List<List<TimerTaskList>> delayList) {
this.currentTime = currentTime;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
@@ -50,7 +52,7 @@ public class TimeWheel {
this.timerTaskLists = new TimerTaskList[wheelSize];
//currentTime为tickMs的整数倍 这里做取整操作
this.currentTime = currentTime - (currentTime % tickMs);
this.delayQueue = delayQueue;
this.delayList = delayList;
for (int i = 0; i < wheelSize; i++) {
timerTaskLists[i] = new TimerTaskList();
}
@@ -63,7 +65,7 @@ public class TimeWheel {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayList);
}
}
}
@@ -86,9 +88,12 @@ public class TimeWheel {
TimerTaskList timerTaskList = timerTaskLists[index];
timerTaskList.addTask(timerTask);
if (timerTaskList.setExpiration(virtualId * tickMs)) {
//添加到delayQueue
//添加到delayList
if (delayList.get(index) == null) {
delayList.set(index, new ArrayList<>(16));
}
//TODO 改成加到list对应元素
delayQueue.offer(timerTaskList);
delayList.get(index).add(timerTaskList);
}
} else {
//放到上一层的时间轮

View File

@@ -0,0 +1,26 @@
package com.jd.platform.async.executor.wheel;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author shenkaiwen5
* @version 1.0
* @date 2021-11-15
*/
public class TimeWheelFlushMession implements Runnable {
/**
* 定时器
*/
private Timer timer;
public TimeWheelFlushMession(Timer timer) {
this.timer = timer;
}
@Override
public void run() {
timer.advanceClock();
}
}

View File

@@ -1,6 +1,9 @@
package com.jd.platform.async.executor.wheel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 定时器
@@ -17,57 +20,77 @@ public class Timer {
*/
private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
/**
* 过期任务执行线程
*/
private ExecutorService workerThreadPool;
/**
* 轮询delayQueue获取过期任务线程
*/
private ExecutorService bossThreadPool;
private ScheduledExecutorService bossThreadPool;
/**
* 总的队列
*/
private List<List<TimerTaskList>> delayList = new ArrayList<>(20);
/**
* 待执行的任务队列
*/
private List<TimerTask> todoList = new ArrayList<>(16);
/**
* 执行到的index
*/
private AtomicInteger INDEX = new AtomicInteger(0);
/**
* 构造函数
*/
public Timer() {
timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);
// workerThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1);
workerThreadPool = Executors.newCachedThreadPool();
bossThreadPool = Executors.newFixedThreadPool(1);
timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayList);
//默认就1线程
bossThreadPool = Executors.newSingleThreadScheduledExecutor();
//20ms获取一次过期任务
bossThreadPool.submit(() -> {
while (true) {
this.advanceClock(20);
}
});
bossThreadPool.scheduleAtFixedRate(() -> {
advanceClock();
}, 0, 1, TimeUnit.MILLISECONDS);
}
/**
* 添加任务
*/
public void addTask(TimerTask timerTask) {
//添加失败任务直接执行
//尝试添加任务
if (!timeWheel.addTask(timerTask)) {
//TODO 不要线程池,改成一个线程直接执行
workerThreadPool.submit(timerTask.getTask());
//添加失败任务直接执行
todoList.add(timerTask);
}
}
/**
* 获取过期任务
* 获取过期任务并执行
*
* 这个方法作为定时任务1毫秒一执行
*/
private void advanceClock(long timeout) {
protected void advanceClock() {
try {
//取20-list对应的时间槽
TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (timerTaskList != null) {
//推进时间
timeWheel.advanceClock(timerTaskList.getExpiration());
//执行过期任务(包含降级操作)
//TODO 加到要执行的列表
timerTaskList.flush(this::addTask);
//TODO 执行。这个方法作为定时任务1毫秒一执行
List<TimerTaskList> timerTaskList = delayList.get(INDEX.getAndIncrement());
//delayList一个时间槽里所有时间轮的对应槽先清空再重新添加
List<TimerTaskList> tmpList = new ArrayList<>(16);
tmpList.addAll(timerTaskList);
timerTaskList.clear();
//遍历所有轮的槽,执行
for (TimerTaskList singleWheelList: tmpList) {
if (singleWheelList != null) {
//推进时间
timeWheel.advanceClock(singleWheelList.getExpiration());
//执行过期任务(包含降级操作)
//TODO 加到要执行的列表
singleWheelList.flush(this::addTask);
//TODO 执行
for (TimerTask task: todoList) {
task.getTask().run();
}
todoList.clear();
}
}
} catch (Exception e) {
e.printStackTrace();