diff --git a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java index a317941..ca33017 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/TimeWheel.java @@ -87,6 +87,7 @@ public class TimeWheel { timerTaskList.addTask(timerTask); if (timerTaskList.setExpiration(virtualId * tickMs)) { //添加到delayQueue中 + //TODO 改成加到list对应元素 delayQueue.offer(timerTaskList); } } else { diff --git a/src/main/java/com/jd/platform/async/executor/wheel/Timer.java b/src/main/java/com/jd/platform/async/executor/wheel/Timer.java index 30f86d2..526cade 100644 --- a/src/main/java/com/jd/platform/async/executor/wheel/Timer.java +++ b/src/main/java/com/jd/platform/async/executor/wheel/Timer.java @@ -1,9 +1,6 @@ package com.jd.platform.async.executor.wheel; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * 定时器 @@ -35,7 +32,8 @@ public class Timer { */ public Timer() { timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue); - workerThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1); +// workerThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1); + workerThreadPool = Executors.newCachedThreadPool(); bossThreadPool = Executors.newFixedThreadPool(1); //20ms获取一次过期任务 bossThreadPool.submit(() -> { @@ -51,6 +49,7 @@ public class Timer { public void addTask(TimerTask timerTask) { //添加失败任务直接执行 if (!timeWheel.addTask(timerTask)) { + //TODO 不要线程池,改成一个线程直接执行 workerThreadPool.submit(timerTask.getTask()); } } @@ -60,12 +59,15 @@ public class Timer { */ private void advanceClock(long timeout) { try { + //取20-list对应的时间槽 TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS); if (timerTaskList != null) { //推进时间 timeWheel.advanceClock(timerTaskList.getExpiration()); //执行过期任务(包含降级操作) + //TODO 加到要执行的列表 timerTaskList.flush(this::addTask); + //TODO 执行。这个方法作为定时任务,1毫秒一执行 } } catch (Exception e) { e.printStackTrace();