超时时间轮---完成,测试有1ms误差,在找原因

This commit is contained in:
shenkaiwen5
2021-11-16 00:03:24 +08:00
parent 6ff3b0771f
commit 1a590cf691
5 changed files with 92 additions and 56 deletions

View File

@@ -161,10 +161,6 @@ public class Async {
}
public static Timer getTimer() {
return timer;
}
public static void setTimer(Timer timer) {
Async.timer = timer;
return Async.timer;
}
}

View File

@@ -1,6 +1,5 @@
package com.jd.platform.async.executor.wheel;
import java.util.ArrayList;
import java.util.List;
/**
@@ -32,13 +31,17 @@ public class TimeWheel {
* 上层时间轮
*/
private volatile TimeWheel overflowWheel;
/**
* 时间轮层数
*/
private int overflowIndex;
/**
* 一个Timer只有一个delayQueue
*/
private List<List<TimerTaskList>> delayList;
public TimeWheel(long tickMs, int wheelSize, List<List<TimerTaskList>> delayList) {
public TimeWheel(long tickMs, int wheelSize, List<List<TimerTaskList>> delayList, int overflowIndex) {
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
@@ -47,6 +50,7 @@ public class TimeWheel {
for (int i = 0; i < wheelSize; i++) {
timerTaskLists[i] = new TimerTaskList();
}
this.overflowIndex = overflowIndex;
}
/**
@@ -56,7 +60,7 @@ public class TimeWheel {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel(interval, wheelSize, delayList);
overflowWheel = new TimeWheel(interval, wheelSize, delayList, overflowIndex+1);
}
}
}
@@ -64,12 +68,24 @@ public class TimeWheel {
}
/**
* 添加任务到时间轮
* 添加任务到时间轮 (底层轮方法)
*/
public boolean addTask(TimerTask timerTask) {
long expiration = timerTask.getDelayMs();
//currentTime应该所有时间轮都一致
long currentTime = System.currentTimeMillis();
currentTime = currentTime - (currentTime % tickMs);
if (!timerTask.getFlag()) {
timerTask.setCurrentTime(currentTime);
}
return addTask(timerTask, currentTime);
}
/**
* 添加任务到时间轮
*/
public boolean addTask(TimerTask timerTask, long currentTime) {
long expiration = timerTask.getDelayMsAndCur();
//urrentTime = currentTime - (currentTime % tickMs);
//过期任务直接执行
if (expiration < currentTime + tickMs) {
@@ -78,21 +94,18 @@ public class TimeWheel {
//当前时间轮可以容纳该任务 加入时间槽
Long virtualId = expiration / tickMs;
int index = (int) (virtualId % wheelSize);
System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration);
//System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration);
TimerTaskList timerTaskList = timerTaskLists[index];
timerTaskList.addTask(timerTask);
if (timerTaskList.setExpiration(virtualId * tickMs)) {
//添加到delayList中
if (delayList.get(index) == null) {
delayList.set(index, new ArrayList<>(16));
}
//TODO 改成加到list对应元素
delayList.get(index).add(timerTaskList);
delayList.get(overflowIndex).add(timerTaskList);
}
} else {
//放到上一层的时间轮
TimeWheel timeWheel = getOverflowWheel();
timeWheel.addTask(timerTask);
timeWheel.addTask(timerTask, currentTime);
}
return true;
}

View File

@@ -33,7 +33,11 @@ public class Timer {
* 构造函数
*/
public Timer() {
timeWheel = new TimeWheel(1, 20, delayList);
for (int i = 0; i < 20; i++) {
delayList.add(new ArrayList<>(16));
}
timeWheel = new TimeWheel(1, 20, delayList, 0);
//默认就1线程
bossThreadPool = Executors.newSingleThreadScheduledExecutor();
//20ms获取一次过期任务
@@ -66,7 +70,7 @@ public class Timer {
}
//取20-list对应的时间槽
List<TimerTaskList> timerTaskList = delayList.get(index);
List<TimerTaskList> timerTaskList = delayList.get(index % 20);
//delayList一个时间槽里所有时间轮的对应槽先清空再重新添加
List<TimerTaskList> tmpList = new ArrayList<>(16);
tmpList.addAll(timerTaskList);
@@ -74,11 +78,8 @@ public class Timer {
//遍历所有轮的槽,执行
for (TimerTaskList singleWheelList: tmpList) {
if (singleWheelList != null) {
//执行过期任务(包含降级操作)
//TODO 加到要执行的列表,并执行
singleWheelList.flush(this::addTask);
}
//TODO 加到要执行的列表,并执行
singleWheelList.flush(this::addTask);
}
INDEX.incrementAndGet();
} catch (Exception e) {

View File

@@ -29,14 +29,22 @@ public class TimerTask {
* 上一个节点
*/
protected TimerTask pre;
/**
* 创建时时间
*/
protected long currentTime;
/**
* 描述
*/
public String desc;
/**
* 设置当前时间标志位
*/
private boolean flag = false;
public TimerTask(long delayMs, Runnable task) {
this.delayMs = System.currentTimeMillis() + delayMs;
this.delayMs = delayMs;
this.task = task;
this.timerTaskList = null;
this.next = null;
@@ -47,8 +55,20 @@ public class TimerTask {
return task;
}
public long getDelayMs() {
return delayMs;
public long getDelayMsAndCur() {
return delayMs + currentTime;
}
public void setCurrentTime(long currentTime) {
this.currentTime = currentTime;
flag = true;
}
/**
* 获取标志位
*/
public boolean getFlag() {
return flag;
}
@Override

View File

@@ -15,41 +15,47 @@ import java.util.concurrent.ExecutionException;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeWorker w = new DeWorker();
DeWorker1 w1 = new DeWorker1();
DeWorker2 w2 = new DeWorker2();
for (int i = 0; i < 1; i++) {
DeWorker w = new DeWorker();
DeWorker1 w1 = new DeWorker1();
DeWorker2 w2 = new DeWorker2();
WorkerWrapper<WorkResult<User>, String> workerWrapper2 = new WorkerWrapper.Builder<WorkResult<User>, String>()
.worker(w2)
.callback(w2)
.id("third")
.build();
WorkerWrapper<WorkResult<User>, String> workerWrapper2 = new WorkerWrapper.Builder<WorkResult<User>, String>()
.worker(w2)
.callback(w2)
.id("third")
.build();
WorkerWrapper<WorkResult<User>, User> workerWrapper1 = new WorkerWrapper.Builder<WorkResult<User>, User>()
.worker(w1)
.callback(w1)
.id("second")
.next(workerWrapper2)
.build();
WorkerWrapper<WorkResult<User>, User> workerWrapper1 = new WorkerWrapper.Builder<WorkResult<User>, User>()
.worker(w1)
.callback(w1)
.id("second")
.next(workerWrapper2)
.build();
WorkerWrapper<String, User> workerWrapper = new WorkerWrapper.Builder<String, User>()
.worker(w)
.param("0")
.id("first")
.next(workerWrapper1, true)
.callback(w)
.build();
WorkerWrapper<String, User> workerWrapper = new WorkerWrapper.Builder<String, User>()
.worker(w)
.param("0")
.id("first")
.next(workerWrapper1, true)
.callback(w)
.build();
//虽然尚未执行但是也可以先取得结果的引用作为下一个任务的入参。V1.2前写法,需要手工给
//V1.3后不用给wrapper setParam了直接在worker的action里自行根据id获取即可.参考dependnew包下代码
WorkResult<User> result = workerWrapper.getWorkResult();
WorkResult<User> result1 = workerWrapper1.getWorkResult();
workerWrapper1.setParam(result);
workerWrapper2.setParam(result1);
workerWrapper.setDelayMs(1002L);
Async.beginWork(3500, workerWrapper);
//虽然尚未执行但是也可以先取得结果的引用作为下一个任务的入参。V1.2前写法,需要手工给
//V1.3后不用给wrapper setParam了直接在worker的action里自行根据id获取即可.参考dependnew包下代码
WorkResult<User> result = workerWrapper.getWorkResult();
WorkResult<User> result1 = workerWrapper1.getWorkResult();
workerWrapper1.setParam(result);
workerWrapper2.setParam(result1);
Async.beginWork(3500L, workerWrapper);
System.out.println(workerWrapper2.getWorkResult());
}
System.out.println(workerWrapper2.getWorkResult());
Async.shutDown();
}
}