Files
asyncTool/QuickStart.md

50 KiB
Raw Blame History

如果只是需要用这个框架请往下看即可。如果需要深入了解这个框架是如何一步一步实现的从接到需求到每一步的思考每个类为什么这么设计为什么有这些方法也就是如何从0到1开发出这个框架作者在csdn开了专栏专门讲中间件如何从0开发包括并不限于这个小框架。京东内部同事可在cf上搜索erp也能看到。

安装教程

代码不多,直接拷贝包过去即可。

旧稳定版本v1.4

京东同事通过引用如下maven来使用。

<dependency>
    <groupId>com.jd.platform</groupId>
    <artifactId>asyncTool</artifactId>
    <version>1.4.1-SNAPSHOT</version>
</dependency>

外网请使用jitpack.io上打的包 先添加repositories节点

<repositories>
    <repository>
        <id>jitpack.io</id>
        <url>https://jitpack.io</url>
    </repository>
</repositories>

然后添加如下maven依赖

<dependency>
    <groupId>com.gitee.jd-platform-opensource</groupId>
    <artifactId>asyncTool</artifactId>
    <version>V1.4-SNAPSHOT</version>
</dependency>

最新版本v1.5(不稳定)

从gitee上下载仓库到本地切换到dev分支然后maven安装到本地仓库。

git clone https://gitee.com/jd-platform-opensource/asyncTool.git
cd ./asyncTool
git checkout dev
mvn install

在项目中引入依赖。

<!-- 任务编排核心包 -->
<dependency>
    <artifactId>asyncTool-core</artifactId>
    <groupId>com.jd.platform</groupId>
    <version>1.5.1-SNAPSHOT</version>
</dependency>

任务编排

asyncTool-core核心模块提供了核心功能——任务编排

以下文档基于版本:

<dependencies>
       <dependency>
           <groupId>com.jd.platform</groupId>
           <artifactId>asyncTool-core</artifactId>
           <version>1.5.1-SNAPSHOT</version>
       </dependency>
</dependencies>

基本组件

IWorker 一个最小的任务执行单元。通常是一个网络调用,或一段耗时操作。

TV两个泛型分别是入参和出参类型。

譬如该耗时操作入参是String执行完毕的结果是Integer那么就可以用泛型来定义。

多个不同的worker之间没有关联分别可以有不同的入参、出参类型。

/**
 * 每个最小执行单元需要实现该接口
 *
 * @author wuweifeng wrote on 2019-11-19.
 */
@FunctionalInterface
public interface IWorker<T, V> {
    /**
     * 在这里做耗时操作如rpc请求、IO等
     *
     * @param object      object
     * @param allWrappers 任务包装
     */
    V action(T object, Map<String, WorkerWrapper> allWrappers);

    /**
     * 超时、异常、跳过时,返回的默认值
     *
     * @return 默认值
     */
    default V defaultValue() {
        return null;
    }
}

ICallback对每个worker的回调。worker执行完毕后会回调该接口带着执行成功、失败、原始入参、和详细的结果。

/**
 * 每个执行单元执行完毕后,会回调该接口</p>
 * 需要监听执行结果的,实现该接口即可
 *
 * @author wuweifeng wrote on 2019-11-19.
 */
@FunctionalInterface
public interface ICallback<T, V> {

    /**
     * 任务开始的监听
     */
    default void begin() {

    }

    /**
     * 耗时操作执行完毕后就给value注入值
     * <p/>
     * 只要Wrapper被调用后成功或失败/超时,该方法都会被执行。
     */
    void result(boolean success, T param, WorkResult<V> workResult);
}

wrapper组合了worker和callback是一个 最小的调度单元 。通过编排wrapper之间的关系达到组合各个worker顺序的目的。

wrapper的泛型和worker的一样决定了入参和结果的类型。

// 创建一个WorkerWrapper
WorkerWrapper<String, String> w0 = WorkerWrapper.<String, String>builder()
                .id("0")
                .param("000")
                .worker((param, allWrappers) -> "hello : " + param)
                .build();

通过这一个类看一下action里就是你的耗时操作begin就是任务开始执行时的回调result就是worker执行完毕后的回调。当你组合了多个执行单元时每一步的执行都在掌控之内。失败了还会有自定义的默认值。这是CompleteableFuture无法做到的。

如何构造WorkerWrapper?

推荐Builder模式

如果刚开始使用这个框架,则推荐使用如下方式进行构造:

WorkerWrapper.<String, String>builder()
    .id()
    // 其他属性略。
    // 请在《简单示例》与《设置WorkerWrapper属性》中慢慢感受详细内容。
    // 因为这里地方小,写不下。
复杂的快速构造

不推荐新手使用。

不推荐在业务中使用使用Builder模式代码更加简洁且会检查参数不必节省这些性能。

该对象的构造方法不会检查属性。

在对WorkerWrapper属性有充足了解后可使用“直接设置属性 + 关系图”的方式快速构造wrapper。

