From ab9b48b08137671a797c4bf3c7afa7b9809c48c2 Mon Sep 17 00:00:00 2001 From: shenkaiwen5 Date: Tue, 16 Nov 2021 11:29:09 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E8=BD=AE---=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/jd/platform/async/executor/wheel/TimeWheel.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java index 11a90a4..e21224c 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java @@ -98,12 +98,11 @@ public class TimeWheel { TimerTaskList timerTaskList = timerTaskLists[index]; timerTaskList.addTask(timerTask); if (timerTaskList.setExpiration(virtualId * tickMs)) { - //添加到delayList中 - //TODO 改成加到list对应元素 【20槽, 时间轮层数】,这里添加前一定要初始化到对应层数 + //加到delayList对应元素 【20槽, 时间轮层数】,这里添加前一定要初始化到对应层数 while (delayList.get(index).size() < overflowIndex + 1) { delayList.get(index).add(new TimerTaskList()); } - //TODO 想办法只设置一次 + //这里每次清空整个槽位的List,并且flush List的元素链表,所以没问题 delayList.get(index).set(overflowIndex, timerTaskList); } } else { From 47f707f2872d915e3bd8004328fcc806e401d8e0 Mon Sep 17 00:00:00 2001 From: shenkaiwen5 Date: Tue, 16 Nov 2021 16:33:02 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E8=BD=AE---=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/jd/platform/async/executor/wheel/TimeWheel.java | 2 +- src/test/java/depend/Test.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java index e21224c..0d3133b 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java @@ -85,7 +85,7 @@ public class TimeWheel { */ public boolean addTask(TimerTask timerTask, long currentTime) { long expiration = timerTask.getDelayMsAndCur(); - //urrentTime = currentTime - (currentTime % tickMs); + currentTime = currentTime - (currentTime % tickMs); //过期任务直接执行 if (expiration < currentTime + tickMs) { diff --git a/src/test/java/depend/Test.java b/src/test/java/depend/Test.java index a8dd4c0..713ec30 100644 --- a/src/test/java/depend/Test.java +++ b/src/test/java/depend/Test.java @@ -41,7 +41,7 @@ public class Test { .callback(w) .build(); - workerWrapper.setDelayMs(1003L); + workerWrapper.setDelayMs(1005L); //虽然尚未执行,但是也可以先取得结果的引用,作为下一个任务的入参。V1.2前写法,需要手工给 //V1.3后,不用给wrapper setParam了,直接在worker的action里自行根据id获取即可.参考dependnew包下代码 From d41c09383b01a5d28b82b69d30134edaf33c627a Mon Sep 17 00:00:00 2001 From: shenkaiwen5 Date: Tue, 16 Nov 2021 17:45:35 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E8=BD=AE---=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../async/executor/wheel/TimeWheel.java | 15 ++++---- .../async/executor/wheel/TimerTaskList.java | 38 +------------------ 2 files changed, 8 insertions(+), 45 deletions(-) diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java index 0d3133b..96c62c5 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java @@ -85,7 +85,6 @@ public class TimeWheel { */ public boolean addTask(TimerTask timerTask, long currentTime) { long expiration = timerTask.getDelayMsAndCur(); - currentTime = currentTime - (currentTime % tickMs); //过期任务直接执行 if (expiration < currentTime + tickMs) { @@ -97,14 +96,14 @@ public class TimeWheel { //System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration); TimerTaskList timerTaskList = timerTaskLists[index]; timerTaskList.addTask(timerTask); - if (timerTaskList.setExpiration(virtualId * tickMs)) { - //加到delayList对应元素 【20槽, 时间轮层数】,这里添加前一定要初始化到对应层数 - while (delayList.get(index).size() < overflowIndex + 1) { - delayList.get(index).add(new TimerTaskList()); - } - //这里每次清空整个槽位的List,并且flush List的元素链表,所以没问题 - delayList.get(index).set(overflowIndex, timerTaskList); + + //加到delayList对应元素 【20槽, 时间轮层数】,这里添加前一定要初始化到对应层数 + while (delayList.get(index).size() < overflowIndex + 1) { + delayList.get(index).add(new TimerTaskList()); } + //这里每次清空整个槽位的List,并且flush List的元素链表,所以没问题 + delayList.get(index).set(overflowIndex, timerTaskList); + } else { //放到上一层的时间轮 TimeWheel timeWheel = getOverflowWheel(); diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java b/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java index d25b962..332e8fe 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimerTaskList.java @@ -1,19 +1,11 @@ package com.jd.platform.async.executor.wheel; -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; /** * 时间槽 */ -public class TimerTaskList implements Delayed { - - /** - * 过期时间 - */ - private AtomicLong expiration = new AtomicLong(-1L); +public class TimerTaskList { /** * 根节点 @@ -25,20 +17,6 @@ public class TimerTaskList implements Delayed { root.next = root; } - /** - * 设置过期时间 - */ - public boolean setExpiration(long expire) { - return expiration.getAndSet(expire) != expire; - } - - /** - * 获取过期时间 - */ - public long getExpiration() { - return expiration.get(); - } - /** * 新增任务 */ @@ -80,19 +58,5 @@ public class TimerTaskList implements Delayed { flush.accept(timerTask); timerTask = root.next; } - expiration.set(-1L); - } - - @Override - public long getDelay(TimeUnit unit) { - return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); - } - - @Override - public int compareTo(Delayed o) { - if (o instanceof TimerTaskList) { - return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get()); - } - return 0; } }