v1.5魔改。

更新内容:
1.将WorkerWrapper执行结束由回调改成轮询,以防止线程耗尽bug。
2.修改线程编排模型,用策略器代替了僵硬的must开关与needCheckNextWrapperResult。
3.清理WorkerWrapper代码中的屎山。
4.以上魔改均兼容v1.4旧版本
This commit is contained in:
TcSnZh 2021-05-07 21:56:16 +08:00
parent f323f37ded
commit c240a1b075
42 changed files with 2638 additions and 643 deletions

View File

@ -54,7 +54,7 @@ public interface IWorker<T, V> {
* @param object
* object
*/
V action(T object, Map<String, WorkerWrapper> allWrappers);
V dependAction(T object, Map<String, WorkerWrapper> allWrappers);
/**
* 超时、异常时,返回的默认值
@ -113,7 +113,7 @@ wrapper的泛型和worker的一样决定了入参和结果的类型。
public class ParWorker1 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
public String dependAction(String object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {

View File

@ -8,7 +8,7 @@
If you just need to use this frame, please look down. If you need to have a deep understanding of how this framework is implemented step by step, from receiving the requirements to thinking about each step, why each class is so designed, why there are these methods, that is, how to develop this framework from 0 to 1, I opened a column in [CSDN] (https://blog.csdn.net/tianaleixiaowu/category_. HTML) to talk about how middleware is developed from 0, including and Not limited to this small frame. JD internal colleagues can search my ERP on CF and see it.
If you just need to use this frame, please look down. If you need to have a deep understanding of how this framework is implemented step by step, from receiving the requirements to thinking about each step, why each class is so designed, why there are these methods, that is, how to develop this framework from 0 to 1, I opened a column in [CSDN] (https://blog.csdn.net/tianaleixiaowu/category_. HTML) to talk about how middleware is developed from 0, including and Not limited to this small frame. JD internal colleagues can searchType my ERP on CF and see it.

View File

@ -1,21 +1,28 @@
package com.jd.platform.async.callback;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.worker.WorkResult;
/**
* 默认回调类如果不设置的话会默认给这个回调
*
* @author wuweifeng wrote on 2019-11-19.
*/
public class DefaultCallback<T, V> implements ICallback<T, V> {
@Override
public void begin() {
}
/**
* 默认将打印存在的非{@link com.jd.platform.async.exception.SkippedException}的异常
*/
@Override
public void result(boolean success, T param, WorkResult<V> workResult) {
Exception ex = workResult.getEx();
if (ex != null && !(ex instanceof SkippedException)) {
ex.printStackTrace();
}
}
}

View File

@ -21,6 +21,8 @@ public interface ICallback<T, V> {
/**
* 耗时操作执行完毕后就给value注入值
* <p/>
* 只要Wrapper被调用后成功或失败/超时该方法都会被执行
*/
void result(boolean success, T param, WorkResult<V> workResult);
}

View File

@ -3,68 +3,63 @@ package com.jd.platform.async.executor;
import com.jd.platform.async.callback.DefaultGroupCallback;
import com.jd.platform.async.callback.IGroupCallback;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* 类入口可以根据自己情况调整core线程的数量
* 核心工具类
*
* @author wuweifeng wrote on 2019-12-18
* @version 1.0
*/
public class Async {
/**
* 默认线程池
*/
private static final ThreadPoolExecutor COMMON_POOL =
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024,
15L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
(ThreadFactory) Thread::new);
/**
* 注意这里是个static也就是只能有一个线程池用户自定义线程池时也只能定义一个
*/
private static ExecutorService executorService;
// ========================= 任务执行核心代码 =========================
/**
* 出发点
*
* @return 只要执行未超时就返回true
*/
public static boolean beginWork(long timeout, ExecutorService executorService, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException {
if(workerWrappers == null || workerWrappers.size() == 0) {
public static boolean beginWork(long timeout,
ExecutorService executorService,
Collection<? extends WorkerWrapper> workerWrappers)
throws ExecutionException, InterruptedException {
if (workerWrappers == null || workerWrappers.size() == 0) {
return false;
}
//保存线程池变量
Async.executorService = executorService;
//保存上次执行的线程池变量为了兼容以前的旧功能
Async.lastExecutorService = Objects.requireNonNull(executorService, "ExecutorService is null ! ");
//定义一个map存放所有的wrapperkey为wrapper的唯一idvalue是该wrapper可以从value中获取wrapper的result
Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>();
CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
for (int i = 0; i < workerWrappers.size(); i++) {
WorkerWrapper wrapper = workerWrappers.get(i);
futures[i] = CompletableFuture.runAsync(() -> wrapper.work(executorService, timeout, forParamUseWrappers), executorService);
}
try {
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
return true;
} catch (TimeoutException e) {
Set<WorkerWrapper> set = new HashSet<>();
totalWorkers(workerWrappers, set);
for (WorkerWrapper wrapper : set) {
wrapper.stopNow();
final ConcurrentMap<String, WorkerWrapper> forParamUseWrappers =
new ConcurrentHashMap<>(Math.max(workerWrappers.size() * 3, 8));
final WrapperEndingInspector inspector = new WrapperEndingInspector(SystemClock.now() + timeout);
inspector.addWrapper(workerWrappers);
workerWrappers.forEach(wrapper -> {
if (wrapper == null) {
return;
}
return false;
}
executorService.submit(() -> wrapper.work(executorService, timeout, forParamUseWrappers, inspector));
});
inspector.registerToPollingCenter();
return inspector.await();
//处理超时的逻辑被移动到了WrapperEndingInspector中
}
/**
* 如果想自定义线程池请传pool不自定义的话就走默认的COMMON_POOL
*/
public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
if(workerWrapper == null || workerWrapper.length == 0) {
public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
throws ExecutionException, InterruptedException {
if (workerWrapper == null || workerWrapper.length == 0) {
return false;
}
List<WorkerWrapper> workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toList());
Set<WorkerWrapper> workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet());
return beginWork(timeout, executorService, workerWrappers);
}
@ -72,11 +67,11 @@ public class Async {
* 同步阻塞,直到所有都完成,或失败
*/
public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
return beginWork(timeout, COMMON_POOL, workerWrapper);
return beginWork(timeout, getCommonPool(), workerWrapper);
}
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
beginWorkAsync(timeout, COMMON_POOL, groupCallback, workerWrapper);
beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper);
}
/**
@ -102,9 +97,10 @@ public class Async {
}
});
} else {
COMMON_POOL.submit(() -> {
final ExecutorService commonPool = getCommonPool();
commonPool.submit(() -> {
try {
boolean success = beginWork(timeout, COMMON_POOL, workerWrapper);
boolean success = beginWork(timeout, commonPool, workerWrapper);
if (success) {
finalGroupCallback.success(Arrays.asList(workerWrapper));
} else {
@ -119,43 +115,107 @@ public class Async {
}
// ========================= 设置/属性选项 =========================
/**
* 总共多少个执行单元
* 默认线程池
* <p>
* 在v1.4及之前该COMMON_POLL是被写死的
* <p>
* 自v1.5后
* 该线程池会被懒加载
* 该线程池将会给线程取名为asyncTool-commonPool-thread-0数字不重复
* </p>
*/
@SuppressWarnings("unchecked")
private static void totalWorkers(List<WorkerWrapper> workerWrappers, Set<WorkerWrapper> set) {
set.addAll(workerWrappers);
for (WorkerWrapper wrapper : workerWrappers) {
if (wrapper.getNextWrappers() == null) {
continue;
private static ThreadPoolExecutor COMMON_POOL;
/**
* 在以前及现在的版本中
* 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时ExecutorService将会被记录下来
* <p/>
* 注意这里是个static也就是只能有一个线程池用户自定义线程池时也只能定义一个
*/
private static volatile ExecutorService lastExecutorService;
public static ThreadPoolExecutor getCommonPool() {
if (COMMON_POOL == null) {
synchronized (Async.class) {
if (COMMON_POOL == null) {
COMMON_POOL = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
1024,
15L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactory() {
private final AtomicInteger threadCount = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "asyncTool-commonPool-thread-" + threadCount.getAndIncrement());
t.setDaemon(true);
return t;
}
@Override
public String toString() {
return "asyncTool-commonPool-threadFactory";
}
}
) {
@Override
public String toString() {
return "asyncTool-commonPool";
}
};
}
}
List<WorkerWrapper> wrappers = wrapper.getNextWrappers();
totalWorkers(wrappers, set);
}
}
/**
* 关闭线程池
*/
public static void shutDown() {
shutDown(executorService);
}
/**
* 关闭线程池
*/
public static void shutDown(ExecutorService executorService) {
if (executorService != null) {
executorService.shutdown();
} else {
COMMON_POOL.shutdown();
}
return COMMON_POOL;
}
public static String getThreadCount() {
return "activeCount=" + COMMON_POOL.getActiveCount() +
" completedCount " + COMMON_POOL.getCompletedTaskCount() +
" largestCount " + COMMON_POOL.getLargestPoolSize();
",completedCount=" + COMMON_POOL.getCompletedTaskCount() +
",largestCount=" + COMMON_POOL.getLargestPoolSize();
}
public static synchronized void shutDownCommonPool(boolean now) {
if (!COMMON_POOL.isShutdown()) {
if (now) {
COMMON_POOL.shutdownNow();
} else {
COMMON_POOL.shutdown();
}
}
}
/**
* 关闭上次使用的线程池
*
* @deprecated 因此在v1.5时加上了废弃注解
* <p>
* 这是一个很迷的方法多线程时调用该方法的{@link #lastExecutorService}可能会被别的线程修改而引发不必要不可控的错误仅建议用来测试
* 另外该方法现在不会关闭默认线程池
* </p>
*/
@Deprecated
public static void shutDown() {
if (lastExecutorService != COMMON_POOL) {
shutDown(lastExecutorService);
}
}
/**
* 关闭指定的线程池
*
* @param executorService 指定的线程池传入null则会关闭默认线程池
* @deprecated 没啥用的方法要关闭线程池还不如直接调用线程池的关闭方法避免歧义
*/
@Deprecated
public static void shutDown(ExecutorService executorService) {
if (executorService != null) {
executorService.shutdown();
}
}
}

View File

@ -0,0 +1,331 @@
package com.jd.platform.async.executor;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* 判断{@link WorkerWrapper}是否链路调用完成的轮询器
* =================================================================================
* <p>
* 在v1.4及以前的版本存在如下问题
* >
* 在使用线程数量较少的线程池进行beginWork时调用WorkerWrapper#beginNext方法时
* 会因为本线程等待下游Wrapper执行完成而存在线程耗尽bug线程池会死翘翘的僵住动弹不得
* >
* 例如仅有2个线程的线程池执行以下任务
* {@code
* <p>
* 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况在test/v15.dependnew中示例testThreadPolling_V14Bug说明了这个bug
* 线程数2
* A(5ms)--B1(10ms) ---|--> C1(5ms)
* . \ | (B1B2全部完成可执行C1C2)
* . ---> B2(20ms) --|--> C2(5ms)
* <p>
* }
* 线程1执行了A然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成
* 线程2执行了B1或B2中的一个也在allOf方法等待C1C2完成
* 结果没有线程执行C和B2了导致超时而死并且这个线程池线程有可能被耗尽
* >
* v1.5的解决方案是放弃使工作线程遭致阻塞的{@link java.util.concurrent.CompletableFuture}
* 而是让工作线程在工作前注册到本完成检查器{@link WrapperEndingInspector}然后交由轮询中心{@link PollingCenter}进行检查是否完成
* </p>
* =================================================================================
* <p>
* 本类的工作原理
* .
* 原理
* (1)首先在Async代码中将主动运行的wrapper都保存到一个inspector{@link #addWrapper(WorkerWrapper)}
* (2)主动运行的wrapper于FINISH/ERROR时先异步submit所有下游wrapper在其执行时将自身(下游wrapper)保存到inspector
* (3)然后在异步submit完所有下游wrapper后将调用{@link #setWrapperEndWithTryPolling(WorkerWrapper)}方法
* . 设置自己的{@link #wrapper2called}为true并呼叫轮询{@link PollingCenter#tryPolling()}
* (4)在下游wrapper中经过策略器判断后
* . 若是不需要运行则把本wrapper计数-1{@link Node#count}若是计数<1则将{@link Node}移出{@link #wrapper2called}
* . 若是需要运行则运行之然后跳转到 (2) 的情节如此递归执行链路上所有需要执行的wrapper最后都会存在于{@link #wrapper2called}
* .
* 因此若是存在任一其{@link Node#called}为false的wrapper则表示这条链路还没有调用完
* 若是在{@link #wrapper2called}中所有的{@link Node#called}为true时即可判断出链路执行完毕了
* </p>
*
* @author create by TcSnZh on 2021/5/5-下午3:22
*/
public class WrapperEndingInspector implements Comparable<WrapperEndingInspector> {
/**
* 最迟完成时间
*/
private final long latestFinishTime;
/**
* 保存 需要检查的wrapper--相关属性 的Map
*/
private final ConcurrentHashMap<WorkerWrapper, Node> wrapper2called = new ConcurrentHashMap<>();
/**
* 当全部wrapper都调用结束它会countDown
*/
private final CountDownLatch endCDL = new CountDownLatch(1);
/**
* 读锁用于修改数据写锁用于轮询使用公平锁让wrapper的时间波动不会太长
* <p/>
* 在轮询到本inspector时之所以要上写锁是因为
* 假如此时有个Wrapper正在调用{{@link #addWrapper(WorkerWrapper)}}则wrapper2called发生了改变
* 假如现在恰巧访问到的是{@link #wrapper2called}迭代器的最后一个但此时又加入了另一个且这另一个又是需要去执行的
* 那么假如在迭代器遍历到目前访问到的wrapper都是呼叫完毕的那么这新加入的一个就会被忽略从而判定为全部完成致使bug发生
* <p/>
* 此外即便轮询时上写锁对性能的影响也是有限的因为这只会在呼叫别人的时候发生工作线程与轮询线程的锁争抢
* 而在工作线程执行{@link com.jd.platform.async.callback.IWorker#action(Object, Map)}
* {@link com.jd.platform.async.callback.ICallback#result(boolean, Object, WorkResult)}并不会与轮询线程去
* 争抢锁而通常这个工作的时间才是最耗时的
*/
private final ReentrantReadWriteLock writePollingLock = new ReentrantReadWriteLock(true);
public WrapperEndingInspector(long latestFinishTime) {
this.latestFinishTime = latestFinishTime;
}
public void registerToPollingCenter() {
writePollingLock.readLock().lock();
try {
PollingCenter.getInstance().inspectionSet.add(this);
} finally {
writePollingLock.readLock().unlock();
}
}
public void addWrapper(WorkerWrapper wrapper) {
writePollingLock.readLock().lock();
try {
wrapper2called.computeIfAbsent(wrapper, k -> new Node()).count.incrementAndGet();
} finally {
writePollingLock.readLock().unlock();
}
}
public void addWrapper(Collection<? extends WorkerWrapper> wrappers) {
writePollingLock.readLock().lock();
try {
Objects.requireNonNull(wrappers).forEach(this::addWrapper);
} finally {
writePollingLock.readLock().unlock();
}
}
public void reduceWrapper(WorkerWrapper wrapper) {
writePollingLock.readLock().lock();
try {
/*
* 有可能发生这情况一个Wrapper刚被加进去执行了零//多次均不满足执行条件但是下次调用却应当使其启动
*/
if (wrapper.getState() != WorkerWrapper.INIT) {
Node node = wrapper2called.get(wrapper);
if (node == null) {
return;
}
synchronized (node) {
if (node.count.decrementAndGet() < 1) {
wrapper2called.remove(wrapper);
}
}
}
} finally {
writePollingLock.readLock().unlock();
}
}
/**
* 原子的设置这个Wrapper已经呼叫完成了
* <p/>
* 该方法会调用{@link PollingCenter#tryPolling()}呼叫轮询线程
*
* @return 如果为true表示设置成功为false表示已经被设置过了
*/
public boolean setWrapperEndWithTryPolling(WorkerWrapper wrapper) {
writePollingLock.readLock().lock();
try {
return !wrapper2called.get(wrapper).called.getAndSet(true);
} finally {
writePollingLock.readLock().unlock();
PollingCenter.getInstance().tryPolling();
}
}
/**
* 供外部调用的等待方法
*
* @return 在超时前完成返回true超时时间一到就会返回false就像人被杀就会死
* @throws InterruptedException 外部调用的当前线程被中断时会抛出这个异常
*/
public boolean await() throws InterruptedException {
return endCDL.await(latestFinishTime - SystemClock.now(), TimeUnit.MILLISECONDS);
}
/**
* {@link PollingCenter}会优先把最迟完成时间即开始时间+超时时间较早的Inspection放在前面
*/
@Override
public int compareTo(WrapperEndingInspector other) {
if (this.latestFinishTime - other.latestFinishTime < 0) {
return -1;
}
return 1;
}
@Override
public String toString() {
return "WrapperEndingInspector{" +
"remainTime=" + (latestFinishTime - SystemClock.now()) +
", wrapper2called=" +
wrapper2called.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue))
+
", endCDL.getCount()=" + endCDL.getCount() +
", writePollingLock={read=" + writePollingLock.getReadLockCount() + ",write=" + writePollingLock.getWriteHoldCount() +
"} }";
}
/**
* 节点对象保存属性信息于{@link #wrapper2called}
* <p/>
* 当试图把Node移出本Map时该Node对象自身将会被上锁
*/
public static class Node {
/**
* 是否已经呼叫完了下游wrapper
*/
AtomicBoolean called = new AtomicBoolean(false);
/**
* 本wrapper总共被呼叫次数的统计若小于1则会被移出map
*/
AtomicInteger count = new AtomicInteger(0);
@Override
public String toString() {
return "{" +
"called=" + called.get() +
", count=" + count.get() +
'}';
}
}
/**
* 轮询中心具体的轮询调度由其完成
* <p/>
* {@link #registerToPollingCenter()}调用时就会将inspector注册到本轮询中心以供轮询
*/
public static class PollingCenter {
/**
* 将被轮询的WrapperFinishInspection集合
*/
private final Set<WrapperEndingInspector> inspectionSet = new ConcurrentSkipListSet<>();
/**
* 请求轮询
*/
private void tryPolling() {
if (inspectionSet.size() < POLLING_POOL.getActiveCount()) {
// 线程数 > inspector数理论上已经各个线程都在忙活了不去新开线程
return;
}
POLLING_POOL.submit(() -> {
if (!inspectionSet.isEmpty()) {
for (WrapperEndingInspector inspector : inspectionSet) {
// 这个inspector的写锁被占用说明其他的轮询线程正在扫描这个inspector
// 那就让其他的轮询线程自己忙活去咱们找下一个
if (!inspector.writePollingLock.writeLock().tryLock()) {
continue;
}
try {
if (PollingCenter.this.inspectorIsEnd(inspector)) {
// inspector中的wrapper调用结束了
if (inspector.endCDL.getCount() > 0) {
// 双重检查使endCDL原子性countDown
synchronized (inspector.endCDL) {
if (inspector.endCDL.getCount() > 0) {
inspectionSet.remove(inspector);
inspector.endCDL.countDown();
}
}
}
}
} finally {
inspector.writePollingLock.writeLock().unlock();
}
}
}
});
}
private boolean inspectorIsEnd(WrapperEndingInspector inspector) {
if (inspector.latestFinishTime < SystemClock.now()) {
inspector.wrapper2called.forEach(((wrapper, node) -> {
wrapper.stopNow();
node.called.set(true);
}));
return true;
}
for (Map.Entry<WorkerWrapper, Node> entry : inspector.wrapper2called.entrySet()) {
WorkerWrapper wrapper = entry.getKey();
Node node = entry.getValue();
if (wrapper.getState() == WorkerWrapper.INIT
// 上值如果为false表示该Wrapper要么还没来得及执行要么判断不需要执行但是还未被移出
|| !node.called.get()
// 上值如果为false表示该Wrapper正在工作或是刚刚结束/失败还未将所有下游Wrapper调用一遍
) {
return false;
}
// 这里需要去判断一下超时
}
return true;
}
// ========== static ==========
private final static PollingCenter instance = new PollingCenter();
public static PollingCenter getInstance() {
return instance;
}
private static final ThreadPoolExecutor POLLING_POOL = new ThreadPoolExecutor(
0,
// 轮询线程数量尽可能少
Math.max(Runtime.getRuntime().availableProcessors() / 16, 1),
15L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadFactory() {
private final AtomicInteger threadCount = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "asyncTool-wrapperEndingInspectorPollingCenterPool-thread-" + threadCount.getAndIncrement());
t.setDaemon(true);
// 线程优先级不高
t.setPriority(1);
return t;
}
@Override
public String toString() {
return "asyncTool-wrapperEndingInspectorPollingCenterPool-threadFactory";
}
},
// 多的轮询请求就丢了
new ThreadPoolExecutor.DiscardPolicy()
) {
@Override
public String toString() {
return "asyncTool-wrapperEndingInspectorPollingCenterPool";
}
};
}
}

View File

@ -1,59 +0,0 @@
package com.jd.platform.async.worker;
import com.jd.platform.async.wrapper.WorkerWrapper;
/**
* 对依赖的wrapper的封装
* @author wuweifeng wrote on 2019-12-20
* @version 1.0
*/
public class DependWrapper {
private WorkerWrapper<?, ?> dependWrapper;
/**
* 是否该依赖必须完成后才能执行自己.<p>
* 因为存在一个任务依赖于多个任务是让这多个任务全部完成后才执行自己还是某几个执行完毕就可以执行自己
*
* 1
* ---3
* 2
*
* 1---3
* 2---3
* 这两种就不一样上面的就是必须12都完毕才能3
* 下面的就是1完毕就可以3
*/
private boolean must = true;
public DependWrapper(WorkerWrapper<?, ?> dependWrapper, boolean must) {
this.dependWrapper = dependWrapper;
this.must = must;
}
public DependWrapper() {
}
public WorkerWrapper<?, ?> getDependWrapper() {
return dependWrapper;
}
public void setDependWrapper(WorkerWrapper<?, ?> dependWrapper) {
this.dependWrapper = dependWrapper;
}
public boolean isMust() {
return must;
}
public void setMust(boolean must) {
this.must = must;
}
@Override
public String toString() {
return "DependWrapper{" +
"dependWrapper=" + dependWrapper +
", must=" + must +
'}';
}
}

View File

@ -0,0 +1,73 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.executor.WrapperEndingInspector;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.wrapper.actionstrategy.DependenceAction;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
* {@link WorkerWrapper}默认实现类将上下游Wrapper保存在自己的Set中
*
* @author create by TcSnZh on 2021/5/6-下午2:41
*/
class StableWorkerWrapper<T, V> extends WorkerWrapper<T, V> {
StableWorkerWrapper(String id, IWorker<T, V> worker, T param, ICallback<T, V> callback) {
super(id, worker, param, callback);
}
/**
* 依赖的wrappers其dependType字段决定了依赖策略
*
* <p>
* v1.5时将其抽取到本子类
* 且修改List为Set并默认使用LinkedHashSet以提高id索引效率且保持有序虽然有序也没什么用
* </p>
*/
private Set<WorkerWrapper<?, ?>> dependWrappers;
/**
* 在自己后面的wrapper如果没有自己就是末尾如果有一个就是串行如果有多个有几个就需要开几个线程</p>
* -------2
* 1
* -------3
* 如1后面有23
*
* <p>
* v1.5时将其抽取到本子类
* 且修改List为Set并在{@link StableWorkerWrapperBuilder}中默认使用LinkedHashSet以提高id索引效率且保持有序虽然有序也没什么用
* </p>
*/
private Set<WorkerWrapper<?, ?>> nextWrappers;
// ========== public impl ==========
@Override
public Set<WorkerWrapper<?, ?>> getNextWrappers() {
return nextWrappers;
}
// ========== package impl ==========
@Override
void setNextWrappers(Set<WorkerWrapper<?, ?>> nextWrappers) {
this.nextWrappers = nextWrappers;
}
@Override
Set<WorkerWrapper<?, ?>> getDependWrappers() {
return dependWrappers;
}
@Override
void setDependWrappers(Set<WorkerWrapper<?, ?>> dependWrappers) {
this.dependWrappers = dependWrappers;
}
}

View File

@ -0,0 +1,443 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
import com.jd.platform.async.wrapper.actionstrategy.*;
import java.util.*;
/**
* 一个稳定的Builder兼容1.4版本之前的代码
* <p/>
* 效果等同于v1.4及之前的{@link WorkerWrapper.Builder}
* <p/>
* 考虑到由于废弃了must方式编排needCheckNextWrapperResult判断跳过转用策略器方式导致本类为了向上兼容保留了一些低效的功能
* <p/>
* 权限修饰符为default表示暂不对外开放
*
* @author create by TcSnZh on 2021/5/3-上午12:36
*/
class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS>>
implements WorkerWrapperBuilder<T, V> {
/**
* 该wrapper的唯一标识
* 如果不设置则使用{@code UUID.randomUUID().toString()}
*/
private String id;
/**
* worker将来要处理的param
*/
private T param;
private IWorker<T, V> worker;
private ICallback<T, V> callback;
/**
* 自己后面的所有
*/
private Set<WorkerWrapper<?, ?>> nextWrappers;
/**
* 自己依赖的所有
*/
private Set<WorkerWrapper<?, ?>> dependWrappers;
/**
* 旧版本的检查是否跳过的开关
*/
private Boolean needCheckNextWrapperResult = null;
/**
* 新版本的检查是否跳过的策略
*/
private SkipStrategy skipStrategy;
/**
* 基本依赖策略
* <p/>
* 如果在{@link #build()}调用时{@code dependenceStrategy==null}
* 则会给WorkerWrapper设置默认策略{@link DependenceStrategy#ALL_DEPENDENCIES_ALL_SUCCESS}
*/
private DependenceStrategy dependenceStrategy;
/**
* 存储自己需要特殊对待的dependWrapper集合
*/
private Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> dependWrapperActionStrategyMap;
/**
* 存储需要特殊对待自己的nextWrapper集合
*/
private Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> selfIsSpecialMap;
/**
* 一个保存以must=true方式传入的WorkerWrapper的集合
* <p/>
* 该Set将会加入到{@link WorkerWrapper.WrapperStrategy#getDependMustStrategyMapper().mustDependSet}之中
*/
private Set<WorkerWrapper<?, ?>> mustDependSet;
/**
* 存储强依赖于自己的wrapper集合
*/
private Set<WorkerWrapper<?, ?>> selfIsMustSet;
/**
* 是否使用了旧的编排模式Must开关
* <p/>
* 之所以需要以下两个属性是为了隔离旧api与新api的策略不兼容的情况<b>建议早日替换旧方法</b>
* 例如旧代码里调用{@link WorkerWrapper.Builder#depend(WorkerWrapper, boolean)}参数传入了false
*/
private boolean useV15DeprecatedMustDependApi = false;
/**
* 是否使用了新的编排模式
* <p/>
* {@link #useV15DeprecatedMustDependApi}
*/
private boolean useV15NewDependApi = false;
private boolean isBuilding = false;
@Override
public BUILDER_SUB_CLASS worker(IWorker<T, V> worker) {
this.worker = worker;
return returnThisBuilder();
}
@Override
public BUILDER_SUB_CLASS param(T t) {
this.param = t;
return returnThisBuilder();
}
@Override
public BUILDER_SUB_CLASS id(String id) {
if (id != null) {
this.id = id;
}
return returnThisBuilder();
}
@Override
public BUILDER_SUB_CLASS setSkipStrategy(SkipStrategy strategy) {
this.skipStrategy = strategy;
return returnThisBuilder();
}
@Override
public BUILDER_SUB_CLASS callback(ICallback<T, V> callback) {
this.callback = callback;
return returnThisBuilder();
}
@Override
public SetDependImpl setDepend() {
useV15NewDependApi = true;
checkCanNotCompatibleDeprecateMustDependApi(false);
return new SetDependImpl();
}
public class SetDependImpl implements SetDepend<T, V> {
@Override
public SetDependImpl wrapper(WorkerWrapper<?, ?> wrapper) {
if (wrapper == null) {
return this;
}
if (dependWrappers == null) {
dependWrappers = new LinkedHashSet<>();
}
dependWrappers.add(wrapper);
return this;
}
@Override
public SetDependImpl mustRequireWrapper(WorkerWrapper<?, ?> wrapper) {
if (wrapper == null) {
return this;
}
wrapper(wrapper);
if (mustDependSet == null) {
mustDependSet = new LinkedHashSet<>();
}
mustDependSet.add(wrapper);
return this;
}
@Override
public SetDependImpl specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper) {
if (strategy == null || wrapper == null) {
return this;
}
if (dependWrapperActionStrategyMap == null) {
dependWrapperActionStrategyMap = new LinkedHashMap<>();
}
dependWrapperActionStrategyMap.put(wrapper, strategy);
return this;
}
@Override
public SetDependImpl strategy(DependenceStrategy dependenceStrategy) {
if (dependenceStrategy == null) {
return this;
}
StableWorkerWrapperBuilder.this.dependenceStrategy = dependenceStrategy;
return this;
}
@Override
public BUILDER_SUB_CLASS end() {
return returnThisBuilder();
}
}
@Override
public SetNextImpl setNext() {
useV15NewDependApi = true;
checkCanNotCompatibleDeprecateMustDependApi(false);
return new SetNextImpl();
}
public class SetNextImpl implements SetNext<T, V> {
@Override
public SetNextImpl wrapper(WorkerWrapper<?, ?> wrapper) {
if (wrapper == null) {
return this;
}
if (nextWrappers == null) {
nextWrappers = new LinkedHashSet<>();
}
nextWrappers.add(wrapper);
return this;
}
@Override
public SetNextImpl mustToNextWrapper(WorkerWrapper<?, ?> wrapper) {
if (wrapper == null) {
return this;
}
wrapper(wrapper);
if (selfIsMustSet == null) {
selfIsMustSet = new LinkedHashSet<>();
}
selfIsMustSet.add(wrapper);
return this;
}
@Override
public SetNextImpl specialToNextWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper) {
if (strategy == null || wrapper == null) {
return this;
}
wrapper(wrapper);
if (selfIsSpecialMap == null) {
selfIsSpecialMap = new LinkedHashMap<>();
}
selfIsSpecialMap.put(wrapper, strategy);
return this;
}
@Override
public BUILDER_SUB_CLASS end() {
return returnThisBuilder();
}
}
@Override
public WorkerWrapper<T, V> build() {
isBuilding = true;
WorkerWrapper<T, V> wrapper = new StableWorkerWrapper<>(
id == null ? UUID.randomUUID().toString() : id,
worker,
param,
callback
);
wrapper.setDependWrappers(new LinkedHashSet<>());
wrapper.setNextWrappers(new LinkedHashSet<>());
// ========== 设置依赖关系/策略 ==========
{
if (dependWrappers != null && dependWrappers.size() > 0) {
dependWrappers.forEach(dependWrapper -> {
wrapper.getDependWrappers().add(dependWrapper);
dependWrapper.getNextWrappers().add(wrapper);
});
}
if (nextWrappers != null && nextWrappers.size() > 0) {
nextWrappers.forEach(next -> {
wrapper.getNextWrappers().add(next);
next.getDependWrappers().add(wrapper);
});
}
if (useV15DeprecatedMustDependApi) {
// 适配旧api的must开关
if (mustDependSet != null && mustDependSet.size() > 0) {
wrapper.getWrapperStrategy().setDependMustStrategyMapper(new DependMustStrategyMapper()
.addDependMust(mustDependSet));
}
wrapper.getWrapperStrategy().setDependenceStrategy(new DependenceStrategy() {
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
DependMustStrategyMapper mustMapper = thisWrapper.getWrapperStrategy().getDependMustStrategyMapper();
if (mustMapper != null && mustMapper.getMustDependSet().size() > 0) {
// 至少有一个must则因为must未完全完成而等待
return DependenceAction.TAKE_REST.emptyProperty();
}
// 如果一个must也没有则认为应该是ANY模式
return DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS.judgeAction(dependWrappers, thisWrapper, fromWrapper);
}
@Override
public String toString() {
return "IF_HAS_MUST_ALL_MUST_ELSE_ANY";
}
});
} else {
if (mustDependSet != null && mustDependSet.size() > 0) {
wrapper.getWrapperStrategy().setDependMustStrategyMapper(new DependMustStrategyMapper().addDependMust(mustDependSet));
}
if (dependenceStrategy == null) {
setDepend().defaultStrategy();
}
wrapper.getWrapperStrategy().setDependenceStrategy(dependenceStrategy);
}
if (dependWrapperActionStrategyMap != null && dependWrapperActionStrategyMap.size() > 0) {
DependWrapperStrategyMapper mapper = new DependWrapperStrategyMapper();
dependWrapperActionStrategyMap.forEach(mapper::putMapping);
wrapper.getWrapperStrategy().setDependWrapperStrategyMapper(mapper);
}
if (selfIsMustSet != null && selfIsMustSet.size() > 0) {
selfIsMustSet.forEach(next -> Optional.ofNullable(next.getWrapperStrategy().getDependMustStrategyMapper())
.ifPresent(mustMapper -> mustMapper.addDependMust(wrapper)));
}
if (selfIsSpecialMap != null && selfIsSpecialMap.size() > 0) {
selfIsSpecialMap.forEach((next, strategy) -> Optional.ofNullable(next.getWrapperStrategy().getDependWrapperStrategyMapper())
.ifPresent(wrapperMapper -> wrapperMapper.putMapping(wrapper, strategy)));
}
}
// ========== 设置检查是否跳过策略 ==========
{
if (skipStrategy == null) {
wrapper.getWrapperStrategy().setSkipStrategy(needCheckNextWrapperResult != null && !needCheckNextWrapperResult ?
SkipStrategy.NOT_SKIP
: SkipStrategy.CHECK_ONE_LEVEL
);
} else {
wrapper.getWrapperStrategy().setSkipStrategy(skipStrategy);
}
}
return wrapper;
}
// ========== deprecated methods ==========
/**
* @deprecated 建议使用 {@link WorkerWrapperBuilder#depends(WorkerWrapper[])}
* {@link WorkerWrapperBuilder#setDepend()}设置更多选项例如{@link WorkerWrapperBuilder.SetDepend#wrapper(WorkerWrapper[])}
* 如果是想要必须依赖的功能则使用{@link WorkerWrapperBuilder.SetDepend#mustRequireWrapper(WorkerWrapper[])}
*/
@Deprecated
public BUILDER_SUB_CLASS depend(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return returnThisBuilder();
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
depend(wrapper);
}
return returnThisBuilder();
}
/**
* @deprecated 建议使用 {@link WorkerWrapperBuilder#depends(WorkerWrapper[])}
* {@link WorkerWrapperBuilder#setDepend()}设置更多选项例如{@link WorkerWrapperBuilder.SetDepend#wrapper(WorkerWrapper)}
* 如果是想要必须依赖的功能则使用{@link WorkerWrapperBuilder.SetDepend#mustRequireWrapper(WorkerWrapper[])}
*/
@Deprecated
public BUILDER_SUB_CLASS depend(WorkerWrapper<?, ?> wrapper) {
return depend(wrapper, true);
}
/**
* @deprecated 建议使用 {@link WorkerWrapperBuilder.SetDepend#requireWrapper(WorkerWrapper, boolean)}}
*/
@Deprecated
public BUILDER_SUB_CLASS depend(WorkerWrapper<?, ?> wrapper, boolean isMust) {
if (wrapper == null) {
return returnThisBuilder();
}
useV15DeprecatedMustDependApi = true;
checkCanNotCompatibleDeprecateMustDependApi(true);
if (dependWrappers == null) {
dependWrappers = new LinkedHashSet<>();
}
dependWrappers.add(wrapper);
if (isMust) {
if (mustDependSet == null) {
mustDependSet = new LinkedHashSet<>();
}
mustDependSet.add(wrapper);
}
return returnThisBuilder();
}
@Deprecated
public BUILDER_SUB_CLASS next(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return returnThisBuilder();
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
next(wrapper);
}
return returnThisBuilder();
}
@Deprecated
public BUILDER_SUB_CLASS next(WorkerWrapper<?, ?> wrapper) {
return next(wrapper, true);
}
/**
* 会将wrapper增加到{@link #nextWrappers}
* 如果selfIsMust为true还会将wrapper额外增加到{@link #selfIsMustSet}
*
* @param wrapper WorkerWrapper instance
* @param selfIsMust 是否强依赖自己强依赖是旧版本的叫法即是否必须在自己执行后才能执行
* @return 返回Builder
* @deprecated 不推荐使用Must开关去设置之后的Wrapper
*/
@Deprecated
public BUILDER_SUB_CLASS next(WorkerWrapper<?, ?> wrapper, boolean selfIsMust) {
useV15DeprecatedMustDependApi = true;
checkCanNotCompatibleDeprecateMustDependApi(true);
if (nextWrappers == null) {
nextWrappers = new LinkedHashSet<>();
}
nextWrappers.add(wrapper);
//强依赖自己
if (selfIsMust) {
if (selfIsMustSet == null) {
selfIsMustSet = new LinkedHashSet<>();
}
selfIsMustSet.add(wrapper);
}
return returnThisBuilder();
}
/**
* 设置是否要检查之后的Wrapper是否已经执行完毕
* <p/>
* 默认为true
*
* @param needCheckNextWrapperResult 设为true后如果之后的Wrapper已经执行完毕
* 则跳过本Wrapper并设置{@link WorkResult#getEx()}{@link com.jd.platform.async.exception.SkippedException}
* @deprecated v1.5中已经废弃请使用
*/
@Deprecated
public BUILDER_SUB_CLASS needCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
return returnThisBuilder();
}
// util method
private BUILDER_SUB_CLASS returnThisBuilder() {
return (BUILDER_SUB_CLASS) this;
}
private void checkCanNotCompatibleDeprecateMustDependApi(boolean isOld) {
if (!isBuilding && (!isOld && useV15DeprecatedMustDependApi || isOld && useV15NewDependApi)) {
throw new UnsupportedOperationException("新旧api之间不可兼容请将v1.5之前废弃的方法升级为注释中建议的方法后再调用v1.5之后的新api");
}
}
}

