Files
asyncTool/QuickStart.md

42 KiB
Raw Blame History

如果只是需要用这个框架请往下看即可。如果需要深入了解这个框架是如何一步一步实现的从接到需求到每一步的思考每个类为什么这么设计为什么有这些方法也就是如何从0到1开发出这个框架作者在csdn开了专栏专门讲中间件如何从0开发包括并不限于这个小框架。京东内部同事可在cf上搜索erp也能看到。

安装教程

代码不多,直接拷贝包过去即可。

京东同事通过引用如下maven来使用。

<dependency>
    <groupId>com.jd.platform</groupId>
    <artifactId>asyncTool</artifactId>
    <version>1.4.1-SNAPSHOT</version>
</dependency>

外网请使用jitpack.io上打的包 先添加repositories节点

<repositories>
    <repository>
        <id>jitpack.io</id>
        <url>https://jitpack.io</url>
    </repository>
</repositories>

然后添加如下maven依赖

<dependency>
    <groupId>com.gitee.jd-platform-opensource</groupId>
    <artifactId>asyncTool</artifactId>
    <version>V1.4-SNAPSHOT</version>
</dependency>

使用说明

基本组件

worker 一个最小的任务执行单元。通常是一个网络调用,或一段耗时操作。

TV两个泛型分别是入参和出参类型。

譬如该耗时操作入参是String执行完毕的结果是Integer那么就可以用泛型来定义。

多个不同的worker之间没有关联分别可以有不同的入参、出参类型。

/**
 * 每个最小执行单元需要实现该接口
 *
 * @author wuweifeng wrote on 2019-11-19.
 */
@FunctionalInterface
public interface IWorker<T, V> {
    /**
     * 在这里做耗时操作如rpc请求、IO等
     *
     * @param object      object
     * @param allWrappers 任务包装
     */
    V action(T object, Map<String, WorkerWrapper> allWrappers);

    /**
     * 超时、异常时,返回的默认值
     *
     * @return 默认值
     */
    default V defaultValue() {
        return null;
    }
}

callBack对每个worker的回调。worker执行完毕后会回调该接口带着执行成功、失败、原始入参、和详细的结果。

/**
 * 每个执行单元执行完毕后,会回调该接口</p>
 * 需要监听执行结果的,实现该接口即可
 *
 * @author wuweifeng wrote on 2019-11-19.
 */
@FunctionalInterface
public interface ICallback<T, V> {

    /**
     * 任务开始的监听
     */
    default void begin() {

    }

    /**
     * 耗时操作执行完毕后就给value注入值
     * <p/>
     * 只要Wrapper被调用后成功或失败/超时,该方法都会被执行。
     */
    void result(boolean success, T param, WorkResult<V> workResult);
}

wrapper组合了worker和callback是一个 最小的调度单元 。通过编排wrapper之间的关系达到组合各个worker顺序的目的。

wrapper的泛型和worker的一样决定了入参和结果的类型。

// 创建一个WorkerWrapper
WorkerWrapper<String, String> w0 = WorkerWrapper.<String, String>builder()
                .id("0")
                .param("000")
                .worker((param, allWrappers) -> "hello : " + param)
                .build();

通过这一个类看一下action里就是你的耗时操作begin就是任务开始执行时的回调result就是worker执行完毕后的回调。当你组合了多个执行单元时每一步的执行都在掌控之内。失败了还会有自定义的默认值。这是CompleteableFuture无法做到的。

简单示例

  1. 3个任务并行

输入图片说明

class Test {
    static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    return null;
                });
    }

    public static void main(String[] args) {
        WorkerWrapper<?, ?> a = builder("A").build();
        WorkerWrapper<?, ?> b = builder("B").build();
        WorkerWrapper<?, ?> c = builder("C").build();
        try {
            Async.beginWork(100, a, b, c);
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=B) is working
        wrapper(id=C) is working
        */
    }
}
  1. 1个执行完毕后开启另外两个另外两个执行完毕后开始第4个

输入图片说明

class Test {
    static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    return null;
                });
    }

    public static void main(String[] args) {
        WorkerWrapper<?, ?> a = builder("A").build();
        WorkerWrapper<?, ?> b = builder("B").depends(a).build();
        WorkerWrapper<?, ?> c = builder("C").depends(a).build();
        WorkerWrapper<?, ?> f = builder("F").depends(b, c).build();
        try {
            Async.beginWork(100, a);
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=C) is working
        wrapper(id=B) is working
        wrapper(id=F) is working
        */
    }
}

如果觉得.depneds()方法的排序您不喜欢,也可以用.nextOf()这种方式:

class Test {
    static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    return null;
                });
    }

    public static void main(String[] args) {
        WorkerWrapper<?, ?> f = builder("F").build();
        WorkerWrapper<?, ?> a = builder("A")
                .nextOf(builder("B").nextOf(f).build())
                .nextOf(builder("C").nextOf(f).build())
                .build();
        try {
            Async.beginWork(100, a);
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=B) is working
        wrapper(id=C) is working
        wrapper(id=F) is working
        */
    }
}
  1. 复杂点的

输入图片说明

class Case1 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
    }

    public static void main(String[] args) {
        WorkerWrapper<?, ?> a = builder("A").build();
        WorkerWrapper<?, ?> d;
        builder("H")
                .depends(
                        builder("F")
                                .depends(builder("B").depends(a).build())
                                .depends(builder("C").depends(a).build())
                                .build(),
                        builder("G")
                                .depends(builder("E")
                                        .depends(d = builder("D").build())
                                        .build())
                                .build()
                )
                .build();
        try {
            Async.beginWork(1000, a, d);
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
        /* 输出:
        wrapper(id=D) is working
        wrapper(id=A) is working
        wrapper(id=E) is working
        wrapper(id=B) is working
        wrapper(id=C) is working
        wrapper(id=G) is working
        wrapper(id=F) is working
        wrapper(id=H) is working
        */
    }
}
  1. 依赖别的worker执行结果作为入参

可以从action的参数中根据wrapper的id获取任意一个执行单元的执行结果。

但请注意执行顺序,如果尚未执行,则在调用WorkerResult.getResult()会得到null

class Case2 {
    static class AddWork implements IWorker<Integer, Integer> {
        private final String id1;
        private final String id2;

        public AddWork(String id1, String id2) {
            this.id1 = id1;
            this.id2 = id2;
        }

        public AddWork() {
            this(null, null);
        }

        @Override
        public Integer action(Integer param, Map<String, WorkerWrapper<?,?>> allWrappers) {
            // 传入的参数
            if (param != null) {
                return param;
            }
            // 将两个id所对应的wrapper的结果取出相加并返回
            Integer i1 = (Integer) allWrappers.get(id1).getWorkResult().getResult();
            Integer i2 = (Integer) allWrappers.get(id2).getWorkResult().getResult();
            return i1 + i2;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<Integer, Integer> wrapper100 = WorkerWrapper.<Integer, Integer>builder()
                .id("id:100").worker(new AddWork()).param(100).build();
        WorkerWrapper<Integer, Integer> wrapper200 = WorkerWrapper.<Integer, Integer>builder()
                .id("id:200").worker(new AddWork()).param(200).build();
        WorkerWrapper<Integer, Integer> add = WorkerWrapper.<Integer, Integer>builder().id("id:add")
                .worker(new AddWork("id:100", "id:200")).depends(wrapper100, wrapper200).build();
        Async.beginWork(20,wrapper100,wrapper200);
        System.out.println(add.getWorkResult());
        // 输出WorkResult{result=300, resultState=SUCCESS, ex=null}
    }
}
  1. 其他的详见test包下的测试类支持各种形式的组合、编排。

使用自定义线程池

Async工具类有多个方法可以使用自定义线程池

public static boolean beginWork(long timeout,
                                    ExecutorService executorService,
                                    Collection<? extends WorkerWrapper<?,?>> workerWrappers);
    
public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper);

另外,如果没有指定线程池,默认会使用COMMON_POOL,您可以调用这些方法获得/关闭此线程池:

此线程池将会在第一次使用时懒加载。

/**
 * 该方法将会返回{@link #COMMON_POOL},如果还未初始化则会懒加载初始化后再返回。
 */
