diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index 7cc703d..8f234be 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -85,13 +85,30 @@ public class Async { //保存上次执行的线程池变量(为了兼容以前的旧功能) Async.lastExecutorService.set(Objects.requireNonNull(executorService, "ExecutorService is null ! ")); final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout); - final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId); group.addWrapper(workerWrappers); + final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId); + //有多少个开始节点就有多少个线程,依赖任务靠被依赖任务的线程完成工作 workerWrappers.forEach(wrapper -> { if (wrapper == null) { return; } - executorService.submit(() -> wrapper.work(executorService, timeout, group)); + Future future = executorService.submit(() -> wrapper.work(executorService, timeout, group)); + onceWork.getAllThreadSubmit().add(future); + }); + executorService.execute(() -> { + while (true) { + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (onceWork.getAllThreadSubmit().stream().allMatch(Future::isDone)) { + if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) { + onceWork.pleaseCancel(); + } + break; + } + } }); return onceWork; } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java index de733f2..9f137d3 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java @@ -1,5 +1,6 @@ package com.jd.platform.async.worker; +import com.jd.platform.async.executor.PollingCenter; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapperGroup; @@ -270,9 +271,19 @@ public interface OnceWork { class Impl extends AbstractOnceWork { protected final WorkerWrapperGroup group; + /** + * 本次任务中所有线程提交 + */ + protected List> allThreadSubmit; + + public List> getAllThreadSubmit() { + return allThreadSubmit; + } + public Impl(WorkerWrapperGroup group, String workId) { super(workId); this.group = group; + allThreadSubmit = new ArrayList<>(group.getForParamUseWrappers().size()); } @Override @@ -321,6 +332,8 @@ public interface OnceWork { @Override public void pleaseCancel() { group.pleaseCancel(); + //发起检查,看看所有是否取消完毕 + PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); } } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index db522b5..aef7e4e 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -229,6 +229,19 @@ public abstract class WorkerWrapper { public void cancel() { if (State.setState(state, states_of_beforeWorkingEnd, SKIP, null)) { fastFail(false, new CancelException(), true); + //此处调用结果处理器让用户决定取消逻辑 + final Consumer __function__callbackResult = + success -> { + WorkResult _workResult = getWorkResult(); + try { + callback.result(success, param, _workResult); + } catch (Exception e) { + if (setState(state, states_of_skipOrAfterWork, ERROR, null)) { + fastFail(false, e, _workResult.getEx() instanceof EndsNormallyException); + } + } + }; + __function__callbackResult.accept(false); } } diff --git a/asyncTool-core/src/test/java/v15/cases/Case15.java b/asyncTool-core/src/test/java/v15/cases/Case15.java new file mode 100644 index 0000000..be1d997 --- /dev/null +++ b/asyncTool-core/src/test/java/v15/cases/Case15.java @@ -0,0 +1,143 @@ +package v15.cases; + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.OnceWork; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WorkerWrapperBuilder; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 示例:模拟内存溢出 + *

+ * 运行之前请设置 + * -Xmx20m -Xms20m + * + * 当内存溢出时,其中一个线程会OOM,runable不会继续调度, + * 我通过添加一个线程主动cancel来达到提前结束任务而不是等超时 + * + * @author create by kyle + */ +class Case15 { + + private static WorkerWrapperBuilder builder(String id) { + + return WorkerWrapper.builder() + .id(id) + .param(id + "X") + .worker(new MyWorker(id)) + .callback((new ICallback() { + @Override + public void begin() { + System.out.println("wrapper(id=" + id + ") has begin . "); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("\t\twrapper(id=" + id + ") callback " + + (success ? "success " : "fail ") + + ", workResult is " + workResult); + } + })) + .allowInterrupt(true); + } + + public static void main(String[] args) { + long now = SystemClock.now(); + WorkerWrapper a = builder("A").build(); + WorkerWrapper d; + WorkerWrapper build = builder("H") + .depends( + builder("F") + .depends(builder("B").depends(a).build()) + .depends(builder("C").depends(a).build()) + .build(), + builder("G") + .depends(builder("E") + .depends(d = builder("D").build()) + .build()) + .build() + ) + .build(); + try { + OnceWork work = Async.work(5000, a, d); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); + + pool.execute(() -> { + while (true) { + try { + if (work.isCancelled()) { + System.out.println("取消成功"); + } + if (work.isFinish()) { + //注意,这里的结果和“输出H节点的结果----”位置处的不一致,这是多线程写造成的 + System.out.println("结束成功" + build.getWorkResult()); + break; + } + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + work.awaitFinish(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("cost:" + (SystemClock.now() - now)); + while (build.getWorkResult().getEx() == null) { + //同步等待result数据写入 + } + System.out.println("输出H节点的结果----" + build.getWorkResult()); + /* 输出: + wrapper(id=D) is working + wrapper(id=A) is working + wrapper(id=E) is working + wrapper(id=B) is working + wrapper(id=C) is working + wrapper(id=G) is working + wrapper(id=F) is working + wrapper(id=H) is working + */ + } + + private static class MyWorker implements IWorker { + + //用于存放模拟的对象,防止GC回收,用List做对象引用 + private final List list = new LinkedList<>(); + + private String id; + + private int i = 0; + + public MyWorker(String id) { + this.id = id; + } + + @Override + public String action(String param, Map> allWrappers) { + if ("F".equals(id)) { + System.out.println("wrapper(id=" + id + ") is working"); + while (true) { + System.out.println("I am alive:" + i++); + byte[] buf = new byte[1024 * 1024]; + list.add(buf); + } + } + return id; + } + + } + +} +