View File

@ -4,81 +4,71 @@ import com.jd.platform.async.callback.DefaultCallback;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.executor.WrapperEndingInspector;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.DependWrapper;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.worker.*;
import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
import com.jd.platform.async.wrapper.actionstrategy.DependMustStrategyMapper;
import com.jd.platform.async.wrapper.actionstrategy.DependWrapperStrategyMapper;
import com.jd.platform.async.wrapper.actionstrategy.DependenceAction;
import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 对每个worker及callback进行包装一对一
* <p/>
* v1.5时将其抽取为抽象类以解耦并提高扩展性
*
* @author wuweifeng wrote on 2019-11-19.
*/
public class WorkerWrapper<T, V> {
public abstract class WorkerWrapper<T, V> {
/**
* 该wrapper的唯一标识
*/
private String id;
protected final String id;
/**
* worker将来要处理的param
*/
private T param;
private IWorker<T, V> worker;
private ICallback<T, V> callback;
/**
* 在自己后面的wrapper如果没有自己就是末尾如果有一个就是串行如果有多个有几个就需要开几个线程</p>
* -------2
* 1
* -------3
* 如1后面有23
*/
private List<WorkerWrapper<?, ?>> nextWrappers;
/**
* 依赖的wrappers有2种情况1:必须依赖的全部完成后才能执行自己 2:依赖的任何一个多个完成了就可以执行自己
* 通过must字段来控制是否依赖项必须完成
* 1
* -------3
* 2
* 12执行完毕后才能执行3
*/
private List<DependWrapper> dependWrappers;
protected T param;
protected IWorker<T, V> worker;
protected ICallback<T, V> callback;
/**
* 标记该事件是否已经被处理过了譬如已经超时返回false了后续rpc又收到返回值了则不再二次回调
* 经试验,volatile并不能保证"同一毫秒",多线程对该值的修改和拉取
* <p>
* 1-finish, 2-error, 3-working
*/
private AtomicInteger state = new AtomicInteger(0);
/**
* 该map存放所有wrapper的id和wrapper映射
*/
private Map<String, WorkerWrapper> forParamUseWrappers;
protected final AtomicInteger state = new AtomicInteger(0);
/**
* 也是个钩子变量用来存临时的结果
*/
private volatile WorkResult<V> workResult = WorkResult.defaultResult();
protected volatile WorkResult<V> workResult = WorkResult.defaultResult();
/**
* 是否在执行自己前去校验nextWrapper的执行结果<p>
* 1 4
* -------3
* 2
* 如这种在4执行前可能3已经执行完毕了被2执行完后触发的那么4就没必要执行了
* 注意该属性仅在nextWrapper数量<=1时有效>1时的情况是不存在的
* 该map存放所有wrapper的id和wrapper映射
* <p/>
* 需要线程安全
*/
private volatile boolean needCheckNextWrapperResult = true;
private Map<String, WorkerWrapper<?, ?>> forParamUseWrappers;
/**
* 各种策略的封装类
* <p/>
* 其实是因为加功能太多导致这个对象大小超过了128Byte所以强迫症的我不得不把几个字段丢到策略类里面去
* ps: 大小超过128Byte令我(TcSnZh)难受的一比就像走在草坪的格子上一步嫌小两步扯蛋
* IDEA可以使用JOL Java Object Layout插件查看对象大小
*/
private final WrapperStrategy wrapperStrategy = new WrapperStrategy();
private static final int FINISH = 1;
private static final int ERROR = 2;
private static final int WORKING = 3;
private static final int INIT = 0;
// ***** state属性的常量值 *****
private WorkerWrapper(String id, IWorker<T, V> worker, T param, ICallback<T, V> callback) {
public static final int FINISH = 1;
public static final int ERROR = 2;
public static final int WORKING = 3;
public static final int INIT = 0;
WorkerWrapper(String id, IWorker<T, V> worker, T param, ICallback<T, V> callback) {
if (worker == null) {
throw new NullPointerException("async.worker is null");
}
@ -92,66 +82,44 @@ public class WorkerWrapper<T, V> {
this.callback = callback;
}
// ========== public ==========
/**
* 开始工作
* fromWrapper代表这次work是由哪个上游wrapper发起的
* 外部调用本线程运行此Wrapper的入口方法
*
* @param executorService 该ExecutorService将成功运行后在nextWrapper有多个时被使用于多线程调用
* @param remainTime 剩下的时间
* @param forParamUseWrappers 用于保存经过的wrapper的信息的Mapkey为id
* @param inspector wrapper调度检查器
*/
private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
this.forParamUseWrappers = forParamUseWrappers;
//将自己放到所有wrapper的集合里去
forParamUseWrappers.put(id, this);
long now = SystemClock.now();
//总的已经超时了就快速失败进行下一个
if (remainTime <= 0) {
fastFail(INIT, null);
beginNext(executorService, now, remainTime);
return;
}
//如果自己已经执行过了
//可能有多个依赖其中的一个依赖已经执行完了并且自己也已开始执行或执行完毕当另一个依赖执行完毕又进来该方法时就不重复处理了
if (getState() == FINISH || getState() == ERROR) {
beginNext(executorService, now, remainTime);
return;
}
//如果在执行前需要校验nextWrapper的状态
if (needCheckNextWrapperResult) {
//如果自己的next链上有已经出结果或已经开始执行的任务了自己就不用继续了
if (!checkNextWrapperResult()) {
fastFail(INIT, new SkippedException());
beginNext(executorService, now, remainTime);
return;
}
}
//如果没有任何依赖说明自己就是第一批要执行的
if (dependWrappers == null || dependWrappers.size() == 0) {
fire();
beginNext(executorService, now, remainTime);
return;
}
/*如果有前方依赖存在两种情况
一种是前面只有一个wrapper A -> B
一种是前面有多个wrapperA C D -> B需要ACD都完成了才能轮到B但是无论是A执行完还是C执行完都会去唤醒B
所以需要B来做判断必须ACD都完成自己才能执行 */
//只有一个依赖
if (dependWrappers.size() == 1) {
doDependsOneJob(fromWrapper);
beginNext(executorService, now, remainTime);
} else {
//有多个依赖时
doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);
}
public void work(ExecutorService executorService,
long remainTime,
Map<String, WorkerWrapper<?, ?>> forParamUseWrappers,
WrapperEndingInspector inspector) {
work(executorService, null, remainTime, forParamUseWrappers, inspector);
}
public void work(ExecutorService executorService, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
work(executorService, null, remainTime, forParamUseWrappers);
public String getId() {
return id;
}
public WorkResult<V> getWorkResult() {
return workResult;
}
public void setParam(T param) {
this.param = param;
}
public int getState() {
return state.get();
}
/**
* 获取之后的下游Wrapper
*/
public abstract Set<WorkerWrapper<?, ?>> getNextWrappers();
/**
* 总控制台超时停止所有任务
*/
@ -161,142 +129,12 @@ public class WorkerWrapper<T, V> {
}
}
/**
* 判断自己下游链路上是否存在已经出结果的或已经开始执行的
* 如果没有返回true如果有返回false
*/
private boolean checkNextWrapperResult() {
//如果自己就是最后一个或者后面有并行的多个就返回true
if (nextWrappers == null || nextWrappers.size() != 1) {
return getState() == INIT;
}
WorkerWrapper nextWrapper = nextWrappers.get(0);
boolean state = nextWrapper.getState() == INIT;
//继续校验自己的next的状态
return state && nextWrapper.checkNextWrapperResult();
}
/**
* 进行下一个任务
*/
private void beginNext(ExecutorService executorService, long now, long remainTime) {
//花费的时间
long costTime = SystemClock.now() - now;
if (nextWrappers == null) {
return;
}
if (nextWrappers.size() == 1) {
nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
return;
}
CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
for (int i = 0; i < nextWrappers.size(); i++) {
int finalI = i;
futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
.work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
}
try {
CompletableFuture.allOf(futures).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
private void doDependsOneJob(WorkerWrapper dependWrapper) {
if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultResult();
fastFail(INIT, null);
} else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
fastFail(INIT, null);
} else {
//前面任务正常完毕了该自己了
fire();
}
}
private synchronized void doDependsJobs(ExecutorService executorService, List<DependWrapper> dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
boolean nowDependIsMust = false;
//创建必须完成的上游wrapper集合
Set<DependWrapper> mustWrapper = new HashSet<>();
for (DependWrapper dependWrapper : dependWrappers) {
if (dependWrapper.isMust()) {
mustWrapper.add(dependWrapper);
}
if (dependWrapper.getDependWrapper().equals(fromWrapper)) {
nowDependIsMust = dependWrapper.isMust();
}
}
//如果全部是不必须的条件那么只要到了这里就执行自己
if (mustWrapper.size() == 0) {
if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) {
fastFail(INIT, null);
} else {
fire();
}
beginNext(executorService, now, remainTime);
return;
}
//如果存在需要必须完成的且fromWrapper不是必须的就什么也不干
if (!nowDependIsMust) {
return;
}
//如果fromWrapper是必须的
boolean existNoFinish = false;
boolean hasError = false;
//先判断前面必须要执行的依赖任务的执行结果如果有任何一个失败那就不用走action了直接给自己设置为失败进行下一步就是了
for (DependWrapper dependWrapper : mustWrapper) {
WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();
WorkResult tempWorkResult = workerWrapper.getWorkResult();
//为null或者isWorking说明它依赖的某个任务还没执行到或没执行完
if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) {
existNoFinish = true;
break;
}
if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {
workResult = defaultResult();
hasError = true;
break;
}
if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {
workResult = defaultExResult(workerWrapper.getWorkResult().getEx());
hasError = true;
break;
}
}
//只要有失败的
if (hasError) {
fastFail(INIT, null);
beginNext(executorService, now, remainTime);
return;
}
//如果上游都没有失败分为两种情况一种是都finish了一种是有的在working
//都finish的话
if (!existNoFinish) {
//上游都finish了进行自己
fire();
beginNext(executorService, now, remainTime);
return;
}
}
/**
* 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
*/
private void fire() {
//阻塞取结果
workResult = workerDoJob();
}
/**
* 快速失败
*
* @return 已经失败则返回false如果刚才设置为失败了则返回true
*/
private boolean fastFail(int expect, Exception e) {
protected boolean fastFail(int expect, Exception e) {
//试图将它从expect状态,改成Error
if (!compareAndSetState(expect, ERROR)) {
return false;
@ -305,305 +143,382 @@ public class WorkerWrapper<T, V> {
//尚未处理过结果
if (checkIsNullResult()) {
if (e == null) {
workResult = defaultResult();
workResult.setResultState(ResultState.TIMEOUT);
} else {
workResult = defaultExResult(e);
workResult.setResultState(ResultState.EXCEPTION);
workResult.setEx(e);
}
workResult.setResult(worker.defaultValue());
}
callback.result(false, param, workResult);
return true;
}
/**
* 具体的单个worker执行任务
* 判断{@link #state}状态是否是初始值
*/
private WorkResult<V> workerDoJob() {
protected boolean checkIsNullResult() {
return ResultState.DEFAULT == workResult.getResultState();
}
protected boolean compareAndSetState(int expect, int update) {
return this.state.compareAndSet(expect, update);
}
/**
* 工作的核心方法
*
* @param fromWrapper 代表这次work是由哪个上游wrapper发起的如果是首个Wrapper则为null
* @param remainTime 剩余时间
*/
protected void work(ExecutorService executorService,
WorkerWrapper<?, ?> fromWrapper,
long remainTime,
Map<String, WorkerWrapper<?, ?>> forParamUseWrappers,
WrapperEndingInspector inspector) {
this.setForParamUseWrappers(forParamUseWrappers);
//将自己放到所有wrapper的集合里去
forParamUseWrappers.put(id, this);
long now = SystemClock.now();
//总的已经超时了就快速失败进行下一个
if (remainTime <= 0) {
fastFail(INIT, null);
beginNext(executorService, now, remainTime, inspector);
return;
}
//如果自己已经执行过了
//可能有多个依赖其中的一个依赖已经执行完了并且自己也已开始执行或执行完毕当另一个依赖执行完毕又进来该方法时就不重复处理了
if (getState() == FINISH || getState() == ERROR) {
beginNext(executorService, now, remainTime, inspector);
return;
}
// 判断是否要跳过自己该方法可能会跳过正在工作的自己
final WrapperStrategy wrapperStrategy = getWrapperStrategy();
if (wrapperStrategy.shouldSkip(getNextWrappers(), this, fromWrapper)) {
fastFail(INIT, new SkippedException());
beginNext(executorService, now, remainTime, inspector);
return;
}
//如果没有任何依赖说明自己就是第一批要执行的
final Set<WorkerWrapper<?, ?>> dependWrappers = getDependWrappers();
if (dependWrappers == null || dependWrappers.size() == 0) {
fire();
beginNext(executorService, now, remainTime, inspector);
return;
}
DependenceAction.WithProperty judge = wrapperStrategy.judgeAction(dependWrappers, this, fromWrapper);
switch (judge.getDependenceAction()) {
case TAKE_REST:
inspector.reduceWrapper(this);
return;
case FAST_FAIL:
switch (judge.getResultState()) {
case TIMEOUT:
fastFail(INIT, null);
break;
case EXCEPTION:
fastFail(INIT, judge.getFastFailException());
break;
default:
fastFail(INIT, new RuntimeException("ResultState " + judge.getResultState() + " set to FAST_FAIL"));
break;
}
beginNext(executorService, now, remainTime, inspector);
break;
case START_WORK:
fire();
beginNext(executorService, now, remainTime, inspector);
break;
case JUDGE_BY_AFTER:
default:
inspector.reduceWrapper(this);
throw new IllegalStateException("策略配置错误不应当在WorkerWrapper中返回JUDGE_BY_AFTER或其他无效值 : this=" + this + ",fromWrapper=" + fromWrapper);
}
}
/**
* 进行下一个任务
*/
protected void beginNext(ExecutorService executorService, long now, long remainTime, WrapperEndingInspector inspector) {
//花费的时间
final long costTime = SystemClock.now() - now;
final long nextRemainTIme = remainTime - costTime;
Set<WorkerWrapper<?, ?>> nextWrappers = getNextWrappers();
if (nextWrappers == null) {
inspector.setWrapperEndWithTryPolling(this);
return;
}
// nextWrappers只有一个就用本线程继续跑
if (nextWrappers.size() == 1) {
try {
WorkerWrapper<?, ?> next = nextWrappers.stream().findFirst().get();
inspector.addWrapper(next);
next.work(executorService, WorkerWrapper.this, nextRemainTIme, getForParamUseWrappers(), inspector);
} finally {
inspector.setWrapperEndWithTryPolling(this);
}
return;
}
// nextWrappers有多个
try {
inspector.addWrapper(nextWrappers);
nextWrappers.forEach(next -> {
executorService.submit(() -> next.work(executorService, this, nextRemainTIme, getForParamUseWrappers(), inspector));
});
} finally {
inspector.setWrapperEndWithTryPolling(this);
}
}
/**
* 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
*/
protected void fire() {
//阻塞取结果
//避免重复执行
if (!checkIsNullResult()) {
return workResult;
return;
}
try {
//如果已经不是init状态了说明正在被执行或已执行完毕这一步很重要可以保证任务不被重复执行
if (!compareAndSetState(INIT, WORKING)) {
return workResult;
return;
}
callback.begin();
//执行耗时操作
V resultValue = worker.action(param, forParamUseWrappers);
V resultValue = resultValue = (V) worker.action(param, (Map) getForParamUseWrappers());
//如果状态不是在working,说明别的地方已经修改了
if (!compareAndSetState(WORKING, FINISH)) {
return workResult;
return;
}
workResult.setResultState(ResultState.SUCCESS);
workResult.setResult(resultValue);
//回调成功
callback.result(true, param, workResult);
return workResult;
} catch (Exception e) {
//避免重复回调
if (!checkIsNullResult()) {
return workResult;
return;
}
fastFail(WORKING, e);
return workResult;
}
}
public WorkResult<V> getWorkResult() {
return workResult;
}
public List<WorkerWrapper<?, ?>> getNextWrappers() {
return nextWrappers;
}
public void setParam(T param) {
this.param = param;
}
private boolean checkIsNullResult() {
return ResultState.DEFAULT == workResult.getResultState();
}
private void addDepend(WorkerWrapper<?, ?> workerWrapper, boolean must) {
addDepend(new DependWrapper(workerWrapper, must));
}
private void addDepend(DependWrapper dependWrapper) {
if (dependWrappers == null) {
dependWrappers = new ArrayList<>();
}
//如果依赖的是重复的同一个就不重复添加了
for (DependWrapper wrapper : dependWrappers) {
if (wrapper.equals(dependWrapper)) {
return;
}
}
dependWrappers.add(dependWrapper);
}
private void addNext(WorkerWrapper<?, ?> workerWrapper) {
if (nextWrappers == null) {
nextWrappers = new ArrayList<>();
}
//避免添加重复
for (WorkerWrapper wrapper : nextWrappers) {
if (workerWrapper.equals(wrapper)) {
return;
}
}
nextWrappers.add(workerWrapper);
}
private void addNextWrappers(List<WorkerWrapper<?, ?>> wrappers) {
if (wrappers == null) {
return;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
addNext(wrapper);
}
}
private void addDependWrappers(List<DependWrapper> dependWrappers) {
if (dependWrappers == null) {
return;
}
for (DependWrapper wrapper : dependWrappers) {
addDepend(wrapper);
}
}
private WorkResult<V> defaultResult() {
workResult.setResultState(ResultState.TIMEOUT);
workResult.setResult(worker.defaultValue());
return workResult;
}
private WorkResult<V> defaultExResult(Exception ex) {
workResult.setResultState(ResultState.EXCEPTION);
workResult.setResult(worker.defaultValue());
workResult.setEx(ex);
return workResult;
}
private int getState() {
return state.get();
}
public String getId() {
return id;
}
private boolean compareAndSetState(int expect, int update) {
return this.state.compareAndSet(expect, update);
}
private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
}
// ========== hashcode and equals ==========
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkerWrapper<?, ?> that = (WorkerWrapper<?, ?>) o;
return needCheckNextWrapperResult == that.needCheckNextWrapperResult &&
Objects.equals(param, that.param) &&
Objects.equals(worker, that.worker) &&
Objects.equals(callback, that.callback) &&
Objects.equals(nextWrappers, that.nextWrappers) &&
Objects.equals(dependWrappers, that.dependWrappers) &&
Objects.equals(state, that.state) &&
Objects.equals(workResult, that.workResult);
return super.equals(o);
}
/**
* {@code return id.hashCode();}返回id值的hashcode
*/
@Override
public int hashCode() {
return Objects.hash(param, worker, callback, nextWrappers, dependWrappers, state, workResult, needCheckNextWrapperResult);
// final String id can use to .hashcode() .
return id.hashCode();
}
public static class Builder<W, C> {
// ========== Builder ==========
public static <T, V> WorkerWrapperBuilder<T, V> builder() {
return new Builder<>();
}
/**
* 自v1.5该类被抽取到{@link StableWorkerWrapperBuilder}抽象类兼容之前的版本
*/
public static class Builder<W, C> extends StableWorkerWrapperBuilder<W, C, Builder<W, C>> {
/**
* 该wrapper的唯一标识
* @deprecated 建议使用 {@link #builder()}返回{@link WorkerWrapperBuilder}接口以调用v1.5之后的规范api
*/
private String id = UUID.randomUUID().toString();
@Deprecated
public Builder() {
}
}
// ========== package access methods , for example , some getter/setter that doesn't want to be public ==========
T getParam() {
return param;
}
IWorker<T, V> getWorker() {
return worker;
}
void setWorker(IWorker<T, V> worker) {
this.worker = worker;
}
ICallback<T, V> getCallback() {
return callback;
}
void setCallback(ICallback<T, V> callback) {
this.callback = callback;
}
void setState(int state) {
this.state.set(state);
}
Map<String, WorkerWrapper<?, ?>> getForParamUseWrappers() {
return forParamUseWrappers;
}
void setForParamUseWrappers(Map<String, WorkerWrapper<?, ?>> forParamUseWrappers) {
this.forParamUseWrappers = forParamUseWrappers;
}
void setWorkResult(WorkResult<V> workResult) {
this.workResult = workResult;
}
abstract void setNextWrappers(Set<WorkerWrapper<?, ?>> nextWrappers);
abstract Set<WorkerWrapper<?, ?>> getDependWrappers();
abstract void setDependWrappers(Set<WorkerWrapper<?, ?>> dependWrappers);
WrapperStrategy getWrapperStrategy() {
return wrapperStrategy;
}
// ========== toString ==========
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(150)
.append("WorkerWrapper{id=").append(id)
.append(", param=").append(param)
.append(", worker=").append(worker)
.append(", callback=").append(callback)
.append(", state=").append(state)
.append(", workResult=").append(workResult)
// 防止循环引用这里只输出相关Wrapper的id
.append(", forParamUseWrappers::getId=");
getForParamUseWrappers().keySet().forEach(wrapperId -> sb.append(wrapperId).append(", "));
if (getForParamUseWrappers().keySet().size() > 0) {
sb.delete(sb.length() - 2, sb.length());
}
sb
.append(", dependWrappers::getId=[");
getDependWrappers().stream().map(WorkerWrapper::getId).forEach(wrapperId -> sb.append(wrapperId).append(", "));
if (getDependWrappers().size() > 0) {
sb.delete(sb.length() - 2, sb.length());
}
sb
.append("], nextWrappers::getId=[");
getNextWrappers().stream().map(WorkerWrapper::getId).forEach(wrapperId -> sb.append(wrapperId).append(", "));
if (getNextWrappers().size() > 0) {
sb.delete(sb.length() - 2, sb.length());
}
sb
.append("]")
.append(", wrapperStrategy=").append(getWrapperStrategy())
.append('}');
return sb.toString();
}
public static class WrapperStrategy implements DependenceStrategy, SkipStrategy {
// ========== 这三个属性用来判断是否要开始工作 ==========
// 从前往后依次判断的顺序为 dependWrapperStrategyMapper -> dependMustStrategyMapper -> dependenceStrategy
/**
* worker将来要处理的param
* 对特殊Wrapper专用的依赖响应策略
* <b>该值允许为null</b>
*/
private W param;
private IWorker<W, C> worker;
private ICallback<W, C> callback;
private DependWrapperStrategyMapper dependWrapperStrategyMapper;
/**
* 自己后面的所有
* 对必须完成的must的Wrapper的依赖响应策略
* <b>该值允许为null</b>
* <p/>
* 这是一个不得不向历史妥协的属性用于适配must开关方式
*/
private List<WorkerWrapper<?, ?>> nextWrappers;
private DependMustStrategyMapper dependMustStrategyMapper;
/**
* 自己依赖的所有
* 依赖响应全局策略
*/
private List<DependWrapper> dependWrappers;
/**
* 存储强依赖于自己的wrapper集合
*/
private Set<WorkerWrapper<?, ?>> selfIsMustSet;
private DependenceStrategy dependenceStrategy;
private boolean needCheckNextWrapperResult = true;
public Builder<W, C> worker(IWorker<W, C> worker) {
this.worker = worker;
return this;
}
public Builder<W, C> param(W w) {
this.param = w;
return this;
}
public Builder<W, C> id(String id) {
if (id != null) {
this.id = id;
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
// 如果存在依赖则调用三层依赖响应策略进行判断
DependenceStrategy strategy = dependWrapperStrategyMapper;
if (dependMustStrategyMapper != null) {
strategy = strategy == null ? dependMustStrategyMapper : strategy.thenJudge(dependMustStrategyMapper);
}
return this;
if (dependenceStrategy != null) {
strategy = strategy == null ? dependenceStrategy : strategy.thenJudge(dependenceStrategy);
}
if (strategy == null) {
throw new IllegalStateException("配置无效三层判断策略均为null请开发者检查自己的Builder是否逻辑错误");
}
return strategy.judgeAction(dependWrappers, thisWrapper, fromWrapper);
}
public Builder<W, C> needCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
return this;
public DependWrapperStrategyMapper getDependWrapperStrategyMapper() {
return dependWrapperStrategyMapper;
}
public Builder<W, C> callback(ICallback<W, C> callback) {
this.callback = callback;
return this;
public void setDependWrapperStrategyMapper(DependWrapperStrategyMapper dependWrapperStrategyMapper) {
this.dependWrapperStrategyMapper = dependWrapperStrategyMapper;
}
public Builder<W, C> depend(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
depend(wrapper);
}
return this;
public DependMustStrategyMapper getDependMustStrategyMapper() {
return dependMustStrategyMapper;
}
public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper) {
return depend(wrapper, true);
public void setDependMustStrategyMapper(DependMustStrategyMapper dependMustStrategyMapper) {
this.dependMustStrategyMapper = dependMustStrategyMapper;
}
public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper, boolean isMust) {
if (wrapper == null) {
return this;
}
DependWrapper dependWrapper = new DependWrapper(wrapper, isMust);
if (dependWrappers == null) {
dependWrappers = new ArrayList<>();
}
dependWrappers.add(dependWrapper);
return this;
public DependenceStrategy getDependenceStrategy() {
return dependenceStrategy;
}
public Builder<W, C> next(WorkerWrapper<?, ?> wrapper) {
return next(wrapper, true);
public void setDependenceStrategy(DependenceStrategy dependenceStrategy) {
this.dependenceStrategy = dependenceStrategy;
}
public Builder<W, C> next(WorkerWrapper<?, ?> wrapper, boolean selfIsMust) {
if (nextWrappers == null) {
nextWrappers = new ArrayList<>();
}
nextWrappers.add(wrapper);
// ========== 跳过策略 ==========
//强依赖自己
if (selfIsMust) {
if (selfIsMustSet == null) {
selfIsMustSet = new HashSet<>();
}
selfIsMustSet.add(wrapper);
}
return this;
private SkipStrategy skipStrategy;
@Override
public boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
return skipStrategy != null && skipStrategy.shouldSkip(nextWrappers, thisWrapper, fromWrapper);
}
public Builder<W, C> next(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
next(wrapper);
}
return this;
public SkipStrategy getSkipStrategy() {
return skipStrategy;
}
public WorkerWrapper<W, C> build() {
WorkerWrapper<W, C> wrapper = new WorkerWrapper<>(id, worker, param, callback);
wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult);
if (dependWrappers != null) {
for (DependWrapper workerWrapper : dependWrappers) {
workerWrapper.getDependWrapper().addNext(wrapper);
wrapper.addDepend(workerWrapper);
}
}
if (nextWrappers != null) {
for (WorkerWrapper<?, ?> workerWrapper : nextWrappers) {
boolean must = false;
if (selfIsMustSet != null && selfIsMustSet.contains(workerWrapper)) {
must = true;
}
workerWrapper.addDepend(wrapper, must);
wrapper.addNext(workerWrapper);
}
}
return wrapper;
public void setSkipStrategy(SkipStrategy skipStrategy) {
this.skipStrategy = skipStrategy;
}
// ========== toString ==========
@Override
public String toString() {
return "WrapperStrategy{" +
"dependWrapperStrategyMapper=" + dependWrapperStrategyMapper +
", dependMustStrategyMapper=" + dependMustStrategyMapper +
", dependenceStrategy=" + dependenceStrategy +
", skipStrategy=" + skipStrategy +
'}';
}
}
}

View File

@ -0,0 +1,237 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.actionstrategy.DependWrapperActionStrategy;
import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy;
import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
import java.util.Collection;
/**
* 作为优化编排依赖策略后新增的Builder接口
* <p/>
* 该接口中不再开放很多过时的api
*
* @author create by TcSnZh on 2021/5/4-下午1:26
*/
public interface WorkerWrapperBuilder<T, V> {
/**
* 设置唯一id
* 如果不设置{@link StableWorkerWrapperBuilder}会使用UUID
*/
WorkerWrapperBuilder<T, V> id(String id);
/**
* 设置{@link IWorker}执行方法
*
* @param worker 传入接口实现类/lambda
*/
WorkerWrapperBuilder<T, V> worker(IWorker<T, V> worker);
/**
* wrapper启动后的传入参数
*
* @param t 参数
*/
WorkerWrapperBuilder<T, V> param(T t);
/**
* 设置{@link ICallback}回调方法
*/
WorkerWrapperBuilder<T, V> callback(ICallback<T, V> callback);
/**
* 设置跳过策略通常用于检查下游Wrapper是否已经完成
* <p/>
* 允许不设置{@link StableWorkerWrapperBuilder}将会默认设置为检查深度为1的下游Wrapper是否执行完成
*
* @param strategy 跳过策略函数
*/
WorkerWrapperBuilder<T, V> setSkipStrategy(SkipStrategy strategy);
/**
* 设置上游Wrapper依赖关系的选项
*/
SetDepend<T, V> setDepend();
interface SetDepend<T, V> {
/**
* 设置在本Wrapper之前的上游Wrapper
*
* @param wrapper 允许传入null
*/
SetDepend<T, V> wrapper(WorkerWrapper<?, ?> wrapper);
default SetDepend<T, V> wrapper(WorkerWrapper... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
wrapper(wrapper);
}
return this;
}
default SetDepend<T, V> wrapper(Collection<? extends WorkerWrapper> wrappers) {
if (wrappers == null) {
return this;
}
wrappers.forEach(this::wrapper);
return this;
}
/**
* 设置必须要执行成功的Wrapper当所有被该方法设为的上游Wrapper执行成功时本Wrapper才能执行
*/
SetDepend<T, V> mustRequireWrapper(WorkerWrapper<?, ?> wrapper);
default SetDepend<T, V> mustRequireWrapper(WorkerWrapper... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
mustRequireWrapper(wrapper);
}
return this;
}
/**
* 一个用于动态判断是否must的方法与旧的{@code .depend(WorkerWrapper,boolean)}效果相同
*
* @param must 如果为true则等同于{@link #mustRequireWrapper(WorkerWrapper)}否则等同于{@link #wrapper(WorkerWrapper)}
*/
default SetDepend<T, V> requireWrapper(WorkerWrapper<?, ?> wrapper, boolean must) {
return must ? mustRequireWrapper(wrapper) : wrapper(wrapper);
}
/**
* 对单个Wrapper设置特殊策略
*
* @param wrapper 需要设置特殊策略的Wrapper
* @param strategy 特殊策略
*/
SetDepend<T, V> specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper);
default SetDepend<T, V> specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper... wrappers) {
if (strategy == null || wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> workerWrapper : wrappers) {
specialDependWrapper(strategy, workerWrapper);
}
return this;
}
/**
* 设置基本策略并返回
* <p>
* 如果从未调用该方法则在{@link #build()}时使用{@link #defaultStrategy()}作为默认策略
* </p>
*
* @param dependenceStrategy 根据上游Wrapper判断本Wrapper是否启动的最终策略
*/
SetDepend<T, V> strategy(DependenceStrategy dependenceStrategy);
/**
* 默认策略为{@link DependenceStrategy#ALL_DEPENDENCIES_ALL_SUCCESS}
*/
default SetDepend<T, V> defaultStrategy() {
return strategy(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS);
}
/**
* 结束依赖关系设置返回到所属的{@link WorkerWrapperBuilder}
*/
WorkerWrapperBuilder<T, V> end();
}
/**
* 便捷式设置依赖的上游Wrapper
*
* @param wrappers 上游Wrapper
*/
default WorkerWrapperBuilder<T, V> depends(WorkerWrapper... wrappers) {
return setDepend().wrapper(wrappers).end();
}
default WorkerWrapperBuilder<T, V> depends(Collection<WorkerWrapper> wrappers) {
return setDepend().wrapper(wrappers).end();
}
default WorkerWrapperBuilder<T, V> depends(DependenceStrategy strategy, WorkerWrapper... wrappers) {
return setDepend().wrapper(wrappers).strategy(strategy).end();
}
default WorkerWrapperBuilder<T, V> depends(DependenceStrategy strategy, Collection<WorkerWrapper> wrappers) {
return setDepend().wrapper(wrappers).strategy(strategy).end();
}
/**
* 设置下游Wrapper依赖关系的选项
*/
SetNext<T, V> setNext();
interface SetNext<T, V> {
/**
* 设置在本Wrapper之后的下游Wrapper
*/
SetNext<T, V> wrapper(WorkerWrapper<?, ?> wrapper);
default SetNext<T, V> wrapper(WorkerWrapper... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
wrapper(wrapper);
}
return this;
}
default SetNext<T, V> wrapper(Collection<? extends WorkerWrapper> wrappers) {
if (wrappers == null) {
return this;
}
wrappers.forEach(this::wrapper);
return this;
}
/**
* 调用该方法将会让传入的此下游workerWrappers对本Wrapper强依赖(must)
*
* @param wrapper 下游Wrapper
*/
SetNext<T, V> mustToNextWrapper(WorkerWrapper<?, ?> wrapper);
default SetNext<T, V> requireToNextWrapper(WorkerWrapper<?, ?> wrapper, boolean must) {
return must ? mustToNextWrapper(wrapper) : wrapper(wrapper);
}
/**
* 调用该方法将会让传入的此下游workerWrappers对本Wrapper进行特殊策略判断
*
* @param strategy 对本Wrapper的特殊策略
* @param wrapper 依赖本Wrapper的下游Wrapper
* @return 返回Builder自身
*/
SetNext<T, V> specialToNextWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper);
WorkerWrapperBuilder<T, V> end();
}
/**
* 便捷式设置本Wrapper被依赖的下游Wrapper
*
* @param wrappers 下游Wrapper
*/
default WorkerWrapperBuilder<T, V> nextOf(WorkerWrapper... wrappers) {
return setNext().wrapper(wrappers).end();
}
default WorkerWrapperBuilder<T, V> nextOf(Collection<WorkerWrapper> wrappers) {
return setNext().wrapper(wrappers).end();
}
WorkerWrapper<T, V> build();
}

