refactor: 如果线程发生OOM,提早取消任务,而不是等超时,以免发生节点空循环检测

This commit is contained in:
kyle
2023-02-01 19:00:15 +08:00
parent fb7e3419cc
commit ffe9056600
4 changed files with 188 additions and 2 deletions

View File

@@ -85,13 +85,30 @@ public class Async {
//保存上次执行的线程池变量(为了兼容以前的旧功能)
Async.lastExecutorService.set(Objects.requireNonNull(executorService, "ExecutorService is null ! "));
final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout);
final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId);
group.addWrapper(workerWrappers);
final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId);
//有多少个开始节点就有多少个线程,依赖任务靠被依赖任务的线程完成工作
workerWrappers.forEach(wrapper -> {
if (wrapper == null) {
return;
}
executorService.submit(() -> wrapper.work(executorService, timeout, group));
Future<?> future = executorService.submit(() -> wrapper.work(executorService, timeout, group));
onceWork.getAllThreadSubmit().add(future);
});
executorService.execute(() -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (onceWork.getAllThreadSubmit().stream().allMatch(Future::isDone)) {
if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) {
onceWork.pleaseCancel();
}
break;
}
}
});
return onceWork;
}

View File

@@ -1,5 +1,6 @@
package com.jd.platform.async.worker;
import com.jd.platform.async.executor.PollingCenter;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperGroup;
@@ -270,9 +271,19 @@ public interface OnceWork {
class Impl extends AbstractOnceWork {
protected final WorkerWrapperGroup group;
/**
* 本次任务中所有线程提交
*/
protected List<Future<?>> allThreadSubmit;
public List<Future<?>> getAllThreadSubmit() {
return allThreadSubmit;
}
public Impl(WorkerWrapperGroup group, String workId) {
super(workId);
this.group = group;
allThreadSubmit = new ArrayList<>(group.getForParamUseWrappers().size());
}
@Override
@@ -321,6 +332,8 @@ public interface OnceWork {
@Override
public void pleaseCancel() {
group.pleaseCancel();
//发起检查,看看所有是否取消完毕
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
}
}

View File

@@ -229,6 +229,19 @@ public abstract class WorkerWrapper<T, V> {
public void cancel() {
if (State.setState(state, states_of_beforeWorkingEnd, SKIP, null)) {
fastFail(false, new CancelException(), true);
//此处调用结果处理器让用户决定取消逻辑
final Consumer<Boolean> __function__callbackResult =
success -> {
WorkResult<V> _workResult = getWorkResult();
try {
callback.result(success, param, _workResult);
} catch (Exception e) {
if (setState(state, states_of_skipOrAfterWork, ERROR, null)) {
fastFail(false, e, _workResult.getEx() instanceof EndsNormallyException);
}
}
};
__function__callbackResult.accept(false);
}
}

View File

@@ -0,0 +1,143 @@
package v15.cases;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.OnceWork;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperBuilder;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 示例:模拟内存溢出
* <p>
* 运行之前请设置
* -Xmx20m -Xms20m
*
* 当内存溢出时其中一个线程会OOMrunable不会继续调度
* 我通过添加一个线程主动cancel来达到提前结束任务而不是等超时
*
* @author create by kyle
*/
class Case15 {
private static WorkerWrapperBuilder<?, ?> builder(String id) {
return WorkerWrapper.<String, String>builder()
.id(id)
.param(id + "X")
.worker(new MyWorker(id))
.callback((new ICallback<String, String>() {
@Override
public void begin() {
System.out.println("wrapper(id=" + id + ") has begin . ");
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
System.out.println("\t\twrapper(id=" + id + ") callback "
+ (success ? "success " : "fail ")
+ ", workResult is " + workResult);
}
}))
.allowInterrupt(true);
}
public static void main(String[] args) {
long now = SystemClock.now();
WorkerWrapper<?, ?> a = builder("A").build();
WorkerWrapper<?, ?> d;
WorkerWrapper<?, ?> build = builder("H")
.depends(
builder("F")
.depends(builder("B").depends(a).build())
.depends(builder("C").depends(a).build())
.build(),
builder("G")
.depends(builder("E")
.depends(d = builder("D").build())
.build())
.build()
)
.build();
try {
OnceWork work = Async.work(5000, a, d);
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
pool.execute(() -> {
while (true) {
try {
if (work.isCancelled()) {
System.out.println("取消成功");
}
if (work.isFinish()) {
//注意这里的结果和“输出H节点的结果----”位置处的不一致,这是多线程写造成的
System.out.println("结束成功" + build.getWorkResult());
break;
}
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
work.awaitFinish();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("cost:" + (SystemClock.now() - now));
while (build.getWorkResult().getEx() == null) {
//同步等待result数据写入
}
System.out.println("输出H节点的结果----" + build.getWorkResult());
/* 输出:
wrapper(id=D) is working
wrapper(id=A) is working
wrapper(id=E) is working
wrapper(id=B) is working
wrapper(id=C) is working
wrapper(id=G) is working
wrapper(id=F) is working
wrapper(id=H) is working
*/
}
private static class MyWorker implements IWorker<String, String> {
//用于存放模拟的对象防止GC回收用List做对象引用
private final List<byte[]> list = new LinkedList<>();
private String id;
private int i = 0;
public MyWorker(String id) {
this.id = id;
}
@Override
public String action(String param, Map<String, WorkerWrapper<?, ?>> allWrappers) {
if ("F".equals(id)) {
System.out.println("wrapper(id=" + id + ") is working");
while (true) {
System.out.println("I am alive" + i++);
byte[] buf = new byte[1024 * 1024];
list.add(buf);
}
}
return id;
}
}
}