mirror of
https://gitee.com/jd-platform-opensource/asyncTool.git
synced 2026-03-22 04:27:15 +08:00
@@ -54,7 +54,7 @@ public interface IWorker<T, V> {
|
||||
* @param object
|
||||
* object
|
||||
*/
|
||||
V action(T object, Map<String, WorkerWrapper> allWrappers);
|
||||
V dependAction(T object, Map<String, WorkerWrapper> allWrappers);
|
||||
|
||||
/**
|
||||
* 超时、异常时,返回的默认值
|
||||
@@ -113,7 +113,7 @@ wrapper的泛型和worker的一样,决定了入参和结果的类型。
|
||||
public class ParWorker1 implements IWorker<String, String>, ICallback<String, String> {
|
||||
|
||||
@Override
|
||||
public String action(String object) {
|
||||
public String dependAction(String object) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
|
||||
|
||||
If you just need to use this frame, please look down. If you need to have a deep understanding of how this framework is implemented step by step, from receiving the requirements to thinking about each step, why each class is so designed, why there are these methods, that is, how to develop this framework from 0 to 1, I opened a column in [CSDN] (https://blog.csdn.net/tianaleixiaowu/category_. HTML) to talk about how middleware is developed from 0, including and Not limited to this small frame. JD internal colleagues can search my ERP on CF and see it.
|
||||
If you just need to use this frame, please look down. If you need to have a deep understanding of how this framework is implemented step by step, from receiving the requirements to thinking about each step, why each class is so designed, why there are these methods, that is, how to develop this framework from 0 to 1, I opened a column in [CSDN] (https://blog.csdn.net/tianaleixiaowu/category_. HTML) to talk about how middleware is developed from 0, including and Not limited to this small frame. JD internal colleagues can searchType my ERP on CF and see it.
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,21 +1,28 @@
|
||||
package com.jd.platform.async.callback;
|
||||
|
||||
|
||||
import com.jd.platform.async.exception.SkippedException;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
|
||||
/**
|
||||
* 默认回调类,如果不设置的话,会默认给这个回调
|
||||
*
|
||||
* @author wuweifeng wrote on 2019-11-19.
|
||||
*/
|
||||
public class DefaultCallback<T, V> implements ICallback<T, V> {
|
||||
@Override
|
||||
public void begin() {
|
||||
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 默认将打印存在的非{@link com.jd.platform.async.exception.SkippedException}的异常。
|
||||
*/
|
||||
@Override
|
||||
public void result(boolean success, T param, WorkResult<V> workResult) {
|
||||
|
||||
Exception ex = workResult.getEx();
|
||||
if (ex != null && !(ex instanceof SkippedException)) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -21,6 +21,8 @@ public interface ICallback<T, V> {
|
||||
|
||||
/**
|
||||
* 耗时操作执行完毕后,就给value注入值
|
||||
* <p/>
|
||||
* 只要Wrapper被调用后成功或失败/超时,该方法都会被执行。
|
||||
*/
|
||||
void result(boolean success, T param, WorkResult<V> workResult);
|
||||
}
|
||||
|
||||
@@ -3,68 +3,64 @@ package com.jd.platform.async.executor;
|
||||
|
||||
import com.jd.platform.async.callback.DefaultGroupCallback;
|
||||
import com.jd.platform.async.callback.IGroupCallback;
|
||||
import com.jd.platform.async.executor.timer.SystemClock;
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
import com.jd.platform.async.wrapper.WrapperEndingInspector;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 类入口,可以根据自己情况调整core线程的数量
|
||||
* 核心工具类。
|
||||
*
|
||||
* @author wuweifeng wrote on 2019-12-18
|
||||
* @version 1.0
|
||||
*/
|
||||
public class Async {
|
||||
/**
|
||||
* 默认线程池
|
||||
*/
|
||||
private static final ThreadPoolExecutor COMMON_POOL =
|
||||
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024,
|
||||
15L, TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<>(),
|
||||
(ThreadFactory) Thread::new);
|
||||
/**
|
||||
* 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个
|
||||
*/
|
||||
private static ExecutorService executorService;
|
||||
|
||||
// ========================= 任务执行核心代码 =========================
|
||||
|
||||
/**
|
||||
* 出发点
|
||||
*
|
||||
* @return 只要执行未超时,就返回true。
|
||||
*/
|
||||
public static boolean beginWork(long timeout, ExecutorService executorService, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException {
|
||||
if(workerWrappers == null || workerWrappers.size() == 0) {
|
||||
public static boolean beginWork(long timeout,
|
||||
ExecutorService executorService,
|
||||
Collection<? extends WorkerWrapper<?,?>> workerWrappers)
|
||||
throws InterruptedException {
|
||||
if (workerWrappers == null || workerWrappers.size() == 0) {
|
||||
return false;
|
||||
}
|
||||
//保存线程池变量
|
||||
Async.executorService = executorService;
|
||||
//保存上次执行的线程池变量(为了兼容以前的旧功能)
|
||||
Async.lastExecutorService = Objects.requireNonNull(executorService, "ExecutorService is null ! ");
|
||||
//定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result
|
||||
Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>();
|
||||
CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
|
||||
for (int i = 0; i < workerWrappers.size(); i++) {
|
||||
WorkerWrapper wrapper = workerWrappers.get(i);
|
||||
futures[i] = CompletableFuture.runAsync(() -> wrapper.work(executorService, timeout, forParamUseWrappers), executorService);
|
||||
}
|
||||
try {
|
||||
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
|
||||
return true;
|
||||
} catch (TimeoutException e) {
|
||||
Set<WorkerWrapper> set = new HashSet<>();
|
||||
totalWorkers(workerWrappers, set);
|
||||
for (WorkerWrapper wrapper : set) {
|
||||
wrapper.stopNow();
|
||||
final ConcurrentMap<String, WorkerWrapper<?,?>> forParamUseWrappers =
|
||||
new ConcurrentHashMap<>(Math.max(workerWrappers.size() * 3, 8));
|
||||
final WrapperEndingInspector inspector = new WrapperEndingInspector(SystemClock.now() + timeout);
|
||||
inspector.addWrapper(workerWrappers);
|
||||
workerWrappers.forEach(wrapper -> {
|
||||
if (wrapper == null) {
|
||||
return;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
executorService.submit(() -> wrapper.work(executorService, timeout, forParamUseWrappers, inspector));
|
||||
});
|
||||
inspector.registerToPollingCenter();
|
||||
return inspector.await();
|
||||
//处理超时的逻辑被移动到了WrapperEndingInspector中。
|
||||
}
|
||||
|
||||
/**
|
||||
* 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL
|
||||
*/
|
||||
public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
|
||||
if(workerWrapper == null || workerWrapper.length == 0) {
|
||||
public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
|
||||
throws ExecutionException, InterruptedException {
|
||||
if (workerWrapper == null || workerWrapper.length == 0) {
|
||||
return false;
|
||||
}
|
||||
List<WorkerWrapper> workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toList());
|
||||
Set workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet());
|
||||
return beginWork(timeout, executorService, workerWrappers);
|
||||
}
|
||||
|
||||
@@ -72,11 +68,11 @@ public class Async {
|
||||
* 同步阻塞,直到所有都完成,或失败
|
||||
*/
|
||||
public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
|
||||
return beginWork(timeout, COMMON_POOL, workerWrapper);
|
||||
return beginWork(timeout, getCommonPool(), workerWrapper);
|
||||
}
|
||||
|
||||
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
|
||||
beginWorkAsync(timeout, COMMON_POOL, groupCallback, workerWrapper);
|
||||
beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -102,9 +98,10 @@ public class Async {
|
||||
}
|
||||
});
|
||||
} else {
|
||||
COMMON_POOL.submit(() -> {
|
||||
final ExecutorService commonPool = getCommonPool();
|
||||
commonPool.submit(() -> {
|
||||
try {
|
||||
boolean success = beginWork(timeout, COMMON_POOL, workerWrapper);
|
||||
boolean success = beginWork(timeout, commonPool, workerWrapper);
|
||||
if (success) {
|
||||
finalGroupCallback.success(Arrays.asList(workerWrapper));
|
||||
} else {
|
||||
@@ -119,43 +116,107 @@ public class Async {
|
||||
|
||||
}
|
||||
|
||||
// ========================= 设置/属性选项 =========================
|
||||
|
||||
/**
|
||||
* 总共多少个执行单元
|
||||
* 默认线程池。
|
||||
* <p>
|
||||
* 在v1.4及之前,该COMMON_POLL是被写死的。
|
||||
* <p>
|
||||
* 自v1.5后:
|
||||
* 该线程池会被懒加载。
|
||||
* 该线程池将会给线程取名为asyncTool-commonPool-thread-0(数字不重复)。
|
||||
* </p>
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void totalWorkers(List<WorkerWrapper> workerWrappers, Set<WorkerWrapper> set) {
|
||||
set.addAll(workerWrappers);
|
||||
for (WorkerWrapper wrapper : workerWrappers) {
|
||||
if (wrapper.getNextWrappers() == null) {
|
||||
continue;
|
||||
private static ThreadPoolExecutor COMMON_POOL;
|
||||
|
||||
/**
|
||||
* 在以前(及现在)的版本中:
|
||||
* 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时,ExecutorService将会被记录下来。
|
||||
* <p/>
|
||||
* 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个
|
||||
*/
|
||||
private static volatile ExecutorService lastExecutorService;
|
||||
|
||||
public static ThreadPoolExecutor getCommonPool() {
|
||||
if (COMMON_POOL == null) {
|
||||
synchronized (Async.class) {
|
||||
if (COMMON_POOL == null) {
|
||||
COMMON_POOL = new ThreadPoolExecutor(
|
||||
Runtime.getRuntime().availableProcessors() * 2,
|
||||
1024,
|
||||
15L,
|
||||
TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<>(),
|
||||
new ThreadFactory() {
|
||||
private final AtomicLong threadCount = new AtomicLong(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r, "asyncTool-commonPool-thread-" + threadCount.getAndIncrement());
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "asyncTool-commonPool-threadFactory";
|
||||
}
|
||||
}
|
||||
) {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "asyncTool-commonPool";
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
List<WorkerWrapper> wrappers = wrapper.getNextWrappers();
|
||||
totalWorkers(wrappers, set);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭线程池
|
||||
*/
|
||||
public static void shutDown() {
|
||||
shutDown(executorService);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭线程池
|
||||
*/
|
||||
public static void shutDown(ExecutorService executorService) {
|
||||
if (executorService != null) {
|
||||
executorService.shutdown();
|
||||
} else {
|
||||
COMMON_POOL.shutdown();
|
||||
}
|
||||
return COMMON_POOL;
|
||||
}
|
||||
|
||||
public static String getThreadCount() {
|
||||
return "activeCount=" + COMMON_POOL.getActiveCount() +
|
||||
" completedCount " + COMMON_POOL.getCompletedTaskCount() +
|
||||
" largestCount " + COMMON_POOL.getLargestPoolSize();
|
||||
",completedCount=" + COMMON_POOL.getCompletedTaskCount() +
|
||||
",largestCount=" + COMMON_POOL.getLargestPoolSize();
|
||||
}
|
||||
|
||||
public static synchronized void shutDownCommonPool(boolean now) {
|
||||
if (!COMMON_POOL.isShutdown()) {
|
||||
if (now) {
|
||||
COMMON_POOL.shutdownNow();
|
||||
} else {
|
||||
COMMON_POOL.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭上次使用的线程池
|
||||
*
|
||||
* @deprecated 因此在v1.5时加上了废弃注解。
|
||||
* <p>
|
||||
* 这是一个很迷的方法,多线程时调用该方法的{@link #lastExecutorService}可能会被别的线程修改而引发不必要、不可控的错误。仅建议用来测试。
|
||||
* 另外,该方法现在不会关闭默认线程池。
|
||||
* </p>
|
||||
*/
|
||||
@Deprecated
|
||||
public static void shutDown() {
|
||||
if (lastExecutorService != COMMON_POOL) {
|
||||
shutDown(lastExecutorService);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭指定的线程池
|
||||
*
|
||||
* @param executorService 指定的线程池。传入null则会关闭默认线程池。
|
||||
* @deprecated 没啥用的方法,要关闭线程池还不如直接调用线程池的关闭方法,避免歧义。
|
||||
*/
|
||||
@Deprecated
|
||||
public static void shutDown(ExecutorService executorService) {
|
||||
if (executorService != null) {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,59 +0,0 @@
|
||||
package com.jd.platform.async.worker;
|
||||
|
||||
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
|
||||
/**
|
||||
* 对依赖的wrapper的封装
|
||||
* @author wuweifeng wrote on 2019-12-20
|
||||
* @version 1.0
|
||||
*/
|
||||
public class DependWrapper {
|
||||
private WorkerWrapper<?, ?> dependWrapper;
|
||||
/**
|
||||
* 是否该依赖必须完成后才能执行自己.<p>
|
||||
* 因为存在一个任务,依赖于多个任务,是让这多个任务全部完成后才执行自己,还是某几个执行完毕就可以执行自己
|
||||
* 如
|
||||
* 1
|
||||
* ---3
|
||||
* 2
|
||||
* 或
|
||||
* 1---3
|
||||
* 2---3
|
||||
* 这两种就不一样,上面的就是必须12都完毕,才能3
|
||||
* 下面的就是1完毕就可以3
|
||||
*/
|
||||
private boolean must = true;
|
||||
|
||||
public DependWrapper(WorkerWrapper<?, ?> dependWrapper, boolean must) {
|
||||
this.dependWrapper = dependWrapper;
|
||||
this.must = must;
|
||||
}
|
||||
|
||||
public DependWrapper() {
|
||||
}
|
||||
|
||||
public WorkerWrapper<?, ?> getDependWrapper() {
|
||||
return dependWrapper;
|
||||
}
|
||||
|
||||
public void setDependWrapper(WorkerWrapper<?, ?> dependWrapper) {
|
||||
this.dependWrapper = dependWrapper;
|
||||
}
|
||||
|
||||
public boolean isMust() {
|
||||
return must;
|
||||
}
|
||||
|
||||
public void setMust(boolean must) {
|
||||
this.must = must;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DependWrapper{" +
|
||||
"dependWrapper=" + dependWrapper +
|
||||
", must=" + must +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
package com.jd.platform.async.wrapper;
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
import com.jd.platform.async.callback.IWorker;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* {@link WorkerWrapper}默认实现类,将上下游Wrapper保存在自己的Set中。
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/6-下午2:41
|
||||
*/
|
||||
class StableWorkerWrapper<T, V> extends WorkerWrapper<T, V> {
|
||||
StableWorkerWrapper(String id, IWorker<T, V> worker, T param, ICallback<T, V> callback) {
|
||||
super(id, worker, param, callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* 依赖的wrappers,其dependType字段决定了依赖策略。
|
||||
*
|
||||
* <p>
|
||||
* v1.5时将其抽取到本子类。
|
||||
* 且修改List为Set,并默认使用LinkedHashSet,以提高id索引效率且保持有序(虽然有序也没什么用)。
|
||||
* </p>
|
||||
*/
|
||||
private Set<WorkerWrapper<?, ?>> dependWrappers;
|
||||
/**
|
||||
* 在自己后面的wrapper,如果没有,自己就是末尾;如果有一个,就是串行;如果有多个,有几个就需要开几个线程</p>
|
||||
* -------2
|
||||
* 1
|
||||
* -------3
|
||||
* 如1后面有2、3
|
||||
*
|
||||
* <p>
|
||||
* v1.5时将其抽取到本子类。
|
||||
* 且修改List为Set,并在{@link StableWorkerWrapperBuilder}中默认使用LinkedHashSet,以提高id索引效率且保持有序(虽然有序也没什么用)。
|
||||
* </p>
|
||||
*/
|
||||
private Set<WorkerWrapper<?, ?>> nextWrappers;
|
||||
|
||||
// ========== public impl ==========
|
||||
|
||||
@Override
|
||||
public Set<WorkerWrapper<?, ?>> getNextWrappers() {
|
||||
return nextWrappers;
|
||||
}
|
||||
|
||||
|
||||
// ========== package impl ==========
|
||||
|
||||
@Override
|
||||
void setNextWrappers(Set<WorkerWrapper<?, ?>> nextWrappers) {
|
||||
this.nextWrappers = nextWrappers;
|
||||
}
|
||||
|
||||
@Override
|
||||
Set<WorkerWrapper<?, ?>> getDependWrappers() {
|
||||
return dependWrappers;
|
||||
}
|
||||
|
||||
@Override
|
||||
void setDependWrappers(Set<WorkerWrapper<?, ?>> dependWrappers) {
|
||||
this.dependWrappers = dependWrappers;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,482 @@
|
||||
package com.jd.platform.async.wrapper;
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
import com.jd.platform.async.callback.IWorker;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
|
||||
import com.jd.platform.async.wrapper.actionstrategy.*;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 一个稳定的Builder,兼容1.4版本之前的代码。
|
||||
* <p/>
|
||||
* 效果等同于v1.4及之前的{@link WorkerWrapper.Builder}。
|
||||
* <p/>
|
||||
* 考虑到由于废弃了must方式编排、needCheckNextWrapperResult判断跳过,转用策略器方式,导致本类为了向上兼容保留了一些低效的功能。
|
||||
* <p/>
|
||||
* 权限修饰符为default,表示暂不对外开放。
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/3-上午12:36
|
||||
*/
|
||||
class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS>>
|
||||
implements WorkerWrapperBuilder<T, V> {
|
||||
/**
|
||||
* 该wrapper的唯一标识。
|
||||
* 如果不设置则使用{@code UUID.randomUUID().toString()}
|
||||
*/
|
||||
private String id;
|
||||
/**
|
||||
* worker将来要处理的param
|
||||
*/
|
||||
private T param;
|
||||
private IWorker<T, V> worker;
|
||||
private ICallback<T, V> callback;
|
||||
/**
|
||||
* 自己后面的所有
|
||||
*/
|
||||
private Set<WorkerWrapper<?, ?>> nextWrappers;
|
||||
/**
|
||||
* 自己依赖的所有
|
||||
*/
|
||||
private Set<WorkerWrapper<?, ?>> dependWrappers;
|
||||
/**
|
||||
* 旧版本的检查是否跳过的开关
|
||||
*/
|
||||
private Boolean needCheckNextWrapperResult = null;
|
||||
/**
|
||||
* 新版本的检查是否跳过的策略。
|
||||
*/
|
||||
private SkipStrategy skipStrategy;
|
||||
/**
|
||||
* 基本依赖策略。
|
||||
* <p/>
|
||||
* 如果在{@link #build()}调用时,{@code dependenceStrategy==null},
|
||||
* 则会给WorkerWrapper设置默认策略{@link DependenceStrategy#ALL_DEPENDENCIES_ALL_SUCCESS}。
|
||||
*/
|
||||
private DependenceStrategy dependenceStrategy;
|
||||
/**
|
||||
* 存储自己需要特殊对待的dependWrapper集合
|
||||
*/
|
||||
private Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> dependWrapperActionStrategyMap;
|
||||
/**
|
||||
* 存储需要特殊对待自己的nextWrapper集合。
|
||||
*/
|
||||
private Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> selfIsSpecialMap;
|
||||
/**
|
||||
* 一个保存以must=true方式传入的WorkerWrapper的集合。
|
||||
* <p/>
|
||||
* 该Set将会加入到{@link WorkerWrapper.WrapperStrategy#getDependMustStrategyMapper().mustDependSet}之中
|
||||
*/
|
||||
private Set<WorkerWrapper<?, ?>> mustDependSet;
|
||||
/**
|
||||
* 存储强依赖于自己的wrapper集合
|
||||
*/
|
||||
private Set<WorkerWrapper<?, ?>> selfIsMustSet;
|
||||
/**
|
||||
* 是否使用了旧的编排模式(Must开关)
|
||||
* <p/>
|
||||
* 之所以需要以下两个属性,是为了隔离旧api与新api的策略不兼容的情况。<b>建议早日替换旧方法</b>
|
||||
* 例如旧代码里调用{@link WorkerWrapper.Builder#depend(WorkerWrapper, boolean)},参数传入了false。
|
||||
*/
|
||||
private boolean useV15DeprecatedMustDependApi = false;
|
||||
/**
|
||||
* 是否使用了新的编排模式。
|
||||
* <p/>
|
||||
* {@link #useV15DeprecatedMustDependApi}
|
||||
*/
|
||||
private boolean useV15NewDependApi = false;
|
||||
/**
|
||||
* 单个Wrapper超时相关属性
|
||||
*/
|
||||
private boolean enableTimeOut = false;
|
||||
private long time = -1;
|
||||
private TimeUnit unit = null;
|
||||
private boolean allowInterrupt = false;
|
||||
|
||||
/**
|
||||
* 标记自己正在building
|
||||
*/
|
||||
private boolean isBuilding = false;
|
||||
|
||||
@Override
|
||||
public BUILDER_SUB_CLASS worker(IWorker<T, V> worker) {
|
||||
this.worker = worker;
|
||||
return returnThisBuilder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BUILDER_SUB_CLASS param(T t) {
|
||||
this.param = t;
|
||||
return returnThisBuilder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BUILDER_SUB_CLASS id(String id) {
|
||||
if (id != null) {
|
||||
this.id = id;
|
||||
}
|
||||
return returnThisBuilder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BUILDER_SUB_CLASS setSkipStrategy(SkipStrategy strategy) {
|
||||
this.skipStrategy = strategy;
|
||||
return returnThisBuilder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BUILDER_SUB_CLASS callback(ICallback<T, V> callback) {
|
||||
this.callback = callback;
|
||||
return returnThisBuilder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetDependImpl setDepend() {
|
||||
useV15NewDependApi = true;
|
||||
checkCanNotCompatibleDeprecateMustDependApi(false);
|
||||
return new SetDependImpl();
|
||||
}
|
||||
|
||||
public class SetDependImpl implements SetDepend<T, V> {
|
||||
@Override
|
||||
public SetDependImpl wrapper(WorkerWrapper<?, ?> wrapper) {
|
||||
if (wrapper == null) {
|
||||
return this;
|
||||
}
|
||||
if (dependWrappers == null) {
|
||||
dependWrappers = new LinkedHashSet<>();
|
||||
}
|
||||
dependWrappers.add(wrapper);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetDependImpl mustRequireWrapper(WorkerWrapper<?, ?> wrapper) {
|
||||
if (wrapper == null) {
|
||||
return this;
|
||||
}
|
||||
wrapper(wrapper);
|
||||
if (mustDependSet == null) {
|
||||
mustDependSet = new LinkedHashSet<>();
|
||||
}
|
||||
mustDependSet.add(wrapper);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetDependImpl specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper) {
|
||||
if (strategy == null || wrapper == null) {
|
||||
return this;
|
||||
}
|
||||
if (dependWrapperActionStrategyMap == null) {
|
||||
dependWrapperActionStrategyMap = new LinkedHashMap<>();
|
||||
}
|
||||
dependWrapperActionStrategyMap.put(wrapper, strategy);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetDependImpl strategy(DependenceStrategy dependenceStrategy) {
|
||||
if (dependenceStrategy == null) {
|
||||
return this;
|
||||
}
|
||||
StableWorkerWrapperBuilder.this.dependenceStrategy = dependenceStrategy;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BUILDER_SUB_CLASS end() {
|
||||
return returnThisBuilder();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetNextImpl setNext() {
|
||||
useV15NewDependApi = true;
|
||||
checkCanNotCompatibleDeprecateMustDependApi(false);
|
||||
return new SetNextImpl();
|
||||
}
|
||||
|
||||
public class SetNextImpl implements SetNext<T, V> {
|
||||
@Override
|
||||
public SetNextImpl wrapper(WorkerWrapper<?, ?> wrapper) {
|
||||
if (wrapper == null) {
|
||||
return this;
|
||||
}
|
||||
if (nextWrappers == null) {
|
||||
nextWrappers = new LinkedHashSet<>();
|
||||
}
|
||||
nextWrappers.add(wrapper);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetNextImpl mustToNextWrapper(WorkerWrapper<?, ?> wrapper) {
|
||||
if (wrapper == null) {
|
||||
return this;
|
||||
}
|
||||
wrapper(wrapper);
|
||||
if (selfIsMustSet == null) {
|
||||
selfIsMustSet = new LinkedHashSet<>();
|
||||
}
|
||||
selfIsMustSet.add(wrapper);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetNextImpl specialToNextWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper) {
|
||||
if (strategy == null || wrapper == null) {
|
||||
return this;
|
||||
}
|
||||
wrapper(wrapper);
|
||||
if (selfIsSpecialMap == null) {
|
||||
selfIsSpecialMap = new LinkedHashMap<>();
|
||||
}
|
||||
selfIsSpecialMap.put(wrapper, strategy);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BUILDER_SUB_CLASS end() {
|
||||
return returnThisBuilder();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetTimeOut<T, V> setTimeOut() {
|
||||
return new SetTimeOutImpl();
|
||||
}
|
||||
|
||||
public class SetTimeOutImpl implements SetTimeOut<T, V> {
|
||||
@Override
|
||||
public SetTimeOutImpl enableTimeOut(boolean enableElseDisable) {
|
||||
StableWorkerWrapperBuilder.this.enableTimeOut = enableElseDisable;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetTimeOutImpl setTime(long time, TimeUnit unit) {
|
||||
if (time <= 0 || unit == null) {
|
||||
throw new IllegalStateException("Illegal argument : time=" + time + " must > 0, unit=" + unit + " must not null");
|
||||
}
|
||||
StableWorkerWrapperBuilder.this.time = time;
|
||||
StableWorkerWrapperBuilder.this.unit = unit;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetTimeOutImpl allowInterrupt(boolean allow) {
|
||||
StableWorkerWrapperBuilder.this.allowInterrupt = allow;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BUILDER_SUB_CLASS end() {
|
||||
return returnThisBuilder();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public WorkerWrapper<T, V> build() {
|
||||
isBuilding = true;
|
||||
WorkerWrapper<T, V> wrapper = new StableWorkerWrapper<>(
|
||||
id == null ? UUID.randomUUID().toString() : id,
|
||||
worker,
|
||||
param,
|
||||
callback
|
||||
);
|
||||
wrapper.setDependWrappers(new LinkedHashSet<>());
|
||||
wrapper.setNextWrappers(new LinkedHashSet<>());
|
||||
// ========== 设置依赖关系/策略 ==========
|
||||
{
|
||||
if (dependWrappers != null && dependWrappers.size() > 0) {
|
||||
dependWrappers.forEach(dependWrapper -> {
|
||||
wrapper.getDependWrappers().add(dependWrapper);
|
||||
dependWrapper.getNextWrappers().add(wrapper);
|
||||
});
|
||||
}
|
||||
if (nextWrappers != null && nextWrappers.size() > 0) {
|
||||
nextWrappers.forEach(next -> {
|
||||
wrapper.getNextWrappers().add(next);
|
||||
next.getDependWrappers().add(wrapper);
|
||||
});
|
||||
}
|
||||
if (useV15DeprecatedMustDependApi) {
|
||||
// 适配旧api的must开关
|
||||
if (mustDependSet != null && mustDependSet.size() > 0) {
|
||||
wrapper.getWrapperStrategy().setDependMustStrategyMapper(new DependMustStrategyMapper()
|
||||
.addDependMust(mustDependSet));
|
||||
}
|
||||
wrapper.getWrapperStrategy().setDependenceStrategy(DependenceStrategy.IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY);
|
||||
} else {
|
||||
if (mustDependSet != null && mustDependSet.size() > 0) {
|
||||
wrapper.getWrapperStrategy().setDependMustStrategyMapper(new DependMustStrategyMapper().addDependMust(mustDependSet));
|
||||
}
|
||||
if (dependenceStrategy == null) {
|
||||
setDepend().defaultStrategy();
|
||||
}
|
||||
wrapper.getWrapperStrategy().setDependenceStrategy(dependenceStrategy);
|
||||
}
|
||||
if (dependWrapperActionStrategyMap != null && dependWrapperActionStrategyMap.size() > 0) {
|
||||
DependWrapperStrategyMapper mapper = new DependWrapperStrategyMapper();
|
||||
dependWrapperActionStrategyMap.forEach(mapper::putMapping);
|
||||
wrapper.getWrapperStrategy().setDependWrapperStrategyMapper(mapper);
|
||||
}
|
||||
if (selfIsMustSet != null && selfIsMustSet.size() > 0) {
|
||||
selfIsMustSet.forEach(next -> Optional.ofNullable(next.getWrapperStrategy().getDependMustStrategyMapper())
|
||||
.ifPresent(mustMapper -> mustMapper.addDependMust(wrapper)));
|
||||
}
|
||||
if (selfIsSpecialMap != null && selfIsSpecialMap.size() > 0) {
|
||||
selfIsSpecialMap.forEach((next, strategy) -> Optional.ofNullable(next.getWrapperStrategy().getDependWrapperStrategyMapper())
|
||||
.ifPresent(wrapperMapper -> wrapperMapper.putMapping(wrapper, strategy)));
|
||||
}
|
||||
}
|
||||
// ========== 设置检查是否跳过策略 ==========
|
||||
{
|
||||
if (skipStrategy == null) {
|
||||
wrapper.getWrapperStrategy().setSkipStrategy(needCheckNextWrapperResult != null && !needCheckNextWrapperResult ?
|
||||
SkipStrategy.NOT_SKIP
|
||||
: SkipStrategy.CHECK_ONE_LEVEL
|
||||
);
|
||||
} else {
|
||||
wrapper.getWrapperStrategy().setSkipStrategy(skipStrategy);
|
||||
}
|
||||
}
|
||||
// ========== 设置单wrapper超时检查 ==========
|
||||
{
|
||||
if (enableTimeOut) {
|
||||
if (time <= 0) {
|
||||
throw new IllegalStateException("timeout time " + time + " must > " + 0);
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new IllegalStateException("timeout unit must not null");
|
||||
}
|
||||
wrapper.setTimeOut(new WorkerWrapper.TimeOutProperties(true, time, unit, allowInterrupt, wrapper));
|
||||
}
|
||||
}
|
||||
return wrapper;
|
||||
}
|
||||
|
||||
// ========== deprecated methods ==========
|
||||
|
||||
/**
|
||||
* @deprecated 建议使用 {@link WorkerWrapperBuilder#depends(WorkerWrapper[])}
|
||||
* 或{@link WorkerWrapperBuilder#setDepend()}设置更多选项,例如{@link WorkerWrapperBuilder.SetDepend#wrapper(WorkerWrapper[])}
|
||||
* 如果是想要“必须依赖”的功能,则使用{@link WorkerWrapperBuilder.SetDepend#mustRequireWrapper(WorkerWrapper[])}
|
||||
*/
|
||||
@Deprecated
|
||||
public BUILDER_SUB_CLASS depend(WorkerWrapper<?, ?>... wrappers) {
|
||||
if (wrappers == null) {
|
||||
return returnThisBuilder();
|
||||
}
|
||||
for (WorkerWrapper<?, ?> wrapper : wrappers) {
|
||||
depend(wrapper);
|
||||
}
|
||||
return returnThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated 建议使用 {@link WorkerWrapperBuilder#depends(WorkerWrapper[])}。
|
||||
* 或{@link WorkerWrapperBuilder#setDepend()}设置更多选项,例如{@link WorkerWrapperBuilder.SetDepend#wrapper(WorkerWrapper)}
|
||||
* 如果是想要“必须依赖”的功能,则使用{@link WorkerWrapperBuilder.SetDepend#mustRequireWrapper(WorkerWrapper[])}
|
||||
*/
|
||||
@Deprecated
|
||||
public BUILDER_SUB_CLASS depend(WorkerWrapper<?, ?> wrapper) {
|
||||
return depend(wrapper, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated 建议使用 {@link WorkerWrapperBuilder.SetDepend#requireWrapper(WorkerWrapper, boolean)}}
|
||||
*/
|
||||
@Deprecated
|
||||
public BUILDER_SUB_CLASS depend(WorkerWrapper<?, ?> wrapper, boolean isMust) {
|
||||
if (wrapper == null) {
|
||||
return returnThisBuilder();
|
||||
}
|
||||
useV15DeprecatedMustDependApi = true;
|
||||
checkCanNotCompatibleDeprecateMustDependApi(true);
|
||||
if (dependWrappers == null) {
|
||||
dependWrappers = new LinkedHashSet<>();
|
||||
}
|
||||
dependWrappers.add(wrapper);
|
||||
if (isMust) {
|
||||
if (mustDependSet == null) {
|
||||
mustDependSet = new LinkedHashSet<>();
|
||||
}
|
||||
mustDependSet.add(wrapper);
|
||||
}
|
||||
return returnThisBuilder();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public BUILDER_SUB_CLASS next(WorkerWrapper<?, ?>... wrappers) {
|
||||
if (wrappers == null) {
|
||||
return returnThisBuilder();
|
||||
}
|
||||
for (WorkerWrapper<?, ?> wrapper : wrappers) {
|
||||
next(wrapper);
|
||||
}
|
||||
return returnThisBuilder();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public BUILDER_SUB_CLASS next(WorkerWrapper<?, ?> wrapper) {
|
||||
return next(wrapper, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 会将wrapper增加到{@link #nextWrappers}。
|
||||
* 如果selfIsMust为true,还会将wrapper额外增加到{@link #selfIsMustSet}。
|
||||
*
|
||||
* @param wrapper WorkerWrapper instance
|
||||
* @param selfIsMust 是否强依赖自己(“强依赖”是旧版本的叫法。即是否必须在自己执行后才能执行。)
|
||||
* @return 返回Builder。
|
||||
* @deprecated 不推荐使用Must开关去设置之后的Wrapper。
|
||||
*/
|
||||
@Deprecated
|
||||
public BUILDER_SUB_CLASS next(WorkerWrapper<?, ?> wrapper, boolean selfIsMust) {
|
||||
useV15DeprecatedMustDependApi = true;
|
||||
checkCanNotCompatibleDeprecateMustDependApi(true);
|
||||
if (nextWrappers == null) {
|
||||
nextWrappers = new LinkedHashSet<>();
|
||||
}
|
||||
nextWrappers.add(wrapper);
|
||||
//强依赖自己
|
||||
if (selfIsMust) {
|
||||
if (selfIsMustSet == null) {
|
||||
selfIsMustSet = new LinkedHashSet<>();
|
||||
}
|
||||
selfIsMustSet.add(wrapper);
|
||||
}
|
||||
return returnThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置是否要检查之后的Wrapper是否已经执行完毕。
|
||||
* <p/>
|
||||
* 默认为true。
|
||||
*
|
||||
* @param needCheckNextWrapperResult 设为true后,如果之后的Wrapper已经执行完毕。
|
||||
* 则跳过本Wrapper并设置{@link WorkResult#getEx()}为{@link com.jd.platform.async.exception.SkippedException}。
|
||||
* @deprecated v1.5中已经废弃。请使用
|
||||
*/
|
||||
@Deprecated
|
||||
public BUILDER_SUB_CLASS needCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
|
||||
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
|
||||
return returnThisBuilder();
|
||||
}
|
||||
|
||||
// util method
|
||||
|
||||
private BUILDER_SUB_CLASS returnThisBuilder() {
|
||||
return (BUILDER_SUB_CLASS) this;
|
||||
}
|
||||
|
||||
private void checkCanNotCompatibleDeprecateMustDependApi(boolean isOld) {
|
||||
if (!isBuilding && (!isOld && useV15DeprecatedMustDependApi || isOld && useV15NewDependApi)) {
|
||||
throw new UnsupportedOperationException("新旧api之间不可兼容,请将v1.5之前废弃的方法升级为注释中建议的方法后再调用v1.5之后的新api");
|
||||
}
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,290 @@
|
||||
package com.jd.platform.async.wrapper;
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
import com.jd.platform.async.callback.IWorker;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
import com.jd.platform.async.wrapper.actionstrategy.DependWrapperActionStrategy;
|
||||
import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy;
|
||||
import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 作为优化编排依赖策略后,新增的Builder接口。
|
||||
* <p/>
|
||||
* 该接口中不再开放很多过时的api。
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/4-下午1:26
|
||||
*/
|
||||
public interface WorkerWrapperBuilder<T, V> {
|
||||
/**
|
||||
* 设置唯一id。
|
||||
* 如果不设置,{@link StableWorkerWrapperBuilder}会使用UUID
|
||||
*/
|
||||
WorkerWrapperBuilder<T, V> id(String id);
|
||||
|
||||
/**
|
||||
* 设置{@link IWorker}执行方法。
|
||||
*
|
||||
* @param worker 传入接口实现类/lambda
|
||||
*/
|
||||
WorkerWrapperBuilder<T, V> worker(IWorker<T, V> worker);
|
||||
|
||||
/**
|
||||
* wrapper启动后的传入参数。
|
||||
*
|
||||
* @param t 参数
|
||||
*/
|
||||
WorkerWrapperBuilder<T, V> param(T t);
|
||||
|
||||
/**
|
||||
* 设置{@link ICallback}回调方法。
|
||||
*/
|
||||
WorkerWrapperBuilder<T, V> callback(ICallback<T, V> callback);
|
||||
|
||||
/**
|
||||
* 设置跳过策略。通常用于检查下游Wrapper是否已经完成。
|
||||
* <p/>
|
||||
* 允许不设置。{@link StableWorkerWrapperBuilder}将会默认设置为检查深度为1的下游Wrapper是否执行完成。
|
||||
*
|
||||
* @param strategy 跳过策略函数。
|
||||
*/
|
||||
WorkerWrapperBuilder<T, V> setSkipStrategy(SkipStrategy strategy);
|
||||
|
||||
/**
|
||||
* 设置上游Wrapper依赖关系的选项。
|
||||
*/
|
||||
SetDepend<T, V> setDepend();
|
||||
|
||||
interface SetDepend<T, V> {
|
||||
/**
|
||||
* 设置在本Wrapper之前的上游Wrapper。
|
||||
*
|
||||
* @param wrapper 允许传入null。
|
||||
*/
|
||||
SetDepend<T, V> wrapper(WorkerWrapper<?, ?> wrapper);
|
||||
|
||||
default SetDepend<T, V> wrapper(WorkerWrapper... wrappers) {
|
||||
if (wrappers == null) {
|
||||
return this;
|
||||
}
|
||||
for (WorkerWrapper<?, ?> wrapper : wrappers) {
|
||||
wrapper(wrapper);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
default SetDepend<T, V> wrapper(Collection<? extends WorkerWrapper> wrappers) {
|
||||
if (wrappers == null) {
|
||||
return this;
|
||||
}
|
||||
wrappers.forEach(this::wrapper);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置必须要执行成功的Wrapper,当所有被该方法设为的上游Wrapper执行成功时,本Wrapper才能执行
|
||||
*/
|
||||
SetDepend<T, V> mustRequireWrapper(WorkerWrapper<?, ?> wrapper);
|
||||
|
||||
default SetDepend<T, V> mustRequireWrapper(WorkerWrapper... wrappers) {
|
||||
if (wrappers == null) {
|
||||
return this;
|
||||
}
|
||||
for (WorkerWrapper<?, ?> wrapper : wrappers) {
|
||||
mustRequireWrapper(wrapper);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 一个用于动态判断是否must的方法,与旧的{@code .depend(WorkerWrapper,boolean)}效果相同。
|
||||
*
|
||||
* @param must 如果为true,则等同于{@link #mustRequireWrapper(WorkerWrapper)},否则等同于{@link #wrapper(WorkerWrapper)}
|
||||
*/
|
||||
default SetDepend<T, V> requireWrapper(WorkerWrapper<?, ?> wrapper, boolean must) {
|
||||
return must ? mustRequireWrapper(wrapper) : wrapper(wrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 对单个Wrapper设置特殊策略。
|
||||
*
|
||||
* @param wrapper 需要设置特殊策略的Wrapper。
|
||||
* @param strategy 特殊策略。
|
||||
*/
|
||||
SetDepend<T, V> specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper);
|
||||
|
||||
default SetDepend<T, V> specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper... wrappers) {
|
||||
if (strategy == null || wrappers == null) {
|
||||
return this;
|
||||
}
|
||||
for (WorkerWrapper<?, ?> workerWrapper : wrappers) {
|
||||
specialDependWrapper(strategy, workerWrapper);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置基本策略并返回。
|
||||
* <p>
|
||||
* 如果从未调用该方法,则在{@link #build()}时使用{@link #defaultStrategy()}作为默认策略。
|
||||
* </p>
|
||||
*
|
||||
* @param dependenceStrategy 根据上游Wrapper判断本Wrapper是否启动的最终策略。
|
||||
*/
|
||||
SetDepend<T, V> strategy(DependenceStrategy dependenceStrategy);
|
||||
|
||||
/**
|
||||
* 默认策略为{@link DependenceStrategy#ALL_DEPENDENCIES_ALL_SUCCESS}
|
||||
*/
|
||||
default SetDepend<T, V> defaultStrategy() {
|
||||
return strategy(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 结束依赖关系设置。返回到所属的{@link WorkerWrapperBuilder}
|
||||
*/
|
||||
WorkerWrapperBuilder<T, V> end();
|
||||
}
|
||||
|
||||
/**
|
||||
* 便捷式设置依赖的上游Wrapper。
|
||||
*
|
||||
* @param wrappers 上游Wrapper
|
||||
*/
|
||||
default WorkerWrapperBuilder<T, V> depends(WorkerWrapper... wrappers) {
|
||||
return setDepend().wrapper(wrappers).end();
|
||||
}
|
||||
|
||||
default WorkerWrapperBuilder<T, V> depends(Collection<WorkerWrapper> wrappers) {
|
||||
return setDepend().wrapper(wrappers).end();
|
||||
}
|
||||
|
||||
default WorkerWrapperBuilder<T, V> depends(DependenceStrategy strategy, WorkerWrapper... wrappers) {
|
||||
return setDepend().wrapper(wrappers).strategy(strategy).end();
|
||||
}
|
||||
|
||||
default WorkerWrapperBuilder<T, V> depends(DependenceStrategy strategy, Collection<WorkerWrapper> wrappers) {
|
||||
return setDepend().wrapper(wrappers).strategy(strategy).end();
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置下游Wrapper依赖关系的选项。
|
||||
*/
|
||||
SetNext<T, V> setNext();
|
||||
|
||||
interface SetNext<T, V> {
|
||||
/**
|
||||
* 设置在本Wrapper之后的下游Wrapper。
|
||||
*/
|
||||
SetNext<T, V> wrapper(WorkerWrapper<?, ?> wrapper);
|
||||
|
||||
default SetNext<T, V> wrapper(WorkerWrapper... wrappers) {
|
||||
if (wrappers == null) {
|
||||
return this;
|
||||
}
|
||||
for (WorkerWrapper<?, ?> wrapper : wrappers) {
|
||||
wrapper(wrapper);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
default SetNext<T, V> wrapper(Collection<? extends WorkerWrapper> wrappers) {
|
||||
if (wrappers == null) {
|
||||
return this;
|
||||
}
|
||||
wrappers.forEach(this::wrapper);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 调用该方法将会让传入的此下游workerWrappers对本Wrapper强依赖(must)
|
||||
*
|
||||
* @param wrapper 下游Wrapper
|
||||
*/
|
||||
SetNext<T, V> mustToNextWrapper(WorkerWrapper<?, ?> wrapper);
|
||||
|
||||
default SetNext<T, V> requireToNextWrapper(WorkerWrapper<?, ?> wrapper, boolean must) {
|
||||
return must ? mustToNextWrapper(wrapper) : wrapper(wrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 调用该方法将会让传入的此下游workerWrappers对本Wrapper进行特殊策略判断,
|
||||
*
|
||||
* @param strategy 对本Wrapper的特殊策略。
|
||||
* @param wrapper 依赖本Wrapper的下游Wrapper。
|
||||
* @return 返回Builder自身。
|
||||
*/
|
||||
SetNext<T, V> specialToNextWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper);
|
||||
|
||||
WorkerWrapperBuilder<T, V> end();
|
||||
}
|
||||
|
||||
/**
|
||||
* 便捷式设置本Wrapper被依赖的下游Wrapper。
|
||||
*
|
||||
* @param wrappers 下游Wrapper
|
||||
*/
|
||||
default WorkerWrapperBuilder<T, V> nextOf(WorkerWrapper... wrappers) {
|
||||
return setNext().wrapper(wrappers).end();
|
||||
}
|
||||
|
||||
default WorkerWrapperBuilder<T, V> nextOf(Collection<WorkerWrapper> wrappers) {
|
||||
return setNext().wrapper(wrappers).end();
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置超时时间的具体属性
|
||||
*/
|
||||
SetTimeOut<T, V> setTimeOut();
|
||||
|
||||
interface SetTimeOut<T, V> {
|
||||
/**
|
||||
* 是否启动超时判断。
|
||||
* <p>
|
||||
* 默认为true
|
||||
*
|
||||
* @param enableElseDisable 是则true
|
||||
*/
|
||||
SetTimeOut<T, V> enableTimeOut(boolean enableElseDisable);
|
||||
|
||||
/**
|
||||
* 设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断
|
||||
*
|
||||
* @param time 时间数值
|
||||
* @param unit 时间单位
|
||||
*/
|
||||
SetTimeOut<T, V> setTime(long time, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* 是否允许被试图中断线程
|
||||
*
|
||||
* @param allow 是则true
|
||||
*/
|
||||
SetTimeOut<T, V> allowInterrupt(boolean allow);
|
||||
|
||||
WorkerWrapperBuilder<T, V> end();
|
||||
}
|
||||
|
||||
/**
|
||||
* 便携式设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断
|
||||
*
|
||||
* @param time 时间数值
|
||||
* @param unit 时间单位
|
||||
*/
|
||||
default WorkerWrapperBuilder<T, V> timeout(long time, TimeUnit unit) {
|
||||
return timeout(true, time, unit, false);
|
||||
}
|
||||
|
||||
default WorkerWrapperBuilder<T, V> timeout(boolean enableTimeOut, long time, TimeUnit unit, boolean allowInterrupt) {
|
||||
return setTimeOut().enableTimeOut(enableTimeOut).setTime(time, unit).allowInterrupt(allowInterrupt).end();
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建Wrapper。
|
||||
*
|
||||
* @return 返回WorkerWrapper
|
||||
*/
|
||||
WorkerWrapper<T, V> build();
|
||||
}
|
||||
@@ -0,0 +1,486 @@
|
||||
package com.jd.platform.async.wrapper;
|
||||
|
||||
import com.jd.platform.async.executor.timer.SystemClock;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
/**
|
||||
* 判断{@link WorkerWrapper}是否链路调用完成的轮询器。
|
||||
* =================================================================================
|
||||
* <p>
|
||||
* 在v1.4及以前的版本,存在如下问题:
|
||||
* >
|
||||
* 在使用线程数量较少的线程池进行beginWork时,调用WorkerWrapper#beginNext方法时,
|
||||
* 会因为本线程等待下游Wrapper执行完成而存在线程耗尽bug。线程池会死翘翘的僵住、动弹不得。
|
||||
* >
|
||||
* 例如仅有2个线程的线程池,执行以下任务:
|
||||
* {@code
|
||||
* <p>
|
||||
* 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况,在test/v15.dependnew中示例testThreadPolling_V14Bug说明了这个bug
|
||||
* 线程数:2
|
||||
* A(5ms)--B1(10ms) ---|--> C1(5ms)
|
||||
* . \ | (B1、B2全部完成可执行C1、C2)
|
||||
* . ---> B2(20ms) --|--> C2(5ms)
|
||||
* <p>
|
||||
* }
|
||||
* 线程1执行了A,然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成。
|
||||
* 线程2执行了B1或B2中的一个,也在allOf方法等待C1、C2完成。
|
||||
* 结果没有线程执行C和B2了,导致超时而死,并且这个线程池线程有可能被耗尽。
|
||||
* >
|
||||
* v1.5的解决方案是,放弃使工作线程遭致阻塞的{@link java.util.concurrent.CompletableFuture},
|
||||
* 而是让工作线程在工作前注册到本“完成检查器”{@link WrapperEndingInspector},然后交由轮询中心{@link PollingCenter}进行检查是否完成。
|
||||
* </p>
|
||||
* =================================================================================
|
||||
* <p>
|
||||
* 本类的工作原理:
|
||||
* .
|
||||
* 原理:
|
||||
* (1)首先在Async代码中,将主动运行的wrapper都保存到一个inspector{@link #addWrapper(WorkerWrapper)},
|
||||
* (2)主动运行的wrapper于FINISH/ERROR时,先异步submit所有下游wrapper,在其执行时将自身(下游wrapper)保存到inspector,
|
||||
* (3)然后在异步submit完所有下游wrapper后,将调用{@link #setWrapperEndWithTryPolling(WorkerWrapper)}方法,
|
||||
* . 设置自己的{@link #wrappers}为true,并呼叫轮询{@link PollingCenter#tryPolling()}。
|
||||
* (4)在下游wrapper中,经过策略器判断后,
|
||||
* . 若是不需要运行,则把本wrapper计数-1{@link WrapperNode#count},若是计数<1则将{@link WrapperNode}移出{@link #wrappers}。
|
||||
* . 若是需要运行,则运行之,然后跳转到 (2) 的情节。如此递归,执行链路上所有需要执行的wrapper最后都会存在于{@link #wrappers}中。
|
||||
* .
|
||||
* 因此,若是存在任一其{@link WrapperNode#called}为false的wrapper,则表示这条链路还没有调用完。
|
||||
* 若是在{@link #wrappers}中所有的{@link WrapperNode#called}为true时,即可判断出链路执行完毕了。
|
||||
* </p>
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/5-下午3:22
|
||||
*/
|
||||
public class WrapperEndingInspector implements Comparable<WrapperEndingInspector> {
|
||||
/**
|
||||
* 最迟完成时间
|
||||
*/
|
||||
private final long latestFinishTime;
|
||||
|
||||
/**
|
||||
* 保存 需要检查的wrapper--相关属性 的Map。
|
||||
*/
|
||||
private final ConcurrentHashMap<WorkerWrapper, WrapperNode> wrappers = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 当全部wrapper都调用结束,它会countDown
|
||||
*/
|
||||
private final CountDownLatch endCDL = new CountDownLatch(1);
|
||||
|
||||
/**
|
||||
* 读锁用于修改数据,写锁用于轮询。使用公平锁让wrapper的时间波动不会太长。
|
||||
* <p/>
|
||||
* 在轮询到本inspector时,之所以要上写锁,是因为:
|
||||
* 假如此时有个Wrapper正在调用{@link #addWrapper(WorkerWrapper)},则wrappers发生了改变。
|
||||
* 假如现在恰巧访问到的是{@link #wrappers}迭代器的最后一个,但此时又加入了另一个,且这另一个又是需要去执行的。
|
||||
* 那么假如在迭代器遍历到目前访问到的wrapper都是呼叫完毕的,那么这新加入的一个就会被忽略,从而判定为全部完成。致使bug发生。
|
||||
* <p/>
|
||||
* 此外,即便轮询时上写锁,对性能的影响也是有限的。因为这只会在“呼叫别人”的时候发生工作线程与轮询线程的锁争抢,
|
||||
* 而在工作线程执行{@link com.jd.platform.async.callback.IWorker#action(Object, Map)}或
|
||||
* {@link com.jd.platform.async.callback.ICallback#result(boolean, Object, WorkResult)}时,并不会与轮询线程去
|
||||
* 争抢锁,而通常这个工作的时间才是最耗时的。
|
||||
*/
|
||||
private final ReentrantReadWriteLock modifyPollingLock = new ReentrantReadWriteLock(true);
|
||||
|
||||
/**
|
||||
* 当轮询发现超时时,该值被设为false
|
||||
*/
|
||||
private final AtomicBoolean haveNotTimeOut = new AtomicBoolean(true);
|
||||
|
||||
public WrapperEndingInspector(long latestFinishTime) {
|
||||
this.latestFinishTime = latestFinishTime;
|
||||
}
|
||||
|
||||
public void registerToPollingCenter() {
|
||||
modifyPollingLock.readLock().lock();
|
||||
try {
|
||||
// 不重复put,以免InspectorNode被替换为另一个
|
||||
PollingCenter.getInstance().inspectionMap.putIfAbsent(this, new PollingCenter.InspectorNode());
|
||||
} finally {
|
||||
modifyPollingLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void addWrapper(WorkerWrapper wrapper) {
|
||||
modifyPollingLock.readLock().lock();
|
||||
try {
|
||||
wrappers.computeIfAbsent(wrapper, k -> new WrapperNode()).count.incrementAndGet();
|
||||
} finally {
|
||||
modifyPollingLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void addWrapper(Collection<? extends WorkerWrapper> wrappers) {
|
||||
modifyPollingLock.readLock().lock();
|
||||
try {
|
||||
Objects.requireNonNull(wrappers).forEach(this::addWrapper);
|
||||
} finally {
|
||||
modifyPollingLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void reduceWrapper(WorkerWrapper wrapper) {
|
||||
modifyPollingLock.readLock().lock();
|
||||
try {
|
||||
/*
|
||||
* 有可能发生这情况,一个Wrapper刚被加进去,执行了零/一/多次,均不满足执行条件,但是下次调用却应当使其启动。
|
||||
*/
|
||||
if (wrapper.getState() != WorkerWrapper.INIT) {
|
||||
final WrapperNode wrapperNode = wrappers.get(wrapper);
|
||||
if (wrapperNode == null) {
|
||||
return;
|
||||
}
|
||||
synchronized (wrapperNode) {
|
||||
if (wrapperNode.count.decrementAndGet() < 1) {
|
||||
wrappers.remove(wrapper);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
modifyPollingLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 原子的设置这个Wrapper已经呼叫完成了。
|
||||
* <p/>
|
||||
* 该方法会调用{@link PollingCenter#tryPolling()},呼叫轮询线程
|
||||
*
|
||||
* @return 如果为true,表示设置成功。为false表示已经被设置过了。
|
||||
*/
|
||||
public boolean setWrapperEndWithTryPolling(WorkerWrapper wrapper) {
|
||||
modifyPollingLock.readLock().lock();
|
||||
try {
|
||||
return !wrappers.get(wrapper).called.getAndSet(true);
|
||||
} finally {
|
||||
modifyPollingLock.readLock().unlock();
|
||||
PollingCenter.getInstance().tryPolling();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 供外部调用的等待方法
|
||||
*
|
||||
* @return 在超时前完成,返回true。超时时间一到,就会返回false。就像,人被杀,就会死。
|
||||
* @throws InterruptedException 外部调用的当前线程被中断时,会抛出这个异常。
|
||||
*/
|
||||
public boolean await() throws InterruptedException {
|
||||
endCDL.await();
|
||||
return haveNotTimeOut.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link PollingCenter}会优先把最迟完成时间(即开始时间+超时时间)较早的Inspection放在前面。
|
||||
*/
|
||||
@Override
|
||||
public int compareTo(WrapperEndingInspector other) {
|
||||
if (this.latestFinishTime - other.latestFinishTime < 0) {
|
||||
return -1;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "WrapperEndingInspector{" +
|
||||
"remainTime=" + (latestFinishTime - SystemClock.now()) +
|
||||
", wrappers=" +
|
||||
wrappers.entrySet().stream()
|
||||
.collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue))
|
||||
+
|
||||
", endCDL.getCount()=" + endCDL.getCount() +
|
||||
", writePollingLock={read=" + modifyPollingLock.getReadLockCount() + ",write=" + modifyPollingLock.getWriteHoldCount() +
|
||||
"} }";
|
||||
}
|
||||
|
||||
/**
|
||||
* 节点对象,保存属性信息于{@link #wrappers}中。
|
||||
* <p/>
|
||||
* 当试图把Node移出本Map时,该Node对象自身将会被上锁。
|
||||
*/
|
||||
public static class WrapperNode {
|
||||
/**
|
||||
* 是否已经呼叫完了下游wrapper
|
||||
*/
|
||||
AtomicBoolean called = new AtomicBoolean(false);
|
||||
/**
|
||||
* 本wrapper总共被呼叫次数的统计。若小于1则会被移出map。
|
||||
*/
|
||||
AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "{" +
|
||||
"called=" + called.get() +
|
||||
", count=" + count.get() +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 轮询中心。具体的轮询调度由其完成。
|
||||
* <p/>
|
||||
* {@link #registerToPollingCenter()}调用时,就会将inspector注册到本轮询中心以供轮询。
|
||||
*/
|
||||
public static class PollingCenter {
|
||||
public static class InspectorNode {
|
||||
/**
|
||||
* 延迟轮询时间戳。
|
||||
*/
|
||||
private volatile long delayTimeStamp = Long.MAX_VALUE;
|
||||
|
||||
private final ReadWriteLock lockOfDelayTimeStamp = new ReentrantReadWriteLock();
|
||||
|
||||
/**
|
||||
* 比较传入时间戳与{@link #delayTimeStamp},并设置小的那个为{@link #delayTimeStamp}的值。
|
||||
*
|
||||
* @param otherDelayTimeStamp 试图用来比较的另一个时间戳
|
||||
*/
|
||||
public void compareAndSetMinDelayTimeStamp(long otherDelayTimeStamp) {
|
||||
lockOfDelayTimeStamp.writeLock().lock();
|
||||
try {
|
||||
long dif = otherDelayTimeStamp - delayTimeStamp;
|
||||
if (dif > 0) {
|
||||
return;
|
||||
}
|
||||
delayTimeStamp = otherDelayTimeStamp;
|
||||
} finally {
|
||||
lockOfDelayTimeStamp.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public long getDelayTimeStamp() {
|
||||
lockOfDelayTimeStamp.readLock().lock();
|
||||
try {
|
||||
return delayTimeStamp;
|
||||
} finally {
|
||||
lockOfDelayTimeStamp.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public long clearTimeStamp() {
|
||||
lockOfDelayTimeStamp.writeLock().lock();
|
||||
try {
|
||||
long old = this.delayTimeStamp;
|
||||
delayTimeStamp = Long.MAX_VALUE;
|
||||
return old;
|
||||
} finally {
|
||||
lockOfDelayTimeStamp.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "InspectorNode{" +
|
||||
"delayTimeStamp=" + delayTimeStamp +
|
||||
", lockOfDelayTimeStamp=" + lockOfDelayTimeStamp +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 将被轮询的WrapperFinishInspection集合。
|
||||
*/
|
||||
private final Map<WrapperEndingInspector, InspectorNode> inspectionMap = new ConcurrentSkipListMap<>();
|
||||
|
||||
/**
|
||||
* 请求轮询。
|
||||
*/
|
||||
private void tryPolling() {
|
||||
// 开始轮询
|
||||
SINGLETON_POLLING_POOL.submit(() -> {
|
||||
// 用来判断在轮询过程中是否有新增的inspector的值
|
||||
int expectCount;
|
||||
// 如果此值变化过,则在结束时让自己在此值后的时间再启动轮询
|
||||
while (!inspectionMap.isEmpty()) {
|
||||
// expectCount是本线程用来记录本次循环开始时inspectionMap的个数。
|
||||
// 每当移出一个inspector时,该值-1。
|
||||
expectCount = inspectionMap.size();
|
||||
// 开始检查
|
||||
for (Map.Entry<WrapperEndingInspector, InspectorNode> entry : inspectionMap.entrySet()) {
|
||||
final WrapperEndingInspector inspector = entry.getKey();
|
||||
final InspectorNode inspectorNode = entry.getValue();
|
||||
// 直接抢锁,轮询期间禁止修改inspector
|
||||
inspector.modifyPollingLock.writeLock().lock();
|
||||
try {
|
||||
// 对一个inspector进行检查
|
||||
if (PollingCenter.this.checkInspectorIsEnd(inspector, inspectorNode)) {
|
||||
// inspector中的wrapper调用结束了
|
||||
// 先要把wrapper给停了
|
||||
inspector.wrappers.forEach((wrapper, wrapperNode) -> {
|
||||
WorkerWrapper.TimeOutProperties timeOut = wrapper.getTimeOut();
|
||||
if (timeOut != null) {
|
||||
timeOut.checkTimeOut(true);
|
||||
}
|
||||
});
|
||||
// 修改此inspector和expectCount的状态
|
||||
if (inspector.endCDL.getCount() > 0) {
|
||||
// 双重检查使endCDL原子性countDown。
|
||||
synchronized (inspector.endCDL) {
|
||||
if (inspector.endCDL.getCount() > 0) {
|
||||
inspectionMap.remove(inspector);
|
||||
expectCount--;
|
||||
inspector.endCDL.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
inspector.modifyPollingLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
/*
|
||||
* 根据 expectCount == inspectionMap.size() 的值,在仅有本线程1个线程在轮询的情况下:
|
||||
* 1. 若值为true,表示轮询过程中没有新的inspector被添加进set中。此时就可以break了。
|
||||
* . 之所以可以break,是因为这个inspection还没有调用结束,在其结束前还会来催促轮询的。
|
||||
* 2. 若值为false,表示有新的inspector在本线程轮询时,被加入到了set中,且没有被我们迭代到。此时还要重新轮询一次。
|
||||
*/
|
||||
if (expectCount == inspectionMap.size()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private boolean checkInspectorIsEnd(WrapperEndingInspector inspector, InspectorNode inspectorNode) {
|
||||
// 判断一下inspector整组是否超时
|
||||
if (inspector.latestFinishTime < SystemClock.now()) {
|
||||
inspector.haveNotTimeOut.set(false);
|
||||
inspector.wrappers.forEach(((wrapper, wrapperNode) -> {
|
||||
wrapper.failNow();
|
||||
wrapperNode.called.set(true);
|
||||
}));
|
||||
return true;
|
||||
}
|
||||
// 将延迟检查时间设为离现在最近的值。
|
||||
// 此处判断的是inspector所代表整次任务的超时时间
|
||||
inspectorNode.compareAndSetMinDelayTimeStamp(inspector.latestFinishTime);
|
||||
// 判断inspector是否结束,并顺便记录、判断、修改wrapper的超时信息
|
||||
for (Map.Entry<WorkerWrapper, WrapperNode> entry : inspector.wrappers.entrySet()) {
|
||||
WorkerWrapper wrapper = entry.getKey();
|
||||
// 判断单个wrapper是否超时
|
||||
WorkerWrapper.TimeOutProperties timeOutProperties = wrapper.getTimeOut();
|
||||
if (timeOutProperties != null && timeOutProperties.isEnable()) {
|
||||
// 将延迟检查时间设为离现在最近的值。
|
||||
// 此处判断的是wrapper的超时时间
|
||||
if (timeOutProperties.checkTimeOut(true)) {
|
||||
inspector.haveNotTimeOut.set(false);
|
||||
}
|
||||
// 未超时但是设置了超时检查的话,记录一下inspector延时轮询时间
|
||||
else {
|
||||
inspectorNode.compareAndSetMinDelayTimeStamp(
|
||||
(timeOutProperties.isStarted() ? timeOutProperties.getStartWorkingTime() : SystemClock.now())
|
||||
+ timeOutProperties.getUnit().toMillis(timeOutProperties.getTime())
|
||||
);
|
||||
}
|
||||
}
|
||||
// 判断wrapper是否执行完毕
|
||||
WrapperNode node = entry.getValue();
|
||||
if (wrapper.getState() == WorkerWrapper.INIT
|
||||
// 上值如果为false,表示该Wrapper要么还没来得及执行,要么判断不需要执行但是还未被移出
|
||||
|| !node.called.get()
|
||||
// 上值如果为false,表示该Wrapper正在工作或是刚刚结束/失败,还未将所有下游Wrapper调用一遍。
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
{
|
||||
final String executorName = "asyncTool-pollingDelayCaller";
|
||||
ScheduledThreadPoolExecutor delayPollingExecutor = new ScheduledThreadPoolExecutor(
|
||||
1,
|
||||
new ThreadFactory() {
|
||||
private final AtomicLong threadCount = new AtomicLong(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r, executorName + "-thread-" + threadCount.getAndIncrement());
|
||||
t.setDaemon(true);
|
||||
// 线程优先级不高
|
||||
t.setPriority(1);
|
||||
return t;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return executorName + "-threadFactory";
|
||||
}
|
||||
}
|
||||
) {
|
||||
@Override
|
||||
public String toString() {
|
||||
return executorName + "{PollingCenter.this=" + PollingCenter.this + "}";
|
||||
}
|
||||
};
|
||||
// 每毫秒判断一次:map.value的每个延迟轮询队列的头号元素是否抵达当前时间,如果到了,则清除并调用轮询
|
||||
delayPollingExecutor.scheduleAtFixedRate(() -> inspectionMap.values().stream()
|
||||
.min(Comparator.comparingLong(InspectorNode::getDelayTimeStamp))
|
||||
.ifPresent(node -> {
|
||||
long delayTimeStamp = node.getDelayTimeStamp();
|
||||
if (Long.MAX_VALUE != delayTimeStamp && SystemClock.now() > delayTimeStamp) {
|
||||
tryPolling();
|
||||
}
|
||||
}), 1, 1, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
// ========== static ==========
|
||||
|
||||
private final static PollingCenter instance = new PollingCenter();
|
||||
|
||||
public static PollingCenter getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* 单线程的轮询线程池
|
||||
*/
|
||||
private static final ThreadPoolExecutor SINGLETON_POLLING_POOL;
|
||||
|
||||
static {
|
||||
SINGLETON_POLLING_POOL = new ThreadPoolExecutor(
|
||||
0,
|
||||
// 轮询线程数必须为1
|
||||
1,
|
||||
15L,
|
||||
TimeUnit.SECONDS,
|
||||
// 必须保存至少一个轮询请求,以便在本线程轮询结束时,获取到已轮询过的线程提交的轮询请求
|
||||
new ArrayBlockingQueue<>(1),
|
||||
new ThreadFactory() {
|
||||
private final AtomicLong threadCount = new AtomicLong(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r, "asyncTool-pollingCenterPool-thread-" + threadCount.getAndIncrement());
|
||||
t.setDaemon(true);
|
||||
// 线程优先级不高
|
||||
t.setPriority(3);
|
||||
return t;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "asyncTool-pollingCenterPool-threadFactory";
|
||||
}
|
||||
},
|
||||
// 多的就丢了,反正都是催这一个线程去轮询
|
||||
new ThreadPoolExecutor.DiscardPolicy()
|
||||
) {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "asyncTool-pollingCenterPool";
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
package com.jd.platform.async.wrapper.actionstrategy;
|
||||
|
||||
import com.jd.platform.async.worker.ResultState;
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 这是一个“向历史妥协”的策略器。以兼容must开关模式。
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/4-下午1:24
|
||||
*/
|
||||
public class DependMustStrategyMapper implements DependenceStrategy {
|
||||
|
||||
private final Set<WorkerWrapper<?, ?>> mustDependSet = new LinkedHashSet<>();
|
||||
|
||||
/**
|
||||
* 在{@link #mustDependSet} 中的must依赖。
|
||||
* <p>
|
||||
* 如果{@code mustDependSet == null || mustDependSet.size() < 1},返回{@link DependenceAction#JUDGE_BY_AFTER}
|
||||
* <p>
|
||||
* 如果所有的Wrapper已经完成,本Wrapper将会开始工作。
|
||||
* <p>
|
||||
* 如果任一{@link #mustDependSet}中的Wrapper失败,则返回{@link DependenceAction#FAST_FAIL}。
|
||||
* 具体超时/异常则根据{@link com.jd.platform.async.worker.ResultState}的值进行判断。
|
||||
* <p>
|
||||
* 如果存在Wrapper未完成 且 所有的Wrapper都未失败,则返回{@link DependenceAction#JUDGE_BY_AFTER}。
|
||||
* </p>
|
||||
*/
|
||||
@Override
|
||||
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
|
||||
WorkerWrapper<?, ?> thisWrapper,
|
||||
WorkerWrapper<?, ?> fromWrapper) {
|
||||
if (mustDependSet.size() < 1) {
|
||||
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
|
||||
}
|
||||
boolean allSuccess = true;
|
||||
for (WorkerWrapper<?, ?> wrapper : mustDependSet) {
|
||||
switch (wrapper.getWorkResult().getResultState()) {
|
||||
case TIMEOUT:
|
||||
return DependenceAction.FAST_FAIL.fastFailException(ResultState.TIMEOUT, null);
|
||||
case EXCEPTION:
|
||||
return DependenceAction.FAST_FAIL.fastFailException(ResultState.EXCEPTION, wrapper.getWorkResult().getEx());
|
||||
case DEFAULT:
|
||||
allSuccess = false;
|
||||
case SUCCESS:
|
||||
default:
|
||||
}
|
||||
}
|
||||
if (allSuccess) {
|
||||
return DependenceAction.START_WORK.emptyProperty();
|
||||
}
|
||||
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
|
||||
}
|
||||
|
||||
/**
|
||||
* 新增must依赖。
|
||||
*
|
||||
* @param mustDependWrapper WorkerWrapper
|
||||
* @return 返回自身
|
||||
*/
|
||||
public DependMustStrategyMapper addDependMust(WorkerWrapper<?, ?> mustDependWrapper) {
|
||||
if (mustDependWrapper == null) {
|
||||
return this;
|
||||
}
|
||||
mustDependSet.add(mustDependWrapper);
|
||||
return this;
|
||||
}
|
||||
|
||||
public DependMustStrategyMapper addDependMust(Collection<WorkerWrapper<?, ?>> wrappers) {
|
||||
if (wrappers == null) {
|
||||
return this;
|
||||
}
|
||||
mustDependSet.addAll(wrappers);
|
||||
return this;
|
||||
}
|
||||
|
||||
public DependMustStrategyMapper addDependMust(WorkerWrapper<?, ?>... wrappers) {
|
||||
if (wrappers == null) {
|
||||
return this;
|
||||
}
|
||||
return addDependMust(Arrays.asList(wrappers));
|
||||
}
|
||||
|
||||
public Set<WorkerWrapper<?, ?>> getMustDependSet() {
|
||||
return mustDependSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DependMustStrategyMapper{" +
|
||||
"mustDependSet::getId=" + mustDependSet.stream().map(WorkerWrapper::getId).collect(Collectors.toList()) +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package com.jd.platform.async.wrapper.actionstrategy;
|
||||
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
|
||||
/**
|
||||
* 单参数策略。
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/1-下午11:16
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface DependWrapperActionStrategy {
|
||||
/**
|
||||
* 仅使用一个参数的判断方法
|
||||
*
|
||||
* @param fromWrapper 调用本Wrapper的上游Wrapper
|
||||
* @return 返回 {@link DependenceAction.WithProperty}
|
||||
*/
|
||||
DependenceAction.WithProperty judge(WorkerWrapper<?, ?> fromWrapper);
|
||||
|
||||
// ========== 送几个供链式调用的默认值 ==========
|
||||
|
||||
/**
|
||||
* 成功时,交给下一个策略器判断。
|
||||
* 未运行时,休息。
|
||||
* 失败时,失败。
|
||||
*/
|
||||
DependWrapperActionStrategy SUCCESS_CONTINUE = new DependWrapperActionStrategy() {
|
||||
@Override
|
||||
public DependenceAction.WithProperty judge(WorkerWrapper<?, ?> ww) {
|
||||
switch (ww.getWorkResult().getResultState()) {
|
||||
case SUCCESS:
|
||||
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
|
||||
case DEFAULT:
|
||||
return DependenceAction.TAKE_REST.emptyProperty();
|
||||
case EXCEPTION:
|
||||
case TIMEOUT:
|
||||
return DependenceAction.FAST_FAIL.fastFailException(ww.getWorkResult().getResultState(), ww.getWorkResult().getEx());
|
||||
default:
|
||||
}
|
||||
throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + ww.getWorkResult().getResultState());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SUCCESS_CONTINUE";
|
||||
}
|
||||
};
|
||||
/**
|
||||
* 成功时,开始工作。
|
||||
* 未运行时,交给下一个策略器判断。
|
||||
* 失败时,失败。
|
||||
*/
|
||||
DependWrapperActionStrategy SUCCESS_START_INIT_CONTINUE = new DependWrapperActionStrategy() {
|
||||
@Override
|
||||
public DependenceAction.WithProperty judge(WorkerWrapper<?, ?> ww) {
|
||||
switch (ww.getWorkResult().getResultState()) {
|
||||
case SUCCESS:
|
||||
return DependenceAction.START_WORK.emptyProperty();
|
||||
case DEFAULT:
|
||||
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
|
||||
case EXCEPTION:
|
||||
case TIMEOUT:
|
||||
return DependenceAction.FAST_FAIL.fastFailException(ww.getWorkResult().getResultState(), ww.getWorkResult().getEx());
|
||||
default:
|
||||
}
|
||||
throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + ww.getWorkResult().getResultState());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SUCCESS_START_INIT_CONTINUE";
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
package com.jd.platform.async.wrapper.actionstrategy;
|
||||
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 对不同的{@link WorkerWrapper}调用者实行个性化依赖响应策略。
|
||||
* <p/>
|
||||
* 使用{@link DependWrapperStrategyMapper}本实现类对{@link DependenceStrategy}进行增强,
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/1-下午11:12
|
||||
*/
|
||||
public class DependWrapperStrategyMapper implements DependenceStrategy {
|
||||
private final Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> mapper = new ConcurrentHashMap<>(4);
|
||||
|
||||
/**
|
||||
* 设置对应策略
|
||||
*
|
||||
* @param targetWrapper 要设置策略的WorkerWrapper
|
||||
* @param strategy 要设置的策略
|
||||
* @return 返回this,链式调用。
|
||||
*/
|
||||
public DependWrapperStrategyMapper putMapping(WorkerWrapper<?, ?> targetWrapper, DependWrapperActionStrategy strategy) {
|
||||
mapper.put(targetWrapper, strategy);
|
||||
toStringCache = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断方法。
|
||||
* <p/>
|
||||
* 如果fromWrapper在{@link #mapper}中,则返回{@link DependWrapperActionStrategy}的判断返回值。否则返回{@link DependenceAction#JUDGE_BY_AFTER}
|
||||
*
|
||||
* @param dependWrappers (这里不会使用该值)thisWrapper.dependWrappers的属性值。
|
||||
* @param thisWrapper (这里不会使用该值)thisWrapper,即为“被催促”的WorkerWrapper
|
||||
* @param fromWrapper 调用来源Wrapper。
|
||||
* @return 如果在mapper中有对fromWrapper的处理策略,则使用其进行判断。否则返回JUDGE_BY_AFTER交给下一个进行判断。
|
||||
*/
|
||||
@Override
|
||||
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
|
||||
WorkerWrapper<?, ?> thisWrapper,
|
||||
WorkerWrapper<?, ?> fromWrapper) {
|
||||
DependWrapperActionStrategy strategy = mapper.get(fromWrapper);
|
||||
if (strategy == null) {
|
||||
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
|
||||
}
|
||||
return strategy.judge(fromWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 缓存toString
|
||||
*/
|
||||
private String toStringCache;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (toStringCache == null) {
|
||||
toStringCache = "DependWrapperStrategyMapper{mapper=" + mapper.entrySet().stream()
|
||||
.map(entry -> "{" + entry.getKey().getId() + ":" + entry.getValue() + "}")
|
||||
.collect(Collectors.toList())
|
||||
+ "}";
|
||||
}
|
||||
return toStringCache;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
package com.jd.platform.async.wrapper.actionstrategy;
|
||||
|
||||
import com.jd.platform.async.worker.ResultState;
|
||||
|
||||
/**
|
||||
* 返回执行工作类型的枚举。
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/1-下午10:47
|
||||
*/
|
||||
public enum DependenceAction {
|
||||
/**
|
||||
* 开始工作。WorkerWrapper会执行工作方法。
|
||||
*/
|
||||
START_WORK,
|
||||
/**
|
||||
* 还没轮到,休息一下。WorkerWrapper中的调用栈会返回,以等待可能发生的下次调用。
|
||||
*/
|
||||
TAKE_REST,
|
||||
/**
|
||||
* 立即失败。WorkerWrapper会去执行快速失败的方法。
|
||||
*/
|
||||
FAST_FAIL,
|
||||
/**
|
||||
* 交给下层{@link DependenceStrategy}进行判断。
|
||||
* 在WorkerWrapper中不需要考虑此值,因为配置正常的情况下不会返回这个值。
|
||||
*/
|
||||
JUDGE_BY_AFTER;
|
||||
|
||||
// 空值单例
|
||||
|
||||
public WithProperty emptyProperty() {
|
||||
return empty;
|
||||
}
|
||||
|
||||
private final WithProperty empty = new WithProperty() {
|
||||
@Override
|
||||
public void setResultState(ResultState resultState) {
|
||||
throw new UnsupportedOperationException("empty not support modify");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFastFailException(Exception fastFailException) {
|
||||
throw new UnsupportedOperationException("empty not support modify");
|
||||
}
|
||||
|
||||
private final String toString = getDependenceAction() + ".empty";
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toString;
|
||||
}
|
||||
};
|
||||
|
||||
// 携带异常信息、ResultState的返回值
|
||||
|
||||
public WithProperty fastFailException(ResultState resultState, Exception e) {
|
||||
WithProperty withProperty = this.new WithProperty();
|
||||
withProperty.setResultState(resultState);
|
||||
withProperty.setFastFailException(e);
|
||||
return withProperty;
|
||||
}
|
||||
|
||||
/**
|
||||
* 有时需要封装一些参数来返回,则使用本内部类进行返回。
|
||||
* <p/>
|
||||
* 所有的构造方法权限均为private,请在父枚举类{@link DependenceAction}的方法中选择合适的模板生成内部类WithProperty。
|
||||
*/
|
||||
public class WithProperty {
|
||||
private ResultState resultState;
|
||||
private Exception fastFailException;
|
||||
|
||||
// getter setter
|
||||
|
||||
public ResultState getResultState() {
|
||||
return resultState;
|
||||
}
|
||||
|
||||
public void setResultState(ResultState resultState) {
|
||||
this.resultState = resultState;
|
||||
}
|
||||
|
||||
public Exception getFastFailException() {
|
||||
return fastFailException;
|
||||
}
|
||||
|
||||
public void setFastFailException(Exception fastFailException) {
|
||||
this.fastFailException = fastFailException;
|
||||
}
|
||||
|
||||
public DependenceAction getDependenceAction() {
|
||||
return DependenceAction.this;
|
||||
}
|
||||
|
||||
// constructor always private.
|
||||
|
||||
private WithProperty() {
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,255 @@
|
||||
package com.jd.platform.async.wrapper.actionstrategy;
|
||||
|
||||
import com.jd.platform.async.wrapper.WrapperEndingInspector;
|
||||
import com.jd.platform.async.worker.ResultState;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 依赖策略接口。
|
||||
* <p/>
|
||||
* 提供了多个默认值可以作为单例模式使用。
|
||||
* <p>
|
||||
* 工作原理示例:
|
||||
* <p>
|
||||
* ==== 一个简单示例 ====
|
||||
* 现有三个WorkerWrapper:A、B、C,其中 {@code A{dependWrappers=[B,C],} }
|
||||
* 当B执行完成后调用A时,根据依赖关系ALL_DEPENDENCIES_ALL_SUCCESS,还需等待C的结果。
|
||||
* 然后,当C执行完成后调用A时,根据依赖关系ALL_DEPENDENCIES_ALL_SUCCESS: 此时如果C成功了,A就开工,此时如果C失败了,A就失败。
|
||||
* ==== 简单示例2 ====
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/1-下午10:48
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface DependenceStrategy {
|
||||
/**
|
||||
* 核心判断策略
|
||||
*
|
||||
* @param dependWrappers thisWrapper.dependWrappers的属性值。
|
||||
* @param thisWrapper thisWrapper,即为“被催促”的WorkerWrapper
|
||||
* @param fromWrapper 调用来源Wrapper。
|
||||
* <p>
|
||||
* 该参数不会为null。
|
||||
* 因为在{@link WorkerWrapper#work(ExecutorService, long, Map, WrapperEndingInspector)}方法中传入的的第一批无依赖的Wrapper,
|
||||
* 不会被该策略器所判断,而是不论如何直接执行。
|
||||
* </p>
|
||||
* @return 返回枚举值内部类,WorkerWrapper将会根据其值来决定自己如何响应这次调用。 {@link DependenceAction.WithProperty}
|
||||
*/
|
||||
DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
|
||||
WorkerWrapper<?, ?> thisWrapper,
|
||||
WorkerWrapper<?, ?> fromWrapper);
|
||||
|
||||
/**
|
||||
* 如果本策略器的judge方法返回了JUDGE_BY_AFTER,则交给下一个策略器来判断。
|
||||
*
|
||||
* @param after 下层策略器
|
||||
* @return 返回一个“封装的多层策略器”
|
||||
*/
|
||||
default DependenceStrategy thenJudge(DependenceStrategy after) {
|
||||
DependenceStrategy that = this;
|
||||
return new DependenceStrategy() {
|
||||
@Override
|
||||
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
|
||||
WorkerWrapper<?, ?> thisWrapper,
|
||||
WorkerWrapper<?, ?> fromWrapper) {
|
||||
DependenceAction.WithProperty judge = that.judgeAction(dependWrappers, thisWrapper, fromWrapper);
|
||||
if (judge.getDependenceAction() == DependenceAction.JUDGE_BY_AFTER) {
|
||||
return after.judgeAction(dependWrappers, thisWrapper, fromWrapper);
|
||||
}
|
||||
return judge;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return that + " ----> " + after;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// ========== 以下是一些默认实现 ==========
|
||||
|
||||
/**
|
||||
* 被依赖的所有Wrapper都必须成功才能开始工作。
|
||||
* 如果其中任一Wrapper还没有执行且不存在失败,则休息。
|
||||
* 如果其中任一Wrapper失败则立即失败。
|
||||
*/
|
||||
DependenceStrategy ALL_DEPENDENCIES_ALL_SUCCESS = new DependenceStrategy() {
|
||||
@Override
|
||||
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
|
||||
WorkerWrapper<?, ?> thisWrapper,
|
||||
WorkerWrapper<?, ?> fromWrapper) {
|
||||
boolean hasWaiting = false;
|
||||
for (final WorkerWrapper<?, ?> dependWrapper : dependWrappers) {
|
||||
WorkResult<?> workResult = dependWrapper.getWorkResult();
|
||||
switch (workResult.getResultState()) {
|
||||
case DEFAULT:
|
||||
hasWaiting = true;
|
||||
break;
|
||||
case SUCCESS:
|
||||
break;
|
||||
case TIMEOUT:
|
||||
case EXCEPTION:
|
||||
return DependenceAction.FAST_FAIL.fastFailException(workResult.getResultState(), workResult.getEx());
|
||||
default:
|
||||
throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + workResult.getResultState());
|
||||
}
|
||||
}
|
||||
if (hasWaiting) {
|
||||
return DependenceAction.TAKE_REST.emptyProperty();
|
||||
}
|
||||
return DependenceAction.START_WORK.emptyProperty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ALL_DEPENDENCIES_ALL_SUCCESS";
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* 被依赖的Wrapper中任意一个成功了就可以开始工作。
|
||||
* 如果其中所有Wrapper还没有执行,则休息。
|
||||
* 如果其中一个Wrapper失败且不存在成功则立即失败。
|
||||
*/
|
||||
DependenceStrategy ALL_DEPENDENCIES_ANY_SUCCESS = new DependenceStrategy() {
|
||||
@Override
|
||||
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
|
||||
WorkerWrapper<?, ?> thisWrapper,
|
||||
WorkerWrapper<?, ?> fromWrapper) {
|
||||
boolean hasFailed = false;
|
||||
Exception fastFailException = null;
|
||||
ResultState resultState = null;
|
||||
for (final WorkerWrapper<?, ?> dependWrapper : dependWrappers) {
|
||||
WorkResult<?> workResult = dependWrapper.getWorkResult();
|
||||
switch (workResult.getResultState()) {
|
||||
case DEFAULT:
|
||||
break;
|
||||
case SUCCESS:
|
||||
return DependenceAction.START_WORK.emptyProperty();
|
||||
case TIMEOUT:
|
||||
case EXCEPTION:
|
||||
resultState = !hasFailed ? workResult.getResultState() : resultState;
|
||||
fastFailException = !hasFailed ? workResult.getEx() : fastFailException;
|
||||
hasFailed = true;
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + workResult.getResultState());
|
||||
}
|
||||
}
|
||||
if (hasFailed) {
|
||||
return DependenceAction.FAST_FAIL.fastFailException(resultState, fastFailException);
|
||||
}
|
||||
return DependenceAction.TAKE_REST.emptyProperty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ALL_DEPENDENCIES_ANY_SUCCESS";
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* 如果被依赖的工作中任一失败,则立即失败。否则就开始工作(不论之前的工作有没有开始)。
|
||||
*/
|
||||
DependenceStrategy ALL_DEPENDENCIES_NONE_FAILED = new DependenceStrategy() {
|
||||
@Override
|
||||
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
|
||||
WorkerWrapper<?, ?> thisWrapper,
|
||||
WorkerWrapper<?, ?> fromWrapper) {
|
||||
for (WorkerWrapper<?, ?> dependWrapper : dependWrappers) {
|
||||
WorkResult<?> workResult = dependWrapper.getWorkResult();
|
||||
switch (workResult.getResultState()) {
|
||||
case TIMEOUT:
|
||||
case EXCEPTION:
|
||||
return DependenceAction.FAST_FAIL.fastFailException(workResult.getResultState(), workResult.getEx());
|
||||
default:
|
||||
}
|
||||
}
|
||||
return DependenceAction.START_WORK.emptyProperty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ALL_DEPENDENCIES_NONE_FAILED";
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* 只有当指定的这些Wrapper都成功时,才会开始工作。
|
||||
* 任一失败会快速失败。
|
||||
* 任一还没有执行且不存在失败,则休息。
|
||||
*
|
||||
* @param theseWrapper 该方法唯一有效参数。
|
||||
* @return 返回生成的 {@link DependenceAction.WithProperty)
|
||||
*/
|
||||
static DependenceStrategy theseWrapperAllSuccess(Set<WorkerWrapper<?,?>> theseWrapper) {
|
||||
return new DependenceStrategy() {
|
||||
private final Set<WorkerWrapper<?, ?>> theseWrappers;
|
||||
private final String toString;
|
||||
|
||||
{
|
||||
theseWrappers = Collections.unmodifiableSet(theseWrapper);
|
||||
toString = "THESE_WRAPPER_MUST_SUCCESS:" + theseWrappers.stream().map(WorkerWrapper::getId).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
|
||||
WorkerWrapper<?, ?> thisWrapper,
|
||||
WorkerWrapper<?, ?> fromWrapper) {
|
||||
boolean hasWaiting = false;
|
||||
for (WorkerWrapper<?, ?> wrapper : theseWrappers) {
|
||||
ResultState resultState = wrapper.getWorkResult().getResultState();
|
||||
switch (resultState) {
|
||||
case DEFAULT:
|
||||
hasWaiting = true;
|
||||
break;
|
||||
case SUCCESS:
|
||||
break;
|
||||
case TIMEOUT:
|
||||
case EXCEPTION:
|
||||
return DependenceAction.FAST_FAIL.fastFailException(resultState, wrapper.getWorkResult().getEx());
|
||||
default:
|
||||
throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + resultState);
|
||||
}
|
||||
}
|
||||
if (hasWaiting) {
|
||||
return DependenceAction.TAKE_REST.emptyProperty();
|
||||
}
|
||||
return DependenceAction.START_WORK.emptyProperty();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toString;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
DependenceStrategy IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY = new DependenceStrategy() {
|
||||
@Override
|
||||
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
|
||||
WorkerWrapper<?, ?> thisWrapper,
|
||||
WorkerWrapper<?, ?> fromWrapper) {
|
||||
DependMustStrategyMapper mustMapper = thisWrapper.getWrapperStrategy().getDependMustStrategyMapper();
|
||||
if (mustMapper != null && !mustMapper.getMustDependSet().isEmpty()) {
|
||||
// 至少有一个must,则因为must未完全完成而等待。
|
||||
return DependenceAction.TAKE_REST.emptyProperty();
|
||||
}
|
||||
// 如果一个must也没有,则认为应该是ANY模式。
|
||||
return DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS.judgeAction(dependWrappers, thisWrapper, fromWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY";
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
@@ -0,0 +1,183 @@
|
||||
package com.jd.platform.async.wrapper.skipstrategy;
|
||||
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author create by TcSnZh on 2021/5/6-下午3:02
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface SkipStrategy {
|
||||
/**
|
||||
* 跳过策略函数。返回true将会使WorkerWrapper跳过执行。
|
||||
*
|
||||
* @param nextWrappers 下游WrapperSet
|
||||
* @param thisWrapper 本WorkerWrapper
|
||||
* @param fromWrapper 呼叫本Wrapper的上游Wrapper
|
||||
* @return 返回true将会使WorkerWrapper跳过执行。
|
||||
*/
|
||||
boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper);
|
||||
|
||||
/**
|
||||
* 不跳过
|
||||
*/
|
||||
SkipStrategy NOT_SKIP = new SkipStrategy() {
|
||||
@Override
|
||||
public boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "NOT_SKIP";
|
||||
}
|
||||
};
|
||||
|
||||
SkipStrategy CHECK_ONE_LEVEL = new SkipStrategy() {
|
||||
private final SkipStrategy searchNextOneLevel = searchNextWrappers(SearchNextWrappers.SearchType.DFS, 1);
|
||||
|
||||
@Override
|
||||
public boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
|
||||
return searchNextOneLevel.shouldSkip(nextWrappers, thisWrapper, fromWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CHECK_ONE_LEVEL";
|
||||
}
|
||||
};
|
||||
|
||||
default SearchNextWrappers searchNextWrappers(SearchNextWrappers.SearchType searchType, int searchLevel) {
|
||||
return new SearchNextWrappers(searchType, searchLevel);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查之后的Wrapper是否不在INIT状态
|
||||
*/
|
||||
class SearchNextWrappers implements SkipStrategy {
|
||||
/**
|
||||
* 搜索策略
|
||||
*/
|
||||
enum SearchType {
|
||||
DFS, BFS;
|
||||
}
|
||||
|
||||
private final SearchType searchType;
|
||||
|
||||
/**
|
||||
* 搜索深度
|
||||
*/
|
||||
private final int searchLevel;
|
||||
|
||||
public SearchNextWrappers(SearchType searchType, int searchLevel) {
|
||||
this.searchType = Objects.requireNonNull(searchType);
|
||||
this.searchLevel = searchLevel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
|
||||
Set<WorkerWrapper<?, ?>> nextSet;
|
||||
if ((nextSet = nextWrappers) == null || nextSet.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
switch (searchType) {
|
||||
case DFS:
|
||||
return nextSet.stream().allMatch(next ->
|
||||
next.getState() != WorkerWrapper.INIT || dfsSearchShouldSkip(next, 1));
|
||||
case BFS:
|
||||
LinkedList<BfsNode> queue = nextSet.stream().map(ww -> new BfsNode(ww, 0)).collect(Collectors.toCollection(LinkedList::new));
|
||||
HashSet<WorkerWrapper<?, ?>> existed = new HashSet<>(nextSet);
|
||||
while (!queue.isEmpty()) {
|
||||
BfsNode node = queue.poll();
|
||||
if (node.atLevel > searchLevel) {
|
||||
continue;
|
||||
}
|
||||
if (node.wrapper.getState() != WorkerWrapper.INIT) {
|
||||
return true;
|
||||
}
|
||||
if (node.atLevel < searchLevel) {
|
||||
// 如果不是深度的最大值,则往队列里添加
|
||||
node.wrapper.getNextWrappers().forEach(nextWrapper -> {
|
||||
if (existed.contains(nextWrapper)) {
|
||||
return;
|
||||
}
|
||||
queue.offer(new BfsNode(nextWrapper, node.atLevel + 1));
|
||||
existed.add(nextWrapper);
|
||||
});
|
||||
}
|
||||
}
|
||||
return false;
|
||||
default:
|
||||
throw new IllegalStateException("searchType type illegal : " + searchType);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean dfsSearchShouldSkip(WorkerWrapper<?, ?> currentWrapper, int currentLevel) {
|
||||
if (currentLevel + 1 > searchLevel || currentWrapper == null) {
|
||||
return false;
|
||||
}
|
||||
for (WorkerWrapper<?, ?> nextWrapper : currentWrapper.getNextWrappers()) {
|
||||
if (nextWrapper != null &&
|
||||
(nextWrapper.getState() != WorkerWrapper.INIT
|
||||
|| dfsSearchShouldSkip(nextWrapper, currentLevel + 1))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static class BfsNode {
|
||||
final WorkerWrapper<?, ?> wrapper;
|
||||
final int atLevel;
|
||||
|
||||
public BfsNode(WorkerWrapper<?, ?> wrapper, int atLevel) {
|
||||
this.wrapper = wrapper;
|
||||
this.atLevel = atLevel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
BfsNode bfsNode = (BfsNode) o;
|
||||
return Objects.equals(wrapper, bfsNode.wrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return wrapper.hashCode();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
SearchNextWrappers that = (SearchNextWrappers) o;
|
||||
return searchLevel == that.searchLevel && searchType == that.searchType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return searchLevel ^ searchType.ordinal();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CheckNextWrapper{" +
|
||||
"searchType=" + searchType +
|
||||
", searchLevel=" + searchLevel +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package dependnew;
|
||||
package beforev14.depend;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -11,7 +11,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class DeWorker implements IWorker<String, User>, ICallback<String, User> {
|
||||
class DeWorker implements IWorker<String, User>, ICallback<String, User> {
|
||||
|
||||
@Override
|
||||
public User action(String object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package depend;
|
||||
package beforev14.depend;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -11,7 +11,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class DeWorker1 implements IWorker<WorkResult<User>, User>, ICallback<WorkResult<User>, User> {
|
||||
class DeWorker1 implements IWorker<WorkResult<User>, User>, ICallback<WorkResult<User>, User> {
|
||||
|
||||
@Override
|
||||
public User action(WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package depend;
|
||||
package beforev14.depend;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -11,7 +11,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class DeWorker2 implements IWorker<WorkResult<User>, String>, ICallback<WorkResult<User>, String> {
|
||||
class DeWorker2 implements IWorker<WorkResult<User>, String>, ICallback<WorkResult<User>, String> {
|
||||
|
||||
@Override
|
||||
public String action(WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package depend;
|
||||
package beforev14.depend;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@@ -10,7 +10,7 @@ import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
* @author sjsdfg
|
||||
* @since 2020/6/14
|
||||
*/
|
||||
public class LambdaTest {
|
||||
class LambdaTest {
|
||||
public static void main(String[] args) throws Exception {
|
||||
WorkerWrapper<WorkResult<User>, String> workerWrapper2 = new WorkerWrapper.Builder<WorkResult<User>, String>()
|
||||
.worker((WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) -> {
|
||||
@@ -1,4 +1,4 @@
|
||||
package depend;
|
||||
package beforev14.depend;
|
||||
|
||||
import com.jd.platform.async.executor.Async;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
@@ -12,7 +12,7 @@ import java.util.concurrent.ExecutionException;
|
||||
* @author wuweifeng wrote on 2019-12-26
|
||||
* @version 1.0
|
||||
*/
|
||||
public class Test {
|
||||
class Test {
|
||||
|
||||
public static void main(String[] args) throws ExecutionException, InterruptedException {
|
||||
DeWorker w = new DeWorker();
|
||||
@@ -1,11 +1,11 @@
|
||||
package depend;
|
||||
package beforev14.depend;
|
||||
|
||||
/**
|
||||
* 一个包装类
|
||||
* @author wuweifeng wrote on 2019-12-26
|
||||
* @version 1.0
|
||||
*/
|
||||
public class User {
|
||||
class User {
|
||||
private String name;
|
||||
|
||||
public User(String name) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package depend;
|
||||
package beforev14.dependnew;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -11,7 +11,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class DeWorker implements IWorker<String, User>, ICallback<String, User> {
|
||||
class DeWorker implements IWorker<String, User>, ICallback<String, User> {
|
||||
|
||||
@Override
|
||||
public User action(String object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package dependnew;
|
||||
package beforev14.dependnew;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -11,7 +11,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class DeWorker1 implements IWorker<String, User>, ICallback<String, User> {
|
||||
class DeWorker1 implements IWorker<String, User>, ICallback<String, User> {
|
||||
|
||||
@Override
|
||||
public User action(String object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package dependnew;
|
||||
package beforev14.dependnew;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -11,7 +11,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class DeWorker2 implements IWorker<User, String>, ICallback<User, String> {
|
||||
class DeWorker2 implements IWorker<User, String>, ICallback<User, String> {
|
||||
|
||||
@Override
|
||||
public String action(User object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package dependnew;
|
||||
package beforev14.dependnew;
|
||||
|
||||
import com.jd.platform.async.executor.Async;
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
@@ -11,7 +11,7 @@ import java.util.concurrent.ExecutionException;
|
||||
* @author wuweifeng wrote on 2019-12-26
|
||||
* @version 1.0
|
||||
*/
|
||||
public class Test {
|
||||
class Test {
|
||||
|
||||
public static void main(String[] args) throws ExecutionException, InterruptedException {
|
||||
DeWorker w = new DeWorker();
|
||||
@@ -1,11 +1,11 @@
|
||||
package dependnew;
|
||||
package beforev14.dependnew;
|
||||
|
||||
/**
|
||||
* 一个包装类
|
||||
* @author wuweifeng wrote on 2019-12-26
|
||||
* @version 1.0
|
||||
*/
|
||||
public class User {
|
||||
class User {
|
||||
private String name;
|
||||
|
||||
public User(String name) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package parallel;
|
||||
package beforev14.parallel;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -12,7 +12,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class ParTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
|
||||
class ParTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
|
||||
|
||||
@Override
|
||||
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package parallel;
|
||||
package beforev14.parallel;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -12,7 +12,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class ParWorker implements IWorker<String, String>, ICallback<String, String> {
|
||||
class ParWorker implements IWorker<String, String>, ICallback<String, String> {
|
||||
|
||||
@Override
|
||||
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package parallel;
|
||||
package beforev14.parallel;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -12,7 +12,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class ParWorker1 implements IWorker<String, String>, ICallback<String, String> {
|
||||
class ParWorker1 implements IWorker<String, String>, ICallback<String, String> {
|
||||
private long sleepTime = 1000;
|
||||
|
||||
public void setSleepTime(long sleepTime) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package parallel;
|
||||
package beforev14.parallel;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -12,7 +12,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class ParWorker2 implements IWorker<String, String>, ICallback<String, String> {
|
||||
class ParWorker2 implements IWorker<String, String>, ICallback<String, String> {
|
||||
private long sleepTime = 1000;
|
||||
|
||||
public void setSleepTime(long sleepTime) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package parallel;
|
||||
package beforev14.parallel;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -12,7 +12,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class ParWorker3 implements IWorker<String, String>, ICallback<String, String> {
|
||||
class ParWorker3 implements IWorker<String, String>, ICallback<String, String> {
|
||||
private long sleepTime = 1000;
|
||||
|
||||
public void setSleepTime(long sleepTime) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package parallel;
|
||||
package beforev14.parallel;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -12,7 +12,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class ParWorker4 implements IWorker<String, String>, ICallback<String, String> {
|
||||
class ParWorker4 implements IWorker<String, String>, ICallback<String, String> {
|
||||
|
||||
@Override
|
||||
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package parallel;
|
||||
package beforev14.parallel;
|
||||
|
||||
|
||||
import com.jd.platform.async.executor.Async;
|
||||
@@ -14,7 +14,7 @@ import java.util.concurrent.Executors;
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
@SuppressWarnings("ALL")
|
||||
public class TestPar {
|
||||
class TestPar {
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
// testNormal();
|
||||
@@ -1,4 +1,4 @@
|
||||
package seq;
|
||||
package beforev14.seq;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -12,7 +12,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class SeqTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
|
||||
class SeqTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
|
||||
|
||||
@Override
|
||||
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package seq;
|
||||
package beforev14.seq;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -12,7 +12,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class SeqWorker implements IWorker<String, String>, ICallback<String, String> {
|
||||
class SeqWorker implements IWorker<String, String>, ICallback<String, String> {
|
||||
|
||||
@Override
|
||||
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package seq;
|
||||
package beforev14.seq;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -12,7 +12,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class SeqWorker1 implements IWorker<String, String>, ICallback<String, String> {
|
||||
class SeqWorker1 implements IWorker<String, String>, ICallback<String, String> {
|
||||
|
||||
@Override
|
||||
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package seq;
|
||||
package beforev14.seq;
|
||||
|
||||
|
||||
import com.jd.platform.async.callback.ICallback;
|
||||
@@ -12,7 +12,7 @@ import java.util.Map;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class SeqWorker2 implements IWorker<String, String>, ICallback<String, String> {
|
||||
class SeqWorker2 implements IWorker<String, String>, ICallback<String, String> {
|
||||
|
||||
@Override
|
||||
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package seq;
|
||||
package beforev14.seq;
|
||||
|
||||
|
||||
import com.jd.platform.async.executor.Async;
|
||||
@@ -11,7 +11,7 @@ import java.util.concurrent.ExecutionException;
|
||||
* 串行测试
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
public class TestSequential {
|
||||
class TestSequential {
|
||||
public static void main(String[] args) throws InterruptedException, ExecutionException {
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package seq;
|
||||
package beforev14.seq;
|
||||
|
||||
|
||||
import com.jd.platform.async.executor.Async;
|
||||
@@ -12,7 +12,7 @@ import java.util.concurrent.ExecutionException;
|
||||
* @author wuweifeng wrote on 2019-11-20.
|
||||
*/
|
||||
@SuppressWarnings("Duplicates")
|
||||
public class TestSequentialTimeout {
|
||||
class TestSequentialTimeout {
|
||||
public static void main(String[] args) throws InterruptedException, ExecutionException {
|
||||
testFirstTimeout();
|
||||
}
|
||||
272
src/test/java/v15/dependnew/Test.java
Normal file
272
src/test/java/v15/dependnew/Test.java
Normal file
@@ -0,0 +1,272 @@
|
||||
package v15.dependnew;
|
||||
|
||||
import com.jd.platform.async.callback.IWorker;
|
||||
import com.jd.platform.async.executor.Async;
|
||||
import com.jd.platform.async.executor.timer.SystemClock;
|
||||
import com.jd.platform.async.worker.ResultState;
|
||||
import com.jd.platform.async.wrapper.actionstrategy.DependenceAction;
|
||||
import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy;
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
import com.jd.platform.async.wrapper.WorkerWrapperBuilder;
|
||||
import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
/**
|
||||
* @author create by TcSnZh on 2021/5/2-下午9:25
|
||||
*/
|
||||
class Test {
|
||||
public static void main(String[] args) throws ExecutionException, InterruptedException {
|
||||
// ExecutorService pool = Executors.newFixedThreadPool(3);
|
||||
ExecutorService pool = Async.getCommonPool();
|
||||
try {
|
||||
// 先随便找个任务让线程池跑一把,把线程用一下,后面的测试效果会明显一点
|
||||
testNew2(pool);
|
||||
System.out.println("\n\n\n");
|
||||
|
||||
testNew1(pool);
|
||||
System.out.println("\n\n\n");
|
||||
|
||||
testNew2(pool);
|
||||
System.out.println("\n\n\n");
|
||||
|
||||
testThreadPolling_Speed(pool);
|
||||
System.out.println("\n\n\n");
|
||||
|
||||
testThreadPolling_V14Bug();
|
||||
System.out.println("\n\n\n");
|
||||
|
||||
testTimeOut(pool);
|
||||
|
||||
} finally {
|
||||
//Async.shutDownCommonPool();
|
||||
pool.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 简简单单的测试一下新的编排方式
|
||||
* <p>
|
||||
* .A ===> B1 ===> C1 ----> D1
|
||||
* . ||> B2 | || \--> D2
|
||||
* . ||> B3 | ``========v
|
||||
* . ||> B4 |---> C2 ====> E1
|
||||
* . \--> E2
|
||||
*/
|
||||
private static void testNew1(ExecutorService pool) throws ExecutionException, InterruptedException {
|
||||
System.out.println("测试新的builder Api");
|
||||
WorkerWrapper<Object, Object> a = testBuilder("A")
|
||||
.build();
|
||||
WorkerWrapper<Object, Object> b1 = testBuilder("B1").depends(a).build();
|
||||
WorkerWrapper<Object, Object> b2 = testBuilder("B2").depends(a).build();
|
||||
WorkerWrapper<Object, Object> b3 = testBuilder("B3").depends(a).build();
|
||||
WorkerWrapper<Object, Object> b4 = testBuilder("B4").depends(a).build();
|
||||
WorkerWrapper<Object, Object> c1 = testBuilder("C1")
|
||||
.depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, b1, b2, b3, b4)
|
||||
.nextOf(testBuilder("D1").build(),
|
||||
testBuilder("D2").build())
|
||||
.build();
|
||||
WorkerWrapper<Object, Object> c2 = testBuilder("C2")
|
||||
.depends(DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS, b1, b2, b3, b4)
|
||||
.nextOf(testBuilder("E1").depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, c1).build(),
|
||||
testBuilder("E2").build())
|
||||
.build();
|
||||
Async.beginWork(2000, pool, a);
|
||||
logAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试船新的编排方式的花里胡哨的玩法。
|
||||
* A => {B1 ~ B10} >>> C
|
||||
* <p>
|
||||
* C仅需要b1-b10中任意3个Worker工作完成即可启动。
|
||||
* (不过C不一定一定在3个完成后启动,具体还要看线程池属性与线程抢占的顺序,线程池线程数小一点的话更容易让C早日执行)
|
||||
* </p>
|
||||
*/
|
||||
private static void testNew2(ExecutorService pool) throws ExecutionException, InterruptedException {
|
||||
System.out.println("测试10个B中成功三个才能执行C");
|
||||
WorkerWrapper<Object, Object> a = testBuilder("A").build();
|
||||
ArrayList<WorkerWrapper> bList = new ArrayList<>();
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
bList.add(testBuilder("B" + i).depends(a).build());
|
||||
}
|
||||
WorkerWrapper<Object, Object> c = testBuilder("C")
|
||||
.setDepend().strategy((dependWrappers, thisWrapper, fromWrapper) -> {
|
||||
if (dependWrappers.stream()
|
||||
.filter(w -> w.getWorkResult().getResultState() == ResultState.SUCCESS).count() >= 3) {
|
||||
return DependenceAction.START_WORK.emptyProperty();
|
||||
} else {
|
||||
return DependenceAction.TAKE_REST.emptyProperty();
|
||||
}
|
||||
}).wrapper(bList).end().build();
|
||||
Async.beginWork(2000, pool, a);
|
||||
logAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试线程轮询的效率
|
||||
*/
|
||||
private static void testThreadPolling_Speed(ExecutorService pool) throws InterruptedException {
|
||||
int MAX = 1000;
|
||||
Collection<WorkerWrapper<?, ?>> wrappers = new ArrayList<>(MAX);
|
||||
AtomicLong a = new AtomicLong(0);
|
||||
for (int i = 0; i < MAX; i++) {
|
||||
WorkerWrapperBuilder<Void, Void> builder = WorkerWrapper.<Void, Void>builder()
|
||||
.id(String.valueOf(i))
|
||||
// 拷贝数组测试,每次在数组最后加一个递增的值+1的数
|
||||
.worker((object, allWrappers) -> {
|
||||
for (int j = 0; j < 100000; j++) {
|
||||
a.incrementAndGet();
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.setSkipStrategy(SkipStrategy.NOT_SKIP);
|
||||
wrappers.add(builder.build());
|
||||
}
|
||||
long t1 = SystemClock.now();
|
||||
PrintStream out = Async.beginWork(10000, pool, wrappers) ? System.out : System.err;
|
||||
out.println("无依赖任务的测试:\n1000个wrapper对AtomicLong分别自增100000次,耗时 : " + (SystemClock.now() - t1) + "ms a=" + a.get());
|
||||
WorkerWrapper.<Void, Integer>builder();
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况:
|
||||
* <p>
|
||||
* A(5ms)--B1(10ms) ---|--> C1(5ms)
|
||||
* . \ | (B1、B2全部完成可执行C1、C2)
|
||||
* . ---> B2(20ms) --|--> C2(5ms)
|
||||
*/
|
||||
private static void testThreadPolling_V14Bug() throws ExecutionException, InterruptedException {
|
||||
System.out.println("以下代码可复制到v1.4,复现线程耗尽bug : ");
|
||||
BiFunction<String, Long, IWorker<Void, Void>> sleepWork = (id, time) -> (IWorker<Void, Void>) (object, allWrappers) -> {
|
||||
try {
|
||||
System.out.println("wrapper.id=" + id + " before sleep");
|
||||
Thread.sleep(time);
|
||||
System.out.println("wrapper.id=" + id + " after sleep");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
};
|
||||
WorkerWrapper<Void, Void> a = new WorkerWrapper.Builder<Void, Void>()
|
||||
.id("A")
|
||||
.worker(sleepWork.apply("A", 5L))
|
||||
.build();
|
||||
WorkerWrapper.Builder<Void, Void> cBuilder = new WorkerWrapper.Builder<Void, Void>()
|
||||
.depend(new WorkerWrapper.Builder<Void, Void>()
|
||||
.id("B1")
|
||||
.worker(sleepWork.apply("B1", 10L))
|
||||
.depend(a)
|
||||
.build())
|
||||
.depend(new WorkerWrapper.Builder<Void, Void>()
|
||||
.id("B2")
|
||||
.worker(sleepWork.apply("B2", 10L))
|
||||
.depend(a)
|
||||
.build());
|
||||
cBuilder.id("C1").worker(sleepWork.apply("C1", 5L)).build();
|
||||
cBuilder.id("C2").worker(sleepWork.apply("C2", 5L)).build();
|
||||
ExecutorService pool = Executors.newFixedThreadPool(2);
|
||||
try {
|
||||
Async.beginWork(100, pool, a);
|
||||
} finally {
|
||||
pool.shutdown();
|
||||
}
|
||||
System.out.println(a.getNextWrappers());
|
||||
}
|
||||
|
||||
/**
|
||||
* 超时测试
|
||||
*/
|
||||
private static void testTimeOut(ExecutorService pool) throws ExecutionException, InterruptedException {
|
||||
System.out.println("超时测试:");
|
||||
System.err.println("如果抛出" + InterruptedException.class.getName() + "异常,则打断线程成功");
|
||||
WorkerWrapper<Object, Object> a = testBuilder("A")
|
||||
// B1、B2不超时
|
||||
.nextOf(testBuilder("B1", 100).timeout(150, TimeUnit.MILLISECONDS).build())
|
||||
.nextOf(testBuilder("B2", 100).build())
|
||||
// B3单wrapper超时
|
||||
.nextOf(testBuilder("B3", 200).timeout(150, TimeUnit.MILLISECONDS).build())
|
||||
// B4、B5总任务超时
|
||||
.nextOf(testBuilder("B4", 250).build())
|
||||
.nextOf(testBuilder("B5", 250)
|
||||
.setTimeOut().enableTimeOut(true).setTime(300, TimeUnit.MILLISECONDS).allowInterrupt(false).end()
|
||||
.build())
|
||||
// 测试打断B6线程
|
||||
.nextOf(testBuilder("B6", 250).timeout(true, 150, TimeUnit.MILLISECONDS, true).build())
|
||||
.build();
|
||||
long t1 = SystemClock.now();
|
||||
boolean success = Async.beginWork(200, pool, a);
|
||||
System.out.println("time=" + (SystemClock.now() - t1) + ", success=" + success);
|
||||
a.getNextWrappers().forEach(System.out::println);
|
||||
logAll();
|
||||
}
|
||||
|
||||
// ========== util method ==========
|
||||
|
||||
static final AtomicInteger count = new AtomicInteger(1);
|
||||
static final AtomicReference<ConcurrentHashMap<Integer, String>> logger = new AtomicReference<>(new ConcurrentHashMap<>());
|
||||
|
||||
static WorkerWrapperBuilder<Object, Object> testBuilder(String id) {
|
||||
return testBuilder(id, -1);
|
||||
}
|
||||
|
||||
static WorkerWrapperBuilder<Object, Object> testBuilder(String id, long sleepTime) {
|
||||
return WorkerWrapper.builder()
|
||||
.id(id)
|
||||
.worker((param, allWrap) -> {
|
||||
logger.get().put(count.getAndIncrement(), id + " working ");
|
||||
if (sleepTime >= 0) {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
Thread.sleep(sleepTime / 10);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
return "I am success";
|
||||
}).callback((success, param, workResult) -> {
|
||||
String str = " " + id + " callback " + workResult.getResultState();
|
||||
switch (workResult.getResultState()) {
|
||||
case SUCCESS:
|
||||
str += " getResult = " + workResult.getResult();
|
||||
break;
|
||||
case TIMEOUT:
|
||||
case EXCEPTION:
|
||||
str += " getEx = " + workResult.getEx();
|
||||
break;
|
||||
case DEFAULT:
|
||||
throw new RuntimeException();
|
||||
}
|
||||
logger.get().put(count.getAndIncrement(), str);
|
||||
});
|
||||
}
|
||||
|
||||
static void logAll() {
|
||||
TreeMap<Integer, String> map = new TreeMap<>(Integer::compare);
|
||||
map.putAll(logger.get());
|
||||
StringBuilder sb = new StringBuilder(30);
|
||||
map.forEach((count, str) -> {
|
||||
sb.append('(').append(count).append(')');
|
||||
if (count < 10) {
|
||||
sb.append(' ');
|
||||
}
|
||||
sb.append(" ").append(str).append('\n');
|
||||
});
|
||||
System.out.println("--------------------------------\n" + sb);
|
||||
logger.set(new ConcurrentHashMap<>());
|
||||
count.set(1);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user