View File

@ -0,0 +1,99 @@
package com.jd.platform.async.wrapper.actionstrategy;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 这是一个向历史妥协的策略器以兼容must开关模式
*
* @author create by TcSnZh on 2021/5/4-下午1:24
*/
public class DependMustStrategyMapper implements DependenceStrategy {
private final Set<WorkerWrapper<?, ?>> mustDependSet = new LinkedHashSet<>();
/**
* {@link #mustDependSet} 中的must依赖
* <p>
* 如果{@code mustDependSet == null || mustDependSet.size() < 1}返回{@link DependenceAction#JUDGE_BY_AFTER}
* <p>
* 如果所有的Wrapper已经完成本Wrapper将会开始工作
* <p>
* 如果任一{@link #mustDependSet}中的Wrapper失败则返回{@link DependenceAction#FAST_FAIL}
* 具体超时/异常则根据{@link com.jd.platform.async.worker.ResultState}的值进行判断
* <p>
* 如果存在Wrapper未完成 所有的Wrapper都未失败则返回{@link DependenceAction#JUDGE_BY_AFTER}
* </p>
*/
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
if (mustDependSet.size() < 1) {
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
}
boolean allSuccess = true;
for (WorkerWrapper<?, ?> wrapper : mustDependSet) {
switch (wrapper.getWorkResult().getResultState()) {
case TIMEOUT:
return DependenceAction.FAST_FAIL.fastFailException(ResultState.TIMEOUT, null);
case EXCEPTION:
return DependenceAction.FAST_FAIL.fastFailException(ResultState.EXCEPTION, wrapper.getWorkResult().getEx());
case DEFAULT:
allSuccess = false;
case SUCCESS:
default:
}
}
if (allSuccess) {
return DependenceAction.START_WORK.emptyProperty();
}
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
}
/**
* 新增must依赖
*
* @param mustDependWrapper WorkerWrapper
* @return 返回自身
*/
public DependMustStrategyMapper addDependMust(WorkerWrapper<?, ?> mustDependWrapper) {
if (mustDependWrapper == null) {
return this;
}
mustDependSet.add(mustDependWrapper);
return this;
}
public DependMustStrategyMapper addDependMust(Collection<WorkerWrapper<?, ?>> wrappers) {
if (wrappers == null) {
return this;
}
mustDependSet.addAll(wrappers);
return this;
}
public DependMustStrategyMapper addDependMust(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return this;
}
return addDependMust(Arrays.asList(wrappers));
}
public Set<WorkerWrapper<?, ?>> getMustDependSet() {
return mustDependSet;
}
@Override
public String toString() {
return "DependMustStrategyMapper{" +
"mustDependSet::getId=" + mustDependSet.stream().map(WorkerWrapper::getId).collect(Collectors.toList()) +
'}';
}
}