建议在扩展功能的时候使用该构造器,以提高效率。但是请记得检查参数。

以下为示例:

class Case9 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        DirectedGraph<WorkerWrapper<?, ?>, Object> graph = DirectedGraph.synchronizedDigraph(new CommonDirectedGraph<>());
        QuickBuildWorkerWrapper<Object, Object> w1 = new QuickBuildWorkerWrapper<>("id1",
                null,
                (object, allWrappers) -> {
                    System.out.println("I am IWorker 1");
                    return null;
                },
                new DefaultCallback<>(),
                false,
                true,
                100,
                TimeUnit.MILLISECONDS,
                new WrapperStrategy.DefaultWrapperStrategy(),
                graph
        );
        QuickBuildWorkerWrapper<Object, Object> w2 = new QuickBuildWorkerWrapper<>("id2",
                null,
                (object, allWrappers) -> {
                    System.out.println("I am IWorker 2");
                    return null;
                },
                new DefaultCallback<>(),
                false,
                true,
                100,
                TimeUnit.MILLISECONDS,
                new WrapperStrategy.DefaultWrapperStrategy(),
                graph
        );
        graph.addNode(w1, w2);
        graph.putRelation(w1, new Object(), w2);

//        System.out.println(graph);

        Async.work(200, w1).awaitFinish();

        System.out.println("    Begin work end .\n    w1 : " + w1 + "\n    w2 : " + w2 + "\n");
        /* 输出:
        I am IWorker 1
        I am IWorker 2
            Begin work end .
            w1 : 省略
            w2 : 省略
         */
    }
}

简单示例

  1. 3个任务并行

输入图片说明

class Case0 {
    static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    return null;
                });
    }

    public static void main(String[] args) {
        WorkerWrapper<?, ?> a = builder("A").build();
        WorkerWrapper<?, ?> b = builder("B").build();
        WorkerWrapper<?, ?> c = builder("C").build();
        try {
            Async.work(100, a, b, c).awaitFinish();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=B) is working
        wrapper(id=C) is working
         */
    }
}
  1. 1个执行完毕后开启另外两个另外两个执行完毕后开始第4个

输入图片说明

class Case01 {
    static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    return null;
                });
    }

    public static void main(String[] args) {
        WorkerWrapper<?, ?> a = builder("A").build();
        WorkerWrapper<?, ?> b = builder("B").depends(a).build();
        WorkerWrapper<?, ?> c = builder("C").depends(a).build();
        WorkerWrapper<?, ?> f = builder("F").depends(b, c).build();
        try {
            Async.work(100, a).awaitFinish();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=C) is working
        wrapper(id=B) is working
        wrapper(id=F) is working
        */
    }
}

如果觉得.depneds()方法的排序您不喜欢,也可以用.nextOf()这种方式:

class Case02 {
    static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    return null;
                });
    }

    public static void main(String[] args) {
        WorkerWrapper<?, ?> f = builder("F").build();
        WorkerWrapper<?, ?> a = builder("A")
                .nextOf(builder("B").nextOf(f).build())
                .nextOf(builder("C").nextOf(f).build())
                .build();
        try {
            Async.work(100, a).awaitFinish();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=B) is working
        wrapper(id=C) is working
        wrapper(id=F) is working
        */
    }
}
  1. 复杂点的

输入图片说明

class Case1 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
    }

    public static void main(String[] args) {
        WorkerWrapper<?, ?> a = builder("A").build();
        WorkerWrapper<?, ?> d;
        builder("H")
                .depends(
                        builder("F")
                                .depends(builder("B").depends(a).build())
                                .depends(builder("C").depends(a).build())
                                .build(),
                        builder("G")
                                .depends(builder("E")
                                        .depends(d = builder("D").build())
                                        .build())
                                .build()
                )
                .build();
        try {
            Async.work(1000, a, d).awaitFinish();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /* 输出:
        wrapper(id=D) is working
        wrapper(id=A) is working
        wrapper(id=E) is working
        wrapper(id=B) is working
        wrapper(id=C) is working
        wrapper(id=G) is working
        wrapper(id=F) is working
        wrapper(id=H) is working
        */
    }
}
  1. 依赖别的worker执行结果作为入参

可以从action的参数中根据wrapper的id获取任意一个执行单元的执行结果。

但请注意执行顺序,如果尚未执行,则在调用WorkerResult.getResult()会得到null

class Case2 {
    static class AddWork implements IWorker<Integer, Integer> {
        private final String id1;
        private final String id2;

        public AddWork(String id1, String id2) {
            this.id1 = id1;
            this.id2 = id2;
        }

        public AddWork() {
            this(null, null);
        }

        @Override
        public Integer action(Integer param, Map<String, WorkerWrapper<?,?>> allWrappers) {
            // 传入的参数
            if (param != null) {
                return param;
            }
            // 将两个id所对应的wrapper的结果取出相加并返回
            Integer i1 = (Integer) allWrappers.get(id1).getWorkResult().getResult();
            Integer i2 = (Integer) allWrappers.get(id2).getWorkResult().getResult();
            return i1 + i2;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<Integer, Integer> wrapper100 = WorkerWrapper.<Integer, Integer>builder()
                .id("id:100").worker(new AddWork()).param(100).build();
        WorkerWrapper<Integer, Integer> wrapper200 = WorkerWrapper.<Integer, Integer>builder()
                .id("id:200").worker(new AddWork()).param(200).build();
        WorkerWrapper<Integer, Integer> add = WorkerWrapper.<Integer, Integer>builder().id("id:add")
                .worker(new AddWork("id:100", "id:200")).depends(wrapper100, wrapper200).build();
        Async.work(20,wrapper100,wrapper200).awaitFinish();
        System.out.println(add.getWorkResult());
        // 输出WorkResult{result=300, resultState=SUCCESS, ex=null}
    }
}
  1. 其他的详见test包下的测试类支持各种形式的组合、编排。

使用自定义线程池

Async工具类有多个方法可以使用自定义线程池

public static OnceWork work(long timeout,
                                    ExecutorService executorService,
                                    Collection<? extends WorkerWrapper<?,?>> workerWrappers);
    
public static OnceWork work(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper);

public static OnceWork work(long timeout,
                                ExecutorService executorService,
                                Collection<? extends WorkerWrapper<?, ?>> workerWrappers,
                                String workId);

另外,如果没有指定线程池,默认会使用COMMON_POOL,您可以调用这些方法获得/关闭此线程池:

此线程池将会在第一次使用时懒加载。

/**
 * 该方法将会返回{@link #COMMON_POOL},如果还未初始化则会懒加载初始化后再返回。
 */
public static ThreadPoolExecutor getCommonPool();

/**
 * @param now 是否立即关闭
 * @return 如果尚未调用过{@link #getCommonPool()}即没有初始化默认线程池返回false。否则返回true。
 */
public static synchronized boolean shutDownCommonPool(boolean now);

以下是一个使用自定义线程池的简单代码示例:

Async.work(1000, Executors.newFixedThreadPool(2),a).awaitFinish();

WorkerWrapper基本属性

执行流程

WorkerWrapper会在这些情况被运行

  • Async.beginWork中传入wrapper
  • 上游wrapper完成后被调用

开始运行时,执行逻辑如下图所示:

wrapper执行流程

processOn流程图文件放在同仓库。

属性

id

WorkerWrapper的id属性非常重要。

可在builder的该属性设置id如果不设置默认使用UUID。

public interface WorkerWrapperBuilder<T, V> {
    /**
     * 设置唯一id。
     * 如果不设置,{@link StableWorkerWrapperBuilder}会使用UUID
     */
    WorkerWrapperBuilder<T, V> id(String id);
    
    // 略
}

例如如果你需要在IWorker中调用上游wrapper则可以根据id来获取到。

该map的键即为WorkerWrapper的id。

V action(T object, Map<String, WorkerWrapper<?,?>> allWrappers);

请程序员确保在一次任务执行的一组wrapper中id不会重复。在执行过程中不会进行检查。

其他省略

其他属性都写在源码注释中,可下载源码慢慢查看。

OnceWork任务句柄详解

Async.work(...)方法的返回值为OnceWork句柄。

源码里有大量的注释,直接看源码效率更高

需要同步等待完成

OnceWork被返回后,任务并没有同步完成,而是还在运行。

如果我们需要同步完成,则调用awaitFinish方法即可同步等待。

取消任务

调用pleaseCancel方法可取消任务,以下为示例:

class Case10 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return builder(id, -1L);
    }

    private static WorkerWrapperBuilder<?, ?> builder(String id, long sleepTime) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("\twrapper(id=" + id + ") is working");
                    if (sleepTime > 0) {
                        try {
                            Thread.sleep(sleepTime);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return null;
                })
                .callback((new ICallback<String, String>() {
                    @Override
                    public void begin() {
                        System.out.println("wrapper(id=" + id + ") has begin . ");
                    }

                    @Override
                    public void result(boolean success, String param, WorkResult<String> workResult) {
                        System.out.println("\t\twrapper(id=" + id + ") callback "
                                + (success ? "success " : "fail ")
                                + ", workResult is " + workResult);
                    }
                }))
                .allowInterrupt(true);
    }

    /**
     * A(10ms) ==> B(10ms) ==> C(10ms)
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final WorkerWrapper<?, ?> c;
        final WorkerWrapper<?, ?> b;
        final WorkerWrapper<?, ?> a = builder("A", 10)
                .nextOf(b = builder("B", 10)
                        .nextOf(c = builder("C", 10).build())
                        .build())
                .build();
        final OnceWork onceWork = Async.work(40, a);
        Thread.sleep(25);
        onceWork.pleaseCancelAndAwaitFinish();
        System.out.println("任务b信息 " + b);
        System.out.println("任务c信息 " + c);
        System.out.println("OnceWork信息 " + onceWork);
        /*
            可以看到C的state为SKIPworkResult.ex为CancelSkippedException即被取消了。
            不过有时程序运行慢导致B被取消了那么C就不会执行其状态就为INIT了。
         */
    }
}

