diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java index 11a90a4..96c62c5 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java @@ -85,7 +85,6 @@ public class TimeWheel { */ public boolean addTask(TimerTask timerTask, long currentTime) { long expiration = timerTask.getDelayMsAndCur(); - //urrentTime = currentTime - (currentTime % tickMs); //过期任务直接执行 if (expiration < currentTime + tickMs) { @@ -97,15 +96,14 @@ public class TimeWheel { //System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration); TimerTaskList timerTaskList = timerTaskLists[index]; timerTaskList.addTask(timerTask); - if (timerTaskList.setExpiration(virtualId * tickMs)) { - //添加到delayList中 - //TODO 改成加到list对应元素 【20槽, 时间轮层数】,这里添加前一定要初始化到对应层数 - while (delayList.get(index).size() < overflowIndex + 1) { - delayList.get(index).add(new TimerTaskList()); - } - //TODO 想办法只设置一次 - delayList.get(index).set(overflowIndex, timerTaskList); + + //加到delayList对应元素 【20槽, 时间轮层数】,这里添加前一定要初始化到对应层数 + while (delayList.get(index).size() < overflowIndex + 1) { + delayList.get(index).add(new TimerTaskList()); } + //这里每次清空整个槽位的List,并且flush List的元素链表,所以没问题 + delayList.get(index).set(overflowIndex, timerTaskList); + } else { //放到上一层的时间轮 TimeWheel timeWheel = getOverflowWheel(); diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java b/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java index d25b962..332e8fe 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java @@ -1,19 +1,11 @@ package com.jd.platform.async.executor.wheel; -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; /** * 时间槽 */ -public class TimerTaskList implements Delayed { - - /** - * 过期时间 - */ - private AtomicLong expiration = new AtomicLong(-1L); +public class TimerTaskList { /** * 根节点 @@ -25,20 +17,6 @@ public class TimerTaskList implements Delayed { root.next = root; } - /** - * 设置过期时间 - */ - public boolean setExpiration(long expire) { - return expiration.getAndSet(expire) != expire; - } - - /** - * 获取过期时间 - */ - public long getExpiration() { - return expiration.get(); - } - /** * 新增任务 */ @@ -80,19 +58,5 @@ public class TimerTaskList implements Delayed { flush.accept(timerTask); timerTask = root.next; } - expiration.set(-1L); - } - - @Override - public long getDelay(TimeUnit unit) { - return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); - } - - @Override - public int compareTo(Delayed o) { - if (o instanceof TimerTaskList) { - return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get()); - } - return 0; } } diff --git a/src/test/java/depend/Test.java b/src/test/java/depend/Test.java index a8dd4c0..713ec30 100644 --- a/src/test/java/depend/Test.java +++ b/src/test/java/depend/Test.java @@ -41,7 +41,7 @@ public class Test { .callback(w) .build(); - workerWrapper.setDelayMs(1003L); + workerWrapper.setDelayMs(1005L); //虽然尚未执行,但是也可以先取得结果的引用,作为下一个任务的入参。V1.2前写法,需要手工给 //V1.3后,不用给wrapper setParam了,直接在worker的action里自行根据id获取即可.参考dependnew包下代码