View File

@ -0,0 +1,74 @@
package com.jd.platform.async.wrapper.actionstrategy;
import com.jd.platform.async.wrapper.WorkerWrapper;
/**
* 单参数策略
*
* @author create by TcSnZh on 2021/5/1-下午11:16
*/
@FunctionalInterface
public interface DependWrapperActionStrategy {
/**
* 仅使用一个参数的判断方法
*
* @param fromWrapper 调用本Wrapper的上游Wrapper
* @return 返回 {@link DependenceAction.WithProperty}
*/
DependenceAction.WithProperty judge(WorkerWrapper<?, ?> fromWrapper);
// ========== 送几个供链式调用的默认值 ==========
/**
* 成功时交给下一个策略器判断
* 未运行时休息
* 失败时失败
*/
DependWrapperActionStrategy SUCCESS_CONTINUE = new DependWrapperActionStrategy() {
@Override
public DependenceAction.WithProperty judge(WorkerWrapper<?, ?> ww) {
switch (ww.getWorkResult().getResultState()) {
case SUCCESS:
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
case DEFAULT:
return DependenceAction.TAKE_REST.emptyProperty();
case EXCEPTION:
case TIMEOUT:
return DependenceAction.FAST_FAIL.fastFailException(ww.getWorkResult().getResultState(), ww.getWorkResult().getEx());
default:
}
throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + ww.getWorkResult().getResultState());
}
@Override
public String toString() {
return "SUCCESS_CONTINUE";
}
};
/**
* 成功时开始工作
* 未运行时交给下一个策略器判断
* 失败时失败
*/
DependWrapperActionStrategy SUCCESS_START_INIT_CONTINUE = new DependWrapperActionStrategy() {
@Override
public DependenceAction.WithProperty judge(WorkerWrapper<?, ?> ww) {
switch (ww.getWorkResult().getResultState()) {
case SUCCESS:
return DependenceAction.START_WORK.emptyProperty();
case DEFAULT:
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
case EXCEPTION:
case TIMEOUT:
return DependenceAction.FAST_FAIL.fastFailException(ww.getWorkResult().getResultState(), ww.getWorkResult().getEx());
default:
}
throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + ww.getWorkResult().getResultState());
}
@Override
public String toString() {
return "SUCCESS_START_INIT_CONTINUE";
}
};
}

View File

@ -0,0 +1,69 @@
package com.jd.platform.async.wrapper.actionstrategy;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 对不同的{@link WorkerWrapper}调用者实行个性化依赖响应策略
* <p/>
* 使用{@link DependWrapperStrategyMapper}本实现类对{@link DependenceStrategy}进行增强
*
* @author create by TcSnZh on 2021/5/1-下午11:12
*/
public class DependWrapperStrategyMapper implements DependenceStrategy {
private final Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> mapper = new ConcurrentHashMap<>(4);
/**
* 设置对应策略
*
* @param targetWrapper 要设置策略的WorkerWrapper
* @param strategy 要设置的策略
* @return 返回this链式调用
*/
public DependWrapperStrategyMapper putMapping(WorkerWrapper<?, ?> targetWrapper, DependWrapperActionStrategy strategy) {
mapper.put(targetWrapper, strategy);
toStringCache = null;
return this;
}
/**
* 判断方法
* <p/>
* 如果fromWrapper在{@link #mapper}则返回{@link DependWrapperActionStrategy}的判断返回值否则返回{@link DependenceAction#JUDGE_BY_AFTER}
*
* @param dependWrappers 这里不会使用该值thisWrapper.dependWrappers的属性值
* @param thisWrapper 这里不会使用该值thisWrapper即为被催促的WorkerWrapper
* @param fromWrapper 调用来源Wrapper
* @return 如果在mapper中有对fromWrapper的处理策略则使用其进行判断否则返回JUDGE_BY_AFTER交给下一个进行判断
*/
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
DependWrapperActionStrategy strategy = mapper.get(fromWrapper);
if (strategy == null) {
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
}
return strategy.judge(fromWrapper);
}
/**
* 缓存toString
*/
private String toStringCache;
@Override
public String toString() {
if (toStringCache == null) {
toStringCache = "DependWrapperStrategyMapper{mapper=" + mapper.entrySet().stream()
.map(entry -> "{" + entry.getKey().getId() + ":" + entry.getValue() + "}")
.collect(Collectors.toList())
+ "}";
}
return toStringCache;
}
}

