From 787abdb6f1b027dbbe36a5255a28e344af675a65 Mon Sep 17 00:00:00 2001 From: TcSnZh <1293969878@qq.com> Date: Sat, 8 May 2021 18:15:37 +0800 Subject: [PATCH] =?UTF-8?q?v1.5.1=20=E5=A2=9E=E5=8A=A0=E5=8D=95wrapper?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E5=88=A4=E5=AE=9A=E5=8A=9F=E8=83=BD=E3=80=81?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=BD=AE=E8=AF=A2=E7=AD=96=E7=95=A5=E3=80=81?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jd/platform/async/executor/Async.java | 13 +- .../executor/WrapperEndingInspector.java | 347 ------------- .../async/wrapper/StableWorkerWrapper.java | 7 - .../wrapper/StableWorkerWrapperBuilder.java | 77 ++- .../platform/async/wrapper/WorkerWrapper.java | 234 ++++++++- .../async/wrapper/WorkerWrapperBuilder.java | 53 ++ .../async/wrapper/WrapperEndingInspector.java | 486 ++++++++++++++++++ .../actionstrategy/DependenceStrategy.java | 22 +- src/test/java/v15/dependnew/Test.java | 83 ++- 9 files changed, 904 insertions(+), 418 deletions(-) delete mode 100644 src/main/java/com/jd/platform/async/executor/WrapperEndingInspector.java create mode 100644 src/main/java/com/jd/platform/async/wrapper/WrapperEndingInspector.java diff --git a/src/main/java/com/jd/platform/async/executor/Async.java b/src/main/java/com/jd/platform/async/executor/Async.java index 2a1ade4..7dcf855 100644 --- a/src/main/java/com/jd/platform/async/executor/Async.java +++ b/src/main/java/com/jd/platform/async/executor/Async.java @@ -5,10 +5,11 @@ import com.jd.platform.async.callback.DefaultGroupCallback; import com.jd.platform.async.callback.IGroupCallback; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WrapperEndingInspector; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** @@ -28,15 +29,15 @@ public class Async { */ public static boolean beginWork(long timeout, ExecutorService executorService, - Collection workerWrappers) - throws ExecutionException, InterruptedException { + Collection> workerWrappers) + throws InterruptedException { if (workerWrappers == null || workerWrappers.size() == 0) { return false; } //保存上次执行的线程池变量(为了兼容以前的旧功能) Async.lastExecutorService = Objects.requireNonNull(executorService, "ExecutorService is null ! "); //定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result - final ConcurrentMap forParamUseWrappers = + final ConcurrentMap> forParamUseWrappers = new ConcurrentHashMap<>(Math.max(workerWrappers.size() * 3, 8)); final WrapperEndingInspector inspector = new WrapperEndingInspector(SystemClock.now() + timeout); inspector.addWrapper(workerWrappers); @@ -59,7 +60,7 @@ public class Async { if (workerWrapper == null || workerWrapper.length == 0) { return false; } - Set workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet()); + Set workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet()); return beginWork(timeout, executorService, workerWrappers); } @@ -148,7 +149,7 @@ public class Async { TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() { - private final AtomicInteger threadCount = new AtomicInteger(0); + private final AtomicLong threadCount = new AtomicLong(0); @Override public Thread newThread(Runnable r) { diff --git a/src/main/java/com/jd/platform/async/executor/WrapperEndingInspector.java b/src/main/java/com/jd/platform/async/executor/WrapperEndingInspector.java deleted file mode 100644 index 844f91c..0000000 --- a/src/main/java/com/jd/platform/async/executor/WrapperEndingInspector.java +++ /dev/null @@ -1,347 +0,0 @@ -package com.jd.platform.async.executor; - -import com.jd.platform.async.executor.timer.SystemClock; -import com.jd.platform.async.worker.WorkResult; -import com.jd.platform.async.wrapper.WorkerWrapper; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - - -/** - * 判断{@link WorkerWrapper}是否链路调用完成的轮询器。 - * ================================================================================= - *

- * 在v1.4及以前的版本,存在如下问题: - * > - * 在使用线程数量较少的线程池进行beginWork时,调用WorkerWrapper#beginNext方法时, - * 会因为本线程等待下游Wrapper执行完成而存在线程耗尽bug。线程池会死翘翘的僵住、动弹不得。 - * > - * 例如仅有2个线程的线程池,执行以下任务: - * {@code - *

- * 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况,在test/v15.dependnew中示例testThreadPolling_V14Bug说明了这个bug - * 线程数:2 - * A(5ms)--B1(10ms) ---|--> C1(5ms) - * . \ | (B1、B2全部完成可执行C1、C2) - * . ---> B2(20ms) --|--> C2(5ms) - *

- * } - * 线程1执行了A,然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成。 - * 线程2执行了B1或B2中的一个,也在allOf方法等待C1、C2完成。 - * 结果没有线程执行C和B2了,导致超时而死,并且这个线程池线程有可能被耗尽。 - * > - * v1.5的解决方案是,放弃使工作线程遭致阻塞的{@link java.util.concurrent.CompletableFuture}, - * 而是让工作线程在工作前注册到本“完成检查器”{@link WrapperEndingInspector},然后交由轮询中心{@link PollingCenter}进行检查是否完成。 - *

- * ================================================================================= - *

- * 本类的工作原理: - * . - * 原理: - * (1)首先在Async代码中,将主动运行的wrapper都保存到一个inspector{@link #addWrapper(WorkerWrapper)}, - * (2)主动运行的wrapper于FINISH/ERROR时,先异步submit所有下游wrapper,在其执行时将自身(下游wrapper)保存到inspector, - * (3)然后在异步submit完所有下游wrapper后,将调用{@link #setWrapperEndWithTryPolling(WorkerWrapper)}方法, - * . 设置自己的{@link #wrapper2called}为true,并呼叫轮询{@link PollingCenter#tryPolling()}。 - * (4)在下游wrapper中,经过策略器判断后, - * . 若是不需要运行,则把本wrapper计数-1{@link Node#count},若是计数<1则将{@link Node}移出{@link #wrapper2called}。 - * . 若是需要运行,则运行之,然后跳转到 (2) 的情节。如此递归,执行链路上所有需要执行的wrapper最后都会存在于{@link #wrapper2called}中。 - * . - * 因此,若是存在任一其{@link Node#called}为false的wrapper,则表示这条链路还没有调用完。 - * 若是在{@link #wrapper2called}中所有的{@link Node#called}为true时,即可判断出链路执行完毕了。 - *

- * - * @author create by TcSnZh on 2021/5/5-下午3:22 - */ -public class WrapperEndingInspector implements Comparable { - /** - * 最迟完成时间 - */ - private final long latestFinishTime; - - /** - * 保存 需要检查的wrapper--相关属性 的Map。 - */ - private final ConcurrentHashMap wrapper2called = new ConcurrentHashMap<>(); - - /** - * 当全部wrapper都调用结束,它会countDown - */ - private final CountDownLatch endCDL = new CountDownLatch(1); - - /** - * 读锁用于修改数据,写锁用于轮询。使用公平锁让wrapper的时间波动不会太长。 - *

- * 在轮询到本inspector时,之所以要上写锁,是因为: - * 假如此时有个Wrapper正在调用{{@link #addWrapper(WorkerWrapper)}},则wrapper2called发生了改变。 - * 假如现在恰巧访问到的是{@link #wrapper2called}迭代器的最后一个,但此时又加入了另一个,且这另一个又是需要去执行的。 - * 那么假如在迭代器遍历到目前访问到的wrapper都是呼叫完毕的,那么这新加入的一个就会被忽略,从而判定为全部完成。致使bug发生。 - *

- * 此外,即便轮询时上写锁,对性能的影响也是有限的。因为这只会在“呼叫别人”的时候发生工作线程与轮询线程的锁争抢, - * 而在工作线程执行{@link com.jd.platform.async.callback.IWorker#action(Object, Map)}或 - * {@link com.jd.platform.async.callback.ICallback#result(boolean, Object, WorkResult)}时,并不会与轮询线程去 - * 争抢锁,而通常这个工作的时间才是最耗时的。 - */ - private final ReentrantReadWriteLock writePollingLock = new ReentrantReadWriteLock(true); - - public WrapperEndingInspector(long latestFinishTime) { - this.latestFinishTime = latestFinishTime; - } - - public void registerToPollingCenter() { - writePollingLock.readLock().lock(); - try { - PollingCenter.getInstance().inspectionSet.add(this); - } finally { - writePollingLock.readLock().unlock(); - } - } - - public void addWrapper(WorkerWrapper wrapper) { - writePollingLock.readLock().lock(); - try { - wrapper2called.computeIfAbsent(wrapper, k -> new Node()).count.incrementAndGet(); - } finally { - writePollingLock.readLock().unlock(); - } - } - - public void addWrapper(Collection wrappers) { - writePollingLock.readLock().lock(); - try { - Objects.requireNonNull(wrappers).forEach(this::addWrapper); - } finally { - writePollingLock.readLock().unlock(); - } - } - - public void reduceWrapper(WorkerWrapper wrapper) { - writePollingLock.readLock().lock(); - try { - /* - * 有可能发生这情况,一个Wrapper刚被加进去,执行了零/一/多次,均不满足执行条件,但是下次调用却应当使其启动。 - */ - if (wrapper.getState() != WorkerWrapper.INIT) { - Node node = wrapper2called.get(wrapper); - if (node == null) { - return; - } - synchronized (node) { - if (node.count.decrementAndGet() < 1) { - wrapper2called.remove(wrapper); - } - } - } - } finally { - writePollingLock.readLock().unlock(); - } - } - - /** - * 原子的设置这个Wrapper已经呼叫完成了。 - *

- * 该方法会调用{@link PollingCenter#tryPolling()},呼叫轮询线程 - * - * @return 如果为true,表示设置成功。为false表示已经被设置过了。 - */ - public boolean setWrapperEndWithTryPolling(WorkerWrapper wrapper) { - writePollingLock.readLock().lock(); - try { - return !wrapper2called.get(wrapper).called.getAndSet(true); - } finally { - writePollingLock.readLock().unlock(); - PollingCenter.getInstance().tryPolling(); - } - } - - /** - * 供外部调用的等待方法 - * - * @return 在超时前完成,返回true。超时时间一到,就会返回false。就像,人被杀,就会死。 - * @throws InterruptedException 外部调用的当前线程被中断时,会抛出这个异常。 - */ - public boolean await() throws InterruptedException { - return endCDL.await(latestFinishTime - SystemClock.now(), TimeUnit.MILLISECONDS); - } - - /** - * {@link PollingCenter}会优先把最迟完成时间(即开始时间+超时时间)较早的Inspection放在前面。 - */ - @Override - public int compareTo(WrapperEndingInspector other) { - if (this.latestFinishTime - other.latestFinishTime < 0) { - return -1; - } - return 1; - } - - @Override - public String toString() { - return "WrapperEndingInspector{" + - "remainTime=" + (latestFinishTime - SystemClock.now()) + - ", wrapper2called=" + - wrapper2called.entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue)) - + - ", endCDL.getCount()=" + endCDL.getCount() + - ", writePollingLock={read=" + writePollingLock.getReadLockCount() + ",write=" + writePollingLock.getWriteHoldCount() + - "} }"; - } - - /** - * 节点对象,保存属性信息于{@link #wrapper2called}中。 - *

- * 当试图把Node移出本Map时,该Node对象自身将会被上锁。 - */ - public static class Node { - /** - * 是否已经呼叫完了下游wrapper - */ - AtomicBoolean called = new AtomicBoolean(false); - /** - * 本wrapper总共被呼叫次数的统计。若小于1则会被移出map。 - */ - AtomicInteger count = new AtomicInteger(0); - - @Override - public String toString() { - return "{" + - "called=" + called.get() + - ", count=" + count.get() + - '}'; - } - } - - /** - * 轮询中心。具体的轮询调度由其完成。 - *

- * {@link #registerToPollingCenter()}调用时,就会将inspector注册到本轮询中心以供轮询。 - */ - public static class PollingCenter { - /** - * 将被轮询的WrapperFinishInspection集合。 - */ - private final Set inspectionSet = new ConcurrentSkipListSet<>(); - - /** - * 请求轮询。 - */ - private void tryPolling() { - if (inspectionSet.size() < SINGLETON_POLLING_POOL.getActiveCount()) { - // 线程数 > inspector数,理论上已经各个线程都在忙活了,不去新开线程。 - return; - } - SINGLETON_POLLING_POOL.submit(() -> { - int expectCount; - while (!inspectionSet.isEmpty()) { - // expectCount是本线程用来记录本次循环开始时inspectionSet的个数。 - // 每当移出一个inspector时,该值-1。 - expectCount = inspectionSet.size(); - // 开始检查 - for (WrapperEndingInspector inspector : inspectionSet) { - // 直接抢锁,轮询期间禁止修改inspector - inspector.writePollingLock.writeLock().lock(); - try { - if (PollingCenter.this.inspectorIsEnd(inspector)) { - // inspector中的wrapper调用结束了 - if (inspector.endCDL.getCount() > 0) { - // 双重检查使endCDL原子性countDown。 - synchronized (inspector.endCDL) { - if (inspector.endCDL.getCount() > 0) { - inspectionSet.remove(inspector); - expectCount--; - inspector.endCDL.countDown(); - } - } - } - } - } finally { - inspector.writePollingLock.writeLock().unlock(); - } - } - /* - * 根据 expectCount == inspectionSet.size() 的值,由于本线程1个线程在轮询: - * 1. 若值为true,表示轮询过程中没有新的inspector被添加进set中。此时就可以break了。 - * . 之所以可以break,是因为这个inspection还没有调用结束,在其结束前还会来催促轮询的。 - * 2. 若值为false,表示有新的inspector在本线程轮询时,被加入到了set中,且没有被我们迭代到。此时还要重新轮询一次。 - */ - if (expectCount == inspectionSet.size()) { - break; - } - } - }); - } - - private boolean inspectorIsEnd(WrapperEndingInspector inspector) { - if (inspector.latestFinishTime < SystemClock.now()) { - inspector.wrapper2called.forEach(((wrapper, node) -> { - wrapper.stopNow(); - node.called.set(true); - })); - return true; - } - for (Map.Entry entry : inspector.wrapper2called.entrySet()) { - WorkerWrapper wrapper = entry.getKey(); - Node node = entry.getValue(); - if (wrapper.getState() == WorkerWrapper.INIT - // 上值如果为false,表示该Wrapper要么还没来得及执行,要么判断不需要执行但是还未被移出 - || !node.called.get() - // 上值如果为false,表示该Wrapper正在工作或是刚刚结束/失败,还未将所有下游Wrapper调用一遍。 - ) { - return false; - } - // 这里需要去判断一下超时。 - } - return true; - } - - // ========== static ========== - - private final static PollingCenter instance = new PollingCenter(); - - public static PollingCenter getInstance() { - return instance; - } - - /** - * 单线程的轮询线程池 - */ - private static final ThreadPoolExecutor SINGLETON_POLLING_POOL = new ThreadPoolExecutor( - 0, - // 轮询线程数必须为1 - 1, - 15L, - TimeUnit.SECONDS, - // 必须保存至少一个轮询请求,以便在本线程轮询结束时,获取到已轮询过的线程提交的轮询请求 - new ArrayBlockingQueue<>(1), - new ThreadFactory() { - private final AtomicInteger threadCount = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "asyncTool-wrapperEndingInspectorPollingCenterPool-thread-" + threadCount.getAndIncrement()); - t.setDaemon(true); - // 线程优先级不高 - t.setPriority(1); - return t; - } - - @Override - public String toString() { - return "asyncTool-wrapperEndingInspectorPollingCenterPool-threadFactory"; - } - }, - // 多的就丢了,反正都是催这一个线程去轮询 - new ThreadPoolExecutor.DiscardPolicy() - ) { - @Override - public String toString() { - return "asyncTool-wrapperEndingInspectorPollingCenterPool"; - } - }; - } -} diff --git a/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapper.java b/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapper.java index 3ff9554..bda426a 100644 --- a/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapper.java +++ b/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapper.java @@ -2,15 +2,8 @@ package com.jd.platform.async.wrapper; import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; -import com.jd.platform.async.exception.SkippedException; -import com.jd.platform.async.executor.WrapperEndingInspector; -import com.jd.platform.async.executor.timer.SystemClock; -import com.jd.platform.async.worker.ResultState; -import com.jd.platform.async.wrapper.actionstrategy.DependenceAction; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; /** * {@link WorkerWrapper}默认实现类,将上下游Wrapper保存在自己的Set中。 diff --git a/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapperBuilder.java b/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapperBuilder.java index e3cf080..5f98330 100644 --- a/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapperBuilder.java +++ b/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapperBuilder.java @@ -7,6 +7,7 @@ import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy; import com.jd.platform.async.wrapper.actionstrategy.*; import java.util.*; +import java.util.concurrent.TimeUnit; /** * 一个稳定的Builder,兼容1.4版本之前的代码。 @@ -86,7 +87,17 @@ class StableWorkerWrapperBuilder setTimeOut() { + return new SetTimeOutImpl(); + } + + public class SetTimeOutImpl implements SetTimeOut { + @Override + public SetTimeOutImpl enableTimeOut(boolean enableElseDisable) { + StableWorkerWrapperBuilder.this.enableTimeOut = enableElseDisable; + return this; + } + + @Override + public SetTimeOutImpl setTime(long time, TimeUnit unit) { + if (time <= 0 || unit == null) { + throw new IllegalStateException("Illegal argument : time=" + time + " must > 0, unit=" + unit + " must not null"); + } + StableWorkerWrapperBuilder.this.time = time; + StableWorkerWrapperBuilder.this.unit = unit; + return this; + } + + @Override + public SetTimeOutImpl allowInterrupt(boolean allow) { + StableWorkerWrapperBuilder.this.allowInterrupt = allow; + return this; + } + + @Override + public BUILDER_SUB_CLASS end() { + return returnThisBuilder(); + } + } + @Override public WorkerWrapper build() { isBuilding = true; @@ -264,25 +309,7 @@ class StableWorkerWrapperBuilder> dependWrappers, - WorkerWrapper thisWrapper, - WorkerWrapper fromWrapper) { - DependMustStrategyMapper mustMapper = thisWrapper.getWrapperStrategy().getDependMustStrategyMapper(); - if (mustMapper != null && mustMapper.getMustDependSet().size() > 0) { - // 至少有一个must,则因为must未完全完成而等待。 - return DependenceAction.TAKE_REST.emptyProperty(); - } - // 如果一个must也没有,则认为应该是ANY模式。 - return DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS.judgeAction(dependWrappers, thisWrapper, fromWrapper); - } - - @Override - public String toString() { - return "IF_HAS_MUST_ALL_MUST_ELSE_ANY"; - } - }); + wrapper.getWrapperStrategy().setDependenceStrategy(DependenceStrategy.IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY); } else { if (mustDependSet != null && mustDependSet.size() > 0) { wrapper.getWrapperStrategy().setDependMustStrategyMapper(new DependMustStrategyMapper().addDependMust(mustDependSet)); @@ -317,6 +344,18 @@ class StableWorkerWrapperBuilder " + 0); + } + if (unit == null) { + throw new IllegalStateException("timeout unit must not null"); + } + wrapper.setTimeOut(new WorkerWrapper.TimeOutProperties(true, time, unit, allowInterrupt, wrapper)); + } + } return wrapper; } diff --git a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index 0d4d264..fe074bd 100755 --- a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -4,7 +4,6 @@ import com.jd.platform.async.callback.DefaultCallback; import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.exception.SkippedException; -import com.jd.platform.async.executor.WrapperEndingInspector; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.*; import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy; @@ -60,6 +59,10 @@ public abstract class WorkerWrapper { * IDEA可以使用JOL Java Object Layout插件查看对象大小。 */ private final WrapperStrategy wrapperStrategy = new WrapperStrategy(); + /** + * 超时检查,该值允许为null。表示不设置。 + */ + private volatile TimeOutProperties timeOutProperties; // ***** state属性的常量值 ***** @@ -121,14 +124,24 @@ public abstract class WorkerWrapper { public abstract Set> getNextWrappers(); /** - * 总控制台超时,停止所有任务 + * 使wrapper状态修改为超时失败。(但如果已经执行完成则不会修改) + *

+ * 本方法不会试图执行超时判定逻辑。 + * 如果要执行超时逻辑判断,请用{@link TimeOutProperties#checkTimeOut(boolean)}并传入参数true。 */ - public void stopNow() { - if (getState() == INIT || getState() == WORKING) { - fastFail(getState(), null); + public void failNow() { + int state = getState(); + if (state == INIT || state == WORKING) { + fastFail(state, null); } } + public WrapperStrategy getWrapperStrategy() { + return wrapperStrategy; + } + + // ========== protected ========== + /** * 快速失败 * @@ -266,16 +279,16 @@ public abstract class WorkerWrapper { // nextWrappers有多个 try { inspector.addWrapper(nextWrappers); - nextWrappers.forEach(next -> { - executorService.submit(() -> next.work(executorService, this, nextRemainTIme, getForParamUseWrappers(), inspector)); - }); + nextWrappers.forEach(next -> executorService.submit(() -> + next.work(executorService, this, nextRemainTIme, getForParamUseWrappers(), inspector)) + ); } finally { inspector.setWrapperEndWithTryPolling(this); } } /** - * 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程 + * 本工作线程执行自己的job.判断阻塞超时这里开始时会判断一次总超时时间,但在轮询线程会判断单个wrapper超时时间,并也会判断总超时时间。 */ protected void fire() { //阻塞取结果 @@ -288,9 +301,19 @@ public abstract class WorkerWrapper { if (!compareAndSetState(INIT, WORKING)) { return; } - callback.begin(); - //执行耗时操作 - V resultValue = resultValue = (V) worker.action(param, (Map) getForParamUseWrappers()); + V resultValue; + try { + callback.begin(); + if (timeOutProperties != null) { + timeOutProperties.startWorking(); + } + //执行耗时操作 + resultValue = (V) worker.action(param, (Map) getForParamUseWrappers()); + } finally { + if (timeOutProperties != null) { + timeOutProperties.endWorking(); + } + } //如果状态不是在working,说明别的地方已经修改了 if (!compareAndSetState(WORKING, FINISH)) { return; @@ -386,29 +409,46 @@ public abstract class WorkerWrapper { abstract void setDependWrappers(Set> dependWrappers); - WrapperStrategy getWrapperStrategy() { - return wrapperStrategy; + TimeOutProperties getTimeOut() { + return timeOutProperties; + } + + void setTimeOut(TimeOutProperties timeOutProperties) { + this.timeOutProperties = timeOutProperties; } // ========== toString ========== @Override public String toString() { - final StringBuilder sb = new StringBuilder(150) + final StringBuilder sb = new StringBuilder(200) .append("WorkerWrapper{id=").append(id) .append(", param=").append(param) .append(", worker=").append(worker) .append(", callback=").append(callback) - .append(", state=").append(state) + .append(", state="); + int state = this.state.get(); + if (state == FINISH) { + sb.append("FINISH"); + } else if (state == WORKING) { + sb.append("WORKING"); + } else if (state == INIT) { + sb.append("INIT"); + } else if (state == ERROR) { + sb.append("ERROR"); + } else { + throw new IllegalStateException("unknown state : " + state); + } + sb .append(", workResult=").append(workResult) // 防止循环引用,这里只输出相关Wrapper的id - .append(", forParamUseWrappers::getId="); + .append(", forParamUseWrappers::getId=["); getForParamUseWrappers().keySet().forEach(wrapperId -> sb.append(wrapperId).append(", ")); if (getForParamUseWrappers().keySet().size() > 0) { sb.delete(sb.length() - 2, sb.length()); } sb - .append(", dependWrappers::getId=["); + .append("], dependWrappers::getId=["); getDependWrappers().stream().map(WorkerWrapper::getId).forEach(wrapperId -> sb.append(wrapperId).append(", ")); if (getDependWrappers().size() > 0) { sb.delete(sb.length() - 2, sb.length()); @@ -422,6 +462,7 @@ public abstract class WorkerWrapper { sb .append("]") .append(", wrapperStrategy=").append(getWrapperStrategy()) + .append(", timeOutProperties=").append(getTimeOut()) .append('}'); return sb.toString(); } @@ -510,7 +551,6 @@ public abstract class WorkerWrapper { // ========== toString ========== - @Override public String toString() { return "WrapperStrategy{" + @@ -521,4 +561,160 @@ public abstract class WorkerWrapper { '}'; } } + + public static class TimeOutProperties { + private final boolean enable; + private final long time; + private final TimeUnit unit; + private final boolean allowInterrupt; + private final WorkerWrapper wrapper; + + private final Object lock = new Object(); + + private volatile boolean started = false; + private volatile boolean ended = false; + private volatile long startWorkingTime; + private volatile long endWorkingTime; + private volatile Thread doWorkingThread; + + public TimeOutProperties(boolean enable, long time, TimeUnit unit, boolean allowInterrupt, WorkerWrapper wrapper) { + this.enable = enable; + this.time = time; + this.unit = unit; + this.allowInterrupt = allowInterrupt; + this.wrapper = wrapper; + } + + // ========== 工作线程调用 ========== + + public void startWorking() { + synchronized (lock) { + started = true; + startWorkingTime = SystemClock.now(); + doWorkingThread = Thread.currentThread(); + } + } + + public void endWorking() { + synchronized (lock) { + ended = true; + doWorkingThread = null; + endWorkingTime = SystemClock.now(); + } + } + + // ========== 轮询线程调用 ========== + + /** + * 检查超时。 + * 可以将boolean参数传入true以在超时的时候直接失败。 + * + * @param withStop 如果为false,不会发生什么,仅仅是单纯的判断是否超时。 + * 如果为true,则会去快速失败wrapper{@link #failNow()},有必要的话还会打断线程。 + * @return 如果 超时 或 执行时间超过限制 返回true;未超时返回false。 + */ + public boolean checkTimeOut(boolean withStop) { + if (enable) { + synchronized (lock) { + if (started) { + // 判断执行中的wrapper是否超时 + long dif = (ended ? endWorkingTime : SystemClock.now()) - startWorkingTime; + if (dif > unit.toMillis(time)) { + if (withStop) { + if (allowInterrupt) { + doWorkingThread.interrupt(); + } + wrapper.failNow(); + ended = true; + } + return true; + } + return false; + } + } + } + return false; + } + + // ========== package ========== + + boolean isEnable() { + return enable; + } + + long getTime() { + return time; + } + + TimeUnit getUnit() { + return unit; + } + + boolean isAllowInterrupt() { + return allowInterrupt; + } + + Object getLock() { + return lock; + } + + boolean isStarted() { + return started; + } + + void setStarted(boolean started) { + this.started = started; + } + + boolean isEnded() { + return ended; + } + + void setEnded(boolean ended) { + this.ended = ended; + } + + long getStartWorkingTime() { + return startWorkingTime; + } + + void setStartWorkingTime(long startWorkingTime) { + this.startWorkingTime = startWorkingTime; + } + + long getEndWorkingTime() { + return endWorkingTime; + } + + void setEndWorkingTime(long endWorkingTime) { + this.endWorkingTime = endWorkingTime; + } + + Thread getDoWorkingThread() { + return doWorkingThread; + } + + void setDoWorkingThread(Thread doWorkingThread) { + this.doWorkingThread = doWorkingThread; + } + + + // ========== toString ========== + + @Override + public String toString() { + return "TimeOutProperties{" + + "enable=" + enable + + ", time=" + time + + ", unit=" + unit + + ", allowInterrupt=" + allowInterrupt + + ", wrapper::getId=" + wrapper.getId() + + ", started=" + started + + ", ended=" + ended + + ", startWorkingTime=" + startWorkingTime + + ", endWorkingTime=" + endWorkingTime + + ", doWorkingThread=" + doWorkingThread + + '}'; + } + } } diff --git a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java index c861cb1..4a519c0 100644 --- a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java +++ b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java @@ -8,6 +8,7 @@ import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy; import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy; import java.util.Collection; +import java.util.concurrent.TimeUnit; /** * 作为优化编排依赖策略后,新增的Builder接口。 @@ -233,5 +234,57 @@ public interface WorkerWrapperBuilder { return setNext().wrapper(wrappers).end(); } + /** + * 设置超时时间的具体属性 + */ + SetTimeOut setTimeOut(); + + interface SetTimeOut { + /** + * 是否启动超时判断。 + *

+ * 默认为true + * + * @param enableElseDisable 是则true + */ + SetTimeOut enableTimeOut(boolean enableElseDisable); + + /** + * 设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断 + * + * @param time 时间数值 + * @param unit 时间单位 + */ + SetTimeOut setTime(long time, TimeUnit unit); + + /** + * 是否允许被试图中断线程 + * + * @param allow 是则true + */ + SetTimeOut allowInterrupt(boolean allow); + + WorkerWrapperBuilder end(); + } + + /** + * 便携式设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断 + * + * @param time 时间数值 + * @param unit 时间单位 + */ + default WorkerWrapperBuilder timeout(long time, TimeUnit unit) { + return timeout(true, time, unit, false); + } + + default WorkerWrapperBuilder timeout(boolean enableTimeOut, long time, TimeUnit unit, boolean allowInterrupt) { + return setTimeOut().enableTimeOut(enableTimeOut).setTime(time, unit).allowInterrupt(allowInterrupt).end(); + } + + /** + * 构建Wrapper。 + * + * @return 返回WorkerWrapper + */ WorkerWrapper build(); } diff --git a/src/main/java/com/jd/platform/async/wrapper/WrapperEndingInspector.java b/src/main/java/com/jd/platform/async/wrapper/WrapperEndingInspector.java new file mode 100644 index 0000000..66bb3b9 --- /dev/null +++ b/src/main/java/com/jd/platform/async/wrapper/WrapperEndingInspector.java @@ -0,0 +1,486 @@ +package com.jd.platform.async.wrapper; + +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + + +/** + * 判断{@link WorkerWrapper}是否链路调用完成的轮询器。 + * ================================================================================= + *

+ * 在v1.4及以前的版本,存在如下问题: + * > + * 在使用线程数量较少的线程池进行beginWork时,调用WorkerWrapper#beginNext方法时, + * 会因为本线程等待下游Wrapper执行完成而存在线程耗尽bug。线程池会死翘翘的僵住、动弹不得。 + * > + * 例如仅有2个线程的线程池,执行以下任务: + * {@code + *

+ * 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况,在test/v15.dependnew中示例testThreadPolling_V14Bug说明了这个bug + * 线程数:2 + * A(5ms)--B1(10ms) ---|--> C1(5ms) + * . \ | (B1、B2全部完成可执行C1、C2) + * . ---> B2(20ms) --|--> C2(5ms) + *

+ * } + * 线程1执行了A,然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成。 + * 线程2执行了B1或B2中的一个,也在allOf方法等待C1、C2完成。 + * 结果没有线程执行C和B2了,导致超时而死,并且这个线程池线程有可能被耗尽。 + * > + * v1.5的解决方案是,放弃使工作线程遭致阻塞的{@link java.util.concurrent.CompletableFuture}, + * 而是让工作线程在工作前注册到本“完成检查器”{@link WrapperEndingInspector},然后交由轮询中心{@link PollingCenter}进行检查是否完成。 + *

+ * ================================================================================= + *

+ * 本类的工作原理: + * . + * 原理: + * (1)首先在Async代码中,将主动运行的wrapper都保存到一个inspector{@link #addWrapper(WorkerWrapper)}, + * (2)主动运行的wrapper于FINISH/ERROR时,先异步submit所有下游wrapper,在其执行时将自身(下游wrapper)保存到inspector, + * (3)然后在异步submit完所有下游wrapper后,将调用{@link #setWrapperEndWithTryPolling(WorkerWrapper)}方法, + * . 设置自己的{@link #wrappers}为true,并呼叫轮询{@link PollingCenter#tryPolling()}。 + * (4)在下游wrapper中,经过策略器判断后, + * . 若是不需要运行,则把本wrapper计数-1{@link WrapperNode#count},若是计数<1则将{@link WrapperNode}移出{@link #wrappers}。 + * . 若是需要运行,则运行之,然后跳转到 (2) 的情节。如此递归,执行链路上所有需要执行的wrapper最后都会存在于{@link #wrappers}中。 + * . + * 因此,若是存在任一其{@link WrapperNode#called}为false的wrapper,则表示这条链路还没有调用完。 + * 若是在{@link #wrappers}中所有的{@link WrapperNode#called}为true时,即可判断出链路执行完毕了。 + *

+ * + * @author create by TcSnZh on 2021/5/5-下午3:22 + */ +public class WrapperEndingInspector implements Comparable { + /** + * 最迟完成时间 + */ + private final long latestFinishTime; + + /** + * 保存 需要检查的wrapper--相关属性 的Map。 + */ + private final ConcurrentHashMap wrappers = new ConcurrentHashMap<>(); + + /** + * 当全部wrapper都调用结束,它会countDown + */ + private final CountDownLatch endCDL = new CountDownLatch(1); + + /** + * 读锁用于修改数据,写锁用于轮询。使用公平锁让wrapper的时间波动不会太长。 + *

+ * 在轮询到本inspector时,之所以要上写锁,是因为: + * 假如此时有个Wrapper正在调用{@link #addWrapper(WorkerWrapper)},则wrappers发生了改变。 + * 假如现在恰巧访问到的是{@link #wrappers}迭代器的最后一个,但此时又加入了另一个,且这另一个又是需要去执行的。 + * 那么假如在迭代器遍历到目前访问到的wrapper都是呼叫完毕的,那么这新加入的一个就会被忽略,从而判定为全部完成。致使bug发生。 + *

+ * 此外,即便轮询时上写锁,对性能的影响也是有限的。因为这只会在“呼叫别人”的时候发生工作线程与轮询线程的锁争抢, + * 而在工作线程执行{@link com.jd.platform.async.callback.IWorker#action(Object, Map)}或 + * {@link com.jd.platform.async.callback.ICallback#result(boolean, Object, WorkResult)}时,并不会与轮询线程去 + * 争抢锁,而通常这个工作的时间才是最耗时的。 + */ + private final ReentrantReadWriteLock modifyPollingLock = new ReentrantReadWriteLock(true); + + /** + * 当轮询发现超时时,该值被设为false + */ + private final AtomicBoolean haveNotTimeOut = new AtomicBoolean(true); + + public WrapperEndingInspector(long latestFinishTime) { + this.latestFinishTime = latestFinishTime; + } + + public void registerToPollingCenter() { + modifyPollingLock.readLock().lock(); + try { + // 不重复put,以免InspectorNode被替换为另一个 + PollingCenter.getInstance().inspectionMap.putIfAbsent(this, new PollingCenter.InspectorNode()); + } finally { + modifyPollingLock.readLock().unlock(); + } + } + + public void addWrapper(WorkerWrapper wrapper) { + modifyPollingLock.readLock().lock(); + try { + wrappers.computeIfAbsent(wrapper, k -> new WrapperNode()).count.incrementAndGet(); + } finally { + modifyPollingLock.readLock().unlock(); + } + } + + public void addWrapper(Collection wrappers) { + modifyPollingLock.readLock().lock(); + try { + Objects.requireNonNull(wrappers).forEach(this::addWrapper); + } finally { + modifyPollingLock.readLock().unlock(); + } + } + + public void reduceWrapper(WorkerWrapper wrapper) { + modifyPollingLock.readLock().lock(); + try { + /* + * 有可能发生这情况,一个Wrapper刚被加进去,执行了零/一/多次,均不满足执行条件,但是下次调用却应当使其启动。 + */ + if (wrapper.getState() != WorkerWrapper.INIT) { + final WrapperNode wrapperNode = wrappers.get(wrapper); + if (wrapperNode == null) { + return; + } + synchronized (wrapperNode) { + if (wrapperNode.count.decrementAndGet() < 1) { + wrappers.remove(wrapper); + } + } + } + } finally { + modifyPollingLock.readLock().unlock(); + } + } + + /** + * 原子的设置这个Wrapper已经呼叫完成了。 + *

+ * 该方法会调用{@link PollingCenter#tryPolling()},呼叫轮询线程 + * + * @return 如果为true,表示设置成功。为false表示已经被设置过了。 + */ + public boolean setWrapperEndWithTryPolling(WorkerWrapper wrapper) { + modifyPollingLock.readLock().lock(); + try { + return !wrappers.get(wrapper).called.getAndSet(true); + } finally { + modifyPollingLock.readLock().unlock(); + PollingCenter.getInstance().tryPolling(); + } + } + + /** + * 供外部调用的等待方法 + * + * @return 在超时前完成,返回true。超时时间一到,就会返回false。就像,人被杀,就会死。 + * @throws InterruptedException 外部调用的当前线程被中断时,会抛出这个异常。 + */ + public boolean await() throws InterruptedException { + endCDL.await(); + return haveNotTimeOut.get(); + } + + /** + * {@link PollingCenter}会优先把最迟完成时间(即开始时间+超时时间)较早的Inspection放在前面。 + */ + @Override + public int compareTo(WrapperEndingInspector other) { + if (this.latestFinishTime - other.latestFinishTime < 0) { + return -1; + } + return 1; + } + + @Override + public String toString() { + return "WrapperEndingInspector{" + + "remainTime=" + (latestFinishTime - SystemClock.now()) + + ", wrappers=" + + wrappers.entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue)) + + + ", endCDL.getCount()=" + endCDL.getCount() + + ", writePollingLock={read=" + modifyPollingLock.getReadLockCount() + ",write=" + modifyPollingLock.getWriteHoldCount() + + "} }"; + } + + /** + * 节点对象,保存属性信息于{@link #wrappers}中。 + *

+ * 当试图把Node移出本Map时,该Node对象自身将会被上锁。 + */ + public static class WrapperNode { + /** + * 是否已经呼叫完了下游wrapper + */ + AtomicBoolean called = new AtomicBoolean(false); + /** + * 本wrapper总共被呼叫次数的统计。若小于1则会被移出map。 + */ + AtomicInteger count = new AtomicInteger(0); + + @Override + public String toString() { + return "{" + + "called=" + called.get() + + ", count=" + count.get() + + '}'; + } + } + + /** + * 轮询中心。具体的轮询调度由其完成。 + *

+ * {@link #registerToPollingCenter()}调用时,就会将inspector注册到本轮询中心以供轮询。 + */ + public static class PollingCenter { + public static class InspectorNode { + /** + * 延迟轮询时间戳。 + */ + private volatile long delayTimeStamp = Long.MAX_VALUE; + + private final ReadWriteLock lockOfDelayTimeStamp = new ReentrantReadWriteLock(); + + /** + * 比较传入时间戳与{@link #delayTimeStamp},并设置小的那个为{@link #delayTimeStamp}的值。 + * + * @param otherDelayTimeStamp 试图用来比较的另一个时间戳 + */ + public void compareAndSetMinDelayTimeStamp(long otherDelayTimeStamp) { + lockOfDelayTimeStamp.writeLock().lock(); + try { + long dif = otherDelayTimeStamp - delayTimeStamp; + if (dif > 0) { + return; + } + delayTimeStamp = otherDelayTimeStamp; + } finally { + lockOfDelayTimeStamp.writeLock().unlock(); + } + } + + public long getDelayTimeStamp() { + lockOfDelayTimeStamp.readLock().lock(); + try { + return delayTimeStamp; + } finally { + lockOfDelayTimeStamp.readLock().unlock(); + } + } + + public long clearTimeStamp() { + lockOfDelayTimeStamp.writeLock().lock(); + try { + long old = this.delayTimeStamp; + delayTimeStamp = Long.MAX_VALUE; + return old; + } finally { + lockOfDelayTimeStamp.writeLock().unlock(); + } + } + + @Override + public String toString() { + return "InspectorNode{" + + "delayTimeStamp=" + delayTimeStamp + + ", lockOfDelayTimeStamp=" + lockOfDelayTimeStamp + + '}'; + } + } + + /** + * 将被轮询的WrapperFinishInspection集合。 + */ + private final Map inspectionMap = new ConcurrentSkipListMap<>(); + + /** + * 请求轮询。 + */ + private void tryPolling() { + // 开始轮询 + SINGLETON_POLLING_POOL.submit(() -> { + // 用来判断在轮询过程中是否有新增的inspector的值 + int expectCount; + // 如果此值变化过,则在结束时让自己在此值后的时间再启动轮询 + while (!inspectionMap.isEmpty()) { + // expectCount是本线程用来记录本次循环开始时inspectionMap的个数。 + // 每当移出一个inspector时,该值-1。 + expectCount = inspectionMap.size(); + // 开始检查 + for (Map.Entry entry : inspectionMap.entrySet()) { + final WrapperEndingInspector inspector = entry.getKey(); + final InspectorNode inspectorNode = entry.getValue(); + // 直接抢锁,轮询期间禁止修改inspector + inspector.modifyPollingLock.writeLock().lock(); + try { + // 对一个inspector进行检查 + if (PollingCenter.this.checkInspectorIsEnd(inspector, inspectorNode)) { + // inspector中的wrapper调用结束了 + // 先要把wrapper给停了 + inspector.wrappers.forEach((wrapper, wrapperNode) -> { + WorkerWrapper.TimeOutProperties timeOut = wrapper.getTimeOut(); + if (timeOut != null) { + timeOut.checkTimeOut(true); + } + }); + // 修改此inspector和expectCount的状态 + if (inspector.endCDL.getCount() > 0) { + // 双重检查使endCDL原子性countDown。 + synchronized (inspector.endCDL) { + if (inspector.endCDL.getCount() > 0) { + inspectionMap.remove(inspector); + expectCount--; + inspector.endCDL.countDown(); + } + } + } + } + } finally { + inspector.modifyPollingLock.writeLock().unlock(); + } + } + /* + * 根据 expectCount == inspectionMap.size() 的值,在仅有本线程1个线程在轮询的情况下: + * 1. 若值为true,表示轮询过程中没有新的inspector被添加进set中。此时就可以break了。 + * . 之所以可以break,是因为这个inspection还没有调用结束,在其结束前还会来催促轮询的。 + * 2. 若值为false,表示有新的inspector在本线程轮询时,被加入到了set中,且没有被我们迭代到。此时还要重新轮询一次。 + */ + if (expectCount == inspectionMap.size()) { + break; + } + } + }); + } + + private boolean checkInspectorIsEnd(WrapperEndingInspector inspector, InspectorNode inspectorNode) { + // 判断一下inspector整组是否超时 + if (inspector.latestFinishTime < SystemClock.now()) { + inspector.haveNotTimeOut.set(false); + inspector.wrappers.forEach(((wrapper, wrapperNode) -> { + wrapper.failNow(); + wrapperNode.called.set(true); + })); + return true; + } + // 将延迟检查时间设为离现在最近的值。 + // 此处判断的是inspector所代表整次任务的超时时间 + inspectorNode.compareAndSetMinDelayTimeStamp(inspector.latestFinishTime); + // 判断inspector是否结束,并顺便记录、判断、修改wrapper的超时信息 + for (Map.Entry entry : inspector.wrappers.entrySet()) { + WorkerWrapper wrapper = entry.getKey(); + // 判断单个wrapper是否超时 + WorkerWrapper.TimeOutProperties timeOutProperties = wrapper.getTimeOut(); + if (timeOutProperties != null && timeOutProperties.isEnable()) { + // 将延迟检查时间设为离现在最近的值。 + // 此处判断的是wrapper的超时时间 + if (timeOutProperties.checkTimeOut(true)) { + inspector.haveNotTimeOut.set(false); + } + // 未超时但是设置了超时检查的话,记录一下inspector延时轮询时间 + else { + inspectorNode.compareAndSetMinDelayTimeStamp( + (timeOutProperties.isStarted() ? timeOutProperties.getStartWorkingTime() : SystemClock.now()) + + timeOutProperties.getUnit().toMillis(timeOutProperties.getTime()) + ); + } + } + // 判断wrapper是否执行完毕 + WrapperNode node = entry.getValue(); + if (wrapper.getState() == WorkerWrapper.INIT + // 上值如果为false,表示该Wrapper要么还没来得及执行,要么判断不需要执行但是还未被移出 + || !node.called.get() + // 上值如果为false,表示该Wrapper正在工作或是刚刚结束/失败,还未将所有下游Wrapper调用一遍。 + ) { + return false; + } + } + return true; + } + + { + final String executorName = "asyncTool-pollingDelayCaller"; + ScheduledThreadPoolExecutor delayPollingExecutor = new ScheduledThreadPoolExecutor( + 1, + new ThreadFactory() { + private final AtomicLong threadCount = new AtomicLong(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, executorName + "-thread-" + threadCount.getAndIncrement()); + t.setDaemon(true); + // 线程优先级不高 + t.setPriority(1); + return t; + } + + @Override + public String toString() { + return executorName + "-threadFactory"; + } + } + ) { + @Override + public String toString() { + return executorName + "{PollingCenter.this=" + PollingCenter.this + "}"; + } + }; + // 每毫秒判断一次:map.value的每个延迟轮询队列的头号元素是否抵达当前时间,如果到了,则清除并调用轮询 + delayPollingExecutor.scheduleAtFixedRate(() -> inspectionMap.values().stream() + .min(Comparator.comparingLong(InspectorNode::getDelayTimeStamp)) + .ifPresent(node -> { + long delayTimeStamp = node.getDelayTimeStamp(); + if (Long.MAX_VALUE != delayTimeStamp && SystemClock.now() > delayTimeStamp) { + tryPolling(); + } + }), 1, 1, TimeUnit.MILLISECONDS); + } + + // ========== static ========== + + private final static PollingCenter instance = new PollingCenter(); + + public static PollingCenter getInstance() { + return instance; + } + + /** + * 单线程的轮询线程池 + */ + private static final ThreadPoolExecutor SINGLETON_POLLING_POOL; + + static { + SINGLETON_POLLING_POOL = new ThreadPoolExecutor( + 0, + // 轮询线程数必须为1 + 1, + 15L, + TimeUnit.SECONDS, + // 必须保存至少一个轮询请求,以便在本线程轮询结束时,获取到已轮询过的线程提交的轮询请求 + new ArrayBlockingQueue<>(1), + new ThreadFactory() { + private final AtomicLong threadCount = new AtomicLong(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "asyncTool-pollingCenterPool-thread-" + threadCount.getAndIncrement()); + t.setDaemon(true); + // 线程优先级不高 + t.setPriority(3); + return t; + } + + @Override + public String toString() { + return "asyncTool-pollingCenterPool-threadFactory"; + } + }, + // 多的就丢了,反正都是催这一个线程去轮询 + new ThreadPoolExecutor.DiscardPolicy() + ) { + @Override + public String toString() { + return "asyncTool-pollingCenterPool"; + } + }; + } + } + +} + diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceStrategy.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceStrategy.java index 493148c..56368ea 100644 --- a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceStrategy.java +++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceStrategy.java @@ -1,6 +1,6 @@ package com.jd.platform.async.wrapper.actionstrategy; -import com.jd.platform.async.executor.WrapperEndingInspector; +import com.jd.platform.async.wrapper.WrapperEndingInspector; import com.jd.platform.async.worker.ResultState; import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.wrapper.WorkerWrapper; @@ -232,4 +232,24 @@ public interface DependenceStrategy { }; } + DependenceStrategy IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY = new DependenceStrategy() { + @Override + public DependenceAction.WithProperty judgeAction(Set> dependWrappers, + WorkerWrapper thisWrapper, + WorkerWrapper fromWrapper) { + DependMustStrategyMapper mustMapper = thisWrapper.getWrapperStrategy().getDependMustStrategyMapper(); + if (mustMapper != null && !mustMapper.getMustDependSet().isEmpty()) { + // 至少有一个must,则因为must未完全完成而等待。 + return DependenceAction.TAKE_REST.emptyProperty(); + } + // 如果一个must也没有,则认为应该是ANY模式。 + return DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS.judgeAction(dependWrappers, thisWrapper, fromWrapper); + } + + @Override + public String toString() { + return "IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY"; + } + }; + } diff --git a/src/test/java/v15/dependnew/Test.java b/src/test/java/v15/dependnew/Test.java index 28b636c..d0b149b 100644 --- a/src/test/java/v15/dependnew/Test.java +++ b/src/test/java/v15/dependnew/Test.java @@ -26,15 +26,24 @@ class Test { // ExecutorService pool = Executors.newFixedThreadPool(3); ExecutorService pool = Async.getCommonPool(); try { + // 先随便找个任务让线程池跑一把,把线程用一下,后面的测试效果会明显一点 testNew2(pool); System.out.println("\n\n\n"); + testNew1(pool); System.out.println("\n\n\n"); + testNew2(pool); System.out.println("\n\n\n"); + testThreadPolling_Speed(pool); System.out.println("\n\n\n"); + testThreadPolling_V14Bug(); + System.out.println("\n\n\n"); + + testTimeOut(pool); + } finally { //Async.shutDownCommonPool(); pool.shutdown(); @@ -51,21 +60,22 @@ class Test { * . \--> E2 */ private static void testNew1(ExecutorService pool) throws ExecutionException, InterruptedException { - WorkerWrapper a = builder("A") + System.out.println("测试新的builder Api"); + WorkerWrapper a = testBuilder("A") .build(); - WorkerWrapper b1 = builder("B1").depends(a).build(); - WorkerWrapper b2 = builder("B2").depends(a).build(); - WorkerWrapper b3 = builder("B3").depends(a).build(); - WorkerWrapper b4 = builder("B4").depends(a).build(); - WorkerWrapper c1 = builder("C1") + WorkerWrapper b1 = testBuilder("B1").depends(a).build(); + WorkerWrapper b2 = testBuilder("B2").depends(a).build(); + WorkerWrapper b3 = testBuilder("B3").depends(a).build(); + WorkerWrapper b4 = testBuilder("B4").depends(a).build(); + WorkerWrapper c1 = testBuilder("C1") .depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, b1, b2, b3, b4) - .nextOf(builder("D1").build(), - builder("D2").build()) + .nextOf(testBuilder("D1").build(), + testBuilder("D2").build()) .build(); - WorkerWrapper c2 = builder("C2") + WorkerWrapper c2 = testBuilder("C2") .depends(DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS, b1, b2, b3, b4) - .nextOf(builder("E1").depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, c1).build(), - builder("E2").build()) + .nextOf(testBuilder("E1").depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, c1).build(), + testBuilder("E2").build()) .build(); Async.beginWork(2000, pool, a); logAll(); @@ -80,12 +90,13 @@ class Test { *

*/ private static void testNew2(ExecutorService pool) throws ExecutionException, InterruptedException { - WorkerWrapper a = builder("A").build(); + System.out.println("测试10个B中成功三个才能执行C"); + WorkerWrapper a = testBuilder("A").build(); ArrayList bList = new ArrayList<>(); for (int i = 1; i <= 10; i++) { - bList.add(builder("B" + i).depends(a).build()); + bList.add(testBuilder("B" + i).depends(a).build()); } - WorkerWrapper c = builder("C") + WorkerWrapper c = testBuilder("C") .setDepend().strategy((dependWrappers, thisWrapper, fromWrapper) -> { if (dependWrappers.stream() .filter(w -> w.getWorkResult().getResultState() == ResultState.SUCCESS).count() >= 3) { @@ -101,7 +112,7 @@ class Test { /** * 测试线程轮询的效率 */ - private static void testThreadPolling_Speed(ExecutorService pool) throws ExecutionException, InterruptedException { + private static void testThreadPolling_Speed(ExecutorService pool) throws InterruptedException { int MAX = 1000; Collection> wrappers = new ArrayList<>(MAX); AtomicLong a = new AtomicLong(0); @@ -169,23 +180,57 @@ class Test { System.out.println(a.getNextWrappers()); } + /** + * 超时测试 + */ + private static void testTimeOut(ExecutorService pool) throws ExecutionException, InterruptedException { + System.out.println("超时测试:"); + System.err.println("如果抛出" + InterruptedException.class.getName() + "异常,则打断线程成功"); + WorkerWrapper a = testBuilder("A") + // B1、B2不超时 + .nextOf(testBuilder("B1", 100).timeout(150, TimeUnit.MILLISECONDS).build()) + .nextOf(testBuilder("B2", 100).build()) + // B3单wrapper超时 + .nextOf(testBuilder("B3", 200).timeout(150, TimeUnit.MILLISECONDS).build()) + // B4、B5总任务超时 + .nextOf(testBuilder("B4", 250).build()) + .nextOf(testBuilder("B5", 250) + .setTimeOut().enableTimeOut(true).setTime(300, TimeUnit.MILLISECONDS).allowInterrupt(false).end() + .build()) + // 测试打断B6线程 + .nextOf(testBuilder("B6", 250).timeout(true, 150, TimeUnit.MILLISECONDS, true).build()) + .build(); + long t1 = SystemClock.now(); + boolean success = Async.beginWork(200, pool, a); + System.out.println("time=" + (SystemClock.now() - t1) + ", success=" + success); + a.getNextWrappers().forEach(System.out::println); + logAll(); + } + // ========== util method ========== static final AtomicInteger count = new AtomicInteger(1); static final AtomicReference> logger = new AtomicReference<>(new ConcurrentHashMap<>()); - static WorkerWrapperBuilder builder(String id) { - return builder(id, -1); + static WorkerWrapperBuilder testBuilder(String id) { + return testBuilder(id, -1); } - static WorkerWrapperBuilder builder(String id, long sleepTime) { + static WorkerWrapperBuilder testBuilder(String id, long sleepTime) { return WorkerWrapper.builder() .id(id) .worker((param, allWrap) -> { logger.get().put(count.getAndIncrement(), id + " working "); if (sleepTime >= 0) { + for (int i = 0; i < 10; i++) { + try { + Thread.sleep(sleepTime / 10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } try { - Thread.sleep(sleepTime); + Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }