v1.5.3 模块化代码,初步构思了动态任务调度的配置文件格式

This commit is contained in:
TcSnZh
2021-05-19 22:53:43 +08:00
parent 13be645314
commit 980dc41f02
106 changed files with 3629 additions and 363 deletions

40
.gitignore vendored
View File

@@ -1,32 +1,32 @@
HELP.md
target/
**/HELP.md
**/target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
**/.apt_generated
**/.classpath
**/.factorypath
**/.project
**/.settings
**/.springBeans
**/.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
**/.idea
**/*.iws
**/*.iml
**/*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
**/nbproject/private/
**/nbbuild/
**/dist/
**/nbdist/
**/.nb-gradle/
**/build/
### VS Code ###
.vscode/
**/.vscode/

View File

@@ -4,6 +4,8 @@
代码不多,直接拷贝包过去即可。
#### 旧稳定版本v1.4
京东同事通过引用如下maven来使用。
```xml
@@ -34,11 +36,55 @@
</dependency>
```
# 使用说明
#### 最新版本v1.5(不稳定)
从gitee上下载仓库到本地切换到`dev`分支然后maven安装到本地仓库。
```bash
git clone https://gitee.com/jd-platform-opensource/asyncTool.git
cd ./asyncTool
git checkout dev
mvn install
```
在项目中引入依赖。
```xml
<!-- 任务编排核心包 -->
<dependency>
<artifactId>asyncTool-core</artifactId>
<groupId>com.jd.platform</groupId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
<!-- 动态任务调度有需要的话可以引入。依赖fastjson请自行解决版本冲突 -->
<dependency>
<artifactId>asyncTool-scheduling</artifactId>
<groupId>com.jd.platform</groupId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
```
# 任务编排
> `asyncTool-core`核心模块提供了核心功能——任务编排
>
> 以下文档基于版本:
>
> ```xml
> <dependencies>
> <dependency>
> <groupId>com.jd.platform</groupId>
> <artifactId>asyncTool-core</artifactId>
> <version>1.5.0-SNAPSHOT</version>
> </dependency>
> </dependencies>
> ```
### 基本组件
worker 一个最小的任务执行单元。通常是一个网络调用,或一段耗时操作。
`IWorker` 一个最小的任务执行单元。通常是一个网络调用,或一段耗时操作。
TV两个泛型分别是入参和出参类型。
@@ -63,7 +109,7 @@ public interface IWorker<T, V> {
V action(T object, Map<String, WorkerWrapper> allWrappers);
/**
* 超时、异常时,返回的默认值
* 超时、异常、跳过时,返回的默认值
*
* @return 默认值
*/
@@ -73,8 +119,7 @@ public interface IWorker<T, V> {
}
```
callBack对每个worker的回调。worker执行完毕后会回调该接口带着执行成功、失败、原始入参、和详细的结果。
`ICallback`对每个worker的回调。worker执行完毕后会回调该接口带着执行成功、失败、原始入参、和详细的结果。
```java
/**
@@ -117,6 +162,96 @@ WorkerWrapper<String, String> w0 = WorkerWrapper.<String, String>builder()
通过这一个类看一下action里就是你的耗时操作begin就是任务开始执行时的回调result就是worker执行完毕后的回调。当你组合了多个执行单元时每一步的执行都在掌控之内。失败了还会有自定义的默认值。这是CompleteableFuture无法做到的。
### 如何构造WorkerWrapper?
##### 推荐Builder模式
如果刚开始使用这个框架,则推荐使用如下方式进行构造:
```java
WorkerWrapper.<String, String>builder()
.id()
// 其他属性略。
// 请在《简单示例》与《设置WorkerWrapper属性》中慢慢感受详细内容。
// 因为这里地方小,写不下。
```
##### 复杂的快速构造
> 不推荐新手使用。
>
> 不推荐在业务中使用使用Builder模式代码更加简洁且会检查参数不必节省这些性能。
>
> 该对象的构造方法不会检查属性。
在对WorkerWrapper属性有充足了解后可使用“直接设置属性 + 关系图”的方式快速构造wrapper。
建议在扩展功能的时候使用该构造器,以提高效率。但是请记得检查参数。
以下为示例:
```java
package v15.cases;
import com.jd.platform.async.callback.DefaultCallback;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.openutil.collection.CommonDirectedGraph;
import com.jd.platform.async.openutil.collection.DirectedGraph;
import com.jd.platform.async.wrapper.QuickBuildWorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.strategy.WrapperStrategy;
import java.util.concurrent.*;
/**
* 快速构造示例。
*
* @author create by TcSnZh on 2021/5/17-下午5:23
*/
class Case9 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DirectedGraph<WorkerWrapper<?, ?>, Object> graph = DirectedGraph.synchronizedDigraph(new CommonDirectedGraph<>());
QuickBuildWorkerWrapper<Object, Object> w1 = new QuickBuildWorkerWrapper<>("id1",
null,
(object, allWrappers) -> {
System.out.println("I am IWorker 1");
return null;
},
new DefaultCallback<>(),
false,
true,
100,
TimeUnit.MILLISECONDS,
new WrapperStrategy.DefaultWrapperStrategy(),
graph
);
QuickBuildWorkerWrapper<Object, Object> w2 = new QuickBuildWorkerWrapper<>("id2",
null,
(object, allWrappers) -> {
System.out.println("I am IWorker 2");
return null;
},
new DefaultCallback<>(),
false,
true,
100,
TimeUnit.MILLISECONDS,
new WrapperStrategy.DefaultWrapperStrategy(),
graph
);
graph.addNode(w1, w2);
graph.putRelation(w1, new Object(), w2);
// System.out.println(graph);
Async.beginWork(200, w1);
System.out.println(" Begin work end .\n w1 : " + w1 + "\n w2 : " + w2 + "\n");
}
}
```
### 简单示例
1. 3个任务并行
@@ -353,9 +488,9 @@ public static ThreadPoolExecutor getCommonPool();
/**
* @param now 是否立即关闭
* @throws IllegalStateException 如果尚未调用过{@link #getCommonPool()},即没有使用过“使用默认线程池”的方法,该方法会抛出空指针异常
* @return 如果尚未调用过{@link #getCommonPool()},即没有初始化默认线程池返回false。否则返回true
*/
public static synchronized void shutDownCommonPool(boolean now);
public static synchronized boolean shutDownCommonPool(boolean now);
```
以下是一个使用自定义线程池的简单代码示例:
@@ -364,8 +499,6 @@ public static synchronized void shutDownCommonPool(boolean now);
Async.beginWork(1000, Executors.newFixedThreadPool(2),a);
```
## WorkerWrapper基本属性
### 执行流程
@@ -381,9 +514,39 @@ WorkerWrapper会在这些情况被运行
> processOn流程图文件放在同仓库。
### 其他属性
### 属性
#### id
`WorkerWrapper`的id属性非常重要。
可在builder的该属性设置id如果不设置默认使用UUID。
```java
public interface WorkerWrapperBuilder<T, V> {
/**
* 设置唯一id。
* 如果不设置,{@link StableWorkerWrapperBuilder}会使用UUID
*/
WorkerWrapperBuilder<T, V> id(String id);
// 略
}
```
例如如果你需要在`IWorker`中调用上游wrapper则可以根据id来获取到。
> 该map的键即为`WorkerWrapper`的id。
```java
V action(T object, Map<String, WorkerWrapper<?,?>> allWrappers);
```
请程序员确保在一次任务执行的一组wrapper中id不会重复。在执行过程中不会进行检查。
#### 其他省略
> 其他属性都写在源码注释中,可下载源码慢慢查看。
## 设置WorkerWrapper属性
@@ -466,6 +629,8 @@ class Case3 {
wrapper(id=B2) is working
wrapper(id=C2) is working
wrapper(id=C1) is working
wrapper(id=B4) is working
// 我们看到B5被跳过了没有执行callback
*/
}
}
@@ -531,62 +696,76 @@ public enum DependenceAction {
| `FAST_FAIL` | 立即失败。WorkerWrapper会去执行快速失败的方法。 |
| `JUDGE_BY_AFTER` | 交给下层`{@link DependenceStrategy}`进行判断。 由于`{@link DependenceStrategy#thenJudge(DependenceStrategy)}`的责任链设计模式,该返回值的意义就是调用责任链上下一个策略。 |
> 如果wrapper被跳过ResultState将为`DEFAULT`。
>
>
##### 策略器组件默认实现
* `DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS`该值为默认值若builder未设置则默认使用这个。
1. 被依赖的所有Wrapper都必须成功才能开始工作。
2. 如果其中任一Wrapper还没有执行且不存在失败则休息。
3. 如果其中任一Wrapper失败则立即失败。
3. 如果其中任一Wrapper失败则立即失败。*(跳过不算失败)*
* `DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS`
1. 被依赖的Wrapper中任意一个成功了就可以开始工作。
2. 如果其中所有Wrapper还没有执行则休息。
3. 如果其中一个Wrapper失败且不存在成功则立即失败。
3. 如果其中一个Wrapper失败且不存在成功则立即失败。*(跳过不算失败)*
* `DependenceStrategy.ALL_DEPENDENCIES_NONE_FAILED`
* 如果被依赖的工作中任一失败,则立即失败。否则就开始工作(不论之前的工作有没有开始)。
* 如果被依赖的工作中任一失败,则立即失败。*(跳过不算失败)*
* 否则就开始工作(不论之前的工作有没有开始)。
* `DependenceStrategy.theseWrapperAllSuccess(Set<WorkerWrapper<?,?>>)`
* 该方法传入一个`Set`指定wrapper只有当指定的这些Wrapper都成功时才会开始工作。任一失败会快速失败。任一还没有执行且不存在失败则休息。
* 不建议使用:~~`DependenceStrategy.IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY`~~
* 此值用于适配v1.4及之前的must开关模式`wrapperStrategy``dependMustStrategyMapper``mustDependSet`不为空时则休息因为能判断到这个责任链说明set中存在不满足的值。为空时则任一成功则执行。
##### `WorkerWrapper`的三层策略器责任链
##### `WorkerWrapper`的策略器责任链
`WorkerWrapper`在判断时,并不是只使用一个策略进行判断的,而是在`WorkerWrapper.WrapperStrategy`进行了最多三层的判断:
`WorkerWrapper`在判断时,并不是只使用一个策略进行判断的,而是在`WrapperStrategy`进行了最多三层的判断:
```java
public abstract class WorkerWrapper<T, V> {
// 略
public static class WrapperStrategy implements DependenceStrategy, SkipStrategy {
// ========== 这三个策略器用于链式判断是否要开始工作 ==========
public interface WrapperStrategy extends DependenceStrategy, SkipStrategy {
// ========== 这三个策略器用于链式判断是否要开始工作 ==========
// 从前往后依次判断的顺序为 dependWrapperStrategyMapper -> dependMustStrategyMapper -> dependenceStrategy
// 从前往后依次判断的顺序为 dependWrapperStrategyMapper -> dependMustStrategyMapper -> dependenceStrategy
/**
* 对特殊Wrapper专用的依赖响应策略。
* <b>该值允许为null</b>
*/
private DependWrapperStrategyMapper dependWrapperStrategyMapper;
/**
* 对必须完成的must的Wrapper的依赖响应策略。
* <b>该值允许为null</b>
* <p/>
* 这是一个不得不向历史妥协的属性。用于适配must开关方式。
*/
private DependMustStrategyMapper dependMustStrategyMapper;
/**
* 底层全局策略。
*/
private DependenceStrategy dependenceStrategy;
// 略
}
/**
* 设置对特殊Wrapper专用的依赖响应策略。
*
* @return 该值允许为null
*/
DependOnUpWrapperStrategyMapper getDependWrapperStrategyMapper();
/**
* 对必须完成的must的Wrapper的依赖响应策略。
* 这是一个不得不向历史妥协的属性。用于适配must开关方式。
*
* @return 该值允许为null
*/
DependMustStrategyMapper getDependMustStrategyMapper();
/**
* 底层全局策略。
*
* @return 该值不允许为null
*/
DependenceStrategy getDependenceStrategy();
// ========== 这是跳过策略 ==========
/**
* 跳过策略
*
* @return 不允许为null
*/
SkipStrategy getSkipStrategy();
// 其他属性略,自行查看源码即可
}
```
正如注释所言,三个策略器将依次调用`DependenceStrategy.judgeAction(Set,WorkerWrapper,WorkerWrapper)`方法进行判断,每次判断会返回`DependenceAction.WithProperty`类型。
正如注释所言,三个策略器将依次调用`judgeAction(Set,WorkerWrapper,WorkerWrapper)`方法进行判断,每次判断会返回`DependenceAction.WithProperty`类型。
前两个策略器的返回值,即`DependenceAction.WithProperty`类型,若调用`getDependenceAction()`方法返回的枚举值不为`JUDGE_BY_AFTER`时,整个三层责任链将返回此返回值;若为`JUDGE_BY_AFTER`,则交给下个策略器进行判断。该方法具体由以下方法实现:
前两个策略器的返回值若不为枚举`JUDGE_BY_AFTER`的内部类时,整个三层责任链将返回此返回值;若为`JUDGE_BY_AFTER`,则交给下个策略器进行判断。该方法具体由以下方法实现:
```java
public interface DependenceStrategy {
@@ -788,74 +967,51 @@ class Case5 {
那么能不能让上游wrapper根据自己的状态独自决定下游wrapper响应呢
因此,三层策略器的`DependWrapperStrategyMapper`便是用于设置此功能
可以。`DependOnUpWrapperStrategy`函数式接口 与 `DependOnUpWrapperStrategyMapper`这两个类即可完成这个功能。
```java
// 示例版本v1.5
/**
* 对不同的{@link WorkerWrapper}调用者实行个性化依赖响应策略。
* <p/>
* 使用{@link DependWrapperStrategyMapper}本实现类对{@link DependenceStrategy}进行增强,
* 由上游wrapper决定本wrapper行为的单参数策略。
*
* @author create by TcSnZh on 2021/5/1-下午11:12
* @author create by TcSnZh on 2021/5/1-下午11:16
*/
public class DependWrapperStrategyMapper implements DependenceStrategy {
private final Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> mapper = new ConcurrentHashMap<>(4);
/**
* 设置对应策略
*
* @param targetWrapper 要设置策略的WorkerWrapper
* @param strategy 要设置的策略
* @return 返回this链式调用。
*/
public DependWrapperStrategyMapper putMapping(WorkerWrapper<?, ?> targetWrapper, DependWrapperActionStrategy strategy) {/* 略 */}
/**
* 判断方法。
* <p/>
* 如果fromWrapper在{@link #mapper}中,则返回{@link DependWrapperActionStrategy}的判断返回值。否则返回{@link DependenceAction#JUDGE_BY_AFTER}
*
* @param dependWrappers 这里不会使用该值thisWrapper.dependWrappers的属性值。
* @param thisWrapper 这里不会使用该值thisWrapper即为“被催促”的WorkerWrapper
* @param fromWrapper 调用来源Wrapper。
* @return 如果在mapper中有对fromWrapper的处理策略则使用其进行判断。否则返回JUDGE_BY_AFTER交给下一个进行判断。
*/
@Override
public DependenceAction.WithProperty judgeAction(
Set<WorkerWrapper<?,?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper // 仅判断该属性
) {/* 略 */}
// 略
}
```
`mapper`属性中,每个`WorkerWrapper<?, ?>`对应了一个`DependWrapperActionStrategy`这个接口便是用于让上游wrapper决定下游响应的
```java
@FunctionalInterface
public interface DependWrapperActionStrategy {
public interface DependOnUpWrapperStrategy {
/**
* 仅使用一个参数的判断方法
* 仅使用一个参数即调用自身的上游wrapper的判断方法
*
* @param fromWrapper 调用本Wrapper的上游Wrapper
* @return 返回 {@link DependenceAction.WithProperty}
*/
DependenceAction.WithProperty judge(WorkerWrapper<?, ?> fromWrapper);
// 常量略
// ========== 送几个供链式调用的默认值 ==========
/**
* 成功时,交给下一个策略器判断。
* 未运行时,休息。
* 失败时,失败。
*/
DependOnUpWrapperStrategy SUCCESS_CONTINUE = /*略*/ ;
/**
* 成功时,开始工作。
* 未运行时,交给下一个策略器判断。
* 失败时,失败。
*/
DependOnUpWrapperStrategy SUCCESS_START_INIT_CONTINUE = /*略*/ ;
}
```
###### 提供常量
`DependOnUpWrapperStrategyMapper``mapper`属性中,每个`WorkerWrapper<?, ?>`对应了一个`DependOnUpWrapperStrategy`实现了让wrapper对不同的上游做出不同的响应策略。
* `DependWrapperActionStrategy.SUCCESS_CONTINUE`
* 成功时,交给下一个策略器判断。未运行时,休息。失败时,失败。
* `DependWrapperActionStrategy SUCCESS_START_INIT_CONTINUE`
* 成功时,开始工作。未运行时,交给下一个策略器判断。失败时,失败。
```java
public class DependOnUpWrapperStrategyMapper implements DependenceStrategy {
private final Map<WorkerWrapper<?, ?>, DependOnUpWrapperStrategy> mapper = new ConcurrentHashMap<>(4);
// 以下略
}
```
在《`WorkerWrapper`的三层策略器责任链》这一章中,我们可以看到,第一层策略器就是此`DependOnUpWrapperStrategyMapper`
###### 简单使用与示例
@@ -932,7 +1088,12 @@ class Case6 {
}
```
###### 提供常量
* `DependWrapperActionStrategy.SUCCESS_CONTINUE`
* 成功时,交给下一个策略器判断。未运行时,休息。失败时,失败。
* `DependWrapperActionStrategy SUCCESS_START_INIT_CONTINUE`
* 成功时,开始工作。未运行时,交给下一个策略器判断。失败时,失败。
### 设置跳过策略
@@ -943,8 +1104,9 @@ class Case6 {
```json
{
result: null,
resultState: ResultState.EXCEPTION,
ex: com.jd.platform.async.exception.SkippedException
// 注意如果wrapper被跳过ResultState将为DEFAULT
resultState: "ResultState.DEFAULT",
ex: "com.jd.platform.async.exception.SkippedException"
}
```
@@ -1151,14 +1313,13 @@ class Case8 {
wrapper(id=C) callback fail , workResult is WorkResult{result=null, resultState=TIMEOUT, ex=null}
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
...
以下异常信息省略
*/
}
}
```
### 设置是否允许被打断线程
可通过该选项去设置允许线程被打断:
@@ -1178,6 +1339,8 @@ public interface WorkerWrapperBuilder<T, V> {
}
```
#### 线程会被打断的具体情况
开启之后在以下情况会试图打断正处于WORKING状态的工作线程。
* 总任务超时但本wrapper在WORKING。
@@ -1186,7 +1349,284 @@ public interface WorkerWrapperBuilder<T, V> {
* 调用`WorkerWrapper#failNow()`方法且wrapper在WORKING状态。
###
# 开放工具类
> `asyncTool-openutil`工具模块提供了一些便于开发的工具类。
>
> 可单独引入依赖:
>
> ```xml
> <dependencies>
> <dependency>
> <groupId>com.jd.platform</groupId>
> <artifactId>asyncTool-openutil</artifactId>
> <version>1.5.0-SNAPSHOT</version>
> </dependency>
> </dependencies>
> ```
### 集合类
> `com.jd.platform.async.openutil.collection.*`
这里不详述,要用的话源码里有注释。
* `SparseArray2D` 稀疏矩阵。
* `CommonDirectedGraph` 有向图。
* `CommonStoreArk` id储物柜。
### 定时器
> `com.jd.platform.async.openutil.timer.*`
* `HashedWheelTimer` 从netty里抄来的时间轮工具类。
### 其他
`com.jd.platform.async.openutil`
* `BiInt` 一个表示两个int值的实体类内含默认比较器、缓冲区间。
# 动态任务调度
>引入依赖
>
>```xml
><!-- 动态任务调度有需要的话可以引入。依赖fastjson请自行解决版本冲突 -->
><dependency>
> <artifactId>asyncTool-scheduling</artifactId>
> <groupId>com.jd.platform</groupId>
> <version>1.5.0-SNAPSHOT</version>
></dependency>
>```
动态任务调度传个json就能让调度工厂随心所欲的构造wrapper与执行。
如果您已经熟悉**《任务编排》**这一章的内容,则可以试试这玩意。
## 基本组件
## 模型属性
### 字段属性详解
以下是一个传入的json的格式示例注释中描述了属性含义与格式
```json
{
// 图纸名称。可使用 字符串、null、不写。为null或不写则会使用uuid
drawingsName :"",
// 提供的全部WorkerWrapper列表。
// 内部格式请参考《wrappers》这章。
wrappers:[],
// wrapper顺序关系。
// 内部格式请参考《relations》这章
relations: [],
// 任务启动参数
// 内部格式请参考《beginWork》这章
beginWork:{}
}
```
#### wrappers
wrappers数组中只允许存入满足以下格式的对象
```json
wrappers:[
{
// id传入字符串。
// 而且不能与wrappers数组中其他的对象的id属性相同。即必须保证id唯一。
// 不允许undefined不允许null。
"id": "first",
// param即参数。请看《param属性格式》
// useObjectModel代表value的值是否是“对象模型”。
// 允许undefined如果undefined则使用null。显然允许null。
"param": {
"useObjectModel": false,
"value": "JackMa"
},
// 传入对象模型,请看《对象模型》
// 不允许undefined或null
"worker": {
"sameObjectId": 1,
"className:": "schedulingtest.impl.SelectUserByName"
},
// 传入对象模型
// 允许undefined与null。如果为两者则使用com.jd.platform.async.callback.DefaultCallback
"callback": {
"sameObjectId": 1
},
// wrapper依赖策略
// 允许undefined与null。如果为两者则使用com.jd.platform.async.wrapper.strategy.WrapperStrategy.DefaultWrapperStrategy
"wrapperStrategy": {
// 传入{}键值对键名为即wrapper的id属性值为对象模型。
// 允许undefined和null两者之意与空键值对{}并无二致
"dependOnUpWrapperStrategyMapper": null,
// -- 这里不再向历史妥协舍弃了DependMustStrategyMapper
// 基础策略器,传入对象模型
// 允许undefined和null如果是两者则使用"ALL_DEPENDENCIES_ALL_SUCCESS"
"dependenceStrategy": {
"constObjectName": "ALL_DEPENDENCIES_ALL_SUCCESS"
},
// 跳过策略,传入对象模型
// 允许undefined和null如果是两者则使用"CHECK_ONE_LEVEL"
"skipStrategy": {
"constObjectName": "CHECK_ONE_LEVEL"
}
},
// 是否允许打断传入boolean值。允许undefined和null视为false
"allowInterrupt": true,
// 是否启动单wrapper计时允许undefined和null视为false
"enableTimeout": true,
// 单wrapper超时时间数值。传入long(int64)值
// 在enableTimeout为true的情况下不允许为undefined或null或小于等于0的值否则允许任何值
"timeoutLength": 50,
// 单wrapper超时时间单位有以下几个取值即java.util.concurrent.TimeUnit的枚举值
// "NANOSECONDS"、"MICROSECONDS"、"MILLISECONDS"
// "SECONDS"、"MINUTES"、"HOURS"、"DAYS"
// 允许为undefined和null视为"MILLISECONDS"
// 但是除了undefined、null和以上7个值外不允许任何值
"timeoutUnit": "MILLISECONDS"
},
// 这是第二个wrapper属性这里可以省略很多选用默认值的属性。
{
"id": "second",
"param":{
"useObjectModel": false,
"value":"first"
},
"worker": {
"className": "schedulingtest.impl.PayTaxes",
"sameObjectId": 2
},
callback:{
"sameObjectId": 2
}
},
// 这是第三个属性,这里的"extendConfig"属性是个省力的好东西。
{
"id": "third",
// 传入存在的id属性
// 效果是本wrapper配置将继承此id表示的wrapper配置的所有属性
// 并且本wrapper可以有选择的覆写配置——只需设置某个对象的值即可。
// 可以为null或undefined表示不继承配置
"extendConfig": "second"
},
/* , { ... } */
]
```
##### param
`useObjectModel`属性用于说明`value`属性的所代表的对象类型:
* 为false使用json所对应的类型。
* 为true使用《实现类对象规范》中的我们自定义的对象模型。
```json
{
"useObjectModel": false,
"value": "JackMa"
},
```
```json
{
"useObjectModel": true,
"value": {
sameObjectId: 3
}
}
```
#### relations
```json
relation: {
// 可使用 数组(仅当to使用字符串时)、字符串
// 不允许为null或undefined如果两个wrapper之间无关系宁可不写这整个对象。
from: "",
// 可使用 数组(仅当from使用字符串时)、字符串
// 不允许为null或undefined如果两个wrapper之间无关系宁可不写这整个对象。
to: ""
}
```
`from`和`to`两个属性传入的字符串,必须是在`wrappers`数组中所含有的`id`属性。
这两个属性只有在对方为字符串时,才能设置自己为数组,以表示“一对多”关系。
#### beginWork
```json
beginWork: {
// 设置全组超时时间数值使用long(int64)值
// 可以传入<=0的值表示不限制超时时间。
// 允许为null、undefined视为不限制超时时间。
timeoutLength: 100,
// 全组超时时间单位有以下几个取值即java.util.concurrent.TimeUnit的枚举值
// "NANOSECONDS"、"MICROSECONDS"、"MILLISECONDS"
// "SECONDS"、"MINUTES"、"HOURS"、"DAYS"
// 允许为undefined和null视为"MILLISECONDS"
// 但是除了undefined、null和以上7个值外不允许任何值
timeoutUnit: "MILLISECONDS",
// 传入的启动wrapper的id数组不允许null、undefined。可以为空但是这不符合常理。
wrappers: [
"first"
],
// 执行的线程池
// 可以传入"COMMON_POOL"代表使用asyncTool的默认线程池。
// 可以为null或undefined视为"COMMON_POOL"
// 也可以以如下格式传入全限定名+字段/方法名字符串则使用反射调取该字段的所指向的ExecutorService
// 全限定名#字段名 或 全限定名##方法名
executor: "COMMON_POOL"
}
```
### 其他特殊格式
#### 对象模型`ObjectModel`
* 有时我们需要指定接口实现类,或者是默认的接口实现对象,并自定义这些对象的属性。
* 或是指定多个json字段代表一个共同的对象。
那么,我们就需要高度自定义此对象的属性,并封装为`com.jd.platform.async.scheduling.model.ObjectModel`类。
```json
{
// 如果需要使用asyncTool指名道姓的常量实现对象则请见下方《常量对象规范》并在这里传入名称字符串。
// 当设置了该属性时,其他的属性均会被忽视。
"constObjectName": "NOT_SKIP",
// 如果希望指定多个json字段代表一个共同的对象
// 则将其设为相同的id。
// 如果设置了该属性其他的属性均会被忽视。优先级低于constObjectName
"sameObjectId": 1,
// 提供类的全限定名字符串,将调用无参构造方法进行初始化
"className": "your.package.name.YourKlassName",
// 初始化后,会根据该值来修改对象属性
"properties": {
// 其中的键值对为各字段名。这些字段需要有getter、setter方法。
"myIntegerField": 123123
}
}
```
##### 常量对象大全
在`constObjectName`属性中设置,用以下字符串代表如下对象:
| 对象名字符串 | 常量值 |
| -------------------------------- | ------------------------------------------------------- |
| `"NOT_SKIP"` | `SkipStrategy.NOT_SKIP` |
| `"CHECK_ONE_LEVEL"` | `SkipStrategy.CHECK_ONE_LEVEL` |
| `"ALL_DEPENDENCIES_ALL_SUCCESS"` | `DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS` |
| `"ALL_DEPENDENCIES_ANY_SUCCESS"` | `DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS` |
| `"ALL_DEPENDENCIES_NONE_FAILED"` | ` DependenceStrategy.ALL_DEPENDENCIES_NONE_FAILED` |
| `"SUCCESS_CONTINUE"` | `DependOnUpWrapperStrategy.SUCCESS_CONTINUE` |
| `"SUCCESS_START_INIT_CONTINUE"` | `DependOnUpWrapperStrategy.SUCCESS_START_INIT_CONTINUE` |
| `"PRINT_EXCEPTION_STACK_TRACE"` | `ICallback.PRINT_EXCEPTION_STACK_TRACE` |
详见请参考`com.jd.platform.async.scheduling.model.Constants`类中的具体代码。

View File

@@ -89,8 +89,16 @@
在V1.3后框架支持在worker的action的入参Map<String, WorkerWrapper>中获取任意一个执行单元的执行结果当然可以取其中的1个、多个执行结果作为自己的入参。Key就是在定义wrapper时通过id传进来的唯一id标识。详情demo可以查看test包下dependnew包案例。
## 并发场景可能存在的需求之——全组任务的超时
一组任务虽然内部的各个执行单元的时间不可控但是我可以控制全组的执行时间不超过某个值。通过设置timeOut来控制全组的执行阈值。
## 并发场景可能存在的需求之——任务的超时
> 在v1.4中:
>
> 一组任务虽然内部的各个执行单元的时间不可控但是我可以控制全组的执行时间不超过某个值。通过设置timeOut来控制全组的执行阈值。
在v1.5中:
每个wrapper可以设置自己的超时时间同时也可以设置整组任务的超时时间。
且可以设置一旦超时则打断线程(默认不启用打断线程)
## 并发场景可能存在的需求之——高性能、低线程数
该框架全程无锁,不依靠线程锁来保证顺序。
@@ -114,4 +122,3 @@
## 快速开始
[点此开启实战](https://gitee.com/jd-platform-opensource/asyncTool/blob/master/QuickStart.md)

27
asyncTool-core/pom.xml Normal file
View File

@@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>asyncTool</artifactId>
<groupId>com.jd.platform</groupId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>asyncTool-core</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.jd.platform</groupId>
<artifactId>asyncTool-openutil</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@@ -15,14 +15,10 @@ public class DefaultCallback<T, V> implements ICallback<T, V> {
}
/**
* 默认将打印存在的非{@link com.jd.platform.async.exception.SkippedException}的异常
* 默认情况啥回调都没有而且将吞掉所有异常显示只保存在{@link WorkResult}
*/
@Override
public void result(boolean success, T param, WorkResult<V> workResult) {
Exception ex = workResult.getEx();
if (ex != null && !(ex instanceof SkippedException)) {
ex.printStackTrace();
}
// do nothing
}
}

View File

@@ -0,0 +1,46 @@
package com.jd.platform.async.callback;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.worker.WorkResult;
/**
* 每个执行单元执行完毕后,会回调该接口</p>
* 需要监听执行结果的,实现该接口即可
*
* @author wuweifeng wrote on 2019-11-19.
*/
@FunctionalInterface
public interface ICallback<T, V> {
/**
* 任务开始的监听
*/
default void begin() {
}
/**
* 耗时操作执行完毕后就给value注入值
* <p/>
* 只要Wrapper被调用后成功或失败/超时,该方法都会被执行。
*/
void result(boolean success, T param, WorkResult<V> workResult);
/**
* 提供常量选项:打印异常信息,跳过时的异常{@link SkippedException}不会打印。
*/
ICallback<?, ?> PRINT_EXCEPTION_STACK_TRACE = new ICallback<Object, Object>() {
@Override
public void result(boolean success, Object param, WorkResult<Object> workResult) {
Exception ex = workResult.getEx();
if (ex != null && !(ex instanceof SkippedException)) {
ex.printStackTrace();
}
}
@Override
public String toString() {
return "PRINT_EXCEPTION_STACK_TRACE";
}
};
}

View File

@@ -1,9 +1,9 @@
package com.jd.platform.async.callback;
import java.util.Map;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* 每个最小执行单元需要实现该接口
*
@@ -20,7 +20,7 @@ public interface IWorker<T, V> {
V action(T object, Map<String, WorkerWrapper<?,?>> allWrappers);
/**
* 超时异常时返回的默认值
* 超时异常跳过返回的默认值
*
* @return 默认值
*/

View File

@@ -7,7 +7,10 @@ import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperGroup;
import java.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -51,13 +54,13 @@ public class Async {
/**
* 如果想自定义线程池请传pool不自定义的话就走默认的COMMON_POOL
*/
@SuppressWarnings("unchecked")
public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
throws ExecutionException, InterruptedException {
if (workerWrapper == null || workerWrapper.length == 0) {
return false;
}
Set workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet());
//noinspection unchecked
return beginWork(timeout, executorService, workerWrappers);
}
@@ -68,6 +71,7 @@ public class Async {
return beginWork(timeout, getCommonPool(), workerWrapper);
}
@SuppressWarnings("unused")
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper);
}
@@ -183,11 +187,11 @@ public class Async {
/**
* @param now 是否立即关闭
* @throws IllegalStateException 如果尚未调用过{@link #getCommonPool()}即没有使用过使用默认线程池的方法该方法会抛出空指针异常
* @return 如果尚未调用过{@link #getCommonPool()}即没有初始化默认线程池返回false否则返回true
*/
public static synchronized void shutDownCommonPool(boolean now) {
public static synchronized boolean shutDownCommonPool(boolean now) {
if (COMMON_POOL == null) {
throw new IllegalStateException("COMMON_POOL Not initialized yet");
return false;
}
if (!COMMON_POOL.isShutdown()) {
if (now) {
@@ -196,6 +200,7 @@ public class Async {
COMMON_POOL.shutdown();
}
}
return true;
}
/**

View File

@@ -1,9 +1,9 @@
package com.jd.platform.async.executor;
import com.jd.platform.async.util.timer.Timeout;
import com.jd.platform.async.util.timer.TimerTask;
import com.jd.platform.async.util.timer.HashedWheelTimer;
import com.jd.platform.async.util.timer.Timer;
import com.jd.platform.async.openutil.timer.HashedWheelTimer;
import com.jd.platform.async.openutil.timer.Timeout;
import com.jd.platform.async.openutil.timer.Timer;
import com.jd.platform.async.openutil.timer.TimerTask;
import com.jd.platform.async.wrapper.WorkerWrapperGroup;
import java.util.Set;
@@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit;
/**
* 检查{@link WorkerWrapperGroup}是否调用完成的轮询中心
* <p>
* 内部使用时间轮进行轮询
* <p>
* ===========================================================================================
* <p>
@@ -27,11 +27,11 @@ import java.util.concurrent.TimeUnit;
* 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况在test/v15.wrappertest中示例testThreadPolling_V14Bug说明了这个bug
* 线程数2
* A(5ms)--B1(10ms) ---|--> C1(5ms)
* . \ | (B1B2全部完成可执行C1C2)
* . \ | (B1B2任一完成可执行C1C2)
* . ---> B2(20ms) --|--> C2(5ms)
* <p>
* }
* 线程1执行了A然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成
* 线程1执行了A然后在{@link CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成
* 线程2执行了B1或B2中的一个也在allOf方法等待C1C2完成
* 结果没有线程执行C和B2了导致超时而死并且这个线程池线程有可能被耗尽
* >

View File

@@ -2,7 +2,6 @@ package com.jd.platform.async.executor.timer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

View File

@@ -1,7 +1,5 @@
package com.jd.platform.async.worker;
import java.util.concurrent.atomic.AtomicReference;
/**
* 执行结果
*/

View File

@@ -0,0 +1,100 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.openutil.collection.DirectedGraph;
import com.jd.platform.async.openutil.collection.Graph;
import com.jd.platform.async.wrapper.strategy.WrapperStrategy;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 快速构造{@link WorkerWrapper},少废话!
* <p>
* 直接设置属性不用麻烦Builder设置来设置去
* 请 注 意:构造方法不会检查参数合法性,请程序员自己保证参数合法。
* <p>
* 将关系存储于有向图中{@link DirectedGraph}以节省每个wrapper都要保存节点数据的开销。
* </p>
*
* @author create by TcSnZh on 2021/5/13-上午11:54
*/
public class QuickBuildWorkerWrapper<T, V> extends WorkerWrapper<T, V> {
private final DirectedGraph<WorkerWrapper<?, ?>, Object> graph;
private volatile Set<WorkerWrapper<?, ?>> nextWrappersCache;
private volatile Set<WorkerWrapper<?, ?>> dependWrappersCache;
/**
* 构造函数,传入所有属性
*
* @param id {@link WorkerWrapper#id}
* @param param {@link WorkerWrapper#param}
* @param worker {@link WorkerWrapper#worker}
* @param callback {@link WorkerWrapper#callback}
* @param allowInterrupt {@link WorkerWrapper#allowInterrupt}
* @param enableTimeout {@link WorkerWrapper#enableTimeout}
* @param timeoutLength {@link WorkerWrapper#timeoutLength}
* @param timeoutUnit {@link WorkerWrapper#timeoutLength}
* @param wrapperStrategy {@link WorkerWrapper#timeoutUnit}
* @param wrapperGraph 将节点信息保存在图中,而不是如{@link StableWorkerWrapper}在每个wrapper中都保存节点信息。
* <p>
* {@link WorkerWrapper#getDependWrappers()}与{@link WorkerWrapper#getNextWrappers()}方法
* 将从本图中读取依赖顺序。除此之外,本类不会对本图进行任何修改操作。
* 因此,传入的此图应当保证读取时的线程安全。
* </p>
*/
public QuickBuildWorkerWrapper(String id,
T param,
IWorker<T, V> worker,
ICallback<T, V> callback,
boolean allowInterrupt,
boolean enableTimeout,
long timeoutLength,
TimeUnit timeoutUnit,
WrapperStrategy wrapperStrategy,
DirectedGraph<WorkerWrapper<?, ?>, Object> wrapperGraph) {
super(id, worker, callback, allowInterrupt, enableTimeout, timeoutLength, timeoutUnit, wrapperStrategy);
graph = wrapperGraph;
super.param = param;
State.setState(state, State.BUILDING, State.INIT);
}
@Override
public Set<WorkerWrapper<?, ?>> getNextWrappers() {
if (nextWrappersCache == null) {
synchronized (this) {
if (nextWrappersCache == null) {
nextWrappersCache = graph.getRelationFrom(this).stream()
.map(Graph.Entry::getTo).collect(Collectors.toSet());
}
}
}
return nextWrappersCache;
}
@Override
public Set<WorkerWrapper<?, ?>> getDependWrappers() {
if (dependWrappersCache == null) {
synchronized (this) {
if (dependWrappersCache == null) {
dependWrappersCache = graph.getRelationTo(this).stream()
.map(Graph.Entry::getFrom).collect(Collectors.toSet());
}
}
}
return dependWrappersCache;
}
@Override
void setNextWrappers(Set<WorkerWrapper<?, ?>> nextWrappers) {
throw new UnsupportedOperationException();
}
@Override
void setDependWrappers(Set<WorkerWrapper<?, ?>> dependWrappers) {
throw new UnsupportedOperationException();
}
}

View File

@@ -1,11 +1,12 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.strategy.depend.DependMustStrategyMapper;
import com.jd.platform.async.wrapper.strategy.depend.DependWrapperActionStrategy;
import com.jd.platform.async.wrapper.strategy.depend.DependWrapperStrategyMapper;
import com.jd.platform.async.wrapper.strategy.depend.DependOnUpWrapperStrategy;
import com.jd.platform.async.wrapper.strategy.depend.DependOnUpWrapperStrategyMapper;
import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy;
import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy;
@@ -62,15 +63,15 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
/**
* 存储自己需要特殊对待的dependWrapper集合
*/
private Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> dependWrapperActionStrategyMap;
private Map<WorkerWrapper<?, ?>, DependOnUpWrapperStrategy> dependWrapperActionStrategyMap;
/**
* 存储需要特殊对待自己的nextWrapper集合
*/
private Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> selfIsSpecialMap;
private Map<WorkerWrapper<?, ?>, DependOnUpWrapperStrategy> selfIsSpecialMap;
/**
* 一个保存以must=true方式传入的WorkerWrapper的集合
* <p/>
* 该Set将会加入到{@link WorkerWrapper.WrapperStrategy#getDependMustStrategyMapper().mustDependSet}之中
* 该Set将会加入到{@link WorkerWrapper.StableWrapperStrategy#getDependMustStrategyMapper().mustDependSet}之中
*/
private Set<WorkerWrapper<?, ?>> mustDependSet;
/**
@@ -172,7 +173,7 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
}
@Override
public SetDependImpl specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper) {
public SetDependImpl specialDependWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper<?, ?> wrapper) {
if (strategy == null || wrapper == null) {
return this;
}
@@ -232,7 +233,7 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
}
@Override
public SetNextImpl specialToNextWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper) {
public SetNextImpl specialToNextWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper<?, ?> wrapper) {
if (strategy == null || wrapper == null) {
return this;
}
@@ -330,6 +331,7 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
wrapper.getWrapperStrategy().setDependMustStrategyMapper(new DependMustStrategyMapper()
.addDependMust(mustDependSet));
}
//noinspection deprecation
wrapper.getWrapperStrategy().setDependenceStrategy(DependenceStrategy.IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY);
} else {
if (mustDependSet != null && mustDependSet.size() > 0) {
@@ -341,7 +343,7 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
wrapper.getWrapperStrategy().setDependenceStrategy(dependenceStrategy);
}
if (dependWrapperActionStrategyMap != null && dependWrapperActionStrategyMap.size() > 0) {
DependWrapperStrategyMapper mapper = new DependWrapperStrategyMapper();
DependOnUpWrapperStrategyMapper mapper = new DependOnUpWrapperStrategyMapper();
dependWrapperActionStrategyMap.forEach(mapper::putMapping);
wrapper.getWrapperStrategy().setDependWrapperStrategyMapper(mapper);
}
@@ -351,12 +353,12 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
}
if (selfIsSpecialMap != null && selfIsSpecialMap.size() > 0) {
selfIsSpecialMap.forEach((next, strategy) -> {
DependWrapperStrategyMapper dependWrapperStrategyMapper = next.getWrapperStrategy().getDependWrapperStrategyMapper();
if (dependWrapperStrategyMapper == null) {
DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper = next.getWrapperStrategy().getDependWrapperStrategyMapper();
if (dependOnUpWrapperStrategyMapper == null) {
next.getWrapperStrategy().setDependWrapperStrategyMapper(
dependWrapperStrategyMapper = new DependWrapperStrategyMapper());
dependOnUpWrapperStrategyMapper = new DependOnUpWrapperStrategyMapper());
}
dependWrapperStrategyMapper.putMapping(wrapper, strategy);
dependOnUpWrapperStrategyMapper.putMapping(wrapper, strategy);
});
}
}
@@ -381,8 +383,8 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
/**
* @deprecated 建议使用 {@link WorkerWrapperBuilder#depends(WorkerWrapper[])}
* {@link WorkerWrapperBuilder#setDepend()}设置更多选项例如{@link WorkerWrapperBuilder.SetDepend#wrapper(WorkerWrapper[])}
* 如果是想要必须依赖的功能则使用{@link WorkerWrapperBuilder.SetDepend#mustRequireWrapper(WorkerWrapper[])}
* {@link WorkerWrapperBuilder#setDepend()}设置更多选项例如{@link SetDepend#wrapper(WorkerWrapper[])}
* 如果是想要必须依赖的功能则使用{@link SetDepend#mustRequireWrapper(WorkerWrapper[])}
*/
@Deprecated
public BUILDER_SUB_CLASS depend(WorkerWrapper<?, ?>... wrappers) {
@@ -397,8 +399,8 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
/**
* @deprecated 建议使用 {@link WorkerWrapperBuilder#depends(WorkerWrapper[])}
* {@link WorkerWrapperBuilder#setDepend()}设置更多选项例如{@link WorkerWrapperBuilder.SetDepend#wrapper(WorkerWrapper)}
* 如果是想要必须依赖的功能则使用{@link WorkerWrapperBuilder.SetDepend#mustRequireWrapper(WorkerWrapper[])}
* {@link WorkerWrapperBuilder#setDepend()}设置更多选项例如{@link SetDepend#wrapper(WorkerWrapper)}
* 如果是想要必须依赖的功能则使用{@link SetDepend#mustRequireWrapper(WorkerWrapper[])}
*/
@Deprecated
public BUILDER_SUB_CLASS depend(WorkerWrapper<?, ?> wrapper) {
@@ -406,7 +408,7 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
}
/**
* @deprecated 建议使用 {@link WorkerWrapperBuilder.SetDepend#requireWrapper(WorkerWrapper, boolean)}}
* @deprecated 建议使用 {@link SetDepend#requireWrapper(WorkerWrapper, boolean)}}
*/
@Deprecated
public BUILDER_SUB_CLASS depend(WorkerWrapper<?, ?> wrapper, boolean isMust) {
@@ -477,7 +479,7 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
* 默认为true
*
* @param needCheckNextWrapperResult 设为true后如果之后的Wrapper已经执行完毕
* 则跳过本Wrapper并设置{@link WorkResult#getEx()}{@link com.jd.platform.async.exception.SkippedException}
* 则跳过本Wrapper并设置{@link WorkResult#getEx()}{@link SkippedException}
* @deprecated v1.5中已经废弃请使用
*/
@Deprecated
@@ -489,6 +491,7 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
// util method
private BUILDER_SUB_CLASS returnThisBuilder() {
//noinspection unchecked
return (BUILDER_SUB_CLASS) this;
}

View File

@@ -1,25 +1,40 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.callback.DefaultCallback;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.executor.PollingCenter;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.*;
import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy;
import com.jd.platform.async.wrapper.strategy.WrapperStrategy;
import com.jd.platform.async.wrapper.strategy.depend.DependMustStrategyMapper;
import com.jd.platform.async.wrapper.strategy.depend.DependWrapperStrategyMapper;
import com.jd.platform.async.wrapper.strategy.depend.DependOnUpWrapperStrategyMapper;
import com.jd.platform.async.wrapper.strategy.depend.DependenceAction;
import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy;
import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.AFTER_WORK;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.BUILDING;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.ERROR;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.INIT;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.SKIP;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.STARTED;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.SUCCESS;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.WORKING;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_all;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_checkTimeoutAllowStates;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_notWorked;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_skipOrAfterWork;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.*;
/**
@@ -29,6 +44,7 @@ import static com.jd.platform.async.wrapper.WorkerWrapper.State.*;
*
* @author wuweifeng wrote on 2019-11-19.
*/
@SuppressWarnings("AlibabaAbstractClassShouldStartWithAbstractNaming")
public abstract class WorkerWrapper<T, V> {
// ========== 固定属性 ==========
@@ -41,7 +57,7 @@ public abstract class WorkerWrapper<T, V> {
/**
* 各种策略的封装类
*/
private final WrapperStrategy wrapperStrategy = new WrapperStrategy();
private final WrapperStrategy wrapperStrategy;
/**
* 是否允许被打断
*/
@@ -87,7 +103,8 @@ public abstract class WorkerWrapper<T, V> {
boolean allowInterrupt,
boolean enableTimeout,
long timeoutLength,
TimeUnit timeoutUnit
TimeUnit timeoutUnit,
WrapperStrategy wrapperStrategy
) {
if (worker == null) {
throw new NullPointerException("async.worker is null");
@@ -103,7 +120,17 @@ public abstract class WorkerWrapper<T, V> {
this.enableTimeout = enableTimeout;
this.timeoutLength = timeoutLength;
this.timeoutUnit = timeoutUnit;
this.wrapperStrategy = wrapperStrategy;
}
WorkerWrapper(String id,
IWorker<T, V> worker,
ICallback<T, V> callback,
boolean allowInterrupt,
boolean enableTimeout,
long timeoutLength,
TimeUnit timeoutUnit) {
this(id, worker, callback, allowInterrupt, enableTimeout, timeoutLength, timeoutUnit, new StableWrapperStrategy());
}
// ========== public ==========
@@ -141,7 +168,7 @@ public abstract class WorkerWrapper<T, V> {
}
public State getState() {
return State.of(state.get());
return of(state.get());
}
/**
@@ -193,7 +220,7 @@ public abstract class WorkerWrapper<T, V> {
// 就在想CAS的时候出结果了就采用新的结果重新判断一次
continue;
}
fastFail(true, null);
fastFail(true, null, false);
}
return -1L;
}
@@ -228,37 +255,39 @@ public abstract class WorkerWrapper<T, V> {
// 因为抽取成方法反而不好传参污染类方法所以就这么干了
final Consumer<Boolean> __function__callbackResult =
success -> {
WorkResult<V> _workResult = getWorkResult();
try {
callback.result(success, param, getWorkResult());
callback.result(success, param, _workResult);
} catch (Exception e) {
if (State.setState(state, states_of_skipOrAfterWork, ERROR, null)) {
fastFail(false, e);
if (setState(state, states_of_skipOrAfterWork, ERROR, null)) {
fastFail(false, e, _workResult.getEx() instanceof SkippedException);
}
}
};
final Runnable __function__callbackResult_beginNext =
final Runnable __function__callbackResultOfFalse_beginNext =
() -> {
__function__callbackResult.accept(false);
beginNext(executorService, now, remainTime, group);
};
final BiConsumer<Boolean, Exception> __function__fastFail_callbackResult_beginNext =
final BiConsumer<Boolean, Exception> __function__fastFail_callbackResult$false_beginNext =
(fastFail_isTimeout, fastFail_exception) -> {
fastFail(fastFail_isTimeout, fastFail_exception);
__function__callbackResult_beginNext.run();
boolean isSkip = fastFail_exception instanceof SkippedException;
fastFail(fastFail_isTimeout && !isSkip, fastFail_exception, isSkip);
__function__callbackResultOfFalse_beginNext.run();
};
final Runnable __function__doWork =
() -> {
if (State.setState(state, STARTED, WORKING)) {
if (setState(state, STARTED, WORKING)) {
try {
fire(group);
} catch (Exception e) {
if (State.setState(state, WORKING, ERROR)) {
__function__fastFail_callbackResult_beginNext.accept(false, e);
if (setState(state, WORKING, ERROR)) {
__function__fastFail_callbackResult$false_beginNext.accept(false, e);
}
return;
}
}
if (State.setState(state, WORKING, AFTER_WORK)) {
if (setState(state, WORKING, AFTER_WORK)) {
__function__callbackResult.accept(true);
beginNext(executorService, now, remainTime, group);
}
@@ -266,20 +295,20 @@ public abstract class WorkerWrapper<T, V> {
// ================================================
// 开始执行
try {
if (State.isState(state, BUILDING)) {
if (isState(state, BUILDING)) {
throw new IllegalStateException("wrapper can't work because state is BUILDING ! wrapper is " + this);
}
//总的已经超时了就快速失败进行下一个
if (remainTime <= 0) {
if (State.setState(state, states_of_checkTimeoutAllowStates, ERROR, null)) {
__function__fastFail_callbackResult_beginNext.accept(true, null);
if (setState(state, states_of_checkTimeoutAllowStates, ERROR, null)) {
__function__fastFail_callbackResult$false_beginNext.accept(true, null);
}
return;
}
//如果自己已经执行过了
//可能有多个依赖其中的一个依赖已经执行完了并且自己也已开始执行或执行完毕当另一个依赖执行完毕又进来该方法时就不重复处理了
final AtomicReference<State> oldStateRef = new AtomicReference<>(null);
if (!State.setState(state, states_of_notWorked, STARTED, oldStateRef::set)) {
if (!setState(state, states_of_notWorked, STARTED, oldStateRef::set)) {
return;
}
// 如果wrapper是第一次要调用callback.begin
@@ -288,8 +317,8 @@ public abstract class WorkerWrapper<T, V> {
callback.begin();
} catch (Exception e) {
// callback.begin 发生异常
if (State.setState(state, states_of_checkTimeoutAllowStates, ERROR, null)) {
__function__fastFail_callbackResult_beginNext.accept(false, e);
if (setState(state, states_of_checkTimeoutAllowStates, ERROR, null)) {
__function__fastFail_callbackResult$false_beginNext.accept(false, e);
}
return;
}
@@ -305,8 +334,8 @@ public abstract class WorkerWrapper<T, V> {
// 每个线程都需要判断是否要跳过自己该方法可能会跳过正在工作的自己
final WrapperStrategy wrapperStrategy = getWrapperStrategy();
if (wrapperStrategy.shouldSkip(getNextWrappers(), this, fromWrapper)) {
if (State.setState(state, STARTED, SKIP)) {
__function__fastFail_callbackResult_beginNext.accept(false, new SkippedException());
if (setState(state, STARTED, SKIP)) {
__function__fastFail_callbackResult$false_beginNext.accept(false, new SkippedException());
}
return;
}
@@ -317,10 +346,10 @@ public abstract class WorkerWrapper<T, V> {
case TAKE_REST:
return;
case FAST_FAIL:
if (State.setState(state, STARTED, ERROR)) {
if (setState(state, STARTED, ERROR)) {
// 根据FAST_FAIL.fastFailException()设置的属性值来设置fastFail方法的参数
ResultState resultState = judge.getResultState();
__function__fastFail_callbackResult_beginNext.accept(
__function__fastFail_callbackResult$false_beginNext.accept(
resultState == ResultState.TIMEOUT,
judge.getFastFailException()
);
@@ -335,10 +364,10 @@ public abstract class WorkerWrapper<T, V> {
}
} catch (Exception e) {
// wrapper本身抛出了不该有的异常
State.setState(state, states_all, ERROR, null);
setState(state, states_all, ERROR, null);
NotExpectedException ex = new NotExpectedException(e, this);
workResult.set(new WorkResult<>(null, ResultState.EXCEPTION, ex));
__function__fastFail_callbackResult_beginNext.accept(false, ex);
__function__fastFail_callbackResult$false_beginNext.accept(false, ex);
}
}
@@ -367,10 +396,10 @@ public abstract class WorkerWrapper<T, V> {
* 快速失败
* 该方法不负责检查状态请自行控制
*
* @param timeout 是否是因为超时而快速失败
* @param e 设置异常信息到{@link WorkResult#getEx()}
* @param isTimeout 是否是因为超时而快速失败
* @param e 设置异常信息到{@link WorkResult#getEx()}
*/
protected void fastFail(boolean timeout, Exception e) {
protected void fastFail(boolean isTimeout, Exception e, boolean isSkip) {
// 试图打断正在执行{@link IWorker#action(Object, Map)}的线程
Thread _doWorkingThread;
if ((_doWorkingThread = doWorkingThread.get()) != null
@@ -381,7 +410,7 @@ public abstract class WorkerWrapper<T, V> {
// 尚未处理过结果则设置
workResult.compareAndSet(null, new WorkResult<>(
worker.defaultValue(),
timeout ? ResultState.TIMEOUT : ResultState.EXCEPTION,
isTimeout ? ResultState.TIMEOUT : (isSkip ? ResultState.DEFAULT : ResultState.EXCEPTION),
e
));
}
@@ -406,7 +435,7 @@ public abstract class WorkerWrapper<T, V> {
try {
next = nextWrappers.stream().findFirst().get();
group.addWrapper(next);
State.setState(state, AFTER_WORK, SUCCESS);
setState(state, AFTER_WORK, SUCCESS);
} finally {
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
if (next != null) {
@@ -421,7 +450,7 @@ public abstract class WorkerWrapper<T, V> {
nextWrappers.forEach(next -> executorService.submit(() ->
next.work(executorService, this, nextRemainTIme, group))
);
State.setState(state, AFTER_WORK, SUCCESS);
setState(state, AFTER_WORK, SUCCESS);
} finally {
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
}
@@ -460,6 +489,7 @@ public abstract class WorkerWrapper<T, V> {
/**
* @deprecated 建议使用 {@link #builder()}返回{@link WorkerWrapperBuilder}接口以调用v1.5之后的规范api
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
public Builder() {
}
@@ -477,7 +507,7 @@ public abstract class WorkerWrapper<T, V> {
public String toString() {
final StringBuilder sb = new StringBuilder(400)
.append("WorkerWrapper{id=").append(id)
.append(", state=").append(State.of(state.get()))
.append(", state=").append(of(state.get()))
.append(", param=").append(param)
.append(", workResult=").append(workResult)
.append(", allowInterrupt=").append(allowInterrupt)
@@ -507,99 +537,54 @@ public abstract class WorkerWrapper<T, V> {
return sb.toString();
}
public static class WrapperStrategy implements DependenceStrategy, SkipStrategy {
// ========== 这三个策略器用于链式判断是否要开始工作 ==========
// 从前往后依次判断的顺序为 dependWrapperStrategyMapper -> dependMustStrategyMapper -> dependenceStrategy
/**
* 对特殊Wrapper专用的依赖响应策略
* <b>该值允许为null</b>
*/
private DependWrapperStrategyMapper dependWrapperStrategyMapper;
/**
* 对必须完成的must的Wrapper的依赖响应策略
* <b>该值允许为null</b>
* <p/>
* 这是一个不得不向历史妥协的属性用于适配must开关方式
*/
/**
* 一个通用的策略器实现类提供了修改的功能并兼容之前的代码
*/
public static class StableWrapperStrategy extends WrapperStrategy.AbstractWrapperStrategy {
private DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper;
private DependMustStrategyMapper dependMustStrategyMapper;
/**
* 底层全局策略
*/
private DependenceStrategy dependenceStrategy;
private SkipStrategy skipStrategy;
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
// 如果存在依赖则调用三层依赖响应策略进行判断
DependenceStrategy strategy = dependWrapperStrategyMapper;
if (dependMustStrategyMapper != null) {
strategy = strategy == null ? dependMustStrategyMapper : strategy.thenJudge(dependMustStrategyMapper);
}
if (dependenceStrategy != null) {
strategy = strategy == null ? dependenceStrategy : strategy.thenJudge(dependenceStrategy);
}
if (strategy == null) {
throw new IllegalStateException("配置无效三层判断策略均为null请开发者检查自己的Builder是否逻辑错误");
}
return strategy.judgeAction(dependWrappers, thisWrapper, fromWrapper);
public DependOnUpWrapperStrategyMapper getDependWrapperStrategyMapper() {
return dependOnUpWrapperStrategyMapper;
}
public DependWrapperStrategyMapper getDependWrapperStrategyMapper() {
return dependWrapperStrategyMapper;
}
public void setDependWrapperStrategyMapper(DependWrapperStrategyMapper dependWrapperStrategyMapper) {
this.dependWrapperStrategyMapper = dependWrapperStrategyMapper;
@Override
public void setDependWrapperStrategyMapper(DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper) {
this.dependOnUpWrapperStrategyMapper = dependOnUpWrapperStrategyMapper;
}
@Override
public DependMustStrategyMapper getDependMustStrategyMapper() {
return dependMustStrategyMapper;
}
@Override
public void setDependMustStrategyMapper(DependMustStrategyMapper dependMustStrategyMapper) {
this.dependMustStrategyMapper = dependMustStrategyMapper;
}
@Override
public DependenceStrategy getDependenceStrategy() {
return dependenceStrategy;
}
@Override
public void setDependenceStrategy(DependenceStrategy dependenceStrategy) {
this.dependenceStrategy = dependenceStrategy;
}
// ========== 跳过策略 ==========
private SkipStrategy skipStrategy;
@Override
public boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
return skipStrategy != null && skipStrategy.shouldSkip(nextWrappers, thisWrapper, fromWrapper);
}
public SkipStrategy getSkipStrategy() {
return skipStrategy;
}
@Override
public void setSkipStrategy(SkipStrategy skipStrategy) {
this.skipStrategy = skipStrategy;
}
// ========== toString ==========
@Override
public String toString() {
return "WrapperStrategy{" +
"dependWrapperStrategyMapper=" + dependWrapperStrategyMapper +
", dependMustStrategyMapper=" + dependMustStrategyMapper +
", dependenceStrategy=" + dependenceStrategy +
", skipStrategy=" + skipStrategy +
'}';
}
}
/**
@@ -732,6 +717,7 @@ public abstract class WorkerWrapper<T, V> {
*
* @param excepts 范围
*/
@SuppressWarnings("unused")
static boolean inStates(AtomicInteger state, State... excepts) {
int current;
boolean inExcepts;
@@ -753,7 +739,7 @@ public abstract class WorkerWrapper<T, V> {
/**
* CAS的判断是否是某个状态
*/
static boolean isState(AtomicInteger state, State except) {
static boolean isState(AtomicInteger state, @SuppressWarnings("SameParameterValue") State except) {
return state.compareAndSet(except.id, except.id);
}

View File

@@ -2,7 +2,7 @@ package com.jd.platform.async.wrapper;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.wrapper.strategy.depend.DependWrapperActionStrategy;
import com.jd.platform.async.wrapper.strategy.depend.DependOnUpWrapperStrategy;
import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy;
import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy;
@@ -112,9 +112,11 @@ public interface WorkerWrapperBuilder<T, V> {
* @param wrapper 需要设置特殊策略的Wrapper
* @param strategy 特殊策略
*/
SetDepend<T, V> specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper);
@SuppressWarnings("UnusedReturnValue")
SetDepend<T, V> specialDependWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper<?, ?> wrapper);
default SetDepend<T, V> specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper... wrappers) {
@SuppressWarnings("unused")
default SetDepend<T, V> specialDependWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper... wrappers) {
if (strategy == null || wrappers == null) {
return this;
}
@@ -156,6 +158,7 @@ public interface WorkerWrapperBuilder<T, V> {
return setDepend().wrapper(wrappers).end();
}
@SuppressWarnings("unused")
default WorkerWrapperBuilder<T, V> depends(Collection<WorkerWrapper> wrappers) {
return setDepend().wrapper(wrappers).end();
}
@@ -164,6 +167,7 @@ public interface WorkerWrapperBuilder<T, V> {
return setDepend().wrapper(wrappers).strategy(strategy).end();
}
@SuppressWarnings("unused")
default WorkerWrapperBuilder<T, V> depends(DependenceStrategy strategy, Collection<WorkerWrapper> wrappers) {
return setDepend().wrapper(wrappers).strategy(strategy).end();
}
@@ -204,6 +208,7 @@ public interface WorkerWrapperBuilder<T, V> {
*/
SetNext<T, V> mustToNextWrapper(WorkerWrapper<?, ?> wrapper);
@SuppressWarnings("unused")
default SetNext<T, V> requireToNextWrapper(WorkerWrapper<?, ?> wrapper, boolean must) {
return must ? mustToNextWrapper(wrapper) : wrapper(wrapper);
}
@@ -215,7 +220,7 @@ public interface WorkerWrapperBuilder<T, V> {
* @param wrapper 依赖本Wrapper的下游Wrapper
* @return 返回Builder自身
*/
SetNext<T, V> specialToNextWrapper(DependWrapperActionStrategy strategy, WorkerWrapper<?, ?> wrapper);
SetNext<T, V> specialToNextWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper<?, ?> wrapper);
WorkerWrapperBuilder<T, V> end();
}
@@ -229,6 +234,7 @@ public interface WorkerWrapperBuilder<T, V> {
return setNext().wrapper(wrappers).end();
}
@SuppressWarnings("unused")
default WorkerWrapperBuilder<T, V> nextOf(Collection<WorkerWrapper> wrappers) {
return setNext().wrapper(wrappers).end();
}

View File

@@ -1,9 +1,6 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.executor.PollingCenter;
import com.jd.platform.async.util.timer.Timeout;
import com.jd.platform.async.util.timer.TimerTask;
import com.jd.platform.async.worker.ResultState;
import java.util.Collection;
import java.util.Map;
@@ -14,6 +11,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import com.jd.platform.async.openutil.timer.*;
/**
* @author create by TcSnZh on 2021/5/9-下午7:21
*/
@@ -48,6 +47,7 @@ public class WorkerWrapperGroup {
Objects.requireNonNull(wrapper).forEach(this::addWrapper);
}
@SuppressWarnings("unused")
public void addWrapper(WorkerWrapper<?, ?>... wrappers) {
for (WorkerWrapper<?, ?> wrapper : Objects.requireNonNull(wrappers)) {
addWrapper(wrapper);
@@ -76,6 +76,7 @@ public class WorkerWrapperGroup {
public class CheckFinishTask implements TimerTask {
@SuppressWarnings("RedundantThrows")
@Override
public void run(Timeout timeout) throws Exception {
// 已经完成了
@@ -142,7 +143,7 @@ public class WorkerWrapperGroup {
if (obj == this) {
return true;
}
if (!(obj instanceof WorkerWrapperGroup.CheckFinishTask)) {
if (!(obj instanceof CheckFinishTask)) {
return false;
}
return Objects.equals(WorkerWrapperGroup.this, ((CheckFinishTask) obj).getParent());

View File

@@ -0,0 +1,133 @@
package com.jd.platform.async.wrapper.strategy;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.strategy.depend.DependMustStrategyMapper;
import com.jd.platform.async.wrapper.strategy.depend.DependOnUpWrapperStrategyMapper;
import com.jd.platform.async.wrapper.strategy.depend.DependenceAction;
import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy;
import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy;
import java.util.Set;
/**
* @author create by TcSnZh on 2021/5/17-下午6:23
*/
public interface WrapperStrategy extends DependenceStrategy, SkipStrategy {
// ========== 这三个策略器用于链式判断是否要开始工作 ==========
// 从前往后依次判断的顺序为 dependWrapperStrategyMapper -> dependMustStrategyMapper -> dependenceStrategy
/**
* 设置对特殊Wrapper专用的依赖响应策略。
*
* @return 该值允许为null
*/
DependOnUpWrapperStrategyMapper getDependWrapperStrategyMapper();
/**
* 对必须完成的must的Wrapper的依赖响应策略。
* 这是一个不得不向历史妥协的属性。用于适配must开关方式。
*
* @return 该值允许为null
* @deprecated 不推荐使用,很有可能被遗弃
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
DependMustStrategyMapper getDependMustStrategyMapper();
/**
* 底层全局策略。
*
* @return 该值不允许为null
*/
DependenceStrategy getDependenceStrategy();
// ========== 这是跳过策略 ==========
/**
* 跳过策略
*
* @return 不允许为null
*/
SkipStrategy getSkipStrategy();
@Override
default DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
// 如果存在依赖,则调用三层依赖响应策略进行判断
DependenceStrategy strategy = getDependWrapperStrategyMapper();
if (getDependMustStrategyMapper() != null) {
strategy = strategy == null ? getDependMustStrategyMapper() : strategy.thenJudge(getDependenceStrategy());
}
if (getDependenceStrategy() != null) {
strategy = strategy == null ? getDependenceStrategy() : strategy.thenJudge(getDependenceStrategy());
}
if (strategy == null) {
throw new IllegalStateException("配置无效三层判断策略均为null请开发者检查自己的Builder是否逻辑错误");
}
return strategy.judgeAction(dependWrappers, thisWrapper, fromWrapper);
}
@Override
default boolean shouldSkip(Set<WorkerWrapper<?, ?>> nextWrappers, WorkerWrapper<?, ?> thisWrapper, WorkerWrapper<?, ?> fromWrapper) {
return getSkipStrategy() != null && getSkipStrategy().shouldSkip(nextWrappers, thisWrapper, fromWrapper);
}
default void setDependWrapperStrategyMapper(DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper) {
throw new UnsupportedOperationException();
}
default void setDependMustStrategyMapper(DependMustStrategyMapper dependMustStrategyMapper) {
throw new UnsupportedOperationException();
}
default void setDependenceStrategy(DependenceStrategy dependenceStrategy) {
throw new UnsupportedOperationException();
}
default void setSkipStrategy(SkipStrategy skipStrategy) {
throw new UnsupportedOperationException();
}
/**
* 抽象策略器实现了toString
*/
abstract class AbstractWrapperStrategy implements WrapperStrategy {
@Override
public String toString() {
return "WrapperStrategy{" +
"dependWrapperStrategyMapper=" + getDependWrapperStrategyMapper() +
", dependMustStrategyMapper=" + getDependMustStrategyMapper() +
", dependenceStrategy=" + getDependenceStrategy() +
", skipStrategy=" + getSkipStrategy() +
'}';
}
}
/**
* 默认策略器,用默认值实现了所有属性。
*/
class DefaultWrapperStrategy extends AbstractWrapperStrategy {
@Override
public DependOnUpWrapperStrategyMapper getDependWrapperStrategyMapper() {
return null;
}
@Override
public DependMustStrategyMapper getDependMustStrategyMapper() {
return null;
}
@Override
public DependenceStrategy getDependenceStrategy() {
return DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS;
}
@Override
public SkipStrategy getSkipStrategy() {
return SkipStrategy.CHECK_ONE_LEVEL;
}
}
}

View File

@@ -26,7 +26,7 @@ public class DependMustStrategyMapper implements DependenceStrategy {
* 如果所有的Wrapper已经完成本Wrapper将会开始工作
* <p>
* 如果任一{@link #mustDependSet}中的Wrapper失败则返回{@link DependenceAction#FAST_FAIL}
* 具体超时/异常则根据{@link com.jd.platform.async.worker.ResultState}的值进行判断
* 具体超时/异常则根据{@link ResultState}的值进行判断
* <p>
* 如果存在Wrapper未完成 所有的Wrapper都未失败则返回{@link DependenceAction#JUDGE_BY_AFTER}
* </p>

View File

@@ -3,14 +3,14 @@ package com.jd.platform.async.wrapper.strategy.depend;
import com.jd.platform.async.wrapper.WorkerWrapper;
/**
* 单参数策略
* 由上游wrapper决定本wrapper行为的单参数策略
*
* @author create by TcSnZh on 2021/5/1-下午11:16
*/
@FunctionalInterface
public interface DependWrapperActionStrategy {
public interface DependOnUpWrapperStrategy {
/**
* 仅使用一个参数的判断方法
* 仅使用一个参数即调用自身的上游wrapper的判断方法
*
* @param fromWrapper 调用本Wrapper的上游Wrapper
* @return 返回 {@link DependenceAction.WithProperty}
@@ -24,7 +24,7 @@ public interface DependWrapperActionStrategy {
* 未运行时休息
* 失败时失败
*/
DependWrapperActionStrategy SUCCESS_CONTINUE = new DependWrapperActionStrategy() {
DependOnUpWrapperStrategy SUCCESS_CONTINUE = new DependOnUpWrapperStrategy() {
@Override
public DependenceAction.WithProperty judge(WorkerWrapper<?, ?> ww) {
switch (ww.getWorkResult().getResultState()) {
@@ -50,7 +50,7 @@ public interface DependWrapperActionStrategy {
* 未运行时交给下一个策略器判断
* 失败时失败
*/
DependWrapperActionStrategy SUCCESS_START_INIT_CONTINUE = new DependWrapperActionStrategy() {
DependOnUpWrapperStrategy SUCCESS_START_INIT_CONTINUE = new DependOnUpWrapperStrategy() {
@Override
public DependenceAction.WithProperty judge(WorkerWrapper<?, ?> ww) {
switch (ww.getWorkResult().getResultState()) {

View File

@@ -10,12 +10,12 @@ import java.util.stream.Collectors;
/**
* 对不同的{@link WorkerWrapper}调用者实行个性化依赖响应策略
* <p/>
* 使用{@link DependWrapperStrategyMapper}本实现类对{@link DependenceStrategy}进行增强
* 使用{@link DependOnUpWrapperStrategyMapper}本实现类对{@link DependenceStrategy}进行增强
*
* @author create by TcSnZh on 2021/5/1-下午11:12
*/
public class DependWrapperStrategyMapper implements DependenceStrategy {
private final Map<WorkerWrapper<?, ?>, DependWrapperActionStrategy> mapper = new ConcurrentHashMap<>(4);
public class DependOnUpWrapperStrategyMapper implements DependenceStrategy {
private final Map<WorkerWrapper<?, ?>, DependOnUpWrapperStrategy> mapper = new ConcurrentHashMap<>(4);
/**
* 设置对应策略
@@ -24,7 +24,8 @@ public class DependWrapperStrategyMapper implements DependenceStrategy {
* @param strategy 要设置的策略
* @return 返回this链式调用
*/
public DependWrapperStrategyMapper putMapping(WorkerWrapper<?, ?> targetWrapper, DependWrapperActionStrategy strategy) {
@SuppressWarnings("UnusedReturnValue")
public DependOnUpWrapperStrategyMapper putMapping(WorkerWrapper<?, ?> targetWrapper, DependOnUpWrapperStrategy strategy) {
mapper.put(targetWrapper, strategy);
toStringCache = null;
return this;
@@ -33,7 +34,7 @@ public class DependWrapperStrategyMapper implements DependenceStrategy {
/**
* 判断方法
* <p/>
* 如果fromWrapper在{@link #mapper}则返回{@link DependWrapperActionStrategy}的判断返回值否则返回{@link DependenceAction#JUDGE_BY_AFTER}
* 如果fromWrapper在{@link #mapper}则返回{@link DependOnUpWrapperStrategy}的判断返回值否则返回{@link DependenceAction#JUDGE_BY_AFTER}
*
* @param dependWrappers 这里不会使用该值thisWrapper.dependWrappers的属性值
* @param thisWrapper 这里不会使用该值thisWrapper即为被催促的WorkerWrapper
@@ -44,7 +45,7 @@ public class DependWrapperStrategyMapper implements DependenceStrategy {
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
DependWrapperActionStrategy strategy = mapper.get(fromWrapper);
DependOnUpWrapperStrategy strategy = mapper.get(fromWrapper);
if (strategy == null) {
return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
}

View File

@@ -1,10 +1,13 @@
package com.jd.platform.async.wrapper.strategy.depend;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperGroup;
import java.util.*;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@@ -35,7 +38,7 @@ public interface DependenceStrategy {
* @param fromWrapper 调用来源Wrapper
* <p>
* 该参数不会为null
* 因为在{@link WorkerWrapper#work(ExecutorService, long, Map, WrapperEndingInspector)}方法中传入的的第一批无依赖的Wrapper
* 因为在{@link WorkerWrapper#work(ExecutorService, long, WorkerWrapperGroup)}方法中传入的的第一批无依赖的Wrapper
* 不会被该策略器所判断而是不论如何直接执行
* </p>
* @return 返回枚举值内部类WorkerWrapper将会根据其值来决定自己如何响应这次调用 {@link DependenceAction.WithProperty}
@@ -135,6 +138,7 @@ public interface DependenceStrategy {
case EXCEPTION:
resultState = !hasFailed ? workResult.getResultState() : resultState;
fastFailException = !hasFailed ? workResult.getEx() : fastFailException;
// 跳过不算失败
hasFailed = true;
break;
default:
@@ -154,8 +158,10 @@ public interface DependenceStrategy {
};
/**
* 如果被依赖的工作中任一失败则立即失败否则就开始工作不论之前的工作有没有开始
* 如果被依赖的工作中任一失败则立即失败
* 否则就开始工作不论之前的工作有没有开始
*/
@SuppressWarnings("unused")
DependenceStrategy ALL_DEPENDENCIES_NONE_FAILED = new DependenceStrategy() {
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
@@ -166,7 +172,7 @@ public interface DependenceStrategy {
switch (workResult.getResultState()) {
case TIMEOUT:
case EXCEPTION:
return DependenceAction.FAST_FAIL.fastFailException(workResult.getResultState(), workResult.getEx());
return DependenceAction.FAST_FAIL.fastFailException(workResult.getResultState(), workResult.getEx());
default:
}
}
@@ -187,6 +193,7 @@ public interface DependenceStrategy {
* @param theseWrapper 该方法唯一有效参数
* @return 返回生成的 {@link DependenceAction.WithProperty)
*/
@SuppressWarnings("unused")
static DependenceStrategy theseWrapperAllSuccess(Set<WorkerWrapper<?, ?>> theseWrapper) {
return new DependenceStrategy() {
private final Set<WorkerWrapper<?, ?>> theseWrappers;
@@ -239,6 +246,7 @@ public interface DependenceStrategy {
*
* @deprecated 不推荐使用must开关
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
DependenceStrategy IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY = new DependenceStrategy() {
@Override

View File

@@ -2,7 +2,7 @@ package com.jd.platform.async.wrapper.strategy.skip;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.*;
import java.util.Set;
/**
* @author create by TcSnZh on 2021/5/6-下午3:02

View File

@@ -56,6 +56,8 @@ class Case3 {
wrapper(id=B2) is working
wrapper(id=C2) is working
wrapper(id=C1) is working
wrapper(id=B4) is working
// 我们看到B5被跳过了没有执行callback
*/
}
}

View File

@@ -4,14 +4,9 @@ import com.jd.platform.async.executor.Async;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperBuilder;
import com.jd.platform.async.wrapper.strategy.depend.DependWrapperActionStrategy;
import com.jd.platform.async.wrapper.strategy.depend.DependenceAction;
import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 示例自定义依赖策略--对单个wrapper设置上克下策略--简单使用与示例

View File

@@ -67,6 +67,7 @@ class Case8 {
wrapper(id=C) callback fail , workResult is WorkResult{result=null, resultState=TIMEOUT, ex=null}
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
...
以下异常信息省略
*/
}

View File

@@ -0,0 +1,59 @@
package v15.cases;
import com.jd.platform.async.callback.DefaultCallback;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.openutil.collection.CommonDirectedGraph;
import com.jd.platform.async.openutil.collection.DirectedGraph;
import com.jd.platform.async.wrapper.QuickBuildWorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.strategy.WrapperStrategy;
import java.util.concurrent.*;
/**
* 快速构造示例。
*
* @author create by TcSnZh on 2021/5/17-下午5:23
*/
class Case9 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DirectedGraph<WorkerWrapper<?, ?>, Object> graph = DirectedGraph.synchronizedDigraph(new CommonDirectedGraph<>());
QuickBuildWorkerWrapper<Object, Object> w1 = new QuickBuildWorkerWrapper<>("id1",
null,
(object, allWrappers) -> {
System.out.println("I am IWorker 1");
return null;
},
new DefaultCallback<>(),
false,
true,
100,
TimeUnit.MILLISECONDS,
new WrapperStrategy.DefaultWrapperStrategy(),
graph
);
QuickBuildWorkerWrapper<Object, Object> w2 = new QuickBuildWorkerWrapper<>("id2",
null,
(object, allWrappers) -> {
System.out.println("I am IWorker 2");
return null;
},
new DefaultCallback<>(),
false,
true,
100,
TimeUnit.MILLISECONDS,
new WrapperStrategy.DefaultWrapperStrategy(),
graph
);
graph.addNode(w1, w2);
graph.putRelation(w1, new Object(), w2);
// System.out.println(graph);
Async.beginWork(200, w1);
System.out.println(" Begin work end .\n w1 : " + w1 + "\n w2 : " + w2 + "\n");
}
}

View File

@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>asyncTool</artifactId>
<groupId>com.jd.platform</groupId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>asyncTool-openutil</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>

View File

@@ -0,0 +1,109 @@
package com.jd.platform.async.openutil;
import java.util.Comparator;
/**
* 两个int值。重写了{@link #hashCode()}与{@link #equals(Object)}
*
* @author create by TcSnZh on 2021/5/16-上午1:50
*/
public final class BiInt {
// properties
private final int m;
private final int n;
public static final Comparator<BiInt> cmp_m_asc = Comparator.comparingInt(BiInt::getM);
public static final Comparator<BiInt> cmp_n_asc = Comparator.comparingInt(BiInt::getN);
public static final Comparator<BiInt> cmp_m_desc = cmp_m_asc.reversed();
public static final Comparator<BiInt> cmp_n_desc = cmp_n_asc.reversed();
public static final Comparator<BiInt> cmp_m_asc_n_asc =
cmp_m_asc.thenComparing(cmp_n_asc);
public static final Comparator<BiInt> cmp_m_asc_n_desc =
cmp_m_asc.thenComparing(cmp_n_desc);
public static final Comparator<BiInt> cmp_m_desc_n_asc =
cmp_m_desc.thenComparing(cmp_n_asc);
public static final Comparator<BiInt> cmp_m_desc_n_desc =
cmp_m_desc.thenComparing(cmp_n_desc);
public static final Comparator<BiInt> cmp_n_asc_m_asc =
cmp_n_asc.thenComparing(cmp_m_asc);
public static final Comparator<BiInt> cmp_n_asc_m_desc =
cmp_n_asc.thenComparing(cmp_m_desc);
public static final Comparator<BiInt> cmp_n_desc_m_asc =
cmp_n_desc.thenComparing(cmp_m_asc);
public static final Comparator<BiInt> cmp_n_desc_m_desc =
cmp_n_desc.thenComparing(cmp_m_desc);
/**
* private constructor , please use {@link #of(int, int)} to build Idx object.
*/
private BiInt(int m, int n) {
this.m = m;
this.n = n;
}
// getter
public int getM() {
return m;
}
public int getN() {
return n;
}
// hashcode and equals
@Override
public int hashCode() {
return m ^ n;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof BiInt))
return false;
BiInt idx = (BiInt) o;
return m == idx.m && n == idx.n;
}
// toString
@Override
public String toString() {
return "(" + m + ',' + n + ')';
}
// ========== static ==========
// 工厂方法
public static BiInt of(int m, int n) {
if (m == Integer.MIN_VALUE && n == Integer.MAX_VALUE) {
return MIN_TO_MAX;
}
if (m >= 0 && m < CACHE_RANGE_M && n >= 0 && n < CACHE_RANGE_M) {
return cache[m * CACHE_RANGE_M + n];
}
return new BiInt(m, n);
}
// 缓存区间
private static final BiInt MIN_TO_MAX = new BiInt(Integer.MIN_VALUE, Integer.MAX_VALUE);
private static final BiInt[] cache; // m from 0 to 31 , n from 0 to 31 , total 1023 .
private static final int CACHE_RANGE_M = 32; // 0 to 31
private static final int CACHE_RANGE_N = 32; // 0 to 31
static {
cache = new BiInt[CACHE_RANGE_M * CACHE_RANGE_N];
for (int i = 0; i < CACHE_RANGE_M; i++) {
for (int j = 0; j < CACHE_RANGE_N; j++) {
cache[i * CACHE_RANGE_M + j] = new BiInt(i, j);
}
}
}
}

View File

@@ -0,0 +1,74 @@
package com.jd.platform.async.openutil.collection;
import com.jd.platform.async.openutil.BiInt;
import java.util.Iterator;
/**
* @author create by TcSnZh on 2021/5/14-下午9:51
*/
public abstract class AbstractArray2D<E> implements Array2D<E> {
/**
* 用于代替null
*/
protected static final Object NULL = new Object() {
@Override
public String toString() {
return "null";
}
@Override
public int hashCode() {
return 0;
}
@SuppressWarnings("EqualsDoesntCheckParameterClass")
@Override
public boolean equals(Object obj) {
//noinspection ConstantConditions
return obj == null || obj == NULL || obj.equals(null);
}
};
@Override
public String toString() {
StringBuilder sb = new StringBuilder(64).append(this.getClass().getSimpleName()).append('{');
Iterator<Point<E>> it = iterator();
if (it.hasNext()) {
while (true) {
Point<E> point = it.next();
sb.append('{').append(point.getIdx()).append(':').append(point.getElement()).append('}');
if (!it.hasNext()) {
break;
}
sb.append(", ");
}
}
return sb.append('}').toString();
}
public static class PointImpl<E> implements Point<E> {
private final BiInt idx;
private final E element;
public PointImpl(BiInt idx, E element) {
this.idx = idx;
this.element = element;
}
@Override
public BiInt getIdx() {
return idx;
}
@Override
public E getElement() {
return element;
}
@Override
public String toString() {
return "{" + idx + ":" + element + "}";
}
}
}

View File

@@ -0,0 +1,88 @@
package com.jd.platform.async.openutil.collection;
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
/**
* 抽象有向图
*
* @author create by TcSnZh on 2021/5/13-上午11:37
*/
public abstract class AbstractDirectedGraph<N, R> implements DirectedGraph<N, R> {
@Override
public String toString() {
Set<N> nv = nodesView();
Set<? extends Entry<N, R>> rSet = getRelations();
StringBuilder sb = new StringBuilder(nv.size() * 10 + rSet.size() * 20)
.append(this.getClass().getSimpleName()).append("{nodes=[");
Iterator<N> nit = nodesView().iterator();
if (nit.hasNext()) {
for (; ; ) {
sb.append(nit.next());
if (!nit.hasNext()) {
break;
}
sb.append(", ");
}
}
sb.append("], relations=[");
Iterator<? extends Entry<N, R>> eit = rSet.iterator();
if (eit.hasNext()) {
for (; ; ) {
sb.append(eit.next());
if (!eit.hasNext()) {
break;
}
sb.append(", ");
}
}
return sb.append("]}").toString();
}
public abstract class AbstractNodesView extends AbstractSet<N> {
@Override
public boolean add(N n) {
return AbstractDirectedGraph.this.addNode(n);
}
@Override
public boolean remove(Object o) {
N o1;
//noinspection unchecked
if (!AbstractDirectedGraph.this.containsNode(o1 = (N) o)) {
return false;
}
AbstractDirectedGraph.this.removeNode(o1);
return true;
}
}
public static abstract class AbstractEntry<N, R> implements Entry<N, R> {
@Override
public int hashCode() {
return this.getFrom().hashCode() ^ this.getTo().hashCode() ^ this.getRelation().hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof Graph.Entry)) {
return false;
}
Entry obj1 = (Entry) obj;
return Objects.equals(this.getFrom(), obj1.getFrom())
&& Objects.equals(this.getTo(), obj1.getTo())
&& Objects.equals(this.getRelation(), obj1.getRelation());
}
@Override
public String toString() {
return "{from=" + getFrom() + ", relation=" + getRelation() + ", to=" + getTo() + "]";
}
}
}

View File

@@ -0,0 +1,22 @@
package com.jd.platform.async.openutil.collection;
/**
* @author create by TcSnZh on 2021/5/14-上午2:33
*/
public abstract class AbstractStoreArk<E> implements StoreArk<E> {
@Override
public boolean isEmpty() {
return size() <= 0;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(size() * 10).append(this.getClass().getSimpleName()).append("{");
if (!isEmpty()) {
stream().forEach(entry -> sb.append(entry.getKey()).append(":").append(entry.getValue()).append(", "));
sb.delete(sb.length() - 2, sb.length());
}
return sb.append("}").toString();
}
}

View File

@@ -0,0 +1,169 @@
package com.jd.platform.async.openutil.collection;
import com.jd.platform.async.openutil.BiInt;
import java.util.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* 二维数组
*
* @author create by TcSnZh on 2021/5/14-下午9:50
*/
@SuppressWarnings("unused")
public interface Array2D<E> extends Iterable<Array2D.Point<E>> {
/**
* 有多少行
*/
int lineLength();
/**
* 有多少列
*/
int columnLength();
/**
* 添加元素到指定位置
*
* @param line 行
* @param column 列
* @param element 元素
* @return 如果之前添加过元素,将返回替换掉的之前的元素
* @throws IndexOutOfBoundsException 行列超出范围
*/
E add(int line, int column, E element);
/**
* 如果不存在的话则添加元素
* <p>
* {@link #add(int, int, Object)}
*
* @return 不存在且成功添加返回true。
*/
default boolean addIfAbsent(int line, int column, E element) {
if (get(line, column) != null) {
return false;
}
add(line, column, element);
return true;
}
/**
* 删除元素
*
* @param line 行
* @param column 列
* @return 返回移出的元素
* @throws IndexOutOfBoundsException 行列超出返回
* @throws IllegalArgumentException 如果原本不存在元素
*/
E remove(int line, int column);
/**
* 存在则移除不存在则返回null
*
* @param line 行
* @param column 列
* @return 如果不存在返回null。存在则返回被移出的元素。
* @throws IndexOutOfBoundsException 行列超出范围
*/
default E removeIfAbsent(int line, int column) {
if (get(line, column) == null) {
return null;
}
return remove(line, column);
}
/**
* 获取元素
*
* @param line 行
* @param column 列
* @return 如果存在返回该元素。不存在则返回null。
* @throws IndexOutOfBoundsException 行列超出范围
*/
E get(int line, int column);
/**
* 是否包含元素
*
* @param element 元素
* @return 有这个元素就返回true。
*/
boolean containsElement(E element);
/**
* 获取整行的元素
*
* @param line 行号
* @return 返回key为列号value为元素的Map
* @throws IndexOutOfBoundsException 行号超出范围
*/
Map<Integer, E> fullLine(int line);
/**
* 获取整列的元素
*
* @param column 列号
* @return 返回key为行号value为元素的Map
* @throws IndexOutOfBoundsException 列号超出范围
*/
Map<Integer, E> fullColumn(int column);
/**
* 迭代器
*
* @param foreachOrder 遍历顺序
* @return 如果本容器不允许null值存在只需返回存在的元素的键即可。如果允许null值存在仅需返回包括人工放入的null值的键即可。
*/
Iterator<? extends Point<E>> iterator(Comparator<BiInt> foreachOrder);
@Override
default Iterator<Point<E>> iterator() {
//noinspection unchecked
return (Iterator) iterator(BiInt.cmp_m_asc_n_asc);
}
/**
* 流
*/
default Stream<? extends Point<E>> stream() {
return StreamSupport.stream(spliterator(), false);
}
default Stream<? extends Point<E>> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
default Spliterator<Point<E>> spliterator(Comparator<BiInt> foreachOrder) {
return Spliterators.spliteratorUnknownSize(iterator(foreachOrder), 0);
}
default Stream<? extends Point<E>> stream(Comparator<BiInt> foreachOrder) {
return StreamSupport.stream(spliterator(foreachOrder), false);
}
default Stream<? extends Point<E>> parallelStream(Comparator<BiInt> foreachOrder) {
return StreamSupport.stream(spliterator(foreachOrder), true);
}
/**
* 端点
*
* @param <E> 元素泛型
*/
interface Point<E> {
BiInt getIdx();
default int getLine() {
return getIdx().getM();
}
default int getColumn() {
return getIdx().getN();
}
E getElement();
}
}

View File

@@ -0,0 +1,76 @@
package com.jd.platform.async.openutil.collection;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Function;
/**
* 二叉树
*
* @author create by TcSnZh on 2021/5/15-下午8:00
*/
public interface BiTree extends Tree {
// todo
/**
* 二叉树节点
*/
interface BiNode extends Node {
@Override
default BiNode getParent() {
throw new UnsupportedOperationException("Get parent node is not supported.");
}
BiNode getLeft();
BiNode getRight();
@Override
default Collection<? extends BiNode> getChildren() {
return Arrays.asList(getLeft(), getRight());
}
}
/**
* 返回一个通俗易懂的字符画。
* <p/>
* 从leetcode上抄的。若有侵权请联系 {@code zh0u.he@qq.com},将会删除。
*
* @param node 根节点
* @param provideName 节点显示在图中的名字。
* @param <N> 根节点泛型
* @return 返回一个字符画
*/
static <N extends BiNode> String toPrettyString(N node, Function<N, String> provideName) {
StringBuilder sb = new StringBuilder();
//noinspection unchecked
_toPrettyString(node, "", true, sb, (Function) provideName);
return sb.toString();
}
/**
* jdk8没有private static只能加条下划线意思意思了。
*/
static <N extends BiNode> void _toPrettyString(N node,
String prefix,
boolean isLeft,
StringBuilder sb,
Function<BiNode, String> provideName) {
if (node == null) {
sb.append("(Empty tree)");
return;
}
BiNode right = node.getRight();
if (right != null) {
_toPrettyString(right, prefix + (isLeft ? "" : " "), false, sb, provideName);
}
sb.append(prefix).append(isLeft ? "└── " : "┌── ").append(provideName.apply(node)).append('\n');
BiNode left = node.getLeft();
if (left != null) {
_toPrettyString(left, prefix + (isLeft ? " " : ""), true, sb, provideName);
}
}
}

View File

@@ -0,0 +1,60 @@
package com.jd.platform.async.openutil.collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Supplier;
/**
* 一个缓存元素位置的储存柜。
*
* @author create by TcSnZh on 2021/5/14-上午2:37
*/
public class CachedStoreArk<E> extends AbstractStoreArk<E> {
private final StoreArk<E> inner;
private final Map<E, Integer> cacheMap = new HashMap<>();
public CachedStoreArk() {
this(CommonStoreArk::new);
}
private CachedStoreArk(Supplier<StoreArk<E>> sup) {
this.inner = sup.get();
}
@Override
public int store(E element) {
int id = inner.store(element);
cacheMap.put(element, id);
return id;
}
@Override
public E peek(int id) {
return inner.peek(id);
}
@Override
public E takeOut(int id) {
E e = inner.takeOut(id);
cacheMap.remove(e);
return e;
}
@Override
public int size() {
return inner.size();
}
@Override
public Iterator<Map.Entry<Integer, E>> iterator() {
return inner.iterator();
}
@Override
public int findId(E element) {
Integer idNullable = cacheMap.get(element);
return idNullable == null ? -1 : idNullable;
}
}

View File

@@ -0,0 +1,10 @@
package com.jd.platform.async.openutil.collection;
/**
* 冲突范围表
*
* @author create by TcSnZh on 2021/5/16-上午1:36
*/
public class CollisionRangeTable {
// todo
}

View File

@@ -0,0 +1,145 @@
package com.jd.platform.async.openutil.collection;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 线程不安全的有向图。
* <p/>
* 不允许放入null。
*
* @author create by TcSnZh on 2021/5/14-上午2:22
*/
public class CommonDirectedGraph<N, R> extends AbstractDirectedGraph<N, R> {
// ========== properties ==========
private final StoreArk<N> nodes = new CachedStoreArk<>();
private final Array2D<R> arr = new SparseArray2D<>();
// ========== methods ==========
@Override
public boolean addNode(N node) {
if (containsNode(Objects.requireNonNull(node))) {
return false;
}
nodes.store(node);
return true;
}
@Override
public boolean containsNode(N node) {
return node != null && findNodeId(node, false) >= 0;
}
@Override
public Set<? extends Entry<N, R>> removeNode(N node) {
int id = findNodeId(Objects.requireNonNull(node), true);
LinkedHashSet<Entry<N, R>> res = new LinkedHashSet<>();
// 查找node为from的键
arr.fullLine(id).forEach((toNodeId, relation) -> {
res.add(new OuterEntry<>(node, nodes.peek(toNodeId), relation));
arr.remove(id, toNodeId);
});
// 查找node为to的键
arr.fullColumn(id).forEach((fromNodeId, relation) -> {
// 在上一次遍历中fromNodeId为id
if (fromNodeId == id) {
return;
}
res.add(new OuterEntry<>(nodes.peek(fromNodeId), node, relation));
arr.remove(fromNodeId, id);
});
nodes.takeOut(id);
return res;
}
@Override
public R putRelation(N fromNode, R relation, N toNode) {
return arr.add(
findNodeId(Objects.requireNonNull(fromNode), true),
findNodeId(Objects.requireNonNull(toNode), true),
Objects.requireNonNull(relation)
);
}
@Override
public Set<? extends Entry<N, R>> getRelationFrom(N from) {
int id = findNodeId(Objects.requireNonNull(from), true);
LinkedHashSet<Entry<N, R>> res = new LinkedHashSet<>();
// 查找node为from的键
arr.fullLine(id).forEach((toNodeId, relation) -> res.add(new OuterEntry<>(from, nodes.peek(toNodeId), relation)));
return res;
}
@Override
public Set<? extends Entry<N, R>> getRelationTo(N to) {
int id = findNodeId(Objects.requireNonNull(to), true);
LinkedHashSet<Entry<N, R>> res = new LinkedHashSet<>();
// 查找node为to的键
arr.fullColumn(id).forEach((fromNodeId, relation) ->
res.add(new OuterEntry<>(nodes.peek(fromNodeId), to, relation)));
return res;
}
@Override
public Set<N> nodesView() {
return new AbstractNodesView() {
@Override
public Iterator<N> iterator() {
return nodes.stream().map(Map.Entry::getValue).iterator();
}
@Override
public int size() {
return nodes.size();
}
};
}
@Override
public Set<? extends Entry<N, R>> getRelations() {
return arr.stream().map((Function<Array2D.Point<R>, Entry<N, R>>) rPoint -> new OuterEntry<>(
nodes.peek(rPoint.getLine()),
nodes.peek(rPoint.getColumn()),
rPoint.getElement()
)).collect(Collectors.toSet());
}
private int findNodeId(N node, boolean mustExistElseThrowEx) {
int id = nodes.findId(Objects.requireNonNull(node));
if (mustExistElseThrowEx && id < 0) {
throw new IllegalArgumentException("No node exists : " + node);
}
return id;
}
private static class OuterEntry<N, R> extends AbstractEntry<N, R> {
private final N from;
private final N to;
private final R relation;
public OuterEntry(N from, N to, R relation) {
this.from = from;
this.to = to;
this.relation = relation;
}
@Override
public N getFrom() {
return from;
}
@Override
public N getTo() {
return to;
}
@Override
public R getRelation() {
return relation;
}
}
}

View File

@@ -0,0 +1,159 @@
package com.jd.platform.async.openutil.collection;
import java.util.*;
/**
* 自动扩容的id储物柜线程不安全。
*
* @author create by TcSnZh on 2021/5/13-下午1:24
*/
public class CommonStoreArk<E> extends AbstractStoreArk<E> {
private Object[] elements;
/**
* 已经分配的下标数
*/
private int allocSize = 0;
/**
* 保存着最小空元素的队列
*/
private final Queue<Integer> emptyPoints = new PriorityQueue<>(Integer::compareTo);
public CommonStoreArk(int initialCapacity) {
elements = new Object[initialCapacity];
}
public CommonStoreArk() {
this(10);
}
@Override
public int store(E element) {
int id;
elements[id = pollId()] = element;
return id;
}
@Override
public E peek(int id) {
if (id < 0) {
throw new IllegalArgumentException("id " + id + " can't be negative");
}
if (id >= elements.length) {
return null;
}
//noinspection unchecked
return (E) elements[id];
}
@Override
public E takeOut(int id) {
if (id < 0) {
throw new IllegalArgumentException("id " + id + " can't be negative");
}
if (id >= elements.length) {
return null;
}
//noinspection unchecked
E out = (E) elements[id];
elements[id] = null;
if (id == allocSize - 1) {
allocSize--;
} else {
emptyPoints.add(id);
}
return out;
}
@Override
public int size() {
return allocSize - emptyPoints.size();
}
@Override
public Iterator<Map.Entry<Integer, E>> iterator() {
return new Iterator<Map.Entry<Integer, E>>() {
private final Map.Entry<Integer, E>[] items;
private int idx = 0;
{
//noinspection unchecked
items = new Map.Entry[size()];
int itemsIdx = 0;
Iterator<Integer> emptyPointItr = emptyPoints.iterator();
for (int i = 0; i < allocSize; i++) {
Object element = elements[i];
if (element == null) {
continue;
}
final int _i = i;
//noinspection unchecked
items[itemsIdx++] = new Map.Entry<Integer, E>() {
private final int k = _i;
private E v = (E) element;
@Override
public Integer getKey() {
return k;
}
@Override
public E getValue() {
return v;
}
@Override
public E setValue(E value) {
E _v = this.v;
this.v = value;
return _v;
}
@Override
public String toString() {
return "{" + k + ':' + v + '}';
}
};
}
}
@Override
public boolean hasNext() {
return idx < items.length;
}
@Override
public Map.Entry<Integer, E> next() {
return items[idx++];
}
};
}
@Override
public int findId(E element) {
int i = 0;
for (Object o : elements) {
if (Objects.equals(o, element)) {
return i;
}
i++;
}
return -1;
}
private int pollId() {
if (!emptyPoints.isEmpty()) {
return emptyPoints.poll();
}
int id = allocSize++;
int length = elements.length;
if (id >= length) {
// 扩容
elements = Arrays.copyOf(elements, Math.max(length + 1, length + (length >> 1)));
}
return id;
}
}

View File

@@ -0,0 +1,185 @@
package com.jd.platform.async.openutil.collection;
import java.util.AbstractSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
/**
* @author create by TcSnZh on 2021/5/16-下午11:27
*/
public interface DirectedGraph<N, R> extends Graph<N, R> {
@Override
default boolean isDirected() {
return true;
}
static <N, R> DirectedGraph<N, R> readOnlyDigraph(DirectedGraph<N, R> source) {
return new ReadOnlyDirectedGraph<>(source);
}
static <N, R> DirectedGraph<N, R> synchronizedDigraph(DirectedGraph<N, R> source) {
return synchronizedDigraph(source, new Object());
}
static <N, R> DirectedGraph<N, R> synchronizedDigraph(DirectedGraph<N, R> source, Object mutex) {
return new SyncDirectedGraph<>(source, mutex);
}
class ReadOnlyDirectedGraph<N, R> extends AbstractDirectedGraph<N, R> {
private final DirectedGraph<N, R> source;
public ReadOnlyDirectedGraph(DirectedGraph<N, R> source) {
this.source = source;
}
private static UnsupportedOperationException readOnlyGraph() {
return new UnsupportedOperationException("readOnly graph");
}
@Override
public boolean addNode(N node) {
throw readOnlyGraph();
}
@Override
public boolean containsNode(N node) {
return source.containsNode(node);
}
@Override
public Set<? extends Entry<N, R>> removeNode(N node) {
throw readOnlyGraph();
}
@Override
public R putRelation(N fromNode, R relation, N toNode) {
throw readOnlyGraph();
}
@Override
public Set<? extends Entry<N, R>> getRelationFrom(N from) {
return source.getRelationFrom(from);
}
@Override
public Set<? extends Entry<N, R>> getRelationTo(N to) {
return source.getRelationTo(to);
}
@Override
public Set<N> nodesView() {
return new AbstractSet<N>() {
private final Set<N> nodesViewSource = source.nodesView();
@Override
public Iterator<N> iterator() {
return new Iterator<N>() {
private final Iterator<N> iteratorSource = nodesViewSource.iterator();
@Override
public boolean hasNext() {
return iteratorSource.hasNext();
}
@Override
public N next() {
return iteratorSource.next();
}
@Override
public void remove() {
throw readOnlyGraph();
}
};
}
@Override
public int size() {
return nodesViewSource.size();
}
@Override
public boolean add(N n) {
throw readOnlyGraph();
}
@Override
public boolean remove(Object o) {
throw readOnlyGraph();
}
};
}
@Override
public Set<? extends Entry<N, R>> getRelations() {
return source.getRelations();
}
}
class SyncDirectedGraph<N, R> extends AbstractDirectedGraph<N, R> {
private final DirectedGraph<N, R> source;
private final Object mutex;
public SyncDirectedGraph(DirectedGraph<N, R> source, Object mutex) {
this.source = source;
this.mutex = mutex;
}
@Override
public boolean addNode(N node) {
synchronized (mutex) {
return source.addNode(node);
}
}
@Override
public boolean containsNode(N node) {
synchronized (mutex) {
return source.containsNode(node);
}
}
@Override
public Set<? extends Entry<N, R>> removeNode(N node) {
synchronized (mutex) {
return source.removeNode(node);
}
}
@Override
public R putRelation(N fromNode, R relation, N toNode) {
synchronized (mutex) {
return source.putRelation(fromNode, relation, toNode);
}
}
@Override
public Set<? extends Entry<N, R>> getRelationFrom(N from) {
synchronized (mutex) {
return source.getRelationFrom(from);
}
}
@Override
public Set<? extends Entry<N, R>> getRelationTo(N to) {
synchronized (mutex) {
return source.getRelationTo(to);
}
}
@Override
public Set<N> nodesView() {
synchronized (mutex) {
return Collections.synchronizedSet(source.nodesView());
}
}
@Override
public Set<? extends Entry<N, R>> getRelations() {
synchronized (mutex) {
return source.getRelations();
}
}
}
}

View File

@@ -0,0 +1,106 @@
package com.jd.platform.async.openutil.collection;
import java.util.Set;
/**
* 图数据结构
*
* @author create by TcSnZh on 2021/5/13-上午11:37
*/
@SuppressWarnings("unused")
public interface Graph<N, R> {
/**
* 添加节点。
* 如果节点已经存在,则不会添加。
*
* @param node 添加进图的节点
* @return 添加成功返回true如果节点已经存在返回false。
*/
boolean addNode(N node);
/**
* 添加一堆Node任一成功返回true
*/
default boolean addNode(N... nodes) {
boolean success = false;
for (N node : nodes) {
if (addNode(node)) {
success = true;
}
}
return success;
}
/**
* 是否存在节点
*
* @param node 节点。
* @return 存在返回true否则返回false。
*/
boolean containsNode(N node);
/**
* 移除节点。
* 返回与该节点有关系的,被一并移出的键。
*
* @param node 节点
* @return 返回值不会为null。
* @throws IllegalArgumentException 如果两个节点任一不存在本图中,抛出异常。
*/
Set<? extends Entry<N, R>> removeNode(N node);
/**
* 添加关系
* 在无向图中fromNode与toNode参数的位置调换没有影响。
*
* @param fromNode 从这个节点开始
* @param relation 关系
* @param toNode 以那个节点为目标
* @return 如果之前存在关系则会替换之前的关系返回出被替换的之前存在的关系。如果之前没有关系返回null。
* @throws IllegalArgumentException 如果两个节点任一不存在本图中,抛出该异常。
*/
R putRelation(N fromNode, R relation, N toNode);
/**
* 获取“从这个节点开始”的所有关系
*
* @param from 关系开始的节点
* @return 返回 {@link Entry}键。
*/
Set<? extends Entry<N, R>> getRelationFrom(N from);
/**
* 获取“以这个节点为目标”的所有关系
*
* @param to 被关系的节点
* @return 返回 {@link Entry}键。
*/
Set<? extends Entry<N, R>> getRelationTo(N to);
/**
* 返回全部节点视图
*
* @return 视图
*/
Set<N> nodesView();
/**
* 返回全部关系返回的是新Set
*
* @return 与本类无关的Set
*/
Set<? extends Entry<N, R>> getRelations();
/**
* 是否有向
*/
boolean isDirected();
interface Entry<N, R> {
N getFrom();
N getTo();
R getRelation();
}
}

View File

@@ -0,0 +1,230 @@
package com.jd.platform.async.openutil.collection;
import com.jd.platform.async.openutil.BiInt;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 稀疏二维数组。
* <p/>
* 可以设置是否允许存入null。
*
* @author create by TcSnZh on 2021/5/14-下午9:45
*/
public class SparseArray2D<E> extends AbstractArray2D<E> {
// ========== properties ==========
/**
* 限制长宽默认为Integer.MAX_VALUE。稀疏数组不在乎这些。
*/
private final int maxLineLength;
private final int maxColumnLength;
private final boolean allowNull;
private final Map<BiInt, Object> items = new HashMap<>();
// ========== index cache properties ==========
/**
* 缓存行列索引
*/
private final NavigableMap<Integer, NavigableSet<Integer>> indexOfLine2columns = new TreeMap<>(Integer::compareTo);
private final NavigableMap<Integer, NavigableSet<Integer>> indexOfColumn2lines = new TreeMap<>(Integer::compareTo);
// ========== constructor ==========
public SparseArray2D() {
this(Integer.MAX_VALUE, Integer.MAX_VALUE);
}
public SparseArray2D(boolean allowNull) {
this(Integer.MAX_VALUE, Integer.MAX_VALUE, allowNull);
}
public SparseArray2D(int maxLineCapacity, int maxColumnCapacity) {
this(maxLineCapacity, maxColumnCapacity, false);
}
public SparseArray2D(int maxLineCapacity, int maxColumnCapacity, boolean allowNull) {
this.maxLineLength = maxLineCapacity;
this.maxColumnLength = maxColumnCapacity;
this.allowNull = allowNull;
}
// ========== public methods ==========
@Override
public int lineLength() {
return maxLineLength;
}
@Override
public int columnLength() {
return maxColumnLength;
}
@Override
public E add(int line, int column, E element) {
if (!allowNull && element == null) {
throw new NullPointerException("null is not allowed");
}
Object put = items.put(BiInt.of(checkLine(line), checkColumn(column)), element == null ? NULL : element);
addIndex(line, column);
//noinspection unchecked
return NULL.equals(put) ? null : (E) put;
}
@Override
public E remove(int line, int column) {
BiInt idx = BiInt.of(checkLine(line), checkColumn(column));
Object get = items.get(idx);
if (get == null) {
throw new IllegalArgumentException("There is no element in line " + line + " column " + column);
}
items.remove(idx);
removeIndex(line, column);
//noinspection unchecked
return NULL.equals(get) ? null : (E) get;
}
/**
* 该方法如果返回null则分不清 之前存入了null 还是 没有存入过
* <p>
* {@inheritDoc}
*/
@Override
public E get(int line, int column) {
Object get = items.get(BiInt.of(checkLine(line), checkColumn(column)));
//noinspection unchecked
return NULL.equals(get) ? null : (E) get;
}
@Override
public boolean containsElement(E element) {
if (NULL.equals(element)) {
if (!allowNull) {
return false;
}
return items.values().stream().anyMatch(v -> NULL.equals(element));
}
return items.values().stream().anyMatch(element::equals);
}
@Override
public Map<Integer, E> fullLine(int line) {
return Optional.ofNullable(indexOfLine2columns.get(line))
.map(set -> set.stream()
.collect(Collectors.toMap(column -> column, column -> {
//noinspection unchecked
return (E) items.get(BiInt.of(line, column));
})))
.orElse(Collections.emptyMap());
}
@Override
public Map<Integer, E> fullColumn(int column) {
return Optional.ofNullable(indexOfColumn2lines.get(column))
.map(set -> set.stream()
.collect(Collectors.toMap(line -> line, line -> {
//noinspection unchecked
return (E) items.get(BiInt.of(line, column));
})))
.orElse(Collections.emptyMap());
}
@Override
public Iterator<? extends Point<E>> iterator(Comparator<BiInt> foreachOrder) {
return new Iterator<Point<E>>() {
private final Iterator<Map.Entry<BiInt, Object>> it;
private Point<E> last = null;
private boolean removed = false;
{
it = items.entrySet().stream()
.sorted((o1, o2) -> foreachOrder.compare(o1.getKey(), o2.getKey()))
.iterator();
}
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Point<E> next() {
Map.Entry<BiInt, Object> next = it.next();
removed = false;
Object v = next.getValue();
//noinspection unchecked
return last = new PointImpl<>(next.getKey(), NULL.equals(v) ? null : (E) v);
}
@Override
public void remove() {
if (last == null || removed) {
throw new IllegalStateException(last == null
? "Iterator has not yet been called .next() ."
: "Iterator item already removed : " + last);
}
BiInt idx = last.getIdx();
SparseArray2D.this.remove(idx.getM(), idx.getN());
}
};
}
// ========== private methods ==========
private int checkLine(int line) {
int len = lineLength();
if (line < 0 || line >= len) {
throw new IndexOutOfBoundsException("Line " + line + " out of bound [0," + (len - 1) + "]");
}
return line;
}
private int checkColumn(int column) {
int len = columnLength();
if (column < 0 || column >= len) {
throw new IndexOutOfBoundsException("Column " + column + " out of bound [0," + (len - 1) + "]");
}
return column;
}
private void addIndex(int line, int column) {
indexOfLine2columns.computeIfAbsent(line, line1 -> new TreeSet<>(Integer::compareTo)).add(column);
indexOfColumn2lines.computeIfAbsent(column, column1 -> new TreeSet<>(Integer::compareTo)).add(line);
}
private void removeIndex(int line, int column) {
// remove line index
{
NavigableSet<Integer> columns = indexOfLine2columns.get(line);
if (columns == null || !columns.contains(column)) {
throw new ConcurrentModificationException(
"线程不安全导致索引异常 : lines " + columns + " is null or not contain line " + line);
}
if (columns.size() == 1) {
indexOfLine2columns.remove(line);
} else {
columns.remove(column);
}
}
// remove column index
{
NavigableSet<Integer> lines = indexOfColumn2lines.get(column);
if (lines == null || !lines.contains(line)) {
throw new ConcurrentModificationException(
"线程不安全导致索引异常 : lines " + lines + " is null or not contain column " + column);
}
if (lines.size() == 1) {
indexOfColumn2lines.remove(column);
} else {
lines.remove(column);
}
}
}
}

View File

@@ -0,0 +1,69 @@
package com.jd.platform.async.openutil.collection;
import java.util.Map;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* id存储柜。
* 每个元素的id是固定的除非取出后重新加入且id是大于等于0且分配到的id必须是未分配的id中最小的。
* <p>
* 类似于我们去游泳馆,里面的存放个人物品的柜子。
* 放进去元素后会分配一个id。然后凭借该id取出元素。
* 不过不同于这些现实中的柜子的是这个存储柜必定会提供最小的id并且必定>0。
* </p>
*
* @author create by TcSnZh on 2021/5/14-上午2:29
*/
public interface StoreArk<E> extends Iterable<Map.Entry<Integer, E>> {
/**
* 存入元素
*
* @param element 元素。
* @return 返回最小的id。从0开始。
*/
int store(E element);
/**
* 查看元素
*
* @param id id;
* @return 返回存在的元素。如果本id未被占用 或 原先存入null返回null。
* @throws IllegalArgumentException id为负数时抛出该异常
*/
E peek(int id);
/**
* 取出元素
*
* @param id id
* @return 返回被取出的元素。如果本id未被占用 或 原先存入null返回null。
* @throws IllegalArgumentException id为负数时抛出该异常
*/
E takeOut(int id);
/**
* 元素个数
*/
int size();
/**
* 是否为空
*/
boolean isEmpty();
/**
* 查找元素的id
*
* @param element 元素
* @return 如果存在返回id。不存在返回-1
*/
int findId(E element);
/**
* 返回流
*/
default Stream<Map.Entry<Integer, E>> stream() {
return StreamSupport.stream(spliterator(), false);
}
}

View File

@@ -0,0 +1,16 @@
package com.jd.platform.async.openutil.collection;
import java.util.Collection;
/**
* @author create by TcSnZh on 2021/5/15-下午7:58
*/
public interface Tree {
interface Node {
default Node getParent(){
throw new UnsupportedOperationException("Get parent node is not supported.");
}
Collection<? extends Node> getChildren();
}
}

View File

@@ -1,4 +1,4 @@
package com.jd.platform.async.util.collection;
package com.jd.platform.async.openutil.collection;
import java.util.Iterator;

View File

@@ -0,0 +1,46 @@
package com.jd.platform.async.openutil.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 范围锁 todo
*
* @author create by TcSnZh on 2021/5/15-下午6:23
*/
public interface RangeLock extends Lock {
void lock(int start, int end);
boolean tryLock(int start, int end);
boolean tryLock(int start, int end, long time, TimeUnit unit) throws InterruptedException;
void lockInterruptibly(int start, int end) throws InterruptedException;
@Override
Condition newCondition();
@Override
void unlock();
@Override
default void lock() {
lock(Integer.MIN_VALUE, Integer.MAX_VALUE);
}
@Override
default void lockInterruptibly() throws InterruptedException {
lockInterruptibly(Integer.MIN_VALUE, Integer.MAX_VALUE);
}
@Override
default boolean tryLock() {
return tryLock(Integer.MIN_VALUE, Integer.MAX_VALUE);
}
@Override
default boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return tryLock(Integer.MIN_VALUE, Integer.MAX_VALUE, time, unit);
}
}

View File

@@ -1,4 +1,4 @@
package com.jd.platform.async.util.timer;
package com.jd.platform.async.openutil.timer;
/**
* @author create by TcSnZh on 2021/5/12-下午6:36
@@ -10,6 +10,7 @@ public abstract class AbstractWheelTimer implements Timer, AutoCloseable {
public abstract void start();
@SuppressWarnings("RedundantThrows")
@Override
public void close() throws Exception {
stop();

View File

@@ -1,4 +1,4 @@
package com.jd.platform.async.util.timer;
package com.jd.platform.async.openutil.timer;
import java.util.*;
import java.util.concurrent.*;
@@ -10,12 +10,11 @@ import java.util.concurrent.atomic.AtomicLong;
* 从netty里抄来的删去了一些功能
* <p/>
* <b>
* 如果违反开源协议请联系作者 zh0u.he@qq.com
* 如果违反开源协议请联系作者 zh.jobs@foxmail.com
* If violate the open source agreement, please contact the author : zh0u.he@qq.com
* </b>
*
* @author create by TcSnZh on 2021/5/12-下午7:16
* @
*/
public class HashedWheelTimer extends AbstractWheelTimer {
@@ -42,6 +41,7 @@ public class HashedWheelTimer extends AbstractWheelTimer {
* ({@link Executors#defaultThreadFactory()}), default tick duration, and
* default number of ticks per wheel.
*/
@SuppressWarnings("unused")
public HashedWheelTimer() {
this(Executors.defaultThreadFactory());
}
@@ -56,6 +56,7 @@ public class HashedWheelTimer extends AbstractWheelTimer {
* @throws NullPointerException if {@code unit} is {@code null}
* @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
*/
@SuppressWarnings("unused")
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
this(Executors.defaultThreadFactory(), tickDuration, unit);
}
@@ -70,6 +71,7 @@ public class HashedWheelTimer extends AbstractWheelTimer {
* @throws NullPointerException if {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
*/
@SuppressWarnings("unused")
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}
@@ -118,28 +120,7 @@ public class HashedWheelTimer extends AbstractWheelTimer {
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @param leakDetection {@code true} if leak detection should be enabled always,
* if false it will only be enabled if the worker thread is not
* a daemon thread.
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
}
/**
@@ -151,21 +132,19 @@ public class HashedWheelTimer extends AbstractWheelTimer {
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @param leakDetection {@code true} if leak detection should be enabled always,
* if false it will only be enabled if the worker thread is not
* a daemon thread.
* @param maxPendingTimeouts The maximum number of pending timeouts after which call to
* {@code newTimeout} will result in
* {@link java.util.concurrent.RejectedExecutionException}
* {@link RejectedExecutionException}
* being thrown. No maximum pending timeouts limit is assumed if
* this value is 0 or negative.
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
public HashedWheelTimer(ThreadFactory threadFactory,
long tickDuration,
TimeUnit unit,
int ticksPerWheel,
long maxPendingTimeouts) {
Objects.requireNonNull(threadFactory, "threadFactory must not null !");
Objects.requireNonNull(threadFactory, "unit must not null !");
@@ -427,6 +406,7 @@ public class HashedWheelTimer extends AbstractWheelTimer {
}
try {
//noinspection BusyWait
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (workerState.get() == WORKER_STATE_SHUTDOWN) {

View File

@@ -1,4 +1,4 @@
package com.jd.platform.async.util.timer;
package com.jd.platform.async.openutil.timer;
/**
* 借鉴netty
@@ -32,5 +32,6 @@ public interface Timeout {
*
* @return 如果取消成功完成则为true否则为false
*/
@SuppressWarnings("unused")
boolean cancel();
}

View File

@@ -1,7 +1,8 @@
package com.jd.platform.async.util.timer;
package com.jd.platform.async.openutil.timer;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -23,6 +24,7 @@ public interface Timer {
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
@SuppressWarnings("unused")
default Timeout newTimeout(Runnable runnable, long delay, TimeUnit unit) {
AtomicReference<Timeout> timeoutRef = new AtomicReference<>();
newTimeout(timeout -> {

View File

@@ -1,4 +1,4 @@
package com.jd.platform.async.util.timer;
package com.jd.platform.async.openutil.timer;
/**
* 类似于netty的TimerTask

View File

@@ -0,0 +1,33 @@
package com.jd.platform.async.openutil.unsafe;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
/**
* @author create by TcSnZh on 2021/5/16-下午4:36
*/
@SuppressWarnings({"AlibabaAbstractClassShouldStartWithAbstractNaming", "unused"})
abstract class UnsafeUtil {
private static final Unsafe unsafe;
static {
Field theUnsafe;
try {
theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
} catch (NoSuchFieldException e) {
throw new Error(e);
}
theUnsafe.setAccessible(true);
try {
unsafe = (Unsafe) theUnsafe.get(null);
} catch (IllegalAccessException e) {
throw new Error(e);
}
}
@SuppressWarnings("FinalStaticMethod")
public static final Unsafe getUnsafe() {
return unsafe;
}
}

View File

@@ -0,0 +1,51 @@
package openutiltest;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* 便于测试的jdk动态代理
*
* @author create by TcSnZh on 2021/5/16-下午11:38
*/
public class PrintProxy<I> {
public PrintProxy(Class<I> clazz) {
this.interfaceClazz = clazz;
}
private final Class<?> interfaceClazz;
public I proxyTo(I obj, String objNickName) {
//noinspection unchecked
return (I) Proxy.newProxyInstance(
obj.getClass().getClassLoader(),
new Class[]{interfaceClazz},
(proxy, method, args) -> {
String methodInfo = methodInfo(method);
try {
Object res = method.invoke(obj, args);
System.out.printf(objNickName + " 执行方法: %-40s --> 方法返回值: %-20s --> this.toString() = %-40s\n",
methodInfo, res, obj);
return res;
} catch (Exception e) {
System.err.printf(objNickName + " 执行方法: %-40s --> 异常信息: %-40s --> this.toString() = %-40s\n",
methodInfo, e.getClass().getSimpleName() + " : " + e.getMessage(), obj
);
throw e;
}
}
);
}
private static String methodInfo(Method method) {
StringBuilder sb = new StringBuilder().append(method.getName()).append('(');
for (Class<?> parameterType : method.getParameterTypes()) {
sb.append(parameterType.getSimpleName()).append(", ");
}
if (method.getParameterTypes().length > 0) {
sb.delete(sb.length() - 2, sb.length());
}
return sb.append(')').toString();
}
}

View File

@@ -0,0 +1,37 @@
package openutiltest;
import com.jd.platform.async.openutil.collection.CommonDirectedGraph;
import com.jd.platform.async.openutil.collection.DirectedGraph;
import java.util.Arrays;
/**
* 测试图工具类的使用
*
* @author create by TcSnZh on 2021/5/16-下午11:25
*/
public class TestGraph {
public static void main(String[] args) {
test_CommonDirectedGraph();
}
private static void test_CommonDirectedGraph() {
System.out.println("\n\n ==================== 测试正常使用 ==================");
//noinspection unchecked
DirectedGraph<String, String> graph =
new PrintProxy<>(DirectedGraph.class).proxyTo(new CommonDirectedGraph<>(), "graph");
graph.isDirected();
graph.addNode("胖虎");
graph.addNode("大雄");
graph.putRelation("胖虎", "", "大雄");
graph.addNode("静香");
graph.nodesView().addAll(Arrays.asList("小夫", "胖虎的妹妹", "哆啦A梦"));
graph.putRelation("胖虎", "是其哥", "胖虎的妹妹");
graph.putRelation("胖虎的妹妹", "是其妹", "胖虎");
graph.putRelation("胖虎的妹妹", "喜欢", "大雄");
graph.putRelation("胖虎", "????", "小夫");
graph.putRelation("大雄", "喜欢", "静香");
graph.removeNode("大雄");
graph.getRelations();
}
}

View File

@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>asyncTool</artifactId>
<groupId>com.jd.platform</groupId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>asyncTool-scheduling</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<artifactId>asyncTool-core</artifactId>
<groupId>com.jd.platform</groupId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,40 @@
package com.jd.platform.async.scheduling;
import com.jd.platform.async.scheduling.model.SchedulingDrawingsModel;
/**
* @author create by TcSnZh on 2021/5/17-上午1:22
*/
public class DefaultSchedulingJsonParser implements SchedulingJsonParser {
// ========== singleton instance ==========
/**
* 私有构造方法,需要通过{@link #getInstance()}方法获取单例。
*/
private DefaultSchedulingJsonParser() {
}
/**
* 获取单例
*/
public static DefaultSchedulingJsonParser getInstance() {
return instance;
}
private static final DefaultSchedulingJsonParser instance = new DefaultSchedulingJsonParser();
// ========== public methods ==========
@Override
public SchedulingDrawingsModel parse(String json) {
// todo
return null;
}
// ========== util methods ==========
}

View File

@@ -0,0 +1,35 @@
package com.jd.platform.async.scheduling;
import sun.misc.Unsafe;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* 调度工厂。传入图纸生成一组wrapper。
*
* @author create by TcSnZh on 2021/5/17-上午1:11
*/
public class SchedulingFactory {
private final String factoryName;
/**
* 无参构造,默认使用 {@code 栈信息<自增long值> } 作为工厂名
*/
public SchedulingFactory() {
this(Thread.currentThread().getStackTrace()[2] + "<" + defaultFactoryNameCount.getAndIncrement() + ">");
}
/**
* 指定工厂名
*
* @param factoryName 工厂名
*/
public SchedulingFactory(String factoryName) {
this.factoryName = factoryName;
}
// ========== static ==========
private static final AtomicLong defaultFactoryNameCount = new AtomicLong();
}

View File

@@ -0,0 +1,24 @@
package com.jd.platform.async.scheduling;
import com.jd.platform.async.scheduling.model.SchedulingDrawingsModel;
/**
* @author create by TcSnZh on 2021/5/17-下午7:22
*/
public interface SchedulingJsonParser {
/**
* 解析json为图纸对象
*
* @param json json
* @return 返回图纸对象接口
*/
SchedulingDrawingsModel parse(String json);
/**
* 默认实现
*/
static SchedulingJsonParser getDefaultInstance() {
return DefaultSchedulingJsonParser.getInstance();
}
}

View File

@@ -0,0 +1,25 @@
package com.jd.platform.async.scheduling.exception;
/**
* @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/19-下午7:20
*/
public class IllegalConfigException extends Exception{
public IllegalConfigException() {
}
public IllegalConfigException(String message) {
super(message);
}
public IllegalConfigException(String message, Throwable cause) {
super(message, cause);
}
public IllegalConfigException(Throwable cause) {
super(cause);
}
public IllegalConfigException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@@ -0,0 +1,80 @@
package com.jd.platform.async.scheduling.model;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.wrapper.strategy.depend.DependOnUpWrapperStrategy;
import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy;
import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy;
import java.util.HashMap;
import java.util.Map;
/**
* @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/19-下午7:51
*/
@SuppressWarnings("unused")
public class ObjectModel {
protected String constObjectName;
protected String className;
protected Long sameObjectId;
protected Map<String, Object> properties;
public ObjectModel() {
}
public String getConstObjectName() {
return constObjectName;
}
public void setConstObjectName(String constObjectName) {
this.constObjectName = constObjectName;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public Long getSameObjectId() {
return sameObjectId;
}
public void setSameObjectId(Long sameObjectId) {
this.sameObjectId = sameObjectId;
}
public Map<String, Object> getProperties() {
return properties;
}
public void setProperties(Map<String, Object> properties) {
this.properties = properties;
}
public static Map<String, Object> getConstObjects() {
return constObjects;
}
// static constants
private static final Map<String, Object> constObjects;
static {
constObjects = new HashMap<>(16);
constObjects.put("NOT_SKIP", SkipStrategy.NOT_SKIP);
constObjects.put("CHECK_ONE_LEVEL", SkipStrategy.CHECK_ONE_LEVEL);
constObjects.put("ALL_DEPENDENCIES_ALL_SUCCESS", DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS);
constObjects.put("ALL_DEPENDENCIES_ANY_SUCCESS", DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS);
constObjects.put("ALL_DEPENDENCIES_NONE_FAILED", DependenceStrategy.ALL_DEPENDENCIES_NONE_FAILED);
constObjects.put("SUCCESS_CONTINUE", DependOnUpWrapperStrategy.SUCCESS_CONTINUE);
constObjects.put("SUCCESS_START_INIT_CONTINUE", DependOnUpWrapperStrategy.SUCCESS_START_INIT_CONTINUE);
constObjects.put("PRINT_EXCEPTION_STACK_TRACE", ICallback.PRINT_EXCEPTION_STACK_TRACE);
}
public static <T> T getConstObject(String name) {
//noinspection unchecked
return (T) constObjects.get(name);
}
}

View File

@@ -0,0 +1,255 @@
package com.jd.platform.async.scheduling.model;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 工厂图纸模型对象。
* 具体参数含义,请查阅 QuickStart.md
*
* @author create by TcSnZh on 2021/5/17-上午1:16
*/
@SuppressWarnings("unused")
public class SchedulingDrawingsModel {
protected String drawingsName;
protected List<WrapperModel> wrappers;
protected List<RelationModel> relations;
protected BeginWorkModel beginWork;
public static class WrapperModel {
protected String id;
protected ParamModel param;
protected ObjectModel worker;
protected ObjectModel callback;
protected WrapperStrategyModel wrapperStrategy;
protected Boolean allowInterrupt;
protected Boolean enableTimeout;
protected Long timeoutLength;
protected TimeUnit timeUnit;
protected String extendConfig;
public static class ParamModel {
protected Boolean useObjectModel;
protected Object value; // true - ObjectModel ; false - the json converted to type, such as Map\List\String...
public Boolean getUseObjectModel() {
return useObjectModel;
}
public void setUseObjectModel(Boolean useObjectModel) {
this.useObjectModel = useObjectModel;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
}
public static class WrapperStrategyModel {
protected Map<String, ObjectModel> dependOnUpWrapperStrategyMapper;
protected ObjectModel dependenceStrategy;
protected ObjectModel skipStrategy;
public Map<String, ObjectModel> getDependOnUpWrapperStrategyMapper() {
return dependOnUpWrapperStrategyMapper;
}
public void setDependOnUpWrapperStrategyMapper(Map<String, ObjectModel> dependOnUpWrapperStrategyMapper) {
this.dependOnUpWrapperStrategyMapper = dependOnUpWrapperStrategyMapper;
}
public ObjectModel getDependenceStrategy() {
return dependenceStrategy;
}
public void setDependenceStrategy(ObjectModel dependenceStrategy) {
this.dependenceStrategy = dependenceStrategy;
}
public ObjectModel getSkipStrategy() {
return skipStrategy;
}
public void setSkipStrategy(ObjectModel skipStrategy) {
this.skipStrategy = skipStrategy;
}
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public ParamModel getParam() {
return param;
}
public void setParam(ParamModel param) {
this.param = param;
}
public ObjectModel getWorker() {
return worker;
}
public void setWorker(ObjectModel worker) {
this.worker = worker;
}
public ObjectModel getCallback() {
return callback;
}
public void setCallback(ObjectModel callback) {
this.callback = callback;
}
public WrapperStrategyModel getWrapperStrategy() {
return wrapperStrategy;
}
public void setWrapperStrategy(WrapperStrategyModel wrapperStrategy) {
this.wrapperStrategy = wrapperStrategy;
}
public Boolean getAllowInterrupt() {
return allowInterrupt;
}
public void setAllowInterrupt(Boolean allowInterrupt) {
this.allowInterrupt = allowInterrupt;
}
public Boolean getEnableTimeout() {
return enableTimeout;
}
public void setEnableTimeout(Boolean enableTimeout) {
this.enableTimeout = enableTimeout;
}
public Long getTimeoutLength() {
return timeoutLength;
}
public void setTimeoutLength(Long timeoutLength) {
this.timeoutLength = timeoutLength;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}
public String getExtendConfig() {
return extendConfig;
}
public void setExtendConfig(String extendConfig) {
this.extendConfig = extendConfig;
}
}
public static class RelationModel {
protected Object from; // from和to最多有一个数组剩下的都是String
protected Object to;
public Object getFrom() {
return from;
}
public void setFrom(Object from) {
this.from = from;
}
public Object getTo() {
return to;
}
public void setTo(Object to) {
this.to = to;
}
}
public static class BeginWorkModel {
protected Long timeoutLength;
protected TimeUnit timeoutUnit;
protected List<String> wrappers;
protected String executor;
public Long getTimeoutLength() {
return timeoutLength;
}
public void setTimeoutLength(Long timeoutLength) {
this.timeoutLength = timeoutLength;
}
public TimeUnit getTimeoutUnit() {
return timeoutUnit;
}
public void setTimeoutUnit(TimeUnit timeoutUnit) {
this.timeoutUnit = timeoutUnit;
}
public List<String> getWrappers() {
return wrappers;
}
public void setWrappers(List<String> wrappers) {
this.wrappers = wrappers;
}
public String getExecutor() {
return executor;
}
public void setExecutor(String executor) {
this.executor = executor;
}
}
public String getDrawingsName() {
return drawingsName;
}
public void setDrawingsName(String drawingsName) {
this.drawingsName = drawingsName;
}
public List<WrapperModel> getWrappers() {
return wrappers;
}
public void setWrappers(List<WrapperModel> wrappers) {
this.wrappers = wrappers;
}
public List<RelationModel> getRelations() {
return relations;
}
public void setRelations(List<RelationModel> relations) {
this.relations = relations;
}
public BeginWorkModel getBeginWork() {
return beginWork;
}
public void setBeginWork(BeginWorkModel beginWork) {
this.beginWork = beginWork;
}
}

View File

@@ -0,0 +1,16 @@
package com.jd.platform.async.scheduling.util;
/**
* 反射工具类
*
* @author tcsnzh[zh.jobs@foxmail.com]
*/
@SuppressWarnings("AlibabaAbstractClassShouldStartWithAbstractNaming")
public abstract class ReflectUtil {
private ReflectUtil() {
}
public static void foreachField(Class<?> clazz) {
}
}

View File

@@ -0,0 +1,41 @@
package schedulingtest;
import java.io.*;
import java.util.Objects;
/**
* @author create by TcSnZh on 2021/5/17-上午11:34
*/
public class FileStringReader {
public static String readFile(String resourceClasspath) {
InputStream is = null;
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(Objects.requireNonNull(
is = Objects.requireNonNull(
Thread.currentThread().getContextClassLoader(), "get classLoader is null"
).getResourceAsStream(resourceClasspath), () -> "get resource " + resourceClasspath + " is null"
)));
StringWriter stringWriter = new StringWriter();
char[] buf = new char[4096];
int len;
while ((len = reader.read(buf)) > 0) {
stringWriter.write(buf, 0, len);
}
return stringWriter.toString();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
System.out.println("test readFile : \n\n" + readFile("case1_1.json"));
}
}

View File

@@ -0,0 +1,17 @@
package schedulingtest;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author create by TcSnZh on 2021/5/17-上午2:31
*/
public class PrintParam implements IWorker<Object, Object> {
@Override
public Object action(Object object, Map<String, WorkerWrapper<?, ?>> allWrappers) {
System.out.println("print param : " + object);
return object;
}
}

View File

@@ -0,0 +1,14 @@
package schedulingtest.cases;
import com.jd.platform.async.scheduling.SchedulingJsonParser;
import schedulingtest.FileStringReader;
/**
* @author create by TcSnZh on 2021/5/17-上午11:49
*/
class Case1 {
public static void main(String[] args) {
String json = FileStringReader.readFile("test.json");
System.out.println(SchedulingJsonParser.getDefaultInstance().parse(json));
}
}

View File

@@ -0,0 +1,52 @@
package schedulingtest.entity;
/**
* @author create by TcSnZh on 2021/5/17-上午1:33
*/
public class User {
private String name;
private int age;
private double money;
public User() {
}
public User(String name, int age, double money) {
this.name = name;
this.age = age;
this.money = money;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public double getMoney() {
return money;
}
public void setMoney(double money) {
this.money = money;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
", money=" + money +
'}';
}
}

Some files were not shown because too many files have changed in this diff Show More