public static ThreadPoolExecutor getCommonPool();

/**
 * @param now 是否立即关闭
 * @throws IllegalStateException 如果尚未调用过{@link #getCommonPool()},即没有使用过“使用默认线程池”的方法,该方法会抛出空指针异常。
 */
public static synchronized void shutDownCommonPool(boolean now);

以下是一个使用自定义线程池的简单代码示例:

Async.beginWork(1000, Executors.newFixedThreadPool(2),a);

WorkerWrapper基本属性

执行流程

WorkerWrapper会在这些情况被运行

  • Async.beginWork中传入wrapper
  • 上游wrapper完成后被调用

开始运行时,执行逻辑如下图所示:

wrapper执行流程

processOn流程图文件放在同仓库。

其他属性

设置WorkerWrapper属性

设置依赖策略

快速上手

WorkerWrapperBuilder提供了这些方法来设置依赖策略:

public interface WorkerWrapperBuilder<T, V> {
    // 略
    
    // 各种depends方法都是简便设置依赖的。
    default WorkerWrapperBuilder<T, V> depends(/* 略 */);
    
    // 切换到SetDepend模式
    SetDepend<T, V> setDepend();

    interface SetDepend<T, V> {
        // 其中各种方法都是用来设置策略的具体点开源码看注释就行。设置完end()回到Builder模式。
    }
    
    // 略
}

如果没有具体设置策略的话,默认使用DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS。具体效果可到《策略器组件默认实现》这一章查看。

例如:

A之后是B1、B2、B3、B4、B5

其中B1与B2全部成功后才能执行C1

B3、B4、B5任意一个成功后就能执行C2。

以下为代码实现:

class Case3 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<?, ?> a = builder("A").build();
        WorkerWrapper<?, ?> b1 = builder("B1").depends(a).build();
        WorkerWrapper<?, ?> b2 = builder("B2").depends(a).build();
        WorkerWrapper<?, ?> b3 = builder("B3").depends(a).build();
        WorkerWrapper<?, ?> b4 = builder("B4").depends(a).build();
        WorkerWrapper<?, ?> b5 = builder("B5").depends(a).build();
        WorkerWrapper<?, ?> c1 = builder("C1")
                .depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, b1, b2)
                .build();
        WorkerWrapper<?, ?> c2 = builder("C2")
                .depends(DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS, b3, b4, b5)
                .build();
        // 这里用线程数较少的线程池做示例对于ALL_DEPENDENCIES_ANY_SUCCESS“仅需一个”的效果会好一点
        ExecutorService pool = Executors.newFixedThreadPool(2);
        try {
            Async.beginWork(1000, pool, a);
        } finally {
            pool.shutdown();
        }
        /* 输出:
        wrapper(id=A) is working
		wrapper(id=B3) is working
		wrapper(id=B1) is working
		wrapper(id=B2) is working
		wrapper(id=C2) is working
		wrapper(id=C1) is working
		*/
    }
}
策略器组件

wrapper每次被上游wrapper所调用时若是还没有结束且不应跳过都会去根据自己的上游wrapper的状态来决定自己这次该做什么。

DependenceStrategy这个函数式接口就是用来判断“这次该做什么的”。

package com.jd.platform.async.wrapper.dependaction;

//...

import com.jd.platform.async.wrapper.strategy.depend.DependenceAction;

@FunctionalInterface
public interface DependenceStrategy {
    /**
     * 核心判断策略
     *
     * @param dependWrappers thisWrapper.dependWrappers的属性值。
     * @param thisWrapper    thisWrapper即为“被催促”的WorkerWrapper
     * @param fromWrapper    调用来源Wrapper。
     *                       <p>
     *                       该参数不会为null。
     *                       因为在{@link WorkerWrapper#work(ExecutorService, long, Map, WrapperEndingInspector)}方法中传入的的第一批无依赖的Wrapper
     *                       不会被该策略器所判断,而是不论如何直接执行。
     *                       </p>
     * @return 返回枚举值内部类WorkerWrapper将会根据其值来决定自己如何响应这次调用。 {@link DependenceAction.WithProperty}
     */
    DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
                                              WorkerWrapper<?, ?> thisWrapper,
                                              WorkerWrapper<?, ?> fromWrapper);

    // consts 略

    // methods 略
}