设置WorkerWrapper属性

设置依赖策略

快速上手

WorkerWrapperBuilder提供了这些方法来设置依赖策略:

public interface WorkerWrapperBuilder<T, V> {
    // 略
    
    // 各种depends方法都是简便设置依赖的。
    default WorkerWrapperBuilder<T, V> depends(/* 略 */);
    
    // 切换到SetDepend模式
    SetDepend<T, V> setDepend();

    interface SetDepend<T, V> {
        // 其中各种方法都是用来设置策略的具体点开源码看注释就行。设置完end()回到Builder模式。
    }
    
    // 略
}

如果没有具体设置策略的话,默认使用DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS。具体效果可到《策略器组件默认实现》这一章查看。

例如:

A之后是B1、B2、B3、B4、B5

其中B1与B2全部成功后才能执行C1

B3、B4、B5任意一个成功后就能执行C2。

以下为代码实现:

class Case3 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<?, ?> a = builder("A").build();
        WorkerWrapper<?, ?> b1 = builder("B1").depends(a).build();
        WorkerWrapper<?, ?> b2 = builder("B2").depends(a).build();
        WorkerWrapper<?, ?> b3 = builder("B3").depends(a).build();
        WorkerWrapper<?, ?> b4 = builder("B4").depends(a).build();
        WorkerWrapper<?, ?> b5 = builder("B5").depends(a).build();
        WorkerWrapper<?, ?> c1 = builder("C1")
                .depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, b1, b2)
                .build();
        WorkerWrapper<?, ?> c2 = builder("C2")
                .depends(DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS, b3, b4, b5)
                .build();
        // 这里用线程数较少的线程池做示例对于ALL_DEPENDENCIES_ANY_SUCCESS“仅需一个”的效果会好一点
        ExecutorService pool = Executors.newFixedThreadPool(2);
        try {
            Async.work(1000, pool, a).awaitFinish();
        } finally {
            pool.shutdown();
        }
        /* 输出:
        wrapper(id=A) is working
		wrapper(id=B3) is working
		wrapper(id=B1) is working
		wrapper(id=B2) is working
		wrapper(id=C2) is working
		wrapper(id=C1) is working
		wrapper(id=B4) is working
		// 我们看到B5被跳过了没有执行callback
		*/
    }
}
策略器组件

wrapper每次被上游wrapper所调用时若是还没有结束且不应跳过都会去根据自己的上游wrapper的状态来决定自己这次该做什么。

DependenceStrategy这个函数式接口就是用来判断“这次该做什么的”。

package com.jd.platform.async.wrapper.dependaction;

//...

import com.jd.platform.async.wrapper.strategy.depend.DependenceAction;

@FunctionalInterface
public interface DependenceStrategy {
    /**
     * 核心判断策略
     *
     * @param dependWrappers thisWrapper.dependWrappers的属性值。
     * @param thisWrapper    thisWrapper即为“被催促”的WorkerWrapper
     * @param fromWrapper    调用来源Wrapper。
     *                       <p>
     *                       该参数不会为null。
     *                       因为在{@link WorkerWrapper#work(ExecutorService, long, Map, WrapperEndingInspector)}方法中传入的的第一批无依赖的Wrapper
     *                       不会被该策略器所判断,而是不论如何直接执行。
     *                       </p>
     * @return 返回枚举值内部类WorkerWrapper将会根据其值来决定自己如何响应这次调用。 {@link DependenceAction.WithProperty}
     */
    DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
                                              WorkerWrapper<?, ?> thisWrapper,
                                              WorkerWrapper<?, ?> fromWrapper);

    // consts 略

    // methods 略
}

其返回值DependenceAction.WithProperty是枚举DependenceAction的一个内部实体类,作用是让返回的枚举值可以多带几个参数。

public enum DependenceAction {
    START_WORK,
    TAKE_REST,
    FAST_FAIL,
    JUDGE_BY_AFTER;
    
    // methods ...
    
    public class WithProperty {/* ... */}
}
枚举值 含义
START_WORK 开始工作。WorkerWrapper会执行工作方法。
TAKE_REST 还没轮到休息一下。WorkerWrapper中的调用栈会返回以等待其他上游wrapper调用它或是会一生无缘被调用。
FAST_FAIL 立即失败。WorkerWrapper会去执行快速失败的方法。
JUDGE_BY_AFTER 交给下层{@link DependenceStrategy}进行判断。 由于{@link DependenceStrategy#thenJudge(DependenceStrategy)}的责任链设计模式,该返回值的意义就是调用责任链上下一个策略。

如果wrapper被跳过ResultState将为DEFAULT

策略器组件默认实现
  • DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS该值为默认值若builder未设置则默认使用这个。
    1. 被依赖的所有Wrapper都必须成功才能开始工作。
    2. 如果其中任一Wrapper还没有执行且不存在失败则休息。
    3. 如果其中任一Wrapper失败则立即失败。(跳过不算失败)
  • DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS
    1. 被依赖的Wrapper中任意一个成功了就可以开始工作。
    2. 如果其中所有Wrapper还没有执行则休息。
    3. 如果其中一个Wrapper失败且不存在成功则立即失败。(跳过不算失败)
  • DependenceStrategy.ALL_DEPENDENCIES_NONE_FAILED
    • 如果被依赖的工作中任一失败,则立即失败。(跳过不算失败)
    • 否则就开始工作(不论之前的工作有没有开始)。
  • DependenceStrategy.theseWrapperAllSuccess(Set<WorkerWrapper<?,?>>)
    • 该方法传入一个Set指定wrapper只有当指定的这些Wrapper都成功时才会开始工作。任一失败会快速失败。任一还没有执行且不存在失败则休息。
  • 不建议使用:DependenceStrategy.IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY
    • 此值用于适配v1.4及之前的must开关模式wrapperStrategydependMustStrategyMappermustDependSet不为空时则休息因为能判断到这个责任链说明set中存在不满足的值。为空时则任一成功则执行。
WorkerWrapper的策略器责任链

WorkerWrapper在判断时,并不是只使用一个策略进行判断的,而是在WrapperStrategy进行了最多三层的判断:

public interface WrapperStrategy extends DependenceStrategy, SkipStrategy {
    // ========== 这三个策略器用于链式判断是否要开始工作 ==========

    // 从前往后依次判断的顺序为 dependWrapperStrategyMapper -> dependMustStrategyMapper -> dependenceStrategy

    /**
     * 设置对特殊Wrapper专用的依赖响应策略。
     *
     * @return 该值允许为null
     */
    DependOnUpWrapperStrategyMapper getDependWrapperStrategyMapper();

    /**
     * 对必须完成的must的Wrapper的依赖响应策略。
     * 这是一个不得不向历史妥协的属性。用于适配must开关方式。
     *
     * @return 该值允许为null
     */
    DependMustStrategyMapper getDependMustStrategyMapper();

    /**
     * 底层全局策略。
     *
     * @return 该值不允许为null
     */
    DependenceStrategy getDependenceStrategy();

    // ========== 这是跳过策略 ==========

    /**
     * 跳过策略
     *
     * @return 不允许为null
     */
    SkipStrategy getSkipStrategy();
    
    // 其他属性略,自行查看源码即可
}

正如注释所言,三个策略器将依次调用judgeAction(Set,WorkerWrapper,WorkerWrapper)方法进行判断,每次判断会返回DependenceAction.WithProperty类型。

前两个策略器的返回值若不为枚举JUDGE_BY_AFTER的内部类时,整个三层责任链将返回此返回值;若为JUDGE_BY_AFTER,则交给下个策略器进行判断。该方法具体由以下方法实现:

public interface DependenceStrategy {
    // 略
    
    /**
     * 如果本策略器的judge方法返回了JUDGE_BY_AFTER则交给下一个策略器来判断。
     *
     * @param after 下层策略器
     * @return 返回一个“封装的多层策略器”
     */
    default DependenceStrategy thenJudge(DependenceStrategy after) {
        DependenceStrategy that = this;
        return new DependenceStrategy() {
            @Override
            public DependenceAction.WithProperty judgeAction(
                Set<WorkerWrapper<?, ?>> dependWrappers,
                WorkerWrapper<?, ?> thisWrapper,
                WorkerWrapper<?, ?> fromWrapper
            ) {
                DependenceAction.WithProperty judge = that.judgeAction(dependWrappers, thisWrapper, fromWrapper);
                if (judge.getDependenceAction() == DependenceAction.JUDGE_BY_AFTER) {
                    return after.judgeAction(dependWrappers, thisWrapper, fromWrapper);
                }
                return judge;
            }

            @Override
            public String toString() {
                return that + " ----> " + after;
            }
        };
    }
    
    // 略
}

自定义依赖策略

自定义全局策略

以下是一个自定义依赖策略的示例。

效果是在B1~B10共10个wrapper只需3个wrapper成功即可执行C

class Case4 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<?, ?> a = builder("A").build();
        WorkerWrapper<?, ?> c = builder("C")
                .setDepend().strategy(new DependenceStrategy() {
                    @Override
                    public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
                        return dependWrappers.stream()
                                .filter(workerWrapper -> workerWrapper.getWorkResult().getResultState() == ResultState.SUCCESS)
                                .count() > 3 ?
                                DependenceAction.START_WORK.emptyProperty()
                                : DependenceAction.TAKE_REST.emptyProperty();
                    }
                }).end()
                .build();
        for (int i = 1; i < 10; i++) {
            builder("B" + i).depends(a).nextOf(c).build();
        }
        ExecutorService pool = Executors.newFixedThreadPool(2);
        try {
            Async.work(1000, pool, a).awaitFinish();
        } finally {
            pool.shutdown();
        }
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=B2) is working
        wrapper(id=B1) is working
        wrapper(id=B4) is working
        wrapper(id=B3) is working
        wrapper(id=B5) is working
        wrapper(id=C) is working
        由于B1-B10是并行的所以正好仅有3个wrapper成功在多线程环境中是比较难遇到的。
         */
    }
}
设置一组必须完成的wrapper不推荐使用

使用以下两个方法指定的上游wrapper必须全部执行成功本Wrapper才能执行。

public interface WorkerWrapperBuilder<T, V> {
    // 略
    
    SetDepend<T, V> setDepend();
    
    interface SetDepend<T, V> {
        // 略
        
        /**
         * 设置必须要执行成功的Wrapper当所有被该方法设为的上游Wrapper执行成功时本Wrapper才能执行
         */
        SetDepend<T, V> mustRequireWrapper(WorkerWrapper<?, ?> wrapper);

        default SetDepend<T, V> mustRequireWrapper(WorkerWrapper... wrappers){
            /*...*/}
        
  		// 略
    }
    
    // 略
}

以下是一个不推荐使用的示例:

class Case5 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
    }

    @Deprecated
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        WorkerWrapper<?, ?> a1 = builder("A1").build();
        WorkerWrapper<?, ?> a2 = builder("A2").build();
        WorkerWrapper<?, ?> a3 = builder("A3").build();
        WorkerWrapper<?, ?> a4 = builder("A4").build();
        WorkerWrapper<?, ?> a5 = builder("A5").build();
        WorkerWrapper<?, ?> a6 = builder("A6").build();
        WorkerWrapper<?, ?> a7 = builder("A7").build();
        WorkerWrapper<?, ?> a8 = builder("A8").build();
        WorkerWrapper<?, ?> a9 = builder("A9").build();
        WorkerWrapper<?, ?> a10 = builder("A10").build();
        builder("B")
                .setDepend()
                // 必须a3、a4成功才能执行
                .mustRequireWrapper(a3, a4)
                // 如果a3、a4没有成功则休息
                .strategy((dependWrappers, thisWrapper, fromWrapper) -> DependenceAction.TAKE_REST.emptyProperty())
                .wrapper(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10)
                .end()
                .build();
        WorkerWrapper<?, ?> start = builder("start").nextOf(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10).build();
        ExecutorService pool = Executors.newFixedThreadPool(2);
        try {
            Async.work(1000, pool, start).awaitFinish();
        } finally {
            pool.shutdown();
        }
        /* 输出:
        wrapper(id=A1) is working
        wrapper(id=A2) is working
        wrapper(id=A4) is working
        wrapper(id=A3) is working
        wrapper(id=A5) is working
        wrapper(id=B) is working
        wrapper(id=A6) is working
        我们可以看到A3、A4执行后B也执行了之后的wrapper被跳过了
        这里之所以a5、a6还在执行只是因为他两正好在WORKING所以没发现后面的B已经可以跳过了
        */
    }
}

.mustRequireWrapper(a3, a4)方法设置的策略,优先级高于.strategy()设置的底层策略。

对单个wrapper设置“上克下”策略

我们面临着这种问题:

一般来说都是下游wrapper去根据上游wrapper的状态进行策略判断然后给出响应。

那么能不能让上游wrapper根据自己的状态独自决定下游wrapper响应呢

可以。DependOnUpWrapperStrategy函数式接口 与 DependOnUpWrapperStrategyMapper这两个类即可完成这个功能。

/**
 * 由上游wrapper决定本wrapper行为的单参数策略。
 *
 * @author create by TcSnZh on 2021/5/1-下午11:16
 */
@FunctionalInterface
public interface DependOnUpWrapperStrategy {
    /**
     * 仅使用一个参数即调用自身的上游wrapper的判断方法
     *
     * @param fromWrapper 调用本Wrapper的上游Wrapper
     * @return 返回 {@link DependenceAction.WithProperty}
     */
    DependenceAction.WithProperty judge(WorkerWrapper<?, ?> fromWrapper);

    // ========== 送几个供链式调用的默认值 ==========

    /**
     * 成功时,交给下一个策略器判断。
     * 未运行时,休息。
     * 失败时,失败。
     */
    DependOnUpWrapperStrategy SUCCESS_CONTINUE = /*略*/ ;
    /**
     * 成功时,开始工作。
     * 未运行时,交给下一个策略器判断。
     * 失败时,失败。
     */
    DependOnUpWrapperStrategy SUCCESS_START_INIT_CONTINUE = /*略*/ ;
}

DependOnUpWrapperStrategyMappermapper属性中,每个WorkerWrapper<?, ?>对应了一个DependOnUpWrapperStrategy实现了让wrapper对不同的上游做出不同的响应策略。

