diff --git a/QuickStart.md b/QuickStart.md
index 690e952..ec1e1db 100644
--- a/QuickStart.md
+++ b/QuickStart.md
@@ -56,13 +56,7 @@ mvn install
asyncTool-core
com.jd.platform
- 1.5.0-SNAPSHOT
-
-
-
- asyncTool-scheduling
- com.jd.platform
- 1.5.0-SNAPSHOT
+ 1.5.1-SNAPSHOT
```
@@ -77,7 +71,7 @@ mvn install
>
> com.jd.platform
> asyncTool-core
-> 1.5.0-SNAPSHOT
+> 1.5.1-SNAPSHOT
>
>
> ```
@@ -191,23 +185,6 @@ WorkerWrapper.builder()
以下为示例:
```java
-package v15.cases;
-
-import com.jd.platform.async.callback.DefaultCallback;
-import com.jd.platform.async.executor.Async;
-import com.jd.platform.async.openutil.collection.CommonDirectedGraph;
-import com.jd.platform.async.openutil.collection.DirectedGraph;
-import com.jd.platform.async.wrapper.QuickBuildWorkerWrapper;
-import com.jd.platform.async.wrapper.WorkerWrapper;
-import com.jd.platform.async.wrapper.strategy.WrapperStrategy;
-
-import java.util.concurrent.*;
-
-/**
- * 快速构造示例。
- *
- * @author create by TcSnZh on 2021/5/17-下午5:23
- */
class Case9 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DirectedGraph, Object> graph = DirectedGraph.synchronizedDigraph(new CommonDirectedGraph<>());
@@ -244,10 +221,16 @@ class Case9 {
// System.out.println(graph);
- Async.beginWork(200, w1);
+ Async.work(200, w1).awaitFinish();
System.out.println(" Begin work end .\n w1 : " + w1 + "\n w2 : " + w2 + "\n");
-
+ /* 输出:
+ I am IWorker 1
+ I am IWorker 2
+ Begin work end .
+ w1 : 省略
+ w2 : 省略
+ */
}
}
```
@@ -259,7 +242,7 @@ class Case9 {

```java
-class Test {
+class Case0 {
static WorkerWrapperBuilder, ?> builder(String id) {
return WorkerWrapper.builder()
.id(id)
@@ -274,15 +257,15 @@ class Test {
WorkerWrapper, ?> b = builder("B").build();
WorkerWrapper, ?> c = builder("C").build();
try {
- Async.beginWork(100, a, b, c);
- } catch (ExecutionException | InterruptedException e) {
+ Async.work(100, a, b, c).awaitFinish();
+ } catch (InterruptedException e) {
e.printStackTrace();
}
/* 输出:
wrapper(id=A) is working
wrapper(id=B) is working
wrapper(id=C) is working
- */
+ */
}
}
```
@@ -293,7 +276,7 @@ class Test {

```java
-class Test {
+class Case01 {
static WorkerWrapperBuilder, ?> builder(String id) {
return WorkerWrapper.builder()
.id(id)
@@ -309,8 +292,8 @@ class Test {
WorkerWrapper, ?> c = builder("C").depends(a).build();
WorkerWrapper, ?> f = builder("F").depends(b, c).build();
try {
- Async.beginWork(100, a);
- } catch (ExecutionException | InterruptedException e) {
+ Async.work(100, a).awaitFinish();
+ } catch (InterruptedException e) {
e.printStackTrace();
}
/* 输出:
@@ -326,7 +309,7 @@ class Test {
如果觉得`.depneds()`方法的排序您不喜欢,也可以用`.nextOf()`这种方式:
```java
-class Test {
+class Case02 {
static WorkerWrapperBuilder, ?> builder(String id) {
return WorkerWrapper.builder()
.id(id)
@@ -343,8 +326,8 @@ class Test {
.nextOf(builder("C").nextOf(f).build())
.build();
try {
- Async.beginWork(100, a);
- } catch (ExecutionException | InterruptedException e) {
+ Async.work(100, a).awaitFinish();
+ } catch (InterruptedException e) {
e.printStackTrace();
}
/* 输出:
@@ -396,8 +379,8 @@ class Case1 {
)
.build();
try {
- Async.beginWork(1000, a, d);
- } catch (ExecutionException | InterruptedException e) {
+ Async.work(1000, a, d).awaitFinish();
+ } catch (InterruptedException e) {
e.printStackTrace();
}
/* 输出:
@@ -455,7 +438,7 @@ class Case2 {
.id("id:200").worker(new AddWork()).param(200).build();
WorkerWrapper add = WorkerWrapper.builder().id("id:add")
.worker(new AddWork("id:100", "id:200")).depends(wrapper100, wrapper200).build();
- Async.beginWork(20,wrapper100,wrapper200);
+ Async.work(20,wrapper100,wrapper200).awaitFinish();
System.out.println(add.getWorkResult());
// 输出WorkResult{result=300, resultState=SUCCESS, ex=null}
}
@@ -469,11 +452,16 @@ class Case2 {
`Async`工具类有多个方法可以使用自定义线程池
```java
-public static boolean beginWork(long timeout,
+public static OnceWork work(long timeout,
ExecutorService executorService,
Collection extends WorkerWrapper,?>> workerWrappers);
-public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper);
+public static OnceWork work(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper);
+
+public static OnceWork work(long timeout,
+ ExecutorService executorService,
+ Collection extends WorkerWrapper, ?>> workerWrappers,
+ String workId);
```
另外,如果没有指定线程池,默认会使用`COMMON_POOL`,您可以调用这些方法获得/关闭此线程池:
@@ -496,7 +484,7 @@ public static synchronized boolean shutDownCommonPool(boolean now);
以下是一个使用自定义线程池的简单代码示例:
```java
-Async.beginWork(1000, Executors.newFixedThreadPool(2),a);
+Async.work(1000, Executors.newFixedThreadPool(2),a).awaitFinish();
```
## WorkerWrapper基本属性
@@ -548,6 +536,83 @@ V action(T object, Map> allWrappers);
> 其他属性都写在源码注释中,可下载源码慢慢查看。
+## `OnceWork`任务句柄详解
+
+`Async.work(...)`方法的返回值为`OnceWork`句柄。
+
+> 源码里有大量的注释,直接看源码效率更高
+
+#### 需要同步等待完成
+
+`OnceWork`被返回后,任务并没有同步完成,而是还在运行。
+
+如果我们需要同步完成,则调用`awaitFinish`方法即可同步等待。
+
+#### 取消任务
+
+调用`pleaseCancel`方法可取消任务,以下为示例:
+
+```java
+class Case10 {
+ private static WorkerWrapperBuilder, ?> builder(String id) {
+ return builder(id, -1L);
+ }
+
+ private static WorkerWrapperBuilder, ?> builder(String id, long sleepTime) {
+ return WorkerWrapper.builder()
+ .id(id)
+ .worker((param, allWrappers) -> {
+ System.out.println("\twrapper(id=" + id + ") is working");
+ if (sleepTime > 0) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return null;
+ })
+ .callback((new ICallback() {
+ @Override
+ public void begin() {
+ System.out.println("wrapper(id=" + id + ") has begin . ");
+ }
+
+ @Override
+ public void result(boolean success, String param, WorkResult workResult) {
+ System.out.println("\t\twrapper(id=" + id + ") callback "
+ + (success ? "success " : "fail ")
+ + ", workResult is " + workResult);
+ }
+ }))
+ .allowInterrupt(true);
+ }
+
+ /**
+ * A(10ms) ==> B(10ms) ==> C(10ms)
+ */
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ final WorkerWrapper, ?> c;
+ final WorkerWrapper, ?> b;
+ final WorkerWrapper, ?> a = builder("A", 10)
+ .nextOf(b = builder("B", 10)
+ .nextOf(c = builder("C", 10).build())
+ .build())
+ .build();
+ final OnceWork onceWork = Async.work(40, a);
+ Thread.sleep(25);
+ onceWork.pleaseCancelAndAwaitFinish();
+ System.out.println("任务b信息 " + b);
+ System.out.println("任务c信息 " + c);
+ System.out.println("OnceWork信息 " + onceWork);
+ /*
+ 可以看到C的state为SKIP,workResult.ex为CancelSkippedException,即被取消了。
+ 不过有时程序运行慢,导致B被取消了,那么C就不会执行,其状态就为INIT了。
+ */
+ }
+}
+```
+
## 设置WorkerWrapper属性
### 设置依赖策略
@@ -618,7 +683,7 @@ class Case3 {
// 这里用线程数较少的线程池做示例,对于ALL_DEPENDENCIES_ANY_SUCCESS“仅需一个”的效果会好一点
ExecutorService pool = Executors.newFixedThreadPool(2);
try {
- Async.beginWork(1000, pool, a);
+ Async.work(1000, pool, a).awaitFinish();
} finally {
pool.shutdown();
}
@@ -849,7 +914,7 @@ class Case4 {
}
ExecutorService pool = Executors.newFixedThreadPool(2);
try {
- Async.beginWork(1000, pool, a);
+ Async.work(1000, pool, a).awaitFinish();
} finally {
pool.shutdown();
}
@@ -938,7 +1003,7 @@ class Case5 {
WorkerWrapper, ?> start = builder("start").nextOf(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10).build();
ExecutorService pool = Executors.newFixedThreadPool(2);
try {
- Async.beginWork(1000, pool, start);
+ Async.work(1000, pool, start).awaitFinish();
} finally {
pool.shutdown();
}
@@ -1076,7 +1141,7 @@ class Case6 {
.specialToNextWrapper(fromWrapper -> DependenceAction.START_WORK.emptyProperty(), b)
.wrapper(b)
.end().build();
- Async.beginWork(1000, a);
+ Async.work(1000, a).awaitFinish();
System.out.println(a.getWorkResult());
System.out.println(b.getWorkResult());
/* 输出:
@@ -1181,7 +1246,7 @@ class Case7 {
.build(),
builder("E", 5).nextOf(d).build()
).build();
- Async.beginWork(1000, a);
+ Async.work(1000, a).awaitFinish();
/* 输出:
wrapper(id=A) is working
wrapper(id=E) is working
@@ -1302,7 +1367,7 @@ class Case8 {
.nextOf(builder("C", 20).build())
.build())
.build();
- Async.beginWork(15, a);
+ Async.work(20, a).awaitFinish();
/* 输出:
wrapper(id=A) has begin .
wrapper(id=A) is working
@@ -1347,7 +1412,6 @@ public interface WorkerWrapperBuilder {
* 总任务超时,但本wrapper在WORKING。
* 单wrapper超时,但本wrapper在WORKING。
* wrapper应当被跳过,但本wrapper在WORKING。
-
* 调用`WorkerWrapper#failNow()`方法,且wrapper在WORKING状态。
# 开放工具类
@@ -1361,14 +1425,14 @@ public interface WorkerWrapperBuilder {
>
> com.jd.platform
> asyncTool-openutil
-> 1.5.0-SNAPSHOT
+> 1.5.1-SNAPSHOT
>
>
> ```
### 集合类
-> `com.jd.platform.async.openutil.collection.*`
+> 普通集合包`com.jd.platform.async.openutil.collection.*`
这里不详述,要用的话源码里有注释。
@@ -1384,255 +1448,7 @@ public interface WorkerWrapperBuilder {
### 其他
-`com.jd.platform.async.openutil`
+> `com.jd.platform.async.openutil.*`
-* `BiInt` 一个表示两个int值的实体类,内含默认比较器、缓冲区间。
-
-# 动态任务调度
-
->引入依赖
->
->```xml
->
->
-> asyncTool-scheduling
-> com.jd.platform
-> 1.5.0-SNAPSHOT
->
->```
-
-动态任务调度,传个json就能让调度工厂随心所欲的构造wrapper与执行。
-
-如果您已经熟悉**《任务编排》**这一章的内容,则可以试试这玩意。
-
-## 基本组件
-
-## 模型属性
-
-### 字段属性详解
-
-以下是一个传入的json的格式示例,注释中描述了属性含义与格式:
-
-```json
-{
- // 图纸名称。可使用 字符串、null、不写。为null或不写,则会使用uuid
- drawingsName :"",
- // 提供的全部WorkerWrapper列表。
- // 内部格式请参考《wrappers》这章。
- wrappers:[],
- // wrapper顺序关系。
- // 内部格式请参考《relations》这章
- relations: [],
- // 任务启动参数
- // 内部格式请参考《beginWork》这章
- beginWork:{}
-}
-```
-
-#### wrappers
-
-wrappers数组中,只允许存入满足以下格式的对象:
-
-```json
-wrappers:[
- {
- // id,传入字符串。
- // 而且:不能与wrappers数组中其他的对象的id属性相同。即必须保证id唯一。
- // 不允许undefined,不允许null。
- "id": "first",
- // param,即参数。请看《param》
- // useObjectModel代表value的值是否是“对象模型”。
- // 允许undefined或null,视为{"useObjectModel": false,"value": null}
- "param": {
- "useObjectModel": false,
- "value": "JackMa"
- },
- // 传入对象模型,请看《对象模型ObjectModel》
- // 不允许undefined或null
- "worker": {
- "sameObjectId": 1,
- "className:": "schedulingtest.impl.SelectUserByName"
- },
- // 传入对象模型
- // 允许undefined与null。如果为两者则使用com.jd.platform.async.callback.DefaultCallback
- "callback": {
- "sameObjectId": 1
- },
- // wrapper策略
- // 允许undefined与null。如果为两者则使用com.jd.platform.async.wrapper.strategy.WrapperStrategy.DefaultWrapperStrategy
- // 即"ALL_DEPENDENCIES_ALL_SUCCESS"与"CHECK_ONE_LEVEL"
- "wrapperStrategy": {
- // 传入{}键值对,键名为即wrapper的id属性,值为对象模型。
- // 允许undefined和null,两者之意与空键值对{}并无二致
- "dependOnUpWrapperStrategyMapper": null,
- // -- 这里不再向历史妥协,舍弃了DependMustStrategyMapper
- // 基础策略器,传入对象模型
- // 允许undefined和null,如果是两者则使用"ALL_DEPENDENCIES_ALL_SUCCESS"
- "dependenceStrategy": {
- "constObjectName": "ALL_DEPENDENCIES_ALL_SUCCESS"
- },
- // 跳过策略,传入对象模型
- // 允许undefined和null,如果是两者则使用"CHECK_ONE_LEVEL"
- "skipStrategy": {
- "constObjectName": "CHECK_ONE_LEVEL"
- }
- },
- // 是否允许打断,传入boolean值。允许undefined和null,视为false
- "allowInterrupt": true,
- // 是否启动单wrapper计时,允许undefined和null,视为false
- "enableTimeout": true,
- // 单wrapper超时时间数值。传入long(int64)值
- // 在enableTimeout为true的情况下不允许为undefined或null或小于等于0的值,否则允许任何值
- "timeoutLength": 50,
- // 单wrapper超时时间单位,有以下几个取值:(即java.util.concurrent.TimeUnit的枚举值)
- // "NANOSECONDS"、"MICROSECONDS"、"MILLISECONDS"
- // "SECONDS"、"MINUTES"、"HOURS"、"DAYS"
- // 允许为undefined和null,视为"MILLISECONDS"
- // 但是除了undefined、null和以上7个值外,不允许任何值
- "timeoutUnit": "MILLISECONDS"
- },
- // 这是第二个wrapper属性,这里可以省略很多选用默认值的属性。
- {
- "id": "second",
- "param":{
- "useObjectModel": false,
- "value":"first"
- },
- "worker": {
- "className": "schedulingtest.impl.PayTaxes",
- "sameObjectId": 2
- },
- callback:{
- "sameObjectId": 2
- }
- },
- // 这是第三个属性,这里的"extendConfig"属性是个省力的好东西。
- {
- "id": "third",
- // 传入存在的id属性
- // 效果是:本wrapper配置将继承此id表示的wrapper配置的所有属性
- // 并且本wrapper可以有选择的覆写配置——只需设置某个对象的值即可。
- // 可以为null或undefined,表示不继承配置
- "extendConfig": "second"
- },
- /* , { ... } */
-]
-```
-
-##### param
-
-`useObjectModel`属性用于说明`value`属性的所代表的对象类型:
-
-* 为false:使用json所对应的类型。
-* 为true:使用《对象模型`ObjectModel`》中的我们自定义的对象模型。
-
-```json
-{
- "useObjectModel": false,
- "value": "JackMa"
-},
-```
-
-```json
-{
- "useObjectModel": true,
- "value": {
- sameObjectId: 3
- }
-}
-```
-
-#### relations
-
-```json
-relation: {
- // 可使用 数组(仅当to使用字符串时)、字符串
- // 不允许为null或undefined,如果两个wrapper之间无关系,宁可不写这整个对象。
- from: "",
- // 可使用 数组(仅当from使用字符串时)、字符串
- // 不允许为null或undefined,如果两个wrapper之间无关系,宁可不写这整个对象。
- to: ""
-}
-```
-
-`from`和`to`两个属性传入的字符串,必须是在`wrappers`数组中所含有的`id`属性。
-
-这两个属性只有在对方为字符串时,才能设置自己为数组,以表示“一对多”关系。
-
-#### beginWork
-
-```json
-beginWork: {
- // 设置全组超时时间数值,使用long(int64)值
- // 可以传入<=0的值表示不限制超时时间。
- // 允许为null、undefined,视为不限制超时时间。
- timeoutLength: 100,
- // 全组超时时间单位,有以下几个取值:(即java.util.concurrent.TimeUnit的枚举值)
- // "NANOSECONDS"、"MICROSECONDS"、"MILLISECONDS"
- // "SECONDS"、"MINUTES"、"HOURS"、"DAYS"
- // 允许为undefined和null,视为"MILLISECONDS"
- // 但是除了undefined、null和以上7个值外,不允许任何值
- timeoutUnit: "MILLISECONDS",
- // 传入的启动wrapper的id数组,不允许null、undefined。可以为空,但是这不符合常理。
- wrappers: [
- "first"
- ],
- // 执行的线程池
- // 可以传入"COMMON_POOL",代表使用asyncTool的默认线程池。
- // 可以为null或undefined,视为"COMMON_POOL"
- // 也可以以如下格式传入全限定名+字段/方法名字符串,则使用反射调取该字段的所指向的ExecutorService:
- // 全限定名#字段名 或 全限定名##方法名
- executor: "COMMON_POOL"
-}
-```
-
-### 其他特殊格式
-
-#### 对象模型`ObjectModel`
-
-* 有时我们需要指定接口实现类,或者是默认的接口实现对象,并自定义这些对象的属性。
-
-* 或是指定多个json字段代表一个共同的对象。
-
-那么,我们就需要高度自定义此对象的属性,并封装为`com.jd.platform.async.scheduling.model.ObjectModel`类。
-
-```json
-{
- // 如果需要使用asyncTool指名道姓的常量实现对象,则请见下方《常量对象规范》,并在这里传入名称字符串。
- // 当设置了该属性时,其他的属性均会被忽视。
- "constObjectName": "NOT_SKIP",
- // 如果希望指定多个json字段代表一个共同的对象:
- // 则将其设为相同的id。
- // 如果设置了该属性,其他的属性均会被忽视。(优先级低于constObjectName)
- "sameObjectId": 1,
- // 提供类的全限定名字符串,将调用无参构造方法进行初始化
- // 如果constObjectName设置了非null且非undefined值,则此值允许为null或undefined
- // 如果sameObjectId设置了非null且非undefined值,则id相同的对象模型中允许且只允许一个值为非null或非undefined,其他的都必须为null或undefined
- "className": "your.package.name.YourKlassName",
- // 初始化后,会根据该值来修改对象属性
- // 允许为null或undefined,表示不额外设置属性
-
- "properties": {
- // 其中的键值对为各字段名。这些字段需要有getter、setter方法。
- "myIntegerField": 123123
- }
-}
-```
-
-##### 常量对象大全
-
-在`constObjectName`属性中设置,用以下字符串代表如下对象:
-
-| 对象名字符串 | 常量值 |
-| -------------------------------- | ------------------------------------------------------- |
-| `"NOT_SKIP"` | `SkipStrategy.NOT_SKIP` |
-| `"CHECK_ONE_LEVEL"` | `SkipStrategy.CHECK_ONE_LEVEL` |
-| `"ALL_DEPENDENCIES_ALL_SUCCESS"` | `DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS` |
-| `"ALL_DEPENDENCIES_ANY_SUCCESS"` | `DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS` |
-| `"ALL_DEPENDENCIES_NONE_FAILED"` | ` DependenceStrategy.ALL_DEPENDENCIES_NONE_FAILED` |
-| `"SUCCESS_CONTINUE"` | `DependOnUpWrapperStrategy.SUCCESS_CONTINUE` |
-| `"SUCCESS_START_INIT_CONTINUE"` | `DependOnUpWrapperStrategy.SUCCESS_START_INIT_CONTINUE` |
-| `"PRINT_EXCEPTION_STACK_TRACE"` | `ICallback.PRINT_EXCEPTION_STACK_TRACE` |
-
-详见请参考`com.jd.platform.async.scheduling.model.Constants`类中的具体代码。
+* `BiInt` 一个表示两个int值的实体类,内含缓冲区间、默认比较器,适合拿来当2维索引。
diff --git a/asyncTool-core/pom.xml b/asyncTool-core/pom.xml
index 5ed85e0..0838a91 100644
--- a/asyncTool-core/pom.xml
+++ b/asyncTool-core/pom.xml
@@ -5,7 +5,7 @@
asyncTool
com.jd.platform
- 1.5.0-SNAPSHOT
+ 1.5.1-SNAPSHOT
4.0.0
@@ -20,7 +20,7 @@
com.jd.platform
asyncTool-openutil
- 1.5.0-SNAPSHOT
+ 1.5.1-SNAPSHOT
diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelSkippedException.java b/asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelSkippedException.java
new file mode 100644
index 0000000..5532b74
--- /dev/null
+++ b/asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelSkippedException.java
@@ -0,0 +1,19 @@
+package com.jd.platform.async.exception;
+
+/**
+ * 整组取消,设置该异常。
+ *
+ * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午6:12
+ */
+public class CancelSkippedException extends SkippedException {
+ public CancelSkippedException() {
+ }
+
+ public CancelSkippedException(String message) {
+ super(message);
+ }
+
+ public CancelSkippedException(String message, long skipAt) {
+ super(message, skipAt);
+ }
+}
diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java b/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java
index 8d820e0..1c38e7a 100644
--- a/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java
+++ b/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java
@@ -1,5 +1,7 @@
package com.jd.platform.async.exception;
+import com.jd.platform.async.executor.timer.SystemClock;
+
/**
* 如果任务在执行之前,自己后面的任务已经执行完或正在被执行,则抛该exception
*
@@ -7,11 +9,22 @@ package com.jd.platform.async.exception;
* @version 1.0
*/
public class SkippedException extends RuntimeException {
+ private final long skipAt;
+
public SkippedException() {
- super();
+ this(null);
}
public SkippedException(String message) {
+ this(message, SystemClock.now());
+ }
+
+ public SkippedException(String message, long skipAt) {
super(message);
+ this.skipAt = skipAt;
+ }
+
+ public long getSkipAt() {
+ return skipAt;
}
}
diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java
index 3232946..c0c3d88 100644
--- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java
+++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java
@@ -4,15 +4,15 @@ package com.jd.platform.async.executor;
import com.jd.platform.async.callback.DefaultGroupCallback;
import com.jd.platform.async.callback.IGroupCallback;
import com.jd.platform.async.executor.timer.SystemClock;
+import com.jd.platform.async.worker.OnceWork;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperGroup;
+import com.sun.istack.internal.Nullable;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
@@ -26,20 +26,65 @@ public class Async {
// ========================= 任务执行核心代码 =========================
/**
- * 出发点
- *
- * @return 只要执行未超时,就返回true。
+ * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。
+ * 使用uuid作为工作id。使用{@link #getCommonPool()}作为线程池。
*/
- public static boolean beginWork(long timeout,
- ExecutorService executorService,
- Collection extends WorkerWrapper, ?>> workerWrappers)
- throws InterruptedException {
- if (workerWrappers == null || workerWrappers.size() == 0) {
- return false;
+ public static OnceWork work(long timeout,
+ Collection extends WorkerWrapper, ?>> workerWrappers) {
+ return work(timeout, getCommonPool(), workerWrappers);
+ }
+
+ /**
+ * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。
+ * 可变参式传入。使用uuid作为工作id。使用{@link #getCommonPool()}作为线程池。
+ */
+ public static OnceWork work(long timeout,
+ WorkerWrapper, ?>... workerWrappers) {
+ return work(timeout, getCommonPool(), workerWrappers);
+ }
+
+ /**
+ * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。
+ * 可变参式传入。使用uuid作为工作id。
+ */
+ public static OnceWork work(long timeout,
+ ExecutorService executorService,
+ WorkerWrapper, ?>... workerWrappers) {
+ return work(timeout, executorService, Arrays.asList(
+ Objects.requireNonNull(workerWrappers, "workerWrappers array is null")));
+ }
+
+ /**
+ * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。
+ * 省略工作id,使用uuid。
+ */
+ public static OnceWork work(long timeout,
+ ExecutorService executorService,
+ Collection extends WorkerWrapper, ?>> workerWrappers) {
+ return work(timeout, executorService, workerWrappers, UUID.randomUUID().toString());
+ }
+
+ /**
+ * 核心方法。
+ * 该方法不是同步阻塞执行的。如果想要同步阻塞执行,则调用返回值的{@link OnceWork#awaitFinish()}即可。
+ *
+ * @param timeout 全组超时时间
+ * @param executorService 执行线程池
+ * @param workerWrappers 任务容器集合
+ * @param workId 本次工作id
+ * @return 返回 {@link OnceWork}封装对象。
+ */
+ public static OnceWork work(long timeout,
+ ExecutorService executorService,
+ Collection extends WorkerWrapper, ?>> workerWrappers,
+ String workId) {
+ if (workerWrappers == null || workerWrappers.isEmpty()) {
+ return OnceWork.emptyWork(workId);
}
//保存上次执行的线程池变量(为了兼容以前的旧功能)
- Async.lastExecutorService = Objects.requireNonNull(executorService, "ExecutorService is null ! ");
- WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout);
+ Async.lastExecutorService.set(Objects.requireNonNull(executorService, "ExecutorService is null ! "));
+ final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout);
+ final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId);
group.addWrapper(workerWrappers);
workerWrappers.forEach(wrapper -> {
if (wrapper == null) {
@@ -47,38 +92,24 @@ public class Async {
}
executorService.submit(() -> wrapper.work(executorService, timeout, group));
});
- return group.awaitFinish();
- //处理超时的逻辑被移动到了WrapperEndingInspector中。
+ return onceWork;
}
/**
- * 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL
+ * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
*/
- public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
- throws ExecutionException, InterruptedException {
- if (workerWrapper == null || workerWrapper.length == 0) {
- return false;
- }
- Set workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet());
- //noinspection unchecked
- return beginWork(timeout, executorService, workerWrappers);
- }
-
- /**
- * 同步阻塞,直到所有都完成,或失败
- */
- public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
- return beginWork(timeout, getCommonPool(), workerWrapper);
- }
-
@SuppressWarnings("unused")
+ @Deprecated
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper);
}
/**
* 异步执行,直到所有都完成,或失败后,发起回调
+ *
+ * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
*/
+ @Deprecated
public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
if (groupCallback == null) {
groupCallback = new DefaultGroupCallback();
@@ -129,7 +160,7 @@ public class Async {
* 该线程池将会给线程取名为asyncTool-commonPool-thread-0(数字不重复)。
*
*/
- private static ThreadPoolExecutor COMMON_POOL;
+ private static volatile ThreadPoolExecutor COMMON_POOL;
/**
* 在以前(及现在)的版本中:
@@ -137,7 +168,7 @@ public class Async {
*
* 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个
*/
- private static volatile ExecutorService lastExecutorService;
+ private static final AtomicReference lastExecutorService = new AtomicReference<>(null);
/**
* 该方法将会返回{@link #COMMON_POOL},如果还未初始化则会懒加载初始化后再返回。
@@ -155,9 +186,11 @@ public class Async {
new ThreadFactory() {
private final AtomicLong threadCount = new AtomicLong(0);
+ @SuppressWarnings("NullableProblems")
@Override
public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "asyncTool-commonPool-thread-" + threadCount.getAndIncrement());
+ Thread t = new Thread(r,
+ "asyncTool-commonPool-thread-" + threadCount.getAndIncrement());
t.setDaemon(true);
return t;
}
@@ -189,6 +222,7 @@ public class Async {
* @param now 是否立即关闭
* @return 如果尚未调用过{@link #getCommonPool()},即没有初始化默认线程池,返回false。否则返回true。
*/
+ @SuppressWarnings("unused")
public static synchronized boolean shutDownCommonPool(boolean now) {
if (COMMON_POOL == null) {
return false;
@@ -203,6 +237,51 @@ public class Async {
return true;
}
+ // ========================= deprecated =========================
+
+ /**
+ * 同步执行一次任务。
+ *
+ * @return 只要执行未超时,就返回true。
+ * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
+ */
+ @Deprecated
+ public static boolean beginWork(long timeout,
+ ExecutorService executorService,
+ Collection extends WorkerWrapper, ?>> workerWrappers)
+ throws InterruptedException {
+ final OnceWork work = work(timeout, executorService, workerWrappers);
+ work.awaitFinish();
+ return work.hasTimeout();
+ }
+
+ /**
+ * 同步执行一次任务。
+ * 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL
+ *
+ * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
+ */
+ @Deprecated
+ public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
+ throws ExecutionException, InterruptedException {
+ if (workerWrapper == null || workerWrapper.length == 0) {
+ return false;
+ }
+ Set workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet());
+ //noinspection unchecked
+ return beginWork(timeout, executorService, workerWrappers);
+ }
+
+ /**
+ * 同步阻塞,直到所有都完成,或失败
+ *
+ * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
+ */
+ @Deprecated
+ public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
+ return beginWork(timeout, getCommonPool(), workerWrapper);
+ }
+
/**
* 关闭上次使用的线程池
*
@@ -214,8 +293,9 @@ public class Async {
*/
@Deprecated
public static void shutDown() {
- if (lastExecutorService != COMMON_POOL) {
- shutDown(lastExecutorService);
+ final ExecutorService last = lastExecutorService.get();
+ if (last != COMMON_POOL) {
+ shutDown(last);
}
}
@@ -226,7 +306,7 @@ public class Async {
* @deprecated 没啥用的方法,要关闭线程池还不如直接调用线程池的关闭方法,避免歧义。
*/
@Deprecated
- public static void shutDown(ExecutorService executorService) {
+ public static void shutDown(@Nullable ExecutorService executorService) {
if (executorService != null) {
executorService.shutdown();
}
diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java
index 01778a2..2dc1829 100644
--- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java
+++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java
@@ -65,7 +65,7 @@ public class PollingCenter {
thread.setDaemon(true);
return thread;
},
- 4,
+ 1,
TimeUnit.MILLISECONDS,
1024);
diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java
new file mode 100644
index 0000000..611638b
--- /dev/null
+++ b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java
@@ -0,0 +1,375 @@
+package com.jd.platform.async.worker;
+
+import com.jd.platform.async.executor.timer.SystemClock;
+import com.jd.platform.async.wrapper.WorkerWrapper;
+import com.jd.platform.async.wrapper.WorkerWrapperGroup;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * 一次工作结果的总接口。
+ *
+ * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午3:22
+ */
+public interface OnceWork {
+ /**
+ * 返回唯一的workId
+ */
+ String workId();
+
+ /**
+ * 判断是否结束。因超时而结束也算结束。
+ */
+ boolean isFinish();
+
+ /**
+ * 同步等待到结束。
+ */
+ void awaitFinish() throws InterruptedException;
+
+ /**
+ * 判断是否超时
+ *
+ * @return 如果尚未结束或已结束但未超时,返回false。已结束且已经超时返回true。
+ */
+ boolean hasTimeout();
+
+ /**
+ * 判断是否全部wrapper都处于 执行成功 或 跳过。
+ *
+ * @return 如果已经结束,所有wrapper都成功或跳过返回true,否则返回false。如果尚未结束,返回false。
+ */
+ default boolean allSuccess() {
+ if (!isFinish()) {
+ return false;
+ }
+ return getWrappers().values().stream().allMatch(wrapper -> {
+ final ResultState state = wrapper.getWorkResult().getResultState();
+ return state == ResultState.SUCCESS || state == ResultState.DEFAULT;
+ });
+ }
+
+ /**
+ * 获取全部参与到工作中的wrapper。
+ */
+ Map> getWrappers();
+
+ /**
+ * 获取{@link WorkResult#getResultState()}为{@link ResultState#SUCCESS}的wrapper。
+ */
+ default Map> getSuccessWrappers() {
+ return getWrappersOfState(ResultState.SUCCESS);
+ }
+
+ /**
+ * 获取状态于这些state中的wrapper。
+ *
+ * @param ofState 状态列表
+ * @return 返回Map
+ */
+ default Map> getWrappersOfState(ResultState... ofState) {
+ final HashSet states = new HashSet<>(Arrays.asList(ofState));
+ return getWrappers().entrySet().stream()
+ .filter(entry -> states.contains(entry.getValue().getWorkResult().getResultState()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ /**
+ * 获取启动时间
+ */
+ long getStartTime();
+
+ /**
+ * 获取结束时间
+ *
+ * @return 如果超时,返回超时的时刻。如果尚未结束,则抛出异常。
+ * @throws IllegalStateException 尚未结束,抛出异常。
+ */
+ long getFinishTime();
+
+ /**
+ * @return 已经取消完成
+ */
+ boolean isCancelled();
+
+ /**
+ * @return 是否正在取消中
+ */
+ boolean isWaitingCancel();
+
+ /**
+ * 请求异步取消。
+ */
+ void pleaseCancel();
+
+ /**
+ * 同步等待取消完成。
+ */
+ default void pleaseCancelAndAwaitFinish() throws InterruptedException {
+ if (!isCancelled() && !isWaitingCancel()) {
+ pleaseCancel();
+ }
+ awaitFinish();
+ }
+
+ /**
+ * @return 返回 {@link AsFuture}封装对象。
+ */
+ default AsFuture asFuture() {
+ return new AsFuture(this, limitTime -> limitTime / 16);
+ }
+
+ /**
+ * @param sleepCheckInterval 为防止线程爆炸,在{@link Future#get(long, TimeUnit)}方法时使用隔一段时间检查一次。
+ * 该Function的参数为总超时毫秒值,返回值为检查时间间隔。
+ * @return 返回 {@link AsFuture}封装对象。
+ */
+ default AsFuture asFuture(Function sleepCheckInterval) {
+ return new AsFuture(this, sleepCheckInterval);
+ }
+
+ // static
+
+ /**
+ * 空任务
+ */
+ static OnceWork emptyWork(String workId) {
+ return new EmptyWork(workId);
+ }
+
+ // class
+
+ class AsFuture implements Future