其返回值DependenceAction.WithProperty是枚举DependenceAction的一个内部实体类,作用是让返回的枚举值可以多带几个参数。

public enum DependenceAction {
    START_WORK,
    TAKE_REST,
    FAST_FAIL,
    JUDGE_BY_AFTER;
    
    // methods ...
    
    public class WithProperty {/* ... */}
}
枚举值 含义
START_WORK 开始工作。WorkerWrapper会执行工作方法。
TAKE_REST 还没轮到休息一下。WorkerWrapper中的调用栈会返回以等待其他上游wrapper调用它或是会一生无缘被调用。
FAST_FAIL 立即失败。WorkerWrapper会去执行快速失败的方法。
JUDGE_BY_AFTER 交给下层{@link DependenceStrategy}进行判断。 由于{@link DependenceStrategy#thenJudge(DependenceStrategy)}的责任链设计模式,该返回值的意义就是调用责任链上下一个策略。
策略器组件默认实现
  • DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS该值为默认值若builder未设置则默认使用这个。
    1. 被依赖的所有Wrapper都必须成功才能开始工作。
    2. 如果其中任一Wrapper还没有执行且不存在失败则休息。
    3. 如果其中任一Wrapper失败则立即失败。
  • DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS
    1. 被依赖的Wrapper中任意一个成功了就可以开始工作。
    2. 如果其中所有Wrapper还没有执行则休息。
    3. 如果其中一个Wrapper失败且不存在成功则立即失败。
  • DependenceStrategy.ALL_DEPENDENCIES_NONE_FAILED
    • 如果被依赖的工作中任一失败,则立即失败。否则就开始工作(不论之前的工作有没有开始)。
  • DependenceStrategy.theseWrapperAllSuccess(Set<WorkerWrapper<?,?>>)
    • 该方法传入一个Set指定wrapper只有当指定的这些Wrapper都成功时才会开始工作。任一失败会快速失败。任一还没有执行且不存在失败则休息。
  • 不建议使用:DependenceStrategy.IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY
    • 此值用于适配v1.4及之前的must开关模式wrapperStrategydependMustStrategyMappermustDependSet不为空时则休息因为能判断到这个责任链说明set中存在不满足的值。为空时则任一成功则执行。
WorkerWrapper的三层策略器责任链

WorkerWrapper在判断时,并不是只使用一个策略进行判断的,而是在WorkerWrapper.WrapperStrategy进行了最多三层的判断:

public abstract class WorkerWrapper<T, V> {
    
    // 略
    
	public static class WrapperStrategy implements DependenceStrategy, SkipStrategy {
    	// ========== 这三个策略器用于链式判断是否要开始工作 ==========

        // 从前往后依次判断的顺序为 dependWrapperStrategyMapper -> dependMustStrategyMapper -> dependenceStrategy

        /**
         * 对特殊Wrapper专用的依赖响应策略。
         * <b>该值允许为null</b>
         */
        private DependWrapperStrategyMapper dependWrapperStrategyMapper;
        /**
         * 对必须完成的must的Wrapper的依赖响应策略。
         * <b>该值允许为null</b>
         * <p/>
         * 这是一个不得不向历史妥协的属性。用于适配must开关方式。
         */
        private DependMustStrategyMapper dependMustStrategyMapper;
        /**
         * 底层全局策略。
         */
        private DependenceStrategy dependenceStrategy;
 
    	// 略
	}
}

正如注释所言,三个策略器将依次调用DependenceStrategy.judgeAction(Set,WorkerWrapper,WorkerWrapper)方法进行判断,每次判断会返回DependenceAction.WithProperty类型。

前两个策略器的返回值,即DependenceAction.WithProperty类型,若调用getDependenceAction()方法返回的枚举值不为JUDGE_BY_AFTER时,整个三层责任链将返回此返回值;若为JUDGE_BY_AFTER,则交给下个策略器进行判断。该方法具体由以下方法实现:

public interface DependenceStrategy {
    // 略
    
    /**
     * 如果本策略器的judge方法返回了JUDGE_BY_AFTER则交给下一个策略器来判断。
     *
     * @param after 下层策略器
     * @return 返回一个“封装的多层策略器”
     */
    default DependenceStrategy thenJudge(DependenceStrategy after) {
        DependenceStrategy that = this;
        return new DependenceStrategy() {
            @Override
            public DependenceAction.WithProperty judgeAction(
                Set<WorkerWrapper<?, ?>> dependWrappers,
                WorkerWrapper<?, ?> thisWrapper,
                WorkerWrapper<?, ?> fromWrapper
            ) {
                DependenceAction.WithProperty judge = that.judgeAction(dependWrappers, thisWrapper, fromWrapper);
                if (judge.getDependenceAction() == DependenceAction.JUDGE_BY_AFTER) {
                    return after.judgeAction(dependWrappers, thisWrapper, fromWrapper);
                }
                return judge;
            }

            @Override
            public String toString() {
                return that + " ----> " + after;
            }
        };
    }
    
    // 略
}

自定义依赖策略

自定义全局策略

以下是一个自定义依赖策略的示例。

效果是在B1~B10共10个wrapper只需3个wrapper成功即可执行C

class Case4 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<?, ?> a = builder("A").build();
        WorkerWrapper<?, ?> c = builder("C")
                .setDepend().strategy(new DependenceStrategy() {
                    @Override
                    public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
                        return dependWrappers.stream()
                                .filter(workerWrapper -> workerWrapper.getWorkResult().getResultState() == ResultState.SUCCESS)
                                .count() > 3 ?
                                DependenceAction.START_WORK.emptyProperty()
                                : DependenceAction.TAKE_REST.emptyProperty();
                    }
                }).end()
                .build();
        for (int i = 1; i < 10; i++) {
            builder("B" + i).depends(a).nextOf(c).build();
        }
        ExecutorService pool = Executors.newFixedThreadPool(2);
        try {
            Async.beginWork(1000, pool, a);
        } finally {
            pool.shutdown();
        }
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=B2) is working
        wrapper(id=B1) is working
        wrapper(id=B4) is working
        wrapper(id=B3) is working
        wrapper(id=B5) is working
        wrapper(id=C) is working
        由于B1-B10是并行的所以正好仅有3个wrapper成功在多线程环境中是比较难遇到的。
         */
    }
}
设置一组必须完成的wrapper不推荐使用

使用以下两个方法指定的上游wrapper必须全部执行成功本Wrapper才能执行。

public interface WorkerWrapperBuilder<T, V> {
    // 略
    
    SetDepend<T, V> setDepend();
    
    interface SetDepend<T, V> {
        // 略
        
        /**
         * 设置必须要执行成功的Wrapper当所有被该方法设为的上游Wrapper执行成功时本Wrapper才能执行
         */
        SetDepend<T, V> mustRequireWrapper(WorkerWrapper<?, ?> wrapper);

        default SetDepend<T, V> mustRequireWrapper(WorkerWrapper... wrappers){
            /*...*/}
        
  		// 略
    }
    
    // 略
}

以下是一个不推荐使用的示例:

class Case5 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
    }

    @Deprecated
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        WorkerWrapper<?, ?> a1 = builder("A1").build();
        WorkerWrapper<?, ?> a2 = builder("A2").build();
        WorkerWrapper<?, ?> a3 = builder("A3").build();
        WorkerWrapper<?, ?> a4 = builder("A4").build();
        WorkerWrapper<?, ?> a5 = builder("A5").build();
        WorkerWrapper<?, ?> a6 = builder("A6").build();
        WorkerWrapper<?, ?> a7 = builder("A7").build();
        WorkerWrapper<?, ?> a8 = builder("A8").build();
        WorkerWrapper<?, ?> a9 = builder("A9").build();
        WorkerWrapper<?, ?> a10 = builder("A10").build();
        builder("B")
                .setDepend()
                // 必须a3、a4成功才能执行
                .mustRequireWrapper(a3, a4)
                // 如果a3、a4没有成功则休息
                .strategy((dependWrappers, thisWrapper, fromWrapper) -> DependenceAction.TAKE_REST.emptyProperty())
                .wrapper(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10)
                .end()
                .build();
        WorkerWrapper<?, ?> start = builder("start").nextOf(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10).build();
        ExecutorService pool = Executors.newFixedThreadPool(2);
        try {
            Async.beginWork(1000, pool, start);
        } finally {
            pool.shutdown();
        }
        /* 输出:
        wrapper(id=A1) is working
        wrapper(id=A2) is working
        wrapper(id=A4) is working
        wrapper(id=A3) is working
        wrapper(id=A5) is working
        wrapper(id=B) is working
        wrapper(id=A6) is working
        我们可以看到A3、A4执行后B也执行了之后的wrapper被跳过了
        这里之所以a5、a6还在执行只是因为他两正好在WORKING所以没发现后面的B已经可以跳过了
        */
    }
}

