From 6ff3b0771f8c3dd4261d961ab3da4fd254422afc Mon Sep 17 00:00:00 2001 From: shenkaiwen5 Date: Mon, 15 Nov 2021 20:48:55 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4=E8=BD=AE--?= =?UTF-8?q?-=E4=BC=98=E5=8C=96=E6=97=B6=E9=97=B4=E6=8E=A8=E9=80=81?= =?UTF-8?q?=EF=BC=8C=E8=BF=98=E5=89=A9=E7=AE=80=E5=8C=96Runnable=EF=BC=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../async/executor/wheel/TimeWheel.java | 29 +++------------- .../executor/wheel/TimeWheelFlushMession.java | 26 --------------- .../platform/async/executor/wheel/Timer.java | 33 +++++++------------ 3 files changed, 16 insertions(+), 72 deletions(-) delete mode 100644 src/main/java/com/jd/platform/async/executor/wheel/TimeWheelFlushMession.java diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java index e9e305a..e6f47c8 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java @@ -2,7 +2,6 @@ package com.jd.platform.async.executor.wheel; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.DelayQueue; /** * 时间轮 @@ -29,11 +28,6 @@ public class TimeWheel { */ private TimerTaskList[] timerTaskLists; - /** - * 当前时间 - */ - private long currentTime; - /** * 上层时间轮 */ @@ -44,14 +38,11 @@ public class TimeWheel { */ private List> delayList; - public TimeWheel(long tickMs, int wheelSize, long currentTime, List> delayList) { - this.currentTime = currentTime; + public TimeWheel(long tickMs, int wheelSize, List> delayList) { this.tickMs = tickMs; this.wheelSize = wheelSize; this.interval = tickMs * wheelSize; this.timerTaskLists = new TimerTaskList[wheelSize]; - //currentTime为tickMs的整数倍 这里做取整操作 - this.currentTime = currentTime - (currentTime % tickMs); this.delayList = delayList; for (int i = 0; i < wheelSize; i++) { timerTaskLists[i] = new TimerTaskList(); @@ -65,7 +56,7 @@ public class TimeWheel { if (overflowWheel == null) { synchronized (this) { if (overflowWheel == null) { - overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayList); + overflowWheel = new TimeWheel(interval, wheelSize, delayList); } } } @@ -77,6 +68,9 @@ public class TimeWheel { */ public boolean addTask(TimerTask timerTask) { long expiration = timerTask.getDelayMs(); + long currentTime = System.currentTimeMillis(); + currentTime = currentTime - (currentTime % tickMs); + //过期任务直接执行 if (expiration < currentTime + tickMs) { return false; @@ -102,17 +96,4 @@ public class TimeWheel { } return true; } - - /** - * 推进时间 - */ - public void advanceClock(long timestamp) { - if (timestamp >= currentTime + tickMs) { - currentTime = timestamp - (timestamp % tickMs); - if (overflowWheel != null) { - //推进上层时间轮时间 - this.getOverflowWheel().advanceClock(timestamp); - } - } - } } diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheelFlushMession.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheelFlushMession.java deleted file mode 100644 index 11412c3..0000000 --- a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheelFlushMession.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.jd.platform.async.executor.wheel; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * @author shenkaiwen5 - * @version 1.0 - * @date 2021-11-15 - */ -public class TimeWheelFlushMession implements Runnable { - - /** - * 定时器 - */ - private Timer timer; - - public TimeWheelFlushMession(Timer timer) { - this.timer = timer; - } - - @Override - public void run() { - timer.advanceClock(); - } -} diff --git a/src/main/java/com/jd/platform/async/executor/wheel/Timer.java b/src/main/java/com/jd/platform/async/executor/wheel/Timer.java index c032298..90de228 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/Timer.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/Timer.java @@ -15,11 +15,6 @@ public class Timer { */ private TimeWheel timeWheel; - /** - * 一个Timer只有一个delayQueue - */ - private DelayQueue delayQueue = new DelayQueue<>(); - /** * 轮询delayQueue获取过期任务线程 */ @@ -29,11 +24,6 @@ public class Timer { * 总的队列 */ private List> delayList = new ArrayList<>(20); - - /** - * 待执行的任务队列 - */ - private List todoList = new ArrayList<>(16); /** * 执行到的index */ @@ -43,7 +33,7 @@ public class Timer { * 构造函数 */ public Timer() { - timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayList); + timeWheel = new TimeWheel(1, 20, delayList); //默认就1线程 bossThreadPool = Executors.newSingleThreadScheduledExecutor(); //20ms获取一次过期任务 @@ -58,8 +48,8 @@ public class Timer { public void addTask(TimerTask timerTask) { //尝试添加任务 if (!timeWheel.addTask(timerTask)) { - //添加失败任务直接执行 - todoList.add(timerTask); + //添加失败任务直接执行,反正判断超时的任务很快! + timerTask.getTask().run(); } } @@ -70,8 +60,13 @@ public class Timer { */ protected void advanceClock() { try { + int index = INDEX.get(); + if (index >= 20) { + INDEX.set(index % 20); + } + //取20-list对应的时间槽 - List timerTaskList = delayList.get(INDEX.getAndIncrement()); + List timerTaskList = delayList.get(index); //delayList一个时间槽里所有时间轮的对应槽,先清空再重新添加 List tmpList = new ArrayList<>(16); tmpList.addAll(timerTaskList); @@ -80,18 +75,12 @@ public class Timer { //遍历所有轮的槽,执行 for (TimerTaskList singleWheelList: tmpList) { if (singleWheelList != null) { - //推进时间 - timeWheel.advanceClock(singleWheelList.getExpiration()); //执行过期任务(包含降级操作) - //TODO 加到要执行的列表 + //TODO 加到要执行的列表,并执行 singleWheelList.flush(this::addTask); - //TODO 执行 - for (TimerTask task: todoList) { - task.getTask().run(); - } - todoList.clear(); } } + INDEX.incrementAndGet(); } catch (Exception e) { e.printStackTrace(); }