diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java index ca33017..e9e305a 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java @@ -1,5 +1,7 @@ package com.jd.platform.async.executor.wheel; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.DelayQueue; /** @@ -40,9 +42,9 @@ public class TimeWheel { /** * 一个Timer只有一个delayQueue */ - private DelayQueue delayQueue; + private List> delayList; - public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue delayQueue) { + public TimeWheel(long tickMs, int wheelSize, long currentTime, List> delayList) { this.currentTime = currentTime; this.tickMs = tickMs; this.wheelSize = wheelSize; @@ -50,7 +52,7 @@ public class TimeWheel { this.timerTaskLists = new TimerTaskList[wheelSize]; //currentTime为tickMs的整数倍 这里做取整操作 this.currentTime = currentTime - (currentTime % tickMs); - this.delayQueue = delayQueue; + this.delayList = delayList; for (int i = 0; i < wheelSize; i++) { timerTaskLists[i] = new TimerTaskList(); } @@ -63,7 +65,7 @@ public class TimeWheel { if (overflowWheel == null) { synchronized (this) { if (overflowWheel == null) { - overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue); + overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayList); } } } @@ -86,9 +88,12 @@ public class TimeWheel { TimerTaskList timerTaskList = timerTaskLists[index]; timerTaskList.addTask(timerTask); if (timerTaskList.setExpiration(virtualId * tickMs)) { - //添加到delayQueue中 + //添加到delayList中 + if (delayList.get(index) == null) { + delayList.set(index, new ArrayList<>(16)); + } //TODO 改成加到list对应元素 - delayQueue.offer(timerTaskList); + delayList.get(index).add(timerTaskList); } } else { //放到上一层的时间轮 diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheelFlushMession.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheelFlushMession.java new file mode 100644 index 0000000..11412c3 --- /dev/null +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheelFlushMession.java @@ -0,0 +1,26 @@ +package com.jd.platform.async.executor.wheel; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author shenkaiwen5 + * @version 1.0 + * @date 2021-11-15 + */ +public class TimeWheelFlushMession implements Runnable { + + /** + * 定时器 + */ + private Timer timer; + + public TimeWheelFlushMession(Timer timer) { + this.timer = timer; + } + + @Override + public void run() { + timer.advanceClock(); + } +} diff --git a/src/main/java/com/jd/platform/async/executor/wheel/Timer.java b/src/main/java/com/jd/platform/async/executor/wheel/Timer.java index 526cade..c032298 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/Timer.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/Timer.java @@ -1,6 +1,9 @@ package com.jd.platform.async.executor.wheel; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * 定时器 @@ -17,57 +20,77 @@ public class Timer { */ private DelayQueue delayQueue = new DelayQueue<>(); - /** - * 过期任务执行线程 - */ - private ExecutorService workerThreadPool; - /** * 轮询delayQueue获取过期任务线程 */ - private ExecutorService bossThreadPool; + private ScheduledExecutorService bossThreadPool; + + /** + * 总的队列 + */ + private List> delayList = new ArrayList<>(20); + + /** + * 待执行的任务队列 + */ + private List todoList = new ArrayList<>(16); + /** + * 执行到的index + */ + private AtomicInteger INDEX = new AtomicInteger(0); /** * 构造函数 */ public Timer() { - timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue); -// workerThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1); - workerThreadPool = Executors.newCachedThreadPool(); - bossThreadPool = Executors.newFixedThreadPool(1); + timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayList); + //默认就1线程 + bossThreadPool = Executors.newSingleThreadScheduledExecutor(); //20ms获取一次过期任务 - bossThreadPool.submit(() -> { - while (true) { - this.advanceClock(20); - } - }); + bossThreadPool.scheduleAtFixedRate(() -> { + advanceClock(); + }, 0, 1, TimeUnit.MILLISECONDS); } /** * 添加任务 */ public void addTask(TimerTask timerTask) { - //添加失败任务直接执行 + //尝试添加任务 if (!timeWheel.addTask(timerTask)) { - //TODO 不要线程池,改成一个线程直接执行 - workerThreadPool.submit(timerTask.getTask()); + //添加失败任务直接执行 + todoList.add(timerTask); } } /** - * 获取过期任务 + * 获取过期任务并执行 + * + * 这个方法作为定时任务,1毫秒一执行 */ - private void advanceClock(long timeout) { + protected void advanceClock() { try { //取20-list对应的时间槽 - TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS); - if (timerTaskList != null) { - //推进时间 - timeWheel.advanceClock(timerTaskList.getExpiration()); - //执行过期任务(包含降级操作) - //TODO 加到要执行的列表 - timerTaskList.flush(this::addTask); - //TODO 执行。这个方法作为定时任务,1毫秒一执行 + List timerTaskList = delayList.get(INDEX.getAndIncrement()); + //delayList一个时间槽里所有时间轮的对应槽,先清空再重新添加 + List tmpList = new ArrayList<>(16); + tmpList.addAll(timerTaskList); + timerTaskList.clear(); + + //遍历所有轮的槽,执行 + for (TimerTaskList singleWheelList: tmpList) { + if (singleWheelList != null) { + //推进时间 + timeWheel.advanceClock(singleWheelList.getExpiration()); + //执行过期任务(包含降级操作) + //TODO 加到要执行的列表 + singleWheelList.flush(this::addTask); + //TODO 执行 + for (TimerTask task: todoList) { + task.getTask().run(); + } + todoList.clear(); + } } } catch (Exception e) { e.printStackTrace();