.mustRequireWrapper(a3, a4)方法设置的策略,优先级高于.strategy()设置的底层策略。

对单个wrapper设置“上克下”策略

我们面临着这种问题:

一般来说都是下游wrapper去根据上游wrapper的状态进行策略判断然后给出响应。

那么能不能让上游wrapper根据自己的状态独自决定下游wrapper响应呢

因此,三层策略器的DependWrapperStrategyMapper便是用于设置此功能的。

// 示例版本v1.5

/**
 * 对不同的{@link WorkerWrapper}调用者实行个性化依赖响应策略。
 * <p/>
 * 使用{@link DependWrapperStrategyMapper}本实现类对{@link DependenceStrategy}进行增强,
 *
 * @author create by TcSnZh on 2021/5/1-下午11:12
 */
public class DependWrapperStrategyMapper implements DependenceStrategy {
    private final Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> mapper = new ConcurrentHashMap<>(4);

    /**
     * 设置对应策略
     *
     * @param targetWrapper 要设置策略的WorkerWrapper
     * @param strategy      要设置的策略
     * @return 返回this链式调用。
     */
    public DependWrapperStrategyMapper putMapping(WorkerWrapper<?, ?> targetWrapper, DependWrapperActionStrategy strategy) {/* 略 */}

    /**
     * 判断方法。
     * <p/>
     * 如果fromWrapper在{@link #mapper}中,则返回{@link DependWrapperActionStrategy}的判断返回值。否则返回{@link DependenceAction#JUDGE_BY_AFTER}
     *
     * @param dependWrappers 这里不会使用该值thisWrapper.dependWrappers的属性值。
     * @param thisWrapper    这里不会使用该值thisWrapper即为“被催促”的WorkerWrapper
     * @param fromWrapper    调用来源Wrapper。
     * @return 如果在mapper中有对fromWrapper的处理策略则使用其进行判断。否则返回JUDGE_BY_AFTER交给下一个进行判断。
     */
    @Override
    public DependenceAction.WithProperty judgeAction(
        Set<WorkerWrapper<?,?>> dependWrappers,
        WorkerWrapper<?, ?> thisWrapper,
        WorkerWrapper<?, ?> fromWrapper // 仅判断该属性
    ) {/* 略 */}

    // 略
}

mapper属性中,每个WorkerWrapper<?, ?>对应了一个DependWrapperActionStrategy这个接口便是用于让上游wrapper决定下游响应的

@FunctionalInterface
public interface DependWrapperActionStrategy {
    /**
     * 仅使用一个参数的判断方法
     *
     * @param fromWrapper 调用本Wrapper的上游Wrapper
     * @return 返回 {@link DependenceAction.WithProperty}
     */
    DependenceAction.WithProperty judge(WorkerWrapper<?, ?> fromWrapper);
    
    // 常量略
}
提供常量
  • DependWrapperActionStrategy.SUCCESS_CONTINUE
    • 成功时,交给下一个策略器判断。未运行时,休息。失败时,失败。
  • DependWrapperActionStrategy SUCCESS_START_INIT_CONTINUE
    • 成功时,开始工作。未运行时,交给下一个策略器判断。失败时,失败。
简单使用与示例

我们在SetDepend<T, V> setDepend();模式时,可以使用如下方法进行设置

/**
 * 对单个Wrapper设置特殊策略。
 *
 * @param wrapper  需要设置特殊策略的Wrapper。
 * @param strategy 特殊策略。
 */
SetDepend<T, V> specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper);

default SetDepend<T, V> specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper... wrappers);

也可以在 SetNext<T, V> setNext();模式时进行设置。

/**
 * 调用该方法将会让传入的此下游workerWrappers对本Wrapper进行特殊策略判断
 *
 * @param strategy 对本Wrapper的特殊策略。
 * @param wrapper  依赖本Wrapper的下游Wrapper。
 * @return 返回Builder自身。
 */
SetNext<T, V> specialToNextWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper);

以下为示例:

class Case6 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<?, ?> b = builder("B")
                // 这里设置了不论a怎么样b都会快速失败。但是a设置的对wrapper的特殊策略把它覆盖了。
                .depends((dependWrappers, thisWrapper, fromWrapper) ->
                        DependenceAction.FAST_FAIL
                                .fastFailException(ResultState.EXCEPTION, new RuntimeException("b 必定失败除非有上游wrapper救他"))
                )
                .build();
        WorkerWrapper<?, ?> a = builder("A")
                .setNext()
                // a将会使b直接开始工作
                // 若是去掉这行代码则b会失败
                .specialToNextWrapper(fromWrapper -> DependenceAction.START_WORK.emptyProperty(), b)
                .wrapper(b)
                .end().build();
        Async.beginWork(1000, a);
        System.out.println(a.getWorkResult());
        System.out.println(b.getWorkResult());
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=B) is working
        WorkResult{result=null, resultState=SUCCESS, ex=null}
        WorkResult{result=null, resultState=SUCCESS, ex=null}
        */
    }
}

设置跳过策略

当wrapper发现下游wrapper居然已经被执行了那是不是可以跳过自身呢?

of course当wrapper被跳过时getWorkResult返回的值通常是:

{
    result: null,
    resultState: ResultState.EXCEPTION,
    ex: com.jd.platform.async.exception.SkippedException
}

下面是跳过策略的设置接口:

@FunctionalInterface
public interface SkipStrategy {
    /**
     * 跳过策略函数。返回true将会使WorkerWrapper跳过执行。
     *
     * @param nextWrappers 下游WrapperSet
     * @param thisWrapper  本WorkerWrapper
     * @param fromWrapper  呼叫本Wrapper的上游Wrapper
     * @return 返回true将会使WorkerWrapper跳过执行。
     */
    boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper);
    
    // consts 略
    
}

有以下几个默认值:

  • SkipStrategy.NOT_SKIP
    • 不进行跳过检查,也不打算跳过
  • SkipStrategy.CHECK_ONE_LEVEL该值为默认值若builder未设置则默认使用这个。
    • 仅检查深度为1的下游wrapper。如果其全部不在初始化状态则自己会被跳过。

以下是一个示例:

/**
 * @author create by TcSnZh on 2021/5/9-下午4:12
 */
class Case7 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return builder(id, -1L);
    }

    private static WorkerWrapperBuilder<?, ?> builder(String id, long sleepTime) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("wrapper(id=" + id + ") is working");
                    if (sleepTime > 0) {
                        try {
                            Thread.sleep(sleepTime);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return null;
                });
    }

    /**
     * A ==> B(10ms) ==> C ==> D  (D可在E、C任意一个完成后执行)
     * . \====> E(5ms) ====/
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<?, ?> d = builder("D").depends(DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS).build();
        WorkerWrapper<?, ?> a = builder("A")
                .nextOf(builder("B", 10)
                                .nextOf(builder("C")
                                        .nextOf(d)
                                        // 这里我们没有设置C的跳过策略因为默认使用CHECK_ONE_LEVEL可将下行代码注释去掉则C会执行
//                                        .setSkipStrategy(SkipStrategy.NOT_SKIP)
                                        .build())
                                .build(),
                        builder("E", 5).nextOf(d).build()
                ).build();
        Async.beginWork(1000, a);
        /* 输出:
        wrapper(id=A) is working
        wrapper(id=E) is working
        wrapper(id=B) is working
        wrapper(id=D) is working
        */
    }
}