public class DependOnUpWrapperStrategyMapper implements DependenceStrategy {
    private final Map<WorkerWrapper<?, ?>, DependOnUpWrapperStrategy> mapper = new ConcurrentHashMap<>(4);
	// 以下略   
}

在《WorkerWrapper的三层策略器责任链》这一章中,我们可以看到,第一层策略器就是此DependOnUpWrapperStrategyMapper

简单使用与示例

我们在SetDepend<T, V> setDepend();模式时,可以使用如下方法进行设置

/**
 * 对单个Wrapper设置特殊策略。
 *
 * @param wrapper  需要设置特殊策略的Wrapper。
 * @param strategy 特殊策略。
 */
SetDepend<T, V> specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper);

default SetDepend<T, V> specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper... wrappers);

也可以在 SetNext<T, V> setNext();模式时进行设置。

/**
 * 调用该方法将会让传入的此下游workerWrappers对本Wrapper进行特殊策略判断
 *
 * @param strategy 对本Wrapper的特殊策略。
 * @param wrapper  依赖本Wrapper的下游Wrapper。
 * @return 返回Builder自身。
 */
SetNext<T, V> specialToNextWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper);

以下为示例:

class Case6 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
            .id(id)
            .worker((param, allWrappers) -> {
                System.out.println("wrapper(id=" + id + ") is working");
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return null;
            });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<?, ?> b = builder("B")
            // 这里设置了不论a怎么样b都会快速失败。但是a设置的对wrapper的特殊策略把它覆盖了。
            .depends((dependWrappers, thisWrapper, fromWrapper) ->
                     DependenceAction.FAST_FAIL
                     .fastFailException(ResultState.EXCEPTION, new RuntimeException("b 必定失败除非有上游wrapper救他"))
                    )
            .callback(ICallback.PRINT_EXCEPTION_STACK_TRACE)
            .build();
        WorkerWrapper<?, ?> a = builder("A")
            .setNext()
            // a将会使b直接开始工作
            // 若是去掉这行代码则b会失败
            .specialToNextWrapper(fromWrapper -> DependenceAction.START_WORK.emptyProperty(), b)
            .wrapper(b)
            .end().build();
        Async.work(1000, a).awaitFinish();
        System.out.println(a.getWorkResult());
        System.out.println(b.getWorkResult());
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=B) is working
        WorkResult{result=null, resultState=SUCCESS, ex=null}
        WorkResult{result=null, resultState=SUCCESS, ex=null}
        */
    }
}
提供常量
  • DependWrapperActionStrategy.SUCCESS_CONTINUE
    • 成功时,交给下一个策略器判断。未运行时,休息。失败时,失败。
  • DependWrapperActionStrategy SUCCESS_START_INIT_CONTINUE
    • 成功时,开始工作。未运行时,交给下一个策略器判断。失败时,失败。

设置跳过策略

当wrapper发现下游wrapper居然已经被执行了那是不是可以跳过自身呢?

of course当wrapper被跳过时getWorkResult返回的值通常是:

{
    result: null,
    // 注意如果wrapper被跳过ResultState将为DEFAULT
    resultState: "ResultState.DEFAULT",
    ex: "com.jd.platform.async.exception.SkippedException"
}

下面是跳过策略的设置接口:

@FunctionalInterface
public interface SkipStrategy {
    /**
     * 跳过策略函数。返回true将会使WorkerWrapper跳过执行。
     *
     * @param nextWrappers 下游WrapperSet
     * @param thisWrapper  本WorkerWrapper
     * @param fromWrapper  呼叫本Wrapper的上游Wrapper
     * @return 返回true将会使WorkerWrapper跳过执行。
     */
    boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper);
    
    // consts 略
    
}

有以下几个默认值:

  • SkipStrategy.NOT_SKIP
    • 不进行跳过检查,也不打算跳过
  • SkipStrategy.CHECK_ONE_LEVEL该值为默认值若builder未设置则默认使用这个。
    • 仅检查深度为1的下游wrapper。如果其全部不在初始化状态则自己会被跳过。

以下是一个示例:

/**
 * @author create by TcSnZh on 2021/5/9-下午4:12
 */
class Case7 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return builder(id, -1L);
    }

    private static WorkerWrapperBuilder<?, ?> builder(String id, long sleepTime) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    if (sleepTime > 0) {
                        try {
                            Thread.sleep(sleepTime);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return null;
                });
    }

    /**
     * A ==> B(10ms) ==> C ==> D  (D可在E、C任意一个完成后执行)
     * . \====> E(5ms) ====/
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<?, ?> d = builder("D").depends(DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS).build();
        WorkerWrapper<?, ?> a = builder("A")
                .nextOf(builder("B", 10)
                                .nextOf(builder("C")
                                        .nextOf(d)
                                        // 这里我们没有设置C的跳过策略因为默认使用CHECK_ONE_LEVEL可将下行代码注释去掉则C会执行
//                                        .setSkipStrategy(SkipStrategy.NOT_SKIP)
                                        .build())
                                .build(),
                        builder("E", 5).nextOf(d).build()
                ).build();
        Async.work(1000, a).awaitFinish();
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=E) is working
        wrapper(id=B) is working
        wrapper(id=D) is working
        */
    }
}

设置超时

可以在Async.beginWork(/* ... */)中传入总超时时间也可以对单个wrapper设置超时时间。

总任务时间超时

Async.beginWork方法中可以指定总超时时间。

单wrapper任务时间超时

可在Builder中设置超时选项以下是几个关键方法

public interface WorkerWrapperBuilder<T, V> {
    // 略
    
    /**
     * 设置超时时间的具体属性
     */
    SetTimeOut<T, V> setTimeOut();

    interface SetTimeOut<T, V> {
        /**
         * 是否启动超时判断。
         * <p>
         * 默认为true
         *
         * @param enableElseDisable 是则true
         */
        SetTimeOut<T, V> enableTimeOut(boolean enableElseDisable);

        /**
         * 设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断
         *
         * @param time 时间数值
         * @param unit 时间单位
         */
        SetTimeOut<T, V> setTime(long time, TimeUnit unit);

        WorkerWrapperBuilder<T, V> end();
    }
    
    /**
     * 便携式设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断
     *
     * @param time 时间数值
     * @param unit 时间单位
     */
    default WorkerWrapperBuilder<T, V> timeout(long time, TimeUnit unit) {
        return timeout(true, time, unit);
    }

    default WorkerWrapperBuilder<T, V> timeout(boolean enableTimeOut, long time, TimeUnit unit) {
        return setTimeOut().enableTimeOut(enableTimeOut).setTime(time, unit).end();
    }
    
    // 略
}

测试示例

示例:

class Case8 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return builder(id, -1L);
    }

    private static WorkerWrapperBuilder<?, ?> builder(String id, long sleepTime) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("\twrapper(id=" + id + ") is working");
                    if (sleepTime > 0) {
                        try {
                            Thread.sleep(sleepTime);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return null;
                })
                .callback((new ICallback<String, String>() {
                    @Override
                    public void begin() {
                        System.out.println("wrapper(id=" + id + ") has begin . ");
                    }

                    @Override
                    public void result(boolean success, String param, WorkResult<String> workResult) {
                        System.out.println("\t\twrapper(id=" + id + ") callback "
                                + (success ? "success " : "fail ")
                                + ", workResult is " + workResult);
                    }
                }));
    }

    /**
     * A ==> B(10ms) ==> C(20ms)
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<?, ?> a = builder("A")
                .nextOf(builder("B", 10)
                        .nextOf(builder("C", 20).build())
                        .build())
                .build();
        Async.work(20, a).awaitFinish();
        /* 输出:
        wrapper(id=A) has begin .
            wrapper(id=A) is working
                wrapper(id=A) callback success , workResult is WorkResult{result=null, resultState=SUCCESS, ex=null}
        wrapper(id=B) has begin .
            wrapper(id=B) is working
                wrapper(id=B) callback success , workResult is WorkResult{result=null, resultState=TIMEOUT, ex=null}
        wrapper(id=C) has begin .
                wrapper(id=C) callback fail , workResult is WorkResult{result=null, resultState=TIMEOUT, ex=null}
        java.lang.InterruptedException: sleep interrupted
            at java.lang.Thread.sleep(Native Method)
            ...
            以下异常信息省略
        */
    }
}

设置是否允许被打断线程

可通过该选项去设置允许线程被打断:

public interface WorkerWrapperBuilder<T, V> {
    // 略
    
    /**
     * 是否允许被试图中断线程
     *
     * @param allow 是则true
     */
    WorkerWrapperBuilder<T, V> allowInterrupt(boolean allow);
    
    // 略
}

线程会被打断的具体情况

开启之后在以下情况会试图打断正处于WORKING状态的工作线程。

  • 总任务超时但本wrapper在WORKING。
  • 单wrapper超时但本wrapper在WORKING。
  • wrapper应当被跳过但本wrapper在WORKING。
  • 调用WorkerWrapper#failNow()方法且wrapper在WORKING状态。

开放工具类

asyncTool-openutil工具模块提供了一些便于开发的工具类。

可单独引入依赖:

<dependencies>
    <dependency>
        <groupId>com.jd.platform</groupId>
        <artifactId>asyncTool-openutil</artifactId>
        <version>1.5.1-SNAPSHOT</version>
    </dependency>
</dependencies>

集合类

普通集合包com.jd.platform.async.openutil.collection.*

这里不详述,要用的话源码里有注释。

  • SparseArray2D 稀疏矩阵。
  • CommonDirectedGraph 有向图。
  • CommonStoreArk id储物柜。

定时器

com.jd.platform.async.openutil.timer.*

  • HashedWheelTimer 从netty里抄来的时间轮工具类。

其他

com.jd.platform.async.openutil.*

  • BiInt 一个表示两个int值的实体类内含缓冲区间、默认比较器适合拿来当2维索引。