Merge remote-tracking branch 'remote/timewheel' into timewheel

This commit is contained in:
wuweifeng10
2021-11-17 12:04:44 +08:00
3 changed files with 9 additions and 47 deletions

View File

@@ -85,7 +85,6 @@ public class TimeWheel {
*/
public boolean addTask(TimerTask timerTask, long currentTime) {
long expiration = timerTask.getDelayMsAndCur();
//urrentTime = currentTime - (currentTime % tickMs);
//过期任务直接执行
if (expiration < currentTime + tickMs) {
@@ -97,15 +96,14 @@ public class TimeWheel {
//System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration);
TimerTaskList timerTaskList = timerTaskLists[index];
timerTaskList.addTask(timerTask);
if (timerTaskList.setExpiration(virtualId * tickMs)) {
//加到delayList
//TODO 改成加到list对应元素 【20槽, 时间轮层数】,这里添加前一定要初始化到对应层数
while (delayList.get(index).size() < overflowIndex + 1) {
delayList.get(index).add(new TimerTaskList());
}
//TODO 想办法只设置一次
delayList.get(index).set(overflowIndex, timerTaskList);
//加到delayList对应元素 【20槽, 时间轮层数】,这里添加前一定要初始化到对应层数
while (delayList.get(index).size() < overflowIndex + 1) {
delayList.get(index).add(new TimerTaskList());
}
//这里每次清空整个槽位的List并且flush List的元素链表所以没问题
delayList.get(index).set(overflowIndex, timerTaskList);
} else {
//放到上一层的时间轮
TimeWheel timeWheel = getOverflowWheel();

View File

@@ -1,19 +1,11 @@
package com.jd.platform.async.executor.wheel;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
/**
* 时间槽
*/
public class TimerTaskList implements Delayed {
/**
* 过期时间
*/
private AtomicLong expiration = new AtomicLong(-1L);
public class TimerTaskList {
/**
* 根节点
@@ -25,20 +17,6 @@ public class TimerTaskList implements Delayed {
root.next = root;
}
/**
* 设置过期时间
*/
public boolean setExpiration(long expire) {
return expiration.getAndSet(expire) != expire;
}
/**
* 获取过期时间
*/
public long getExpiration() {
return expiration.get();
}
/**
* 新增任务
*/
@@ -80,19 +58,5 @@ public class TimerTaskList implements Delayed {
flush.accept(timerTask);
timerTask = root.next;
}
expiration.set(-1L);
}
@Override
public long getDelay(TimeUnit unit) {
return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
}
@Override
public int compareTo(Delayed o) {
if (o instanceof TimerTaskList) {
return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
}
return 0;
}
}

View File

@@ -41,7 +41,7 @@ public class Test {
.callback(w)
.build();
workerWrapper.setDelayMs(1003L);
workerWrapper.setDelayMs(1005L);
//虽然尚未执行但是也可以先取得结果的引用作为下一个任务的入参。V1.2前写法,需要手工给
//V1.3后不用给wrapper setParam了直接在worker的action里自行根据id获取即可.参考dependnew包下代码