超时时间轮---优化时间推送,还剩简化Runnable!

This commit is contained in:
shenkaiwen5 2021-11-15 20:48:55 +08:00
parent 39db175412
commit 6ff3b0771f
3 changed files with 16 additions and 72 deletions

View File

@ -2,7 +2,6 @@ package com.jd.platform.async.executor.wheel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
/**
* 时间轮
@ -29,11 +28,6 @@ public class TimeWheel {
*/
private TimerTaskList[] timerTaskLists;
/**
* 当前时间
*/
private long currentTime;
/**
* 上层时间轮
*/
@ -44,14 +38,11 @@ public class TimeWheel {
*/
private List<List<TimerTaskList>> delayList;
public TimeWheel(long tickMs, int wheelSize, long currentTime, List<List<TimerTaskList>> delayList) {
this.currentTime = currentTime;
public TimeWheel(long tickMs, int wheelSize, List<List<TimerTaskList>> delayList) {
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
this.timerTaskLists = new TimerTaskList[wheelSize];
//currentTime为tickMs的整数倍 这里做取整操作
this.currentTime = currentTime - (currentTime % tickMs);
this.delayList = delayList;
for (int i = 0; i < wheelSize; i++) {
timerTaskLists[i] = new TimerTaskList();
@ -65,7 +56,7 @@ public class TimeWheel {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayList);
overflowWheel = new TimeWheel(interval, wheelSize, delayList);
}
}
}
@ -77,6 +68,9 @@ public class TimeWheel {
*/
public boolean addTask(TimerTask timerTask) {
long expiration = timerTask.getDelayMs();
long currentTime = System.currentTimeMillis();
currentTime = currentTime - (currentTime % tickMs);
//过期任务直接执行
if (expiration < currentTime + tickMs) {
return false;
@ -102,17 +96,4 @@ public class TimeWheel {
}
return true;
}
/**
* 推进时间
*/
public void advanceClock(long timestamp) {
if (timestamp >= currentTime + tickMs) {
currentTime = timestamp - (timestamp % tickMs);
if (overflowWheel != null) {
//推进上层时间轮时间
this.getOverflowWheel().advanceClock(timestamp);
}
}
}
}

View File

@ -1,26 +0,0 @@
package com.jd.platform.async.executor.wheel;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author shenkaiwen5
* @version 1.0
* @date 2021-11-15
*/
public class TimeWheelFlushMession implements Runnable {
/**
* 定时器
*/
private Timer timer;
public TimeWheelFlushMession(Timer timer) {
this.timer = timer;
}
@Override
public void run() {
timer.advanceClock();
}
}

View File

@ -15,11 +15,6 @@ public class Timer {
*/
private TimeWheel timeWheel;
/**
* 一个Timer只有一个delayQueue
*/
private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
/**
* 轮询delayQueue获取过期任务线程
*/
@ -29,11 +24,6 @@ public class Timer {
* 总的队列
*/
private List<List<TimerTaskList>> delayList = new ArrayList<>(20);
/**
* 待执行的任务队列
*/
private List<TimerTask> todoList = new ArrayList<>(16);
/**
* 执行到的index
*/
@ -43,7 +33,7 @@ public class Timer {
* 构造函数
*/
public Timer() {
timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayList);
timeWheel = new TimeWheel(1, 20, delayList);
//默认就1线程
bossThreadPool = Executors.newSingleThreadScheduledExecutor();
//20ms获取一次过期任务
@ -58,8 +48,8 @@ public class Timer {
public void addTask(TimerTask timerTask) {
//尝试添加任务
if (!timeWheel.addTask(timerTask)) {
//添加失败任务直接执行
todoList.add(timerTask);
//添加失败任务直接执行反正判断超时的任务很快
timerTask.getTask().run();
}
}
@ -70,8 +60,13 @@ public class Timer {
*/
protected void advanceClock() {
try {
int index = INDEX.get();
if (index >= 20) {
INDEX.set(index % 20);
}
//取20-list对应的时间槽
List<TimerTaskList> timerTaskList = delayList.get(INDEX.getAndIncrement());
List<TimerTaskList> timerTaskList = delayList.get(index);
//delayList一个时间槽里所有时间轮的对应槽先清空再重新添加
List<TimerTaskList> tmpList = new ArrayList<>(16);
tmpList.addAll(timerTaskList);
@ -80,18 +75,12 @@ public class Timer {
//遍历所有轮的槽执行
for (TimerTaskList singleWheelList: tmpList) {
if (singleWheelList != null) {
//推进时间
timeWheel.advanceClock(singleWheelList.getExpiration());
//执行过期任务包含降级操作
//TODO 加到要执行的列表
//TODO 加到要执行的列表并执行
singleWheelList.flush(this::addTask);
//TODO 执行
for (TimerTask task: todoList) {
task.getTask().run();
}
todoList.clear();
}
}
INDEX.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
}