View File

@ -0,0 +1,99 @@
package com.jd.platform.async.wrapper.actionstrategy;
import com.jd.platform.async.worker.ResultState;
/**
* 返回执行工作类型的枚举
*
* @author create by TcSnZh on 2021/5/1-下午10:47
*/
public enum DependenceAction {
/**
* 开始工作WorkerWrapper会执行工作方法
*/
START_WORK,
/**
* 还没轮到休息一下WorkerWrapper中的调用栈会返回以等待可能发生的下次调用
*/
TAKE_REST,
/**
* 立即失败WorkerWrapper会去执行快速失败的方法
*/
FAST_FAIL,
/**
* 交给下层{@link DependenceStrategy}进行判断
* 在WorkerWrapper中不需要考虑此值因为配置正常的情况下不会返回这个值
*/
JUDGE_BY_AFTER;
// 空值单例
public WithProperty emptyProperty() {
return empty;
}
private final WithProperty empty = new WithProperty() {
@Override
public void setResultState(ResultState resultState) {
throw new UnsupportedOperationException("empty not support modify");
}
@Override
public void setFastFailException(Exception fastFailException) {
throw new UnsupportedOperationException("empty not support modify");
}
private final String toString = getDependenceAction() + ".empty";
@Override
public String toString() {
return toString;
}
};
// 携带异常信息ResultState的返回值
public WithProperty fastFailException(ResultState resultState, Exception e) {
WithProperty withProperty = this.new WithProperty();
withProperty.setResultState(resultState);
withProperty.setFastFailException(e);
return withProperty;
}
/**
* 有时需要封装一些参数来返回则使用本内部类进行返回
* <p/>
* 所有的构造方法权限均为private请在父枚举类{@link DependenceAction}的方法中选择合适的模板生成内部类WithProperty
*/
public class WithProperty {
private ResultState resultState;
private Exception fastFailException;
// getter setter
public ResultState getResultState() {
return resultState;
}
public void setResultState(ResultState resultState) {
this.resultState = resultState;
}
public Exception getFastFailException() {
return fastFailException;
}
public void setFastFailException(Exception fastFailException) {
this.fastFailException = fastFailException;
}
public DependenceAction getDependenceAction() {
return DependenceAction.this;
}
// constructor always private.
private WithProperty() {
}
}
}

View File

@ -0,0 +1,235 @@
package com.jd.platform.async.wrapper.actionstrategy;
import com.jd.platform.async.executor.WrapperEndingInspector;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
/**
* 依赖策略接口
* <p/>
* 提供了多个默认值可以作为单例模式使用
* <p>
* 工作原理示例
* <p>
* ==== 一个简单示例 ====
* 现有三个WorkerWrapperABC其中 {@code A{dependWrappers=[B,C],} }
* 当B执行完成后调用A时根据依赖关系ALL_DEPENDENCIES_ALL_SUCCESS还需等待C的结果
* 然后当C执行完成后调用A时根据依赖关系ALL_DEPENDENCIES_ALL_SUCCESS 此时如果C成功了A就开工此时如果C失败了A就失败
* ==== 简单示例2 ====
*
* </p>
*
* @author create by TcSnZh on 2021/5/1-下午10:48
*/
@FunctionalInterface
public interface DependenceStrategy {
/**
* 核心判断策略
*
* @param dependWrappers thisWrapper.dependWrappers的属性值
* @param thisWrapper thisWrapper即为被催促的WorkerWrapper
* @param fromWrapper 调用来源Wrapper
* <p>
* 该参数不会为null
* 因为在{@link WorkerWrapper#work(ExecutorService, long, Map, WrapperEndingInspector)}方法中传入的的第一批无依赖的Wrapper
* 不会被该策略器所判断而是不论如何直接执行
* </p>
* @return 返回枚举值内部类WorkerWrapper将会根据其值来决定自己如何响应这次调用 {@link DependenceAction.WithProperty}
*/
DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper);
/**
* 如果本策略器的judge方法返回了JUDGE_BY_AFTER则交给下一个策略器来判断
*
* @param after 下层策略器
* @return 返回一个封装的多层策略器
*/
default DependenceStrategy thenJudge(DependenceStrategy after) {
DependenceStrategy that = this;
return new DependenceStrategy() {
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
DependenceAction.WithProperty judge = that.judgeAction(dependWrappers, thisWrapper, fromWrapper);
if (judge.getDependenceAction() == DependenceAction.JUDGE_BY_AFTER) {
return after.judgeAction(dependWrappers, thisWrapper, fromWrapper);
}
return judge;
}
@Override
public String toString() {
return that + " ----> " + after;
}
};
}
// ========== 以下是一些默认实现 ==========
/**
* 被依赖的所有Wrapper都必须成功才能开始工作
* 如果其中任一Wrapper还没有执行且不存在失败则休息
* 如果其中任一Wrapper失败则立即失败
*/
DependenceStrategy ALL_DEPENDENCIES_ALL_SUCCESS = new DependenceStrategy() {
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
boolean hasWaiting = false;
for (final WorkerWrapper<?, ?> dependWrapper : dependWrappers) {
WorkResult<?> workResult = dependWrapper.getWorkResult();
switch (workResult.getResultState()) {
case DEFAULT:
hasWaiting = true;
break;
case SUCCESS:
break;
case TIMEOUT:
case EXCEPTION:
return DependenceAction.FAST_FAIL.fastFailException(workResult.getResultState(), workResult.getEx());
default:
throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + workResult.getResultState());
}
}
if (hasWaiting) {
return DependenceAction.TAKE_REST.emptyProperty();
}
return DependenceAction.START_WORK.emptyProperty();
}
@Override
public String toString() {
return "ALL_DEPENDENCIES_ALL_SUCCESS";
}
};
/**
* 被依赖的Wrapper中任意一个成功了就可以开始工作
* 如果其中所有Wrapper还没有执行则休息
* 如果其中一个Wrapper失败且不存在成功则立即失败
*/
DependenceStrategy ALL_DEPENDENCIES_ANY_SUCCESS = new DependenceStrategy() {
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
boolean hasFailed = false;
Exception fastFailException = null;
ResultState resultState = null;
for (final WorkerWrapper<?, ?> dependWrapper : dependWrappers) {
WorkResult<?> workResult = dependWrapper.getWorkResult();
switch (workResult.getResultState()) {
case DEFAULT:
break;
case SUCCESS:
return DependenceAction.START_WORK.emptyProperty();
case TIMEOUT:
case EXCEPTION:
resultState = !hasFailed ? workResult.getResultState() : resultState;
fastFailException = !hasFailed ? workResult.getEx() : fastFailException;
hasFailed = true;
break;
default:
throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + workResult.getResultState());
}
}
if (hasFailed) {
return DependenceAction.FAST_FAIL.fastFailException(resultState, fastFailException);
}
return DependenceAction.TAKE_REST.emptyProperty();
}
@Override
public String toString() {
return "ALL_DEPENDENCIES_ANY_SUCCESS";
}
};
/**
* 如果被依赖的工作中任一失败则立即失败否则就开始工作不论之前的工作有没有开始
*/
DependenceStrategy ALL_DEPENDENCIES_NONE_FAILED = new DependenceStrategy() {
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
for (WorkerWrapper<?, ?> dependWrapper : dependWrappers) {
WorkResult<?> workResult = dependWrapper.getWorkResult();
switch (workResult.getResultState()) {
case TIMEOUT:
case EXCEPTION:
return DependenceAction.FAST_FAIL.fastFailException(workResult.getResultState(), workResult.getEx());
default:
}
}
return DependenceAction.START_WORK.emptyProperty();
}
@Override
public String toString() {
return "ALL_DEPENDENCIES_NONE_FAILED";
}
};
/**
* 只有当指定的这些Wrapper都成功时才会开始工作
* 任一失败会快速失败
* 任一还没有执行且不存在失败则休息
*
* @param theseWrapper 该方法唯一有效参数
* @return 返回生成的 {@link DependenceAction.WithProperty)
*/
static DependenceStrategy theseWrapperAllSuccess(Set<WorkerWrapper<?,?>> theseWrapper) {
return new DependenceStrategy() {
private final Set<WorkerWrapper<?, ?>> theseWrappers;
private final String toString;
{
theseWrappers = Collections.unmodifiableSet(theseWrapper);
toString = "THESE_WRAPPER_MUST_SUCCESS:" + theseWrappers.stream().map(WorkerWrapper::getId).collect(Collectors.toList());
}
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
boolean hasWaiting = false;
for (WorkerWrapper<?, ?> wrapper : theseWrappers) {
ResultState resultState = wrapper.getWorkResult().getResultState();
switch (resultState) {
case DEFAULT:
hasWaiting = true;
break;
case SUCCESS:
break;
case TIMEOUT:
case EXCEPTION:
return DependenceAction.FAST_FAIL.fastFailException(resultState, wrapper.getWorkResult().getEx());
default:
throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + resultState);
}
}
if (hasWaiting) {
return DependenceAction.TAKE_REST.emptyProperty();
}
return DependenceAction.START_WORK.emptyProperty();
}
@Override
public String toString() {
return toString;
}
};
}
}

View File

@ -0,0 +1,183 @@
package com.jd.platform.async.wrapper.skipstrategy;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author create by TcSnZh on 2021/5/6-下午3:02
*/
@FunctionalInterface
public interface SkipStrategy {
/**
* 跳过策略函数返回true将会使WorkerWrapper跳过执行
*
* @param nextWrappers 下游WrapperSet
* @param thisWrapper 本WorkerWrapper
* @param fromWrapper 呼叫本Wrapper的上游Wrapper
* @return 返回true将会使WorkerWrapper跳过执行
*/
boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper);
/**
* 不跳过
*/
SkipStrategy NOT_SKIP = new SkipStrategy() {
@Override
public boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
return false;
}
@Override
public String toString() {
return "NOT_SKIP";
}
};
SkipStrategy CHECK_ONE_LEVEL = new SkipStrategy() {
private final SkipStrategy searchNextOneLevel = searchNextWrappers(SearchNextWrappers.SearchType.DFS, 1);
@Override
public boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
return searchNextOneLevel.shouldSkip(nextWrappers, thisWrapper, fromWrapper);
}
@Override
public String toString() {
return "CHECK_ONE_LEVEL";
}
};
default SearchNextWrappers searchNextWrappers(SearchNextWrappers.SearchType searchType, int searchLevel) {
return new SearchNextWrappers(searchType, searchLevel);
}
/**
* 检查之后的Wrapper是否不在INIT状态
*/
class SearchNextWrappers implements SkipStrategy {
/**
* 搜索策略
*/
enum SearchType {
DFS, BFS;
}
private final SearchType searchType;
/**
* 搜索深度
*/
private final int searchLevel;
public SearchNextWrappers(SearchType searchType, int searchLevel) {
this.searchType = Objects.requireNonNull(searchType);
this.searchLevel = searchLevel;
}
@Override
public boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
Set<WorkerWrapper<?, ?>> nextSet;
if ((nextSet = nextWrappers) == null || nextSet.isEmpty()) {
return false;
}
switch (searchType) {
case DFS:
return nextSet.stream().allMatch(next ->
next.getState() != WorkerWrapper.INIT || dfsSearchShouldSkip(next, 1));
case BFS:
LinkedList<BfsNode> queue = nextSet.stream().map(ww -> new BfsNode(ww, 0)).collect(Collectors.toCollection(LinkedList::new));
HashSet<WorkerWrapper<?, ?>> existed = new HashSet<>(nextSet);
while (!queue.isEmpty()) {
BfsNode node = queue.poll();
if (node.atLevel > searchLevel) {
continue;
}
if (node.wrapper.getState() != WorkerWrapper.INIT) {
return true;
}
if (node.atLevel < searchLevel) {
// 如果不是深度的最大值则往队列里添加
node.wrapper.getNextWrappers().forEach(nextWrapper -> {
if (existed.contains(nextWrapper)) {
return;
}
queue.offer(new BfsNode(nextWrapper, node.atLevel + 1));
existed.add(nextWrapper);
});
}
}
return false;
default:
throw new IllegalStateException("searchType type illegal : " + searchType);
}
}
private boolean dfsSearchShouldSkip(WorkerWrapper<?, ?> currentWrapper, int currentLevel) {
if (currentLevel + 1 > searchLevel || currentWrapper == null) {
return false;
}
for (WorkerWrapper<?, ?> nextWrapper : currentWrapper.getNextWrappers()) {
if (nextWrapper != null &&
(nextWrapper.getState() != WorkerWrapper.INIT
|| dfsSearchShouldSkip(nextWrapper, currentLevel + 1))) {
return true;
}
}
return false;
}
static class BfsNode {
final WorkerWrapper<?, ?> wrapper;
final int atLevel;
public BfsNode(WorkerWrapper<?, ?> wrapper, int atLevel) {
this.wrapper = wrapper;
this.atLevel = atLevel;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BfsNode bfsNode = (BfsNode) o;
return Objects.equals(wrapper, bfsNode.wrapper);
}
@Override
public int hashCode() {
return wrapper.hashCode();
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SearchNextWrappers that = (SearchNextWrappers) o;
return searchLevel == that.searchLevel && searchType == that.searchType;
}
@Override
public int hashCode() {
return searchLevel ^ searchType.ordinal();
}
@Override
public String toString() {
return "CheckNextWrapper{" +
"searchType=" + searchType +
", searchLevel=" + searchLevel +
'}';
}
}
}

View File

@ -1,4 +1,4 @@
package dependnew;
package beforev14.depend;
import com.jd.platform.async.callback.ICallback;
@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker implements IWorker<String, User>, ICallback<String, User> {
class DeWorker implements IWorker<String, User>, ICallback<String, User> {
@Override
public User action(String object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package depend;
package beforev14.depend;
import com.jd.platform.async.callback.ICallback;
@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker1 implements IWorker<WorkResult<User>, User>, ICallback<WorkResult<User>, User> {
class DeWorker1 implements IWorker<WorkResult<User>, User>, ICallback<WorkResult<User>, User> {
@Override
public User action(WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package depend;
package beforev14.depend;
import com.jd.platform.async.callback.ICallback;
@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker2 implements IWorker<WorkResult<User>, String>, ICallback<WorkResult<User>, String> {
class DeWorker2 implements IWorker<WorkResult<User>, String>, ICallback<WorkResult<User>, String> {
@Override
public String action(WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package depend;
package beforev14.depend;
import java.util.Map;
@ -10,7 +10,7 @@ import com.jd.platform.async.wrapper.WorkerWrapper;
* @author sjsdfg
* @since 2020/6/14
*/
public class LambdaTest {
class LambdaTest {
public static void main(String[] args) throws Exception {
WorkerWrapper<WorkResult<User>, String> workerWrapper2 = new WorkerWrapper.Builder<WorkResult<User>, String>()
.worker((WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) -> {

View File

@ -1,4 +1,4 @@
package depend;
package beforev14.depend;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.worker.WorkResult;
@ -12,7 +12,7 @@ import java.util.concurrent.ExecutionException;
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
public class Test {
class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeWorker w = new DeWorker();

View File

@ -1,11 +1,11 @@
package depend;
package beforev14.depend;
/**
* 一个包装类
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
public class User {
class User {
private String name;
public User(String name) {

View File

@ -1,4 +1,4 @@
package depend;
package beforev14.dependnew;
import com.jd.platform.async.callback.ICallback;
@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker implements IWorker<String, User>, ICallback<String, User> {
class DeWorker implements IWorker<String, User>, ICallback<String, User> {
@Override
public User action(String object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package dependnew;
package beforev14.dependnew;
import com.jd.platform.async.callback.ICallback;
@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker1 implements IWorker<String, User>, ICallback<String, User> {
class DeWorker1 implements IWorker<String, User>, ICallback<String, User> {
@Override
public User action(String object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package dependnew;
package beforev14.dependnew;
import com.jd.platform.async.callback.ICallback;
@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker2 implements IWorker<User, String>, ICallback<User, String> {
class DeWorker2 implements IWorker<User, String>, ICallback<User, String> {
@Override
public String action(User object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package dependnew;
package beforev14.dependnew;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.wrapper.WorkerWrapper;
@ -11,7 +11,7 @@ import java.util.concurrent.ExecutionException;
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
public class Test {
class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeWorker w = new DeWorker();

View File

@ -1,11 +1,11 @@
package dependnew;
package beforev14.dependnew;
/**
* 一个包装类
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
public class User {
class User {
private String name;
public User(String name) {

View File

@ -1,4 +1,4 @@
package parallel;
package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
class ParTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package parallel;
package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker implements IWorker<String, String>, ICallback<String, String> {
class ParWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package parallel;
package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker1 implements IWorker<String, String>, ICallback<String, String> {
class ParWorker1 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {

View File

@ -1,4 +1,4 @@
package parallel;
package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker2 implements IWorker<String, String>, ICallback<String, String> {
class ParWorker2 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {

View File

@ -1,4 +1,4 @@
package parallel;
package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker3 implements IWorker<String, String>, ICallback<String, String> {
class ParWorker3 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {

View File

@ -1,4 +1,4 @@
package parallel;
package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker4 implements IWorker<String, String>, ICallback<String, String> {
class ParWorker4 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package parallel;
package beforev14.parallel;
import com.jd.platform.async.executor.Async;
@ -14,7 +14,7 @@ import java.util.concurrent.Executors;
* @author wuweifeng wrote on 2019-11-20.
*/
@SuppressWarnings("ALL")
public class TestPar {
class TestPar {
public static void main(String[] args) throws Exception {
// testNormal();

View File

@ -1,4 +1,4 @@
package seq;
package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
class SeqTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package seq;
package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqWorker implements IWorker<String, String>, ICallback<String, String> {
class SeqWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package seq;
package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqWorker1 implements IWorker<String, String>, ICallback<String, String> {
class SeqWorker1 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package seq;
package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqWorker2 implements IWorker<String, String>, ICallback<String, String> {
class SeqWorker2 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {

View File

@ -1,4 +1,4 @@
package seq;
package beforev14.seq;
import com.jd.platform.async.executor.Async;
@ -11,7 +11,7 @@ import java.util.concurrent.ExecutionException;
* 串行测试
* @author wuweifeng wrote on 2019-11-20.
*/
public class TestSequential {
class TestSequential {
public static void main(String[] args) throws InterruptedException, ExecutionException {

View File

@ -1,4 +1,4 @@
package seq;
package beforev14.seq;
import com.jd.platform.async.executor.Async;
@ -12,7 +12,7 @@ import java.util.concurrent.ExecutionException;
* @author wuweifeng wrote on 2019-11-20.
*/
@SuppressWarnings("Duplicates")
public class TestSequentialTimeout {
class TestSequentialTimeout {
public static void main(String[] args) throws InterruptedException, ExecutionException {
testFirstTimeout();
}

View File

@ -0,0 +1,227 @@
package v15.dependnew;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.wrapper.actionstrategy.DependenceAction;
import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperBuilder;
import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
/**
* @author create by TcSnZh on 2021/5/2-下午9:25
*/
class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// ExecutorService pool = Executors.newFixedThreadPool(3);
ExecutorService pool = Async.getCommonPool();
try {
testNew2(pool);
System.out.println("\n\n\n");
testNew1(pool);
System.out.println("\n\n\n");
testNew2(pool);
System.out.println("\n\n\n");
testThreadPolling_Speed(pool);
System.out.println("\n\n\n");
testThreadPolling_V14Bug();
} finally {
//Async.shutDownCommonPool();
pool.shutdown();
}
}
/**
* 简简单单的测试一下新的编排方式
* <p>
* .A ===> B1 ===> C1 ----> D1
* . ||> B2 | || \--> D2
* . ||> B3 | ``========v
* . ||> B4 |---> C2 ====> E1
* . \--> E2
*/
private static void testNew1(ExecutorService pool) throws ExecutionException, InterruptedException {
WorkerWrapper<Object, Object> a = builder("A")
.build();
WorkerWrapper<Object, Object> b1 = builder("B1").depends(a).build();
WorkerWrapper<Object, Object> b2 = builder("B2").depends(a).build();
WorkerWrapper<Object, Object> b3 = builder("B3").depends(a).build();
WorkerWrapper<Object, Object> b4 = builder("B4").depends(a).build();
WorkerWrapper<Object, Object> c1 = builder("C1")
.depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, b1, b2, b3, b4)
.nextOf(builder("D1").build(),
builder("D2").build())
.build();
WorkerWrapper<Object, Object> c2 = builder("C2")
.depends(DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS, b1, b2, b3, b4)
.nextOf(builder("E1").depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, c1).build(),
builder("E2").build())
.build();
Async.beginWork(2000, pool, a);
logAll();
}
/**
* 测试船新的编排方式的花里胡哨的玩法
* A => {B1 ~ B10} >>> C
* <p>
* C仅需要b1-b10中任意3个Worker工作完成即可启动
* 不过C不一定一定在3个完成后启动具体还要看线程池属性与线程抢占的顺序线程池线程数小一点的话更容易让C早日执行
* </p>
*/
private static void testNew2(ExecutorService pool) throws ExecutionException, InterruptedException {
WorkerWrapper<Object, Object> a = builder("A").build();
ArrayList<WorkerWrapper> bList = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
bList.add(builder("B" + i).depends(a).build());
}
WorkerWrapper<Object, Object> c = builder("C")
.setDepend().strategy((dependWrappers, thisWrapper, fromWrapper) -> {
if (dependWrappers.stream()
.filter(w -> w.getWorkResult().getResultState() == ResultState.SUCCESS).count() >= 3) {
return DependenceAction.START_WORK.emptyProperty();
} else {
return DependenceAction.TAKE_REST.emptyProperty();
}
}).wrapper(bList).end().build();
Async.beginWork(2000, pool, a);
logAll();
}
/**
* 测试线程轮询的效率
*/
private static void testThreadPolling_Speed(ExecutorService pool) throws ExecutionException, InterruptedException {
int MAX = 1000;
Collection<WorkerWrapper<?, ?>> wrappers = new ArrayList<>(MAX);
AtomicLong a = new AtomicLong(0);
for (int i = 0; i < MAX; i++) {
WorkerWrapperBuilder<Void, Void> builder = WorkerWrapper.<Void, Void>builder()
.id(String.valueOf(i))
// 拷贝数组测试每次在数组最后加一个递增的值+1的数
.worker((object, allWrappers) -> {
for (int j = 0; j < 100000; j++) {
a.incrementAndGet();
}
return null;
})
.setSkipStrategy(SkipStrategy.NOT_SKIP);
wrappers.add(builder.build());
}
long t1 = SystemClock.now();
PrintStream out = Async.beginWork(10000, pool, wrappers) ? System.out : System.err;
out.println("无依赖任务的测试:\n1000个wrapper对AtomicLong分别自增100000次耗时 : " + (SystemClock.now() - t1) + "ms a=" + a.get());
WorkerWrapper.<Void, Integer>builder();
}
/**
* 测试旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况
* <p>
* A(5ms)--B1(10ms) ---|--> C1(5ms)
* . \ | (B1B2全部完成可执行C1C2)
* . ---> B2(20ms) --|--> C2(5ms)
*/
private static void testThreadPolling_V14Bug() throws ExecutionException, InterruptedException {
System.out.println("以下代码可复制到v1.4复现线程耗尽bug : ");
BiFunction<String, Long, IWorker<Void, Void>> sleepWork = (id, time) -> (IWorker<Void, Void>) (object, allWrappers) -> {
try {
System.out.println("wrapper.id=" + id + " before sleep");
Thread.sleep(time);
System.out.println("wrapper.id=" + id + " after sleep");
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
};
WorkerWrapper<Void, Void> a = new WorkerWrapper.Builder<Void, Void>()
.id("A")
.worker(sleepWork.apply("A", 5L))
.build();
WorkerWrapper.Builder<Void, Void> cBuilder = new WorkerWrapper.Builder<Void, Void>()
.depend(new WorkerWrapper.Builder<Void, Void>()
.id("B1")
.worker(sleepWork.apply("B1", 10L))
.depend(a)
.build())
.depend(new WorkerWrapper.Builder<Void, Void>()
.id("B2")
.worker(sleepWork.apply("B2", 10L))
.depend(a)
.build());
cBuilder.id("C1").worker(sleepWork.apply("C1", 5L)).build();
cBuilder.id("C2").worker(sleepWork.apply("C2", 5L)).build();
ExecutorService pool = Executors.newFixedThreadPool(2);
try {
Async.beginWork(100, pool, a);
} finally {
pool.shutdown();
}
System.out.println(a.getNextWrappers());
}
// ========== util method ==========
static final AtomicInteger count = new AtomicInteger(1);
static final AtomicReference<ConcurrentHashMap<Integer, String>> logger = new AtomicReference<>(new ConcurrentHashMap<>());
static WorkerWrapperBuilder<Object, Object> builder(String id) {
return builder(id, -1);
}
static WorkerWrapperBuilder<Object, Object> builder(String id, long sleepTime) {
return WorkerWrapper.builder()
.id(id)
.worker((param, allWrap) -> {
logger.get().put(count.getAndIncrement(), id + " working ");
if (sleepTime >= 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "I am success";
}).callback((success, param, workResult) -> {
String str = " " + id + " callback " + workResult.getResultState();
switch (workResult.getResultState()) {
case SUCCESS:
str += " getResult = " + workResult.getResult();
break;
case TIMEOUT:
case EXCEPTION:
str += " getEx = " + workResult.getEx();
break;
case DEFAULT:
throw new RuntimeException();
}
logger.get().put(count.getAndIncrement(), str);
});
}
static void logAll() {
TreeMap<Integer, String> map = new TreeMap<>(Integer::compare);
map.putAll(logger.get());
StringBuilder sb = new StringBuilder(30);
map.forEach((count, str) -> {
sb.append('(').append(count).append(')');
if (count < 10) {
sb.append(' ');
}
sb.append(" ").append(str).append('\n');
});
System.out.println("--------------------------------\n" + sb);
logger.set(new ConcurrentHashMap<>());
count.set(1);
}
}