v1.5.1 增加单wrapper超时判定功能、优化轮询策略、优化代码

This commit is contained in:
TcSnZh
2021-05-08 18:15:37 +08:00
parent ced8181cf0
commit 787abdb6f1
9 changed files with 904 additions and 418 deletions

View File

@@ -5,10 +5,11 @@ import com.jd.platform.async.callback.DefaultGroupCallback;
import com.jd.platform.async.callback.IGroupCallback;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WrapperEndingInspector;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
@@ -28,15 +29,15 @@ public class Async {
*/
public static boolean beginWork(long timeout,
ExecutorService executorService,
Collection<? extends WorkerWrapper> workerWrappers)
throws ExecutionException, InterruptedException {
Collection<? extends WorkerWrapper<?,?>> workerWrappers)
throws InterruptedException {
if (workerWrappers == null || workerWrappers.size() == 0) {
return false;
}
//保存上次执行的线程池变量(为了兼容以前的旧功能)
Async.lastExecutorService = Objects.requireNonNull(executorService, "ExecutorService is null ! ");
//定义一个map存放所有的wrapperkey为wrapper的唯一idvalue是该wrapper可以从value中获取wrapper的result
final ConcurrentMap<String, WorkerWrapper> forParamUseWrappers =
final ConcurrentMap<String, WorkerWrapper<?,?>> forParamUseWrappers =
new ConcurrentHashMap<>(Math.max(workerWrappers.size() * 3, 8));
final WrapperEndingInspector inspector = new WrapperEndingInspector(SystemClock.now() + timeout);
inspector.addWrapper(workerWrappers);
@@ -59,7 +60,7 @@ public class Async {
if (workerWrapper == null || workerWrapper.length == 0) {
return false;
}
Set<WorkerWrapper> workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet());
Set workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet());
return beginWork(timeout, executorService, workerWrappers);
}
@@ -148,7 +149,7 @@ public class Async {
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactory() {
private final AtomicInteger threadCount = new AtomicInteger(0);
private final AtomicLong threadCount = new AtomicLong(0);
@Override
public Thread newThread(Runnable r) {

View File

@@ -1,347 +0,0 @@
package com.jd.platform.async.executor;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* 判断{@link WorkerWrapper}是否链路调用完成的轮询器。
* =================================================================================
* <p>
* 在v1.4及以前的版本,存在如下问题:
* >
* 在使用线程数量较少的线程池进行beginWork时调用WorkerWrapper#beginNext方法时
* 会因为本线程等待下游Wrapper执行完成而存在线程耗尽bug。线程池会死翘翘的僵住、动弹不得。
* >
* 例如仅有2个线程的线程池执行以下任务
* {@code
* <p>
* 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况在test/v15.dependnew中示例testThreadPolling_V14Bug说明了这个bug
* 线程数2
* A(5ms)--B1(10ms) ---|--> C1(5ms)
* . \ | (B1、B2全部完成可执行C1、C2)
* . ---> B2(20ms) --|--> C2(5ms)
* <p>
* }
* 线程1执行了A然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成。
* 线程2执行了B1或B2中的一个也在allOf方法等待C1、C2完成。
* 结果没有线程执行C和B2了导致超时而死并且这个线程池线程有可能被耗尽。
* >
* v1.5的解决方案是,放弃使工作线程遭致阻塞的{@link java.util.concurrent.CompletableFuture}
* 而是让工作线程在工作前注册到本“完成检查器”{@link WrapperEndingInspector},然后交由轮询中心{@link PollingCenter}进行检查是否完成。
* </p>
* =================================================================================
* <p>
* 本类的工作原理:
* .
* 原理:
* (1)首先在Async代码中将主动运行的wrapper都保存到一个inspector{@link #addWrapper(WorkerWrapper)}
* (2)主动运行的wrapper于FINISH/ERROR时先异步submit所有下游wrapper在其执行时将自身(下游wrapper)保存到inspector
* (3)然后在异步submit完所有下游wrapper后将调用{@link #setWrapperEndWithTryPolling(WorkerWrapper)}方法,
* . 设置自己的{@link #wrapper2called}为true并呼叫轮询{@link PollingCenter#tryPolling()}。
* (4)在下游wrapper中经过策略器判断后
* . 若是不需要运行则把本wrapper计数-1{@link Node#count},若是计数<1则将{@link Node}移出{@link #wrapper2called}。
* . 若是需要运行,则运行之,然后跳转到 (2) 的情节。如此递归执行链路上所有需要执行的wrapper最后都会存在于{@link #wrapper2called}中。
* .
* 因此,若是存在任一其{@link Node#called}为false的wrapper则表示这条链路还没有调用完。
* 若是在{@link #wrapper2called}中所有的{@link Node#called}为true时即可判断出链路执行完毕了。
* </p>
*
* @author create by TcSnZh on 2021/5/5-下午3:22
*/
public class WrapperEndingInspector implements Comparable<WrapperEndingInspector> {
/**
* 最迟完成时间
*/
private final long latestFinishTime;
/**
* 保存 需要检查的wrapper--相关属性 的Map。
*/
private final ConcurrentHashMap<WorkerWrapper, Node> wrapper2called = new ConcurrentHashMap<>();
/**
* 当全部wrapper都调用结束它会countDown
*/
private final CountDownLatch endCDL = new CountDownLatch(1);
/**
* 读锁用于修改数据写锁用于轮询。使用公平锁让wrapper的时间波动不会太长。
* <p/>
* 在轮询到本inspector时之所以要上写锁是因为
* 假如此时有个Wrapper正在调用{{@link #addWrapper(WorkerWrapper)}}则wrapper2called发生了改变。
* 假如现在恰巧访问到的是{@link #wrapper2called}迭代器的最后一个,但此时又加入了另一个,且这另一个又是需要去执行的。
* 那么假如在迭代器遍历到目前访问到的wrapper都是呼叫完毕的那么这新加入的一个就会被忽略从而判定为全部完成。致使bug发生。
* <p/>
* 此外,即便轮询时上写锁,对性能的影响也是有限的。因为这只会在“呼叫别人”的时候发生工作线程与轮询线程的锁争抢,
* 而在工作线程执行{@link com.jd.platform.async.callback.IWorker#action(Object, Map)}或
* {@link com.jd.platform.async.callback.ICallback#result(boolean, Object, WorkResult)}时,并不会与轮询线程去
* 争抢锁,而通常这个工作的时间才是最耗时的。
*/
private final ReentrantReadWriteLock writePollingLock = new ReentrantReadWriteLock(true);
public WrapperEndingInspector(long latestFinishTime) {
this.latestFinishTime = latestFinishTime;
}
public void registerToPollingCenter() {
writePollingLock.readLock().lock();
try {
PollingCenter.getInstance().inspectionSet.add(this);
} finally {
writePollingLock.readLock().unlock();
}
}
public void addWrapper(WorkerWrapper wrapper) {
writePollingLock.readLock().lock();
try {
wrapper2called.computeIfAbsent(wrapper, k -> new Node()).count.incrementAndGet();
} finally {
writePollingLock.readLock().unlock();
}
}
public void addWrapper(Collection<? extends WorkerWrapper> wrappers) {
writePollingLock.readLock().lock();
try {
Objects.requireNonNull(wrappers).forEach(this::addWrapper);
} finally {
writePollingLock.readLock().unlock();
}
}
public void reduceWrapper(WorkerWrapper wrapper) {
writePollingLock.readLock().lock();
try {
/*
* 有可能发生这情况一个Wrapper刚被加进去执行了零/一/多次,均不满足执行条件,但是下次调用却应当使其启动。
*/
if (wrapper.getState() != WorkerWrapper.INIT) {
Node node = wrapper2called.get(wrapper);
if (node == null) {
return;
}
synchronized (node) {
if (node.count.decrementAndGet() < 1) {
wrapper2called.remove(wrapper);
}
}
}
} finally {
writePollingLock.readLock().unlock();
}
}
/**
* 原子的设置这个Wrapper已经呼叫完成了。
* <p/>
* 该方法会调用{@link PollingCenter#tryPolling()},呼叫轮询线程
*
* @return 如果为true表示设置成功。为false表示已经被设置过了。
*/
public boolean setWrapperEndWithTryPolling(WorkerWrapper wrapper) {
writePollingLock.readLock().lock();
try {
return !wrapper2called.get(wrapper).called.getAndSet(true);
} finally {
writePollingLock.readLock().unlock();
PollingCenter.getInstance().tryPolling();
}
}
/**
* 供外部调用的等待方法
*
* @return 在超时前完成返回true。超时时间一到就会返回false。就像人被杀就会死。
* @throws InterruptedException 外部调用的当前线程被中断时,会抛出这个异常。
*/
public boolean await() throws InterruptedException {
return endCDL.await(latestFinishTime - SystemClock.now(), TimeUnit.MILLISECONDS);
}
/**
* {@link PollingCenter}会优先把最迟完成时间(即开始时间+超时时间较早的Inspection放在前面。
*/
@Override
public int compareTo(WrapperEndingInspector other) {
if (this.latestFinishTime - other.latestFinishTime < 0) {
return -1;
}
return 1;
}
@Override
public String toString() {
return "WrapperEndingInspector{" +
"remainTime=" + (latestFinishTime - SystemClock.now()) +
", wrapper2called=" +
wrapper2called.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue))
+
", endCDL.getCount()=" + endCDL.getCount() +
", writePollingLock={read=" + writePollingLock.getReadLockCount() + ",write=" + writePollingLock.getWriteHoldCount() +
"} }";
}
/**
* 节点对象,保存属性信息于{@link #wrapper2called}中。
* <p/>
* 当试图把Node移出本Map时该Node对象自身将会被上锁。
*/
public static class Node {
/**
* 是否已经呼叫完了下游wrapper
*/
AtomicBoolean called = new AtomicBoolean(false);
/**
* 本wrapper总共被呼叫次数的统计。若小于1则会被移出map。
*/
AtomicInteger count = new AtomicInteger(0);
@Override
public String toString() {
return "{" +
"called=" + called.get() +
", count=" + count.get() +
'}';
}
}
/**
* 轮询中心。具体的轮询调度由其完成。
* <p/>
* {@link #registerToPollingCenter()}调用时就会将inspector注册到本轮询中心以供轮询。
*/
public static class PollingCenter {
/**
* 将被轮询的WrapperFinishInspection集合。
*/
private final Set<WrapperEndingInspector> inspectionSet = new ConcurrentSkipListSet<>();
/**
* 请求轮询。
*/
private void tryPolling() {
if (inspectionSet.size() < SINGLETON_POLLING_POOL.getActiveCount()) {
// 线程数 > inspector数理论上已经各个线程都在忙活了不去新开线程。
return;
}
SINGLETON_POLLING_POOL.submit(() -> {
int expectCount;
while (!inspectionSet.isEmpty()) {
// expectCount是本线程用来记录本次循环开始时inspectionSet的个数。
// 每当移出一个inspector时该值-1。
expectCount = inspectionSet.size();
// 开始检查
for (WrapperEndingInspector inspector : inspectionSet) {
// 直接抢锁轮询期间禁止修改inspector
inspector.writePollingLock.writeLock().lock();
try {
if (PollingCenter.this.inspectorIsEnd(inspector)) {
// inspector中的wrapper调用结束了
if (inspector.endCDL.getCount() > 0) {
// 双重检查使endCDL原子性countDown。
synchronized (inspector.endCDL) {
if (inspector.endCDL.getCount() > 0) {
inspectionSet.remove(inspector);
expectCount--;
inspector.endCDL.countDown();
}
}
}
}
} finally {
inspector.writePollingLock.writeLock().unlock();
}
}
/*
* 根据 expectCount == inspectionSet.size() 的值由于本线程1个线程在轮询
* 1. 若值为true表示轮询过程中没有新的inspector被添加进set中。此时就可以break了。
* . 之所以可以break是因为这个inspection还没有调用结束在其结束前还会来催促轮询的。
* 2. 若值为false表示有新的inspector在本线程轮询时被加入到了set中且没有被我们迭代到。此时还要重新轮询一次。
*/
if (expectCount == inspectionSet.size()) {
break;
}
}
});
}
private boolean inspectorIsEnd(WrapperEndingInspector inspector) {
if (inspector.latestFinishTime < SystemClock.now()) {
inspector.wrapper2called.forEach(((wrapper, node) -> {
wrapper.stopNow();
node.called.set(true);
}));
return true;
}
for (Map.Entry<WorkerWrapper, Node> entry : inspector.wrapper2called.entrySet()) {
WorkerWrapper wrapper = entry.getKey();
Node node = entry.getValue();
if (wrapper.getState() == WorkerWrapper.INIT
// 上值如果为false表示该Wrapper要么还没来得及执行要么判断不需要执行但是还未被移出
|| !node.called.get()
// 上值如果为false表示该Wrapper正在工作或是刚刚结束/失败还未将所有下游Wrapper调用一遍。
) {
return false;
}
// 这里需要去判断一下超时。
}
return true;
}
// ========== static ==========
private final static PollingCenter instance = new PollingCenter();
public static PollingCenter getInstance() {
return instance;
}
/**
* 单线程的轮询线程池
*/
private static final ThreadPoolExecutor SINGLETON_POLLING_POOL = new ThreadPoolExecutor(
0,
// 轮询线程数必须为1
1,
15L,
TimeUnit.SECONDS,
// 必须保存至少一个轮询请求,以便在本线程轮询结束时,获取到已轮询过的线程提交的轮询请求
new ArrayBlockingQueue<>(1),
new ThreadFactory() {
private final AtomicInteger threadCount = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "asyncTool-wrapperEndingInspectorPollingCenterPool-thread-" + threadCount.getAndIncrement());
t.setDaemon(true);
// 线程优先级不高
t.setPriority(1);
return t;
}
@Override
public String toString() {
return "asyncTool-wrapperEndingInspectorPollingCenterPool-threadFactory";
}
},
// 多的就丢了,反正都是催这一个线程去轮询
new ThreadPoolExecutor.DiscardPolicy()
) {
@Override
public String toString() {
return "asyncTool-wrapperEndingInspectorPollingCenterPool";
}
};
}
}

View File

@@ -2,15 +2,8 @@ package com.jd.platform.async.wrapper;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.executor.WrapperEndingInspector;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.wrapper.actionstrategy.DependenceAction;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
* {@link WorkerWrapper}默认实现类将上下游Wrapper保存在自己的Set中。

View File

@@ -7,6 +7,7 @@ import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
import com.jd.platform.async.wrapper.actionstrategy.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 一个稳定的Builder兼容1.4版本之前的代码。
@@ -86,7 +87,17 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
* {@link #useV15DeprecatedMustDependApi}
*/
private boolean useV15NewDependApi = false;
/**
* 单个Wrapper超时相关属性
*/
private boolean enableTimeOut = false;
private long time = -1;
private TimeUnit unit = null;
private boolean allowInterrupt = false;
/**
* 标记自己正在building
*/
private boolean isBuilding = false;
@Override
@@ -233,6 +244,40 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
}
}
@Override
public SetTimeOut<T, V> setTimeOut() {
return new SetTimeOutImpl();
}
public class SetTimeOutImpl implements SetTimeOut<T, V> {
@Override
public SetTimeOutImpl enableTimeOut(boolean enableElseDisable) {
StableWorkerWrapperBuilder.this.enableTimeOut = enableElseDisable;
return this;
}
@Override
public SetTimeOutImpl setTime(long time, TimeUnit unit) {
if (time <= 0 || unit == null) {
throw new IllegalStateException("Illegal argument : time=" + time + " must > 0, unit=" + unit + " must not null");
}
StableWorkerWrapperBuilder.this.time = time;
StableWorkerWrapperBuilder.this.unit = unit;
return this;
}
@Override
public SetTimeOutImpl allowInterrupt(boolean allow) {
StableWorkerWrapperBuilder.this.allowInterrupt = allow;
return this;
}
@Override
public BUILDER_SUB_CLASS end() {
return returnThisBuilder();
}
}
@Override
public WorkerWrapper<T, V> build() {
isBuilding = true;
@@ -264,25 +309,7 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
wrapper.getWrapperStrategy().setDependMustStrategyMapper(new DependMustStrategyMapper()
.addDependMust(mustDependSet));
}
wrapper.getWrapperStrategy().setDependenceStrategy(new DependenceStrategy() {
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
DependMustStrategyMapper mustMapper = thisWrapper.getWrapperStrategy().getDependMustStrategyMapper();
if (mustMapper != null && mustMapper.getMustDependSet().size() > 0) {
// 至少有一个must则因为must未完全完成而等待。
return DependenceAction.TAKE_REST.emptyProperty();
}
// 如果一个must也没有则认为应该是ANY模式。
return DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS.judgeAction(dependWrappers, thisWrapper, fromWrapper);
}
@Override
public String toString() {
return "IF_HAS_MUST_ALL_MUST_ELSE_ANY";
}
});
wrapper.getWrapperStrategy().setDependenceStrategy(DependenceStrategy.IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY);
} else {
if (mustDependSet != null && mustDependSet.size() > 0) {
wrapper.getWrapperStrategy().setDependMustStrategyMapper(new DependMustStrategyMapper().addDependMust(mustDependSet));
@@ -317,6 +344,18 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
wrapper.getWrapperStrategy().setSkipStrategy(skipStrategy);
}
}
// ========== 设置单wrapper超时检查 ==========
{
if (enableTimeOut) {
if (time <= 0) {
throw new IllegalStateException("timeout time " + time + " must > " + 0);
}
if (unit == null) {
throw new IllegalStateException("timeout unit must not null");
}
wrapper.setTimeOut(new WorkerWrapper.TimeOutProperties(true, time, unit, allowInterrupt, wrapper));
}
}
return wrapper;
}

View File

@@ -4,7 +4,6 @@ import com.jd.platform.async.callback.DefaultCallback;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.executor.WrapperEndingInspector;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.*;
import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
@@ -60,6 +59,10 @@ public abstract class WorkerWrapper<T, V> {
* IDEA可以使用JOL Java Object Layout插件查看对象大小。
*/
private final WrapperStrategy wrapperStrategy = new WrapperStrategy();
/**
* 超时检查该值允许为null。表示不设置。
*/
private volatile TimeOutProperties timeOutProperties;
// ***** state属性的常量值 *****
@@ -121,14 +124,24 @@ public abstract class WorkerWrapper<T, V> {
public abstract Set<WorkerWrapper<?, ?>> getNextWrappers();
/**
* 总控制台超时,停止所有任务
* 使wrapper状态修改为超时失败。但如果已经执行完成则不会修改
* <p/>
* 本方法不会试图执行超时判定逻辑。
* 如果要执行超时逻辑判断,请用{@link TimeOutProperties#checkTimeOut(boolean)}并传入参数true。
*/
public void stopNow() {
if (getState() == INIT || getState() == WORKING) {
fastFail(getState(), null);
public void failNow() {
int state = getState();
if (state == INIT || state == WORKING) {
fastFail(state, null);
}
}
public WrapperStrategy getWrapperStrategy() {
return wrapperStrategy;
}
// ========== protected ==========
/**
* 快速失败
*
@@ -266,16 +279,16 @@ public abstract class WorkerWrapper<T, V> {
// nextWrappers有多个
try {
inspector.addWrapper(nextWrappers);
nextWrappers.forEach(next -> {
executorService.submit(() -> next.work(executorService, this, nextRemainTIme, getForParamUseWrappers(), inspector));
});
nextWrappers.forEach(next -> executorService.submit(() ->
next.work(executorService, this, nextRemainTIme, getForParamUseWrappers(), inspector))
);
} finally {
inspector.setWrapperEndWithTryPolling(this);
}
}
/**
* 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
* 本工作线程执行自己的job.判断阻塞超时这里开始时会判断一次总超时时间但在轮询线程会判断单个wrapper超时时间并也会判断总超时时间。
*/
protected void fire() {
//阻塞取结果
@@ -288,9 +301,19 @@ public abstract class WorkerWrapper<T, V> {
if (!compareAndSetState(INIT, WORKING)) {
return;
}
callback.begin();
//执行耗时操作
V resultValue = resultValue = (V) worker.action(param, (Map) getForParamUseWrappers());
V resultValue;
try {
callback.begin();
if (timeOutProperties != null) {
timeOutProperties.startWorking();
}
//执行耗时操作
resultValue = (V) worker.action(param, (Map) getForParamUseWrappers());
} finally {
if (timeOutProperties != null) {
timeOutProperties.endWorking();
}
}
//如果状态不是在working,说明别的地方已经修改了
if (!compareAndSetState(WORKING, FINISH)) {
return;
@@ -386,29 +409,46 @@ public abstract class WorkerWrapper<T, V> {
abstract void setDependWrappers(Set<WorkerWrapper<?, ?>> dependWrappers);
WrapperStrategy getWrapperStrategy() {
return wrapperStrategy;
TimeOutProperties getTimeOut() {
return timeOutProperties;
}
void setTimeOut(TimeOutProperties timeOutProperties) {
this.timeOutProperties = timeOutProperties;
}
// ========== toString ==========
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(150)
final StringBuilder sb = new StringBuilder(200)
.append("WorkerWrapper{id=").append(id)
.append(", param=").append(param)
.append(", worker=").append(worker)
.append(", callback=").append(callback)
.append(", state=").append(state)
.append(", state=");
int state = this.state.get();
if (state == FINISH) {
sb.append("FINISH");
} else if (state == WORKING) {
sb.append("WORKING");
} else if (state == INIT) {
sb.append("INIT");
} else if (state == ERROR) {
sb.append("ERROR");
} else {
throw new IllegalStateException("unknown state : " + state);
}
sb
.append(", workResult=").append(workResult)
// 防止循环引用这里只输出相关Wrapper的id
.append(", forParamUseWrappers::getId=");
.append(", forParamUseWrappers::getId=[");
getForParamUseWrappers().keySet().forEach(wrapperId -> sb.append(wrapperId).append(", "));
if (getForParamUseWrappers().keySet().size() > 0) {
sb.delete(sb.length() - 2, sb.length());
}
sb
.append(", dependWrappers::getId=[");
.append("], dependWrappers::getId=[");
getDependWrappers().stream().map(WorkerWrapper::getId).forEach(wrapperId -> sb.append(wrapperId).append(", "));
if (getDependWrappers().size() > 0) {
sb.delete(sb.length() - 2, sb.length());
@@ -422,6 +462,7 @@ public abstract class WorkerWrapper<T, V> {
sb
.append("]")
.append(", wrapperStrategy=").append(getWrapperStrategy())
.append(", timeOutProperties=").append(getTimeOut())
.append('}');
return sb.toString();
}
@@ -510,7 +551,6 @@ public abstract class WorkerWrapper<T, V> {
// ========== toString ==========
@Override
public String toString() {
return "WrapperStrategy{" +
@@ -521,4 +561,160 @@ public abstract class WorkerWrapper<T, V> {
'}';
}
}
public static class TimeOutProperties {
private final boolean enable;
private final long time;
private final TimeUnit unit;
private final boolean allowInterrupt;
private final WorkerWrapper<?, ?> wrapper;
private final Object lock = new Object();
private volatile boolean started = false;
private volatile boolean ended = false;
private volatile long startWorkingTime;
private volatile long endWorkingTime;
private volatile Thread doWorkingThread;
public TimeOutProperties(boolean enable, long time, TimeUnit unit, boolean allowInterrupt, WorkerWrapper<?, ?> wrapper) {
this.enable = enable;
this.time = time;
this.unit = unit;
this.allowInterrupt = allowInterrupt;
this.wrapper = wrapper;
}
// ========== 工作线程调用 ==========
public void startWorking() {
synchronized (lock) {
started = true;
startWorkingTime = SystemClock.now();
doWorkingThread = Thread.currentThread();
}
}
public void endWorking() {
synchronized (lock) {
ended = true;
doWorkingThread = null;
endWorkingTime = SystemClock.now();
}
}
// ========== 轮询线程调用 ==========
/**
* 检查超时。
* 可以将boolean参数传入true以在超时的时候直接失败。
*
* @param withStop 如果为false不会发生什么仅仅是单纯的判断是否超时。
* 如果为true则会去快速失败wrapper{@link #failNow()},有必要的话还会打断线程。
* @return 如果 超时 或 执行时间超过限制 返回true未超时返回false。
*/
public boolean checkTimeOut(boolean withStop) {
if (enable) {
synchronized (lock) {
if (started) {
// 判断执行中的wrapper是否超时
long dif = (ended ? endWorkingTime : SystemClock.now()) - startWorkingTime;
if (dif > unit.toMillis(time)) {
if (withStop) {
if (allowInterrupt) {
doWorkingThread.interrupt();
}
wrapper.failNow();
ended = true;
}
return true;
}
return false;
}
}
}
return false;
}
// ========== package ==========
boolean isEnable() {
return enable;
}
long getTime() {
return time;
}
TimeUnit getUnit() {
return unit;
}
boolean isAllowInterrupt() {
return allowInterrupt;
}
Object getLock() {
return lock;
}
boolean isStarted() {
return started;
}
void setStarted(boolean started) {
this.started = started;
}
boolean isEnded() {
return ended;
}
void setEnded(boolean ended) {
this.ended = ended;
}
long getStartWorkingTime() {
return startWorkingTime;
}
void setStartWorkingTime(long startWorkingTime) {
this.startWorkingTime = startWorkingTime;
}
long getEndWorkingTime() {
return endWorkingTime;
}
void setEndWorkingTime(long endWorkingTime) {
this.endWorkingTime = endWorkingTime;
}
Thread getDoWorkingThread() {
return doWorkingThread;
}
void setDoWorkingThread(Thread doWorkingThread) {
this.doWorkingThread = doWorkingThread;
}
// ========== toString ==========
@Override
public String toString() {
return "TimeOutProperties{" +
"enable=" + enable +
", time=" + time +
", unit=" + unit +
", allowInterrupt=" + allowInterrupt +
", wrapper::getId=" + wrapper.getId() +
", started=" + started +
", ended=" + ended +
", startWorkingTime=" + startWorkingTime +
", endWorkingTime=" + endWorkingTime +
", doWorkingThread=" + doWorkingThread +
'}';
}
}
}

View File

@@ -8,6 +8,7 @@ import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy;
import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
* 作为优化编排依赖策略后新增的Builder接口。
@@ -233,5 +234,57 @@ public interface WorkerWrapperBuilder<T, V> {
return setNext().wrapper(wrappers).end();
}
/**
* 设置超时时间的具体属性
*/
SetTimeOut<T, V> setTimeOut();
interface SetTimeOut<T, V> {
/**
* 是否启动超时判断。
* <p>
* 默认为true
*
* @param enableElseDisable 是则true
*/
SetTimeOut<T, V> enableTimeOut(boolean enableElseDisable);
/**
* 设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断
*
* @param time 时间数值
* @param unit 时间单位
*/
SetTimeOut<T, V> setTime(long time, TimeUnit unit);
/**
* 是否允许被试图中断线程
*
* @param allow 是则true
*/
SetTimeOut<T, V> allowInterrupt(boolean allow);
WorkerWrapperBuilder<T, V> end();
}
/**
* 便携式设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断
*
* @param time 时间数值
* @param unit 时间单位
*/
default WorkerWrapperBuilder<T, V> timeout(long time, TimeUnit unit) {
return timeout(true, time, unit, false);
}
default WorkerWrapperBuilder<T, V> timeout(boolean enableTimeOut, long time, TimeUnit unit, boolean allowInterrupt) {
return setTimeOut().enableTimeOut(enableTimeOut).setTime(time, unit).allowInterrupt(allowInterrupt).end();
}
/**
* 构建Wrapper。
*
* @return 返回WorkerWrapper
*/
WorkerWrapper<T, V> build();
}

View File

@@ -0,0 +1,486 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* 判断{@link WorkerWrapper}是否链路调用完成的轮询器。
* =================================================================================
* <p>
* 在v1.4及以前的版本,存在如下问题:
* >
* 在使用线程数量较少的线程池进行beginWork时调用WorkerWrapper#beginNext方法时
* 会因为本线程等待下游Wrapper执行完成而存在线程耗尽bug。线程池会死翘翘的僵住、动弹不得。
* >
* 例如仅有2个线程的线程池执行以下任务
* {@code
* <p>
* 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况在test/v15.dependnew中示例testThreadPolling_V14Bug说明了这个bug
* 线程数2
* A(5ms)--B1(10ms) ---|--> C1(5ms)
* . \ | (B1、B2全部完成可执行C1、C2)
* . ---> B2(20ms) --|--> C2(5ms)
* <p>
* }
* 线程1执行了A然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成。
* 线程2执行了B1或B2中的一个也在allOf方法等待C1、C2完成。
* 结果没有线程执行C和B2了导致超时而死并且这个线程池线程有可能被耗尽。
* >
* v1.5的解决方案是,放弃使工作线程遭致阻塞的{@link java.util.concurrent.CompletableFuture}
* 而是让工作线程在工作前注册到本“完成检查器”{@link WrapperEndingInspector},然后交由轮询中心{@link PollingCenter}进行检查是否完成。
* </p>
* =================================================================================
* <p>
* 本类的工作原理:
* .
* 原理:
* (1)首先在Async代码中将主动运行的wrapper都保存到一个inspector{@link #addWrapper(WorkerWrapper)}
* (2)主动运行的wrapper于FINISH/ERROR时先异步submit所有下游wrapper在其执行时将自身(下游wrapper)保存到inspector
* (3)然后在异步submit完所有下游wrapper后将调用{@link #setWrapperEndWithTryPolling(WorkerWrapper)}方法,
* . 设置自己的{@link #wrappers}为true并呼叫轮询{@link PollingCenter#tryPolling()}。
* (4)在下游wrapper中经过策略器判断后
* . 若是不需要运行则把本wrapper计数-1{@link WrapperNode#count},若是计数<1则将{@link WrapperNode}移出{@link #wrappers}。
* . 若是需要运行,则运行之,然后跳转到 (2) 的情节。如此递归执行链路上所有需要执行的wrapper最后都会存在于{@link #wrappers}中。
* .
* 因此,若是存在任一其{@link WrapperNode#called}为false的wrapper则表示这条链路还没有调用完。
* 若是在{@link #wrappers}中所有的{@link WrapperNode#called}为true时即可判断出链路执行完毕了。
* </p>
*
* @author create by TcSnZh on 2021/5/5-下午3:22
*/
public class WrapperEndingInspector implements Comparable<WrapperEndingInspector> {
/**
* 最迟完成时间
*/
private final long latestFinishTime;
/**
* 保存 需要检查的wrapper--相关属性 的Map。
*/
private final ConcurrentHashMap<WorkerWrapper, WrapperNode> wrappers = new ConcurrentHashMap<>();
/**
* 当全部wrapper都调用结束它会countDown
*/
private final CountDownLatch endCDL = new CountDownLatch(1);
/**
* 读锁用于修改数据写锁用于轮询。使用公平锁让wrapper的时间波动不会太长。
* <p/>
* 在轮询到本inspector时之所以要上写锁是因为
* 假如此时有个Wrapper正在调用{@link #addWrapper(WorkerWrapper)}则wrappers发生了改变。
* 假如现在恰巧访问到的是{@link #wrappers}迭代器的最后一个,但此时又加入了另一个,且这另一个又是需要去执行的。
* 那么假如在迭代器遍历到目前访问到的wrapper都是呼叫完毕的那么这新加入的一个就会被忽略从而判定为全部完成。致使bug发生。
* <p/>
* 此外,即便轮询时上写锁,对性能的影响也是有限的。因为这只会在“呼叫别人”的时候发生工作线程与轮询线程的锁争抢,
* 而在工作线程执行{@link com.jd.platform.async.callback.IWorker#action(Object, Map)}或
* {@link com.jd.platform.async.callback.ICallback#result(boolean, Object, WorkResult)}时,并不会与轮询线程去
* 争抢锁,而通常这个工作的时间才是最耗时的。
*/
private final ReentrantReadWriteLock modifyPollingLock = new ReentrantReadWriteLock(true);
/**
* 当轮询发现超时时该值被设为false
*/
private final AtomicBoolean haveNotTimeOut = new AtomicBoolean(true);
public WrapperEndingInspector(long latestFinishTime) {
this.latestFinishTime = latestFinishTime;
}
public void registerToPollingCenter() {
modifyPollingLock.readLock().lock();
try {
// 不重复put以免InspectorNode被替换为另一个
PollingCenter.getInstance().inspectionMap.putIfAbsent(this, new PollingCenter.InspectorNode());
} finally {
modifyPollingLock.readLock().unlock();
}
}
public void addWrapper(WorkerWrapper wrapper) {
modifyPollingLock.readLock().lock();
try {
wrappers.computeIfAbsent(wrapper, k -> new WrapperNode()).count.incrementAndGet();
} finally {
modifyPollingLock.readLock().unlock();
}
}
public void addWrapper(Collection<? extends WorkerWrapper> wrappers) {
modifyPollingLock.readLock().lock();
try {
Objects.requireNonNull(wrappers).forEach(this::addWrapper);
} finally {
modifyPollingLock.readLock().unlock();
}
}
public void reduceWrapper(WorkerWrapper wrapper) {
modifyPollingLock.readLock().lock();
try {
/*
* 有可能发生这情况一个Wrapper刚被加进去执行了零/一/多次,均不满足执行条件,但是下次调用却应当使其启动。
*/
if (wrapper.getState() != WorkerWrapper.INIT) {
final WrapperNode wrapperNode = wrappers.get(wrapper);
if (wrapperNode == null) {
return;
}
synchronized (wrapperNode) {
if (wrapperNode.count.decrementAndGet() < 1) {
wrappers.remove(wrapper);
}
}
}
} finally {
modifyPollingLock.readLock().unlock();
}
}
/**
* 原子的设置这个Wrapper已经呼叫完成了。
* <p/>
* 该方法会调用{@link PollingCenter#tryPolling()},呼叫轮询线程
*
* @return 如果为true表示设置成功。为false表示已经被设置过了。
*/
public boolean setWrapperEndWithTryPolling(WorkerWrapper wrapper) {
modifyPollingLock.readLock().lock();
try {
return !wrappers.get(wrapper).called.getAndSet(true);
} finally {
modifyPollingLock.readLock().unlock();
PollingCenter.getInstance().tryPolling();
}
}
/**
* 供外部调用的等待方法
*
* @return 在超时前完成返回true。超时时间一到就会返回false。就像人被杀就会死。
* @throws InterruptedException 外部调用的当前线程被中断时,会抛出这个异常。
*/
public boolean await() throws InterruptedException {
endCDL.await();
return haveNotTimeOut.get();
}
/**
* {@link PollingCenter}会优先把最迟完成时间(即开始时间+超时时间较早的Inspection放在前面。
*/
@Override
public int compareTo(WrapperEndingInspector other) {
if (this.latestFinishTime - other.latestFinishTime < 0) {
return -1;
}
return 1;
}
@Override
public String toString() {
return "WrapperEndingInspector{" +
"remainTime=" + (latestFinishTime - SystemClock.now()) +
", wrappers=" +
wrappers.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue))
+
", endCDL.getCount()=" + endCDL.getCount() +
", writePollingLock={read=" + modifyPollingLock.getReadLockCount() + ",write=" + modifyPollingLock.getWriteHoldCount() +
"} }";
}
/**
* 节点对象,保存属性信息于{@link #wrappers}中。
* <p/>
* 当试图把Node移出本Map时该Node对象自身将会被上锁。
*/
public static class WrapperNode {
/**
* 是否已经呼叫完了下游wrapper
*/
AtomicBoolean called = new AtomicBoolean(false);
/**
* 本wrapper总共被呼叫次数的统计。若小于1则会被移出map。
*/
AtomicInteger count = new AtomicInteger(0);
@Override
public String toString() {
return "{" +
"called=" + called.get() +
", count=" + count.get() +
'}';
}
}
/**
* 轮询中心。具体的轮询调度由其完成。
* <p/>
* {@link #registerToPollingCenter()}调用时就会将inspector注册到本轮询中心以供轮询。
*/
public static class PollingCenter {
public static class InspectorNode {
/**
* 延迟轮询时间戳。
*/
private volatile long delayTimeStamp = Long.MAX_VALUE;
private final ReadWriteLock lockOfDelayTimeStamp = new ReentrantReadWriteLock();
/**
* 比较传入时间戳与{@link #delayTimeStamp},并设置小的那个为{@link #delayTimeStamp}的值。
*
* @param otherDelayTimeStamp 试图用来比较的另一个时间戳
*/
public void compareAndSetMinDelayTimeStamp(long otherDelayTimeStamp) {
lockOfDelayTimeStamp.writeLock().lock();
try {
long dif = otherDelayTimeStamp - delayTimeStamp;
if (dif > 0) {
return;
}
delayTimeStamp = otherDelayTimeStamp;
} finally {
lockOfDelayTimeStamp.writeLock().unlock();
}
}
public long getDelayTimeStamp() {
lockOfDelayTimeStamp.readLock().lock();
try {
return delayTimeStamp;
} finally {
lockOfDelayTimeStamp.readLock().unlock();
}
}
public long clearTimeStamp() {
lockOfDelayTimeStamp.writeLock().lock();
try {
long old = this.delayTimeStamp;
delayTimeStamp = Long.MAX_VALUE;
return old;
} finally {
lockOfDelayTimeStamp.writeLock().unlock();
}
}
@Override
public String toString() {
return "InspectorNode{" +
"delayTimeStamp=" + delayTimeStamp +
", lockOfDelayTimeStamp=" + lockOfDelayTimeStamp +
'}';
}
}
/**
* 将被轮询的WrapperFinishInspection集合。
*/
private final Map<WrapperEndingInspector, InspectorNode> inspectionMap = new ConcurrentSkipListMap<>();
/**
* 请求轮询。
*/
private void tryPolling() {
// 开始轮询
SINGLETON_POLLING_POOL.submit(() -> {
// 用来判断在轮询过程中是否有新增的inspector的值
int expectCount;
// 如果此值变化过,则在结束时让自己在此值后的时间再启动轮询
while (!inspectionMap.isEmpty()) {
// expectCount是本线程用来记录本次循环开始时inspectionMap的个数。
// 每当移出一个inspector时该值-1。
expectCount = inspectionMap.size();
// 开始检查
for (Map.Entry<WrapperEndingInspector, InspectorNode> entry : inspectionMap.entrySet()) {
final WrapperEndingInspector inspector = entry.getKey();
final InspectorNode inspectorNode = entry.getValue();
// 直接抢锁轮询期间禁止修改inspector
inspector.modifyPollingLock.writeLock().lock();
try {
// 对一个inspector进行检查
if (PollingCenter.this.checkInspectorIsEnd(inspector, inspectorNode)) {
// inspector中的wrapper调用结束了
// 先要把wrapper给停了
inspector.wrappers.forEach((wrapper, wrapperNode) -> {
WorkerWrapper.TimeOutProperties timeOut = wrapper.getTimeOut();
if (timeOut != null) {
timeOut.checkTimeOut(true);
}
});
// 修改此inspector和expectCount的状态
if (inspector.endCDL.getCount() > 0) {
// 双重检查使endCDL原子性countDown。
synchronized (inspector.endCDL) {
if (inspector.endCDL.getCount() > 0) {
inspectionMap.remove(inspector);
expectCount--;
inspector.endCDL.countDown();
}
}
}
}
} finally {
inspector.modifyPollingLock.writeLock().unlock();
}
}
/*
* 根据 expectCount == inspectionMap.size() 的值在仅有本线程1个线程在轮询的情况下
* 1. 若值为true表示轮询过程中没有新的inspector被添加进set中。此时就可以break了。
* . 之所以可以break是因为这个inspection还没有调用结束在其结束前还会来催促轮询的。
* 2. 若值为false表示有新的inspector在本线程轮询时被加入到了set中且没有被我们迭代到。此时还要重新轮询一次。
*/
if (expectCount == inspectionMap.size()) {
break;
}
}
});
}
private boolean checkInspectorIsEnd(WrapperEndingInspector inspector, InspectorNode inspectorNode) {
// 判断一下inspector整组是否超时
if (inspector.latestFinishTime < SystemClock.now()) {
inspector.haveNotTimeOut.set(false);
inspector.wrappers.forEach(((wrapper, wrapperNode) -> {
wrapper.failNow();
wrapperNode.called.set(true);
}));
return true;
}
// 将延迟检查时间设为离现在最近的值。
// 此处判断的是inspector所代表整次任务的超时时间
inspectorNode.compareAndSetMinDelayTimeStamp(inspector.latestFinishTime);
// 判断inspector是否结束并顺便记录、判断、修改wrapper的超时信息
for (Map.Entry<WorkerWrapper, WrapperNode> entry : inspector.wrappers.entrySet()) {
WorkerWrapper wrapper = entry.getKey();
// 判断单个wrapper是否超时
WorkerWrapper.TimeOutProperties timeOutProperties = wrapper.getTimeOut();
if (timeOutProperties != null && timeOutProperties.isEnable()) {
// 将延迟检查时间设为离现在最近的值。
// 此处判断的是wrapper的超时时间
if (timeOutProperties.checkTimeOut(true)) {
inspector.haveNotTimeOut.set(false);
}
// 未超时但是设置了超时检查的话记录一下inspector延时轮询时间
else {
inspectorNode.compareAndSetMinDelayTimeStamp(
(timeOutProperties.isStarted() ? timeOutProperties.getStartWorkingTime() : SystemClock.now())
+ timeOutProperties.getUnit().toMillis(timeOutProperties.getTime())
);
}
}
// 判断wrapper是否执行完毕
WrapperNode node = entry.getValue();
if (wrapper.getState() == WorkerWrapper.INIT
// 上值如果为false表示该Wrapper要么还没来得及执行要么判断不需要执行但是还未被移出
|| !node.called.get()
// 上值如果为false表示该Wrapper正在工作或是刚刚结束/失败还未将所有下游Wrapper调用一遍。
) {
return false;
}
}
return true;
}
{
final String executorName = "asyncTool-pollingDelayCaller";
ScheduledThreadPoolExecutor delayPollingExecutor = new ScheduledThreadPoolExecutor(
1,
new ThreadFactory() {
private final AtomicLong threadCount = new AtomicLong(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, executorName + "-thread-" + threadCount.getAndIncrement());
t.setDaemon(true);
// 线程优先级不高
t.setPriority(1);
return t;
}
@Override
public String toString() {
return executorName + "-threadFactory";
}
}
) {
@Override
public String toString() {
return executorName + "{PollingCenter.this=" + PollingCenter.this + "}";
}
};
// 每毫秒判断一次map.value的每个延迟轮询队列的头号元素是否抵达当前时间如果到了则清除并调用轮询
delayPollingExecutor.scheduleAtFixedRate(() -> inspectionMap.values().stream()
.min(Comparator.comparingLong(InspectorNode::getDelayTimeStamp))
.ifPresent(node -> {
long delayTimeStamp = node.getDelayTimeStamp();
if (Long.MAX_VALUE != delayTimeStamp && SystemClock.now() > delayTimeStamp) {
tryPolling();
}
}), 1, 1, TimeUnit.MILLISECONDS);
}
// ========== static ==========
private final static PollingCenter instance = new PollingCenter();
public static PollingCenter getInstance() {
return instance;
}
/**
* 单线程的轮询线程池
*/
private static final ThreadPoolExecutor SINGLETON_POLLING_POOL;
static {
SINGLETON_POLLING_POOL = new ThreadPoolExecutor(
0,
// 轮询线程数必须为1
1,
15L,
TimeUnit.SECONDS,
// 必须保存至少一个轮询请求,以便在本线程轮询结束时,获取到已轮询过的线程提交的轮询请求
new ArrayBlockingQueue<>(1),
new ThreadFactory() {
private final AtomicLong threadCount = new AtomicLong(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "asyncTool-pollingCenterPool-thread-" + threadCount.getAndIncrement());
t.setDaemon(true);
// 线程优先级不高
t.setPriority(3);
return t;
}
@Override
public String toString() {
return "asyncTool-pollingCenterPool-threadFactory";
}
},
// 多的就丢了,反正都是催这一个线程去轮询
new ThreadPoolExecutor.DiscardPolicy()
) {
@Override
public String toString() {
return "asyncTool-pollingCenterPool";
}
};
}
}
}

View File

@@ -1,6 +1,6 @@
package com.jd.platform.async.wrapper.actionstrategy;
import com.jd.platform.async.executor.WrapperEndingInspector;
import com.jd.platform.async.wrapper.WrapperEndingInspector;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
@@ -232,4 +232,24 @@ public interface DependenceStrategy {
};
}
DependenceStrategy IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY = new DependenceStrategy() {
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
DependMustStrategyMapper mustMapper = thisWrapper.getWrapperStrategy().getDependMustStrategyMapper();
if (mustMapper != null && !mustMapper.getMustDependSet().isEmpty()) {
// 至少有一个must则因为must未完全完成而等待。
return DependenceAction.TAKE_REST.emptyProperty();
}
// 如果一个must也没有则认为应该是ANY模式。
return DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS.judgeAction(dependWrappers, thisWrapper, fromWrapper);
}
@Override
public String toString() {
return "IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY";
}
};
}

View File

@@ -26,15 +26,24 @@ class Test {
// ExecutorService pool = Executors.newFixedThreadPool(3);
ExecutorService pool = Async.getCommonPool();
try {
// 先随便找个任务让线程池跑一把,把线程用一下,后面的测试效果会明显一点
testNew2(pool);
System.out.println("\n\n\n");
testNew1(pool);
System.out.println("\n\n\n");
testNew2(pool);
System.out.println("\n\n\n");
testThreadPolling_Speed(pool);
System.out.println("\n\n\n");
testThreadPolling_V14Bug();
System.out.println("\n\n\n");
testTimeOut(pool);
} finally {
//Async.shutDownCommonPool();
pool.shutdown();
@@ -51,21 +60,22 @@ class Test {
* . \--> E2
*/
private static void testNew1(ExecutorService pool) throws ExecutionException, InterruptedException {
WorkerWrapper<Object, Object> a = builder("A")
System.out.println("测试新的builder Api");
WorkerWrapper<Object, Object> a = testBuilder("A")
.build();
WorkerWrapper<Object, Object> b1 = builder("B1").depends(a).build();
WorkerWrapper<Object, Object> b2 = builder("B2").depends(a).build();
WorkerWrapper<Object, Object> b3 = builder("B3").depends(a).build();
WorkerWrapper<Object, Object> b4 = builder("B4").depends(a).build();
WorkerWrapper<Object, Object> c1 = builder("C1")
WorkerWrapper<Object, Object> b1 = testBuilder("B1").depends(a).build();
WorkerWrapper<Object, Object> b2 = testBuilder("B2").depends(a).build();
WorkerWrapper<Object, Object> b3 = testBuilder("B3").depends(a).build();
WorkerWrapper<Object, Object> b4 = testBuilder("B4").depends(a).build();
WorkerWrapper<Object, Object> c1 = testBuilder("C1")
.depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, b1, b2, b3, b4)
.nextOf(builder("D1").build(),
builder("D2").build())
.nextOf(testBuilder("D1").build(),
testBuilder("D2").build())
.build();
WorkerWrapper<Object, Object> c2 = builder("C2")
WorkerWrapper<Object, Object> c2 = testBuilder("C2")
.depends(DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS, b1, b2, b3, b4)
.nextOf(builder("E1").depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, c1).build(),
builder("E2").build())
.nextOf(testBuilder("E1").depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, c1).build(),
testBuilder("E2").build())
.build();
Async.beginWork(2000, pool, a);
logAll();
@@ -80,12 +90,13 @@ class Test {
* </p>
*/
private static void testNew2(ExecutorService pool) throws ExecutionException, InterruptedException {
WorkerWrapper<Object, Object> a = builder("A").build();
System.out.println("测试10个B中成功三个才能执行C");
WorkerWrapper<Object, Object> a = testBuilder("A").build();
ArrayList<WorkerWrapper> bList = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
bList.add(builder("B" + i).depends(a).build());
bList.add(testBuilder("B" + i).depends(a).build());
}
WorkerWrapper<Object, Object> c = builder("C")
WorkerWrapper<Object, Object> c = testBuilder("C")
.setDepend().strategy((dependWrappers, thisWrapper, fromWrapper) -> {
if (dependWrappers.stream()
.filter(w -> w.getWorkResult().getResultState() == ResultState.SUCCESS).count() >= 3) {
@@ -101,7 +112,7 @@ class Test {
/**
* 测试线程轮询的效率
*/
private static void testThreadPolling_Speed(ExecutorService pool) throws ExecutionException, InterruptedException {
private static void testThreadPolling_Speed(ExecutorService pool) throws InterruptedException {
int MAX = 1000;
Collection<WorkerWrapper<?, ?>> wrappers = new ArrayList<>(MAX);
AtomicLong a = new AtomicLong(0);
@@ -169,23 +180,57 @@ class Test {
System.out.println(a.getNextWrappers());
}
/**
* 超时测试
*/
private static void testTimeOut(ExecutorService pool) throws ExecutionException, InterruptedException {
System.out.println("超时测试:");
System.err.println("如果抛出" + InterruptedException.class.getName() + "异常,则打断线程成功");
WorkerWrapper<Object, Object> a = testBuilder("A")
// B1、B2不超时
.nextOf(testBuilder("B1", 100).timeout(150, TimeUnit.MILLISECONDS).build())
.nextOf(testBuilder("B2", 100).build())
// B3单wrapper超时
.nextOf(testBuilder("B3", 200).timeout(150, TimeUnit.MILLISECONDS).build())
// B4、B5总任务超时
.nextOf(testBuilder("B4", 250).build())
.nextOf(testBuilder("B5", 250)
.setTimeOut().enableTimeOut(true).setTime(300, TimeUnit.MILLISECONDS).allowInterrupt(false).end()
.build())
// 测试打断B6线程
.nextOf(testBuilder("B6", 250).timeout(true, 150, TimeUnit.MILLISECONDS, true).build())
.build();
long t1 = SystemClock.now();
boolean success = Async.beginWork(200, pool, a);
System.out.println("time=" + (SystemClock.now() - t1) + ", success=" + success);
a.getNextWrappers().forEach(System.out::println);
logAll();
}
// ========== util method ==========
static final AtomicInteger count = new AtomicInteger(1);
static final AtomicReference<ConcurrentHashMap<Integer, String>> logger = new AtomicReference<>(new ConcurrentHashMap<>());
static WorkerWrapperBuilder<Object, Object> builder(String id) {
return builder(id, -1);
static WorkerWrapperBuilder<Object, Object> testBuilder(String id) {
return testBuilder(id, -1);
}
static WorkerWrapperBuilder<Object, Object> builder(String id, long sleepTime) {
static WorkerWrapperBuilder<Object, Object> testBuilder(String id, long sleepTime) {
return WorkerWrapper.builder()
.id(id)
.worker((param, allWrap) -> {
logger.get().put(count.getAndIncrement(), id + " working ");
if (sleepTime >= 0) {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(sleepTime / 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(sleepTime);
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}