设置超时

可以在Async.beginWork(/* ... */)中传入总超时时间也可以对单个wrapper设置超时时间。

总任务时间超时

Async.beginWork方法中可以指定总超时时间。

单wrapper任务时间超时

可在Builder中设置超时选项以下是几个关键方法

public interface WorkerWrapperBuilder<T, V> {
    // 略
    
    /**
     * 设置超时时间的具体属性
     */
    SetTimeOut<T, V> setTimeOut();

    interface SetTimeOut<T, V> {
        /**
         * 是否启动超时判断。
         * <p>
         * 默认为true
         *
         * @param enableElseDisable 是则true
         */
        SetTimeOut<T, V> enableTimeOut(boolean enableElseDisable);

        /**
         * 设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断
         *
         * @param time 时间数值
         * @param unit 时间单位
         */
        SetTimeOut<T, V> setTime(long time, TimeUnit unit);

        WorkerWrapperBuilder<T, V> end();
    }
    
    /**
     * 便携式设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断
     *
     * @param time 时间数值
     * @param unit 时间单位
     */
    default WorkerWrapperBuilder<T, V> timeout(long time, TimeUnit unit) {
        return timeout(true, time, unit);
    }

    default WorkerWrapperBuilder<T, V> timeout(boolean enableTimeOut, long time, TimeUnit unit) {
        return setTimeOut().enableTimeOut(enableTimeOut).setTime(time, unit).end();
    }
    
    // 略
}

测试示例

示例:

class Case8 {
    private static WorkerWrapperBuilder<?, ?> builder(String id) {
        return builder(id, -1L);
    }

    private static WorkerWrapperBuilder<?, ?> builder(String id, long sleepTime) {
        return WorkerWrapper.<String, String>builder()
                .id(id)
                .worker((param, allWrappers) -> {
                    System.out.println("\twrapper(id=" + id + ") is working");
                    if (sleepTime > 0) {
                        try {
                            Thread.sleep(sleepTime);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return null;
                })
                .callback((new ICallback<String, String>() {
                    @Override
                    public void begin() {
                        System.out.println("wrapper(id=" + id + ") has begin . ");
                    }

                    @Override
                    public void result(boolean success, String param, WorkResult<String> workResult) {
                        System.out.println("\t\twrapper(id=" + id + ") callback "
                                + (success ? "success " : "fail ")
                                + ", workResult is " + workResult);
                    }
                }));
    }

    /**
     * A ==> B(10ms) ==> C(20ms)
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        WorkerWrapper<?, ?> a = builder("A")
                .nextOf(builder("B", 10)
                        .nextOf(builder("C", 20).build())
                        .build())
                .build();
        Async.beginWork(15, a);
        /* 输出:
        wrapper(id=A) has begin .
            wrapper(id=A) is working
                wrapper(id=A) callback success , workResult is WorkResult{result=null, resultState=SUCCESS, ex=null}
        wrapper(id=B) has begin .
            wrapper(id=B) is working
                wrapper(id=B) callback success , workResult is WorkResult{result=null, resultState=TIMEOUT, ex=null}
        wrapper(id=C) has begin .
                wrapper(id=C) callback fail , workResult is WorkResult{result=null, resultState=TIMEOUT, ex=null}
        java.lang.InterruptedException: sleep interrupted
            at java.lang.Thread.sleep(Native Method)
            以下异常信息省略
        */
    }
}

设置是否允许被打断线程

可通过该选项去设置允许线程被打断:

public interface WorkerWrapperBuilder<T, V> {
    // 略
    
    /**
     * 是否允许被试图中断线程
     *
     * @param allow 是则true
     */
    WorkerWrapperBuilder<T, V> allowInterrupt(boolean allow);
    
    // 略
}

开启之后在以下情况会试图打断正处于WORKING状态的工作线程。

  • 总任务超时但本wrapper在WORKING。

  • 单wrapper超时但本wrapper在WORKING。

  • wrapper应当被跳过但本wrapper在WORKING。

  • 调用WorkerWrapper#failNow()方法且wrapper在WORKING状态。