设置单个任务超时时间轮

This commit is contained in:
wuweifeng10
2021-11-17 12:04:40 +08:00
parent dc547b8feb
commit 31a8a686fd
3 changed files with 180 additions and 0 deletions

View File

@@ -0,0 +1,19 @@
package com.jd.platform.async.executor.timeout;
/**
* 一个单一的任务
* @author wuweifeng
* @version 1.0
* @date 2021-11-16
*/
public interface ITimeoutTask {
/**
* 超时时间
*/
int delayMs();
/**
* 时间到了后的回调
*/
void timeoutCallback();
}

View File

@@ -0,0 +1,61 @@
package com.jd.platform.async.executor.timeout;
/**
* 单个任务,任务最小单元
* @author wuweifeng
* @version 1.0
* @date 2021-11-16
*/
public class SingleTask {
/**
* 处于时间轮的第几圈第0圈就代表本次要执行第1圈代表下一次才执行自己
*/
private int level;
/**
* 超时时间
*/
private int delayMs;
/**
* 创建时时间
*/
private long currentTime;
/**
* 可用于回调
*/
private ITimeoutTask timeoutTask;
/**
* 构建该task
*/
public static SingleTask build(ITimeoutTask timeoutTask) {
SingleTask singleTask = new SingleTask();
singleTask.delayMs = timeoutTask.delayMs();
singleTask.currentTime = System.currentTimeMillis();
singleTask.timeoutTask = timeoutTask;
//计算在第几轮
singleTask.level = singleTask.delayMs / WheelMain.MAX_SIZE;
return singleTask;
}
public void setLevel(int level) {
this.level = level;
}
public int getLevel() {
return level;
}
public int getDelayMs() {
return delayMs;
}
public long getCurrentTime() {
return currentTime;
}
public ITimeoutTask getTimeoutTask() {
return timeoutTask;
}
}

View File

@@ -0,0 +1,100 @@
package com.jd.platform.async.executor.timeout;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 入口调度器
*
* @author wuweifeng
* @version 1.0
* @date 2021-11-16
*/
public class WheelMain {
/**
* 共20个槽位
*/
public static final int MAX_SIZE = 20;
/**
* 总的队列外层是20个槽位内层是每个槽位内的任务集合
* 注意即便是这个List一样会发生在遍历时同时有数据新插入的情况。造成无法对level进行减1的操作
*/
private static List<CopyOnWriteArrayList<SingleTask>> allTaskList = new ArrayList<>(MAX_SIZE);
/**
* 执行到的index
*/
private static final AtomicInteger INDEX = new AtomicInteger(0);
/**
* 初始化
*/
public static void init() {
for (int i = 0; i < MAX_SIZE; i++) {
CopyOnWriteArrayList<SingleTask> list = new CopyOnWriteArrayList<>();
allTaskList.add(list);
}
//单线程调度器1ms执行一次。注意当发生fullGc时该定时器将不再能准确执行
ScheduledExecutorService bossThreadPool = Executors.newSingleThreadScheduledExecutor();
bossThreadPool.scheduleAtFixedRate(WheelMain::advanceClock, 0, 1, TimeUnit.MILLISECONDS);
}
/**
* 获取过期任务并执行
* <p>
* 这个方法作为定时任务1毫秒一执行
*/
private static void advanceClock() {
try {
int index = INDEX.get();
if (index >= MAX_SIZE) {
INDEX.set(index % MAX_SIZE);
}
//取当前走到的槽位
CopyOnWriteArrayList<SingleTask> timerTaskList = allTaskList.get(index % 20);
//索引前进一格
INDEX.incrementAndGet();
//遍历槽内所有任务
for (SingleTask singleTask : timerTaskList) {
//如果level = 0代表时间到了或者创建时间+超时时间>当前时间了,也是时间到了
if (singleTask.getLevel() <= 0 || singleTask.getCurrentTime() + singleTask.getDelayMs() >= System.currentTimeMillis()) {
//给予回调
singleTask.getTimeoutTask().timeoutCallback();
//从列表删除
timerTaskList.remove(singleTask);
} else {
singleTask.setLevel(singleTask.getLevel() - 1);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 添加一个任务到时间轮的对应槽位
*/
public synchronized static void addTask(ITimeoutTask timeoutTask) {
SingleTask singleTask = SingleTask.build(timeoutTask);
//获取超时时间
int delay = timeoutTask.delayMs();
if (delay <= 0) {
return;
}
//放到第几个槽
int putIndex = INDEX.get() + delay % MAX_SIZE;
CopyOnWriteArrayList<SingleTask> list = allTaskList.get(putIndex % MAX_SIZE);
//添加到该槽位的队列中
list.add(singleTask);
}
}