From a0c7e2893196cbd60e1322d2cda5dec351290ed2 Mon Sep 17 00:00:00 2001 From: TcSnZh Date: Wed, 26 May 2021 16:54:26 +0800 Subject: [PATCH] =?UTF-8?q?v1.5.1-SNAPSHOT=20=E4=BD=BF=E7=94=A8OnceWork?= =?UTF-8?q?=E5=8F=A5=E6=9F=84=E6=9B=BF=E4=BB=A3=E4=BA=86=E6=97=A7=E7=9A=84?= =?UTF-8?q?boolean=E8=BF=94=E5=9B=9E=E5=80=BC=E3=80=82=E5=B9=B6=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E4=BA=86=E5=8F=96=E6=B6=88=E4=BB=BB=E5=8A=A1=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- QuickStart.md | 420 +++++------------- asyncTool-core/pom.xml | 4 +- .../exception/CancelSkippedException.java | 19 + .../async/exception/SkippedException.java | 15 +- .../com/jd/platform/async/executor/Async.java | 162 +++++-- .../async/executor/PollingCenter.java | 2 +- .../jd/platform/async/worker/OnceWork.java | 375 ++++++++++++++++ .../platform/async/wrapper/WorkerWrapper.java | 41 +- .../async/wrapper/WorkerWrapperGroup.java | 102 ++++- .../wrapper/strategy/WrapperStrategy.java | 4 +- .../src/test/java/v15/cases/Case0.java | 37 ++ .../src/test/java/v15/cases/Case01.java | 37 ++ .../src/test/java/v15/cases/Case02.java | 38 ++ .../src/test/java/v15/cases/Case1.java | 4 +- .../src/test/java/v15/cases/Case10.java | 72 +++ .../src/test/java/v15/cases/Case2.java | 2 +- .../src/test/java/v15/cases/Case3.java | 2 +- .../src/test/java/v15/cases/Case4.java | 2 +- .../src/test/java/v15/cases/Case5.java | 2 +- .../src/test/java/v15/cases/Case6.java | 2 +- .../src/test/java/v15/cases/Case7.java | 2 +- .../src/test/java/v15/cases/Case8.java | 2 +- .../src/test/java/v15/cases/Case9.java | 10 +- asyncTool-openutil/pom.xml | 2 +- .../com/jd/platform/async/openutil/BiInt.java | 2 +- .../async/openutil/collection/BiTree.java | 76 ---- .../collection/CollisionRangeTable.java | 10 - .../openutil/collection/DirectedGraph.java | 5 +- .../async/openutil/collection/Tree.java | 16 - .../openutil/collection/WheelIterator.java | 23 - .../async/openutil/concurrent/RangeLock.java | 46 -- .../async/openutil/unsafe/UnsafeUtil.java | 33 -- .../src/test/java/openutiltest/TestGraph.java | 2 +- asyncTool-scheduling/pom.xml | 31 -- .../async/scheduling/AsyncScheduling.java | 9 - .../DefaultSchedulingDrawingsParser.java | 219 --------- .../drawings/SchedulingDrawings.java | 10 - .../drawings/SchedulingDrawingsImpl.java | 16 - .../drawings/SchedulingDrawingsParser.java | 18 - .../exception/IllegalSchedulingException.java | 25 -- .../IllegalSchedulingPropertyException.java | 55 --- .../factory/AbstractSchedulingFactory.java | 32 -- .../DefaultSchedulingJsonModelParser.java | 33 -- .../async/scheduling/model/ObjectModel.java | 93 ---- .../model/SchedulingDrawingsModel.java | 300 ------------- .../model/SchedulingJsonModelParser.java | 23 - .../async/scheduling/util/ReflectUtil.java | 13 - .../v15/schedulingtest/FileStringReader.java | 37 -- .../v15/schedulingtest/cases/case1/Case1.java | 15 - .../schedulingtest/cases/case1/PayTaxes.java | 34 -- .../cases/case1/PrintParam.java | 17 - .../cases/case1/SelectUserByName.java | 48 -- .../v15/schedulingtest/cases/case1/User.java | 52 --- .../src/test/resources/v15/case1_1.json | 80 ---- pom.xml | 3 +- 55 files changed, 966 insertions(+), 1768 deletions(-) create mode 100644 asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelSkippedException.java create mode 100644 asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java create mode 100644 asyncTool-core/src/test/java/v15/cases/Case0.java create mode 100644 asyncTool-core/src/test/java/v15/cases/Case01.java create mode 100644 asyncTool-core/src/test/java/v15/cases/Case02.java create mode 100644 asyncTool-core/src/test/java/v15/cases/Case10.java delete mode 100644 asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/BiTree.java delete mode 100644 asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/CollisionRangeTable.java delete mode 100644 asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/Tree.java delete mode 100644 asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/WheelIterator.java delete mode 100644 asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/concurrent/RangeLock.java delete mode 100644 asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/unsafe/UnsafeUtil.java delete mode 100644 asyncTool-scheduling/pom.xml delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/AsyncScheduling.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/DefaultSchedulingDrawingsParser.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawings.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawingsImpl.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawingsParser.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/exception/IllegalSchedulingException.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/exception/IllegalSchedulingPropertyException.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/factory/AbstractSchedulingFactory.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/DefaultSchedulingJsonModelParser.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/ObjectModel.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/SchedulingDrawingsModel.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/SchedulingJsonModelParser.java delete mode 100644 asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/util/ReflectUtil.java delete mode 100644 asyncTool-scheduling/src/test/java/v15/schedulingtest/FileStringReader.java delete mode 100644 asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/Case1.java delete mode 100644 asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/PayTaxes.java delete mode 100644 asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/PrintParam.java delete mode 100644 asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/SelectUserByName.java delete mode 100644 asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/User.java delete mode 100644 asyncTool-scheduling/src/test/resources/v15/case1_1.json diff --git a/QuickStart.md b/QuickStart.md index 690e952..ec1e1db 100644 --- a/QuickStart.md +++ b/QuickStart.md @@ -56,13 +56,7 @@ mvn install asyncTool-core com.jd.platform - 1.5.0-SNAPSHOT - - - - asyncTool-scheduling - com.jd.platform - 1.5.0-SNAPSHOT + 1.5.1-SNAPSHOT ``` @@ -77,7 +71,7 @@ mvn install > > com.jd.platform > asyncTool-core -> 1.5.0-SNAPSHOT +> 1.5.1-SNAPSHOT > > > ``` @@ -191,23 +185,6 @@ WorkerWrapper.builder() 以下为示例: ```java -package v15.cases; - -import com.jd.platform.async.callback.DefaultCallback; -import com.jd.platform.async.executor.Async; -import com.jd.platform.async.openutil.collection.CommonDirectedGraph; -import com.jd.platform.async.openutil.collection.DirectedGraph; -import com.jd.platform.async.wrapper.QuickBuildWorkerWrapper; -import com.jd.platform.async.wrapper.WorkerWrapper; -import com.jd.platform.async.wrapper.strategy.WrapperStrategy; - -import java.util.concurrent.*; - -/** - * 快速构造示例。 - * - * @author create by TcSnZh on 2021/5/17-下午5:23 - */ class Case9 { public static void main(String[] args) throws ExecutionException, InterruptedException { DirectedGraph, Object> graph = DirectedGraph.synchronizedDigraph(new CommonDirectedGraph<>()); @@ -244,10 +221,16 @@ class Case9 { // System.out.println(graph); - Async.beginWork(200, w1); + Async.work(200, w1).awaitFinish(); System.out.println(" Begin work end .\n w1 : " + w1 + "\n w2 : " + w2 + "\n"); - + /* 输出: + I am IWorker 1 + I am IWorker 2 + Begin work end . + w1 : 省略 + w2 : 省略 + */ } } ``` @@ -259,7 +242,7 @@ class Case9 { ![输入图片说明](https://images.gitee.com/uploads/images/2019/1226/140256_8c015621_303698.png "屏幕截图.png") ```java -class Test { +class Case0 { static WorkerWrapperBuilder builder(String id) { return WorkerWrapper.builder() .id(id) @@ -274,15 +257,15 @@ class Test { WorkerWrapper b = builder("B").build(); WorkerWrapper c = builder("C").build(); try { - Async.beginWork(100, a, b, c); - } catch (ExecutionException | InterruptedException e) { + Async.work(100, a, b, c).awaitFinish(); + } catch (InterruptedException e) { e.printStackTrace(); } /* 输出: wrapper(id=A) is working wrapper(id=B) is working wrapper(id=C) is working - */ + */ } } ``` @@ -293,7 +276,7 @@ class Test { ![输入图片说明](https://images.gitee.com/uploads/images/2019/1226/140405_93800bc7_303698.png "屏幕截图.png") ```java -class Test { +class Case01 { static WorkerWrapperBuilder builder(String id) { return WorkerWrapper.builder() .id(id) @@ -309,8 +292,8 @@ class Test { WorkerWrapper c = builder("C").depends(a).build(); WorkerWrapper f = builder("F").depends(b, c).build(); try { - Async.beginWork(100, a); - } catch (ExecutionException | InterruptedException e) { + Async.work(100, a).awaitFinish(); + } catch (InterruptedException e) { e.printStackTrace(); } /* 输出: @@ -326,7 +309,7 @@ class Test { 如果觉得`.depneds()`方法的排序您不喜欢,也可以用`.nextOf()`这种方式: ```java -class Test { +class Case02 { static WorkerWrapperBuilder builder(String id) { return WorkerWrapper.builder() .id(id) @@ -343,8 +326,8 @@ class Test { .nextOf(builder("C").nextOf(f).build()) .build(); try { - Async.beginWork(100, a); - } catch (ExecutionException | InterruptedException e) { + Async.work(100, a).awaitFinish(); + } catch (InterruptedException e) { e.printStackTrace(); } /* 输出: @@ -396,8 +379,8 @@ class Case1 { ) .build(); try { - Async.beginWork(1000, a, d); - } catch (ExecutionException | InterruptedException e) { + Async.work(1000, a, d).awaitFinish(); + } catch (InterruptedException e) { e.printStackTrace(); } /* 输出: @@ -455,7 +438,7 @@ class Case2 { .id("id:200").worker(new AddWork()).param(200).build(); WorkerWrapper add = WorkerWrapper.builder().id("id:add") .worker(new AddWork("id:100", "id:200")).depends(wrapper100, wrapper200).build(); - Async.beginWork(20,wrapper100,wrapper200); + Async.work(20,wrapper100,wrapper200).awaitFinish(); System.out.println(add.getWorkResult()); // 输出WorkResult{result=300, resultState=SUCCESS, ex=null} } @@ -469,11 +452,16 @@ class Case2 { `Async`工具类有多个方法可以使用自定义线程池 ```java -public static boolean beginWork(long timeout, +public static OnceWork work(long timeout, ExecutorService executorService, Collection> workerWrappers); -public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper); +public static OnceWork work(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper); + +public static OnceWork work(long timeout, + ExecutorService executorService, + Collection> workerWrappers, + String workId); ``` 另外,如果没有指定线程池,默认会使用`COMMON_POOL`,您可以调用这些方法获得/关闭此线程池: @@ -496,7 +484,7 @@ public static synchronized boolean shutDownCommonPool(boolean now); 以下是一个使用自定义线程池的简单代码示例: ```java -Async.beginWork(1000, Executors.newFixedThreadPool(2),a); +Async.work(1000, Executors.newFixedThreadPool(2),a).awaitFinish(); ``` ## WorkerWrapper基本属性 @@ -548,6 +536,83 @@ V action(T object, Map> allWrappers); > 其他属性都写在源码注释中,可下载源码慢慢查看。 +## `OnceWork`任务句柄详解 + +`Async.work(...)`方法的返回值为`OnceWork`句柄。 + +> 源码里有大量的注释,直接看源码效率更高 + +#### 需要同步等待完成 + +`OnceWork`被返回后,任务并没有同步完成,而是还在运行。 + +如果我们需要同步完成,则调用`awaitFinish`方法即可同步等待。 + +#### 取消任务 + +调用`pleaseCancel`方法可取消任务,以下为示例: + +```java +class Case10 { + private static WorkerWrapperBuilder builder(String id) { + return builder(id, -1L); + } + + private static WorkerWrapperBuilder builder(String id, long sleepTime) { + return WorkerWrapper.builder() + .id(id) + .worker((param, allWrappers) -> { + System.out.println("\twrapper(id=" + id + ") is working"); + if (sleepTime > 0) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return null; + }) + .callback((new ICallback() { + @Override + public void begin() { + System.out.println("wrapper(id=" + id + ") has begin . "); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("\t\twrapper(id=" + id + ") callback " + + (success ? "success " : "fail ") + + ", workResult is " + workResult); + } + })) + .allowInterrupt(true); + } + + /** + * A(10ms) ==> B(10ms) ==> C(10ms) + */ + public static void main(String[] args) throws ExecutionException, InterruptedException { + final WorkerWrapper c; + final WorkerWrapper b; + final WorkerWrapper a = builder("A", 10) + .nextOf(b = builder("B", 10) + .nextOf(c = builder("C", 10).build()) + .build()) + .build(); + final OnceWork onceWork = Async.work(40, a); + Thread.sleep(25); + onceWork.pleaseCancelAndAwaitFinish(); + System.out.println("任务b信息 " + b); + System.out.println("任务c信息 " + c); + System.out.println("OnceWork信息 " + onceWork); + /* + 可以看到C的state为SKIP,workResult.ex为CancelSkippedException,即被取消了。 + 不过有时程序运行慢,导致B被取消了,那么C就不会执行,其状态就为INIT了。 + */ + } +} +``` + ## 设置WorkerWrapper属性 ### 设置依赖策略 @@ -618,7 +683,7 @@ class Case3 { // 这里用线程数较少的线程池做示例,对于ALL_DEPENDENCIES_ANY_SUCCESS“仅需一个”的效果会好一点 ExecutorService pool = Executors.newFixedThreadPool(2); try { - Async.beginWork(1000, pool, a); + Async.work(1000, pool, a).awaitFinish(); } finally { pool.shutdown(); } @@ -849,7 +914,7 @@ class Case4 { } ExecutorService pool = Executors.newFixedThreadPool(2); try { - Async.beginWork(1000, pool, a); + Async.work(1000, pool, a).awaitFinish(); } finally { pool.shutdown(); } @@ -938,7 +1003,7 @@ class Case5 { WorkerWrapper start = builder("start").nextOf(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10).build(); ExecutorService pool = Executors.newFixedThreadPool(2); try { - Async.beginWork(1000, pool, start); + Async.work(1000, pool, start).awaitFinish(); } finally { pool.shutdown(); } @@ -1076,7 +1141,7 @@ class Case6 { .specialToNextWrapper(fromWrapper -> DependenceAction.START_WORK.emptyProperty(), b) .wrapper(b) .end().build(); - Async.beginWork(1000, a); + Async.work(1000, a).awaitFinish(); System.out.println(a.getWorkResult()); System.out.println(b.getWorkResult()); /* 输出: @@ -1181,7 +1246,7 @@ class Case7 { .build(), builder("E", 5).nextOf(d).build() ).build(); - Async.beginWork(1000, a); + Async.work(1000, a).awaitFinish(); /* 输出: wrapper(id=A) is working wrapper(id=E) is working @@ -1302,7 +1367,7 @@ class Case8 { .nextOf(builder("C", 20).build()) .build()) .build(); - Async.beginWork(15, a); + Async.work(20, a).awaitFinish(); /* 输出: wrapper(id=A) has begin . wrapper(id=A) is working @@ -1347,7 +1412,6 @@ public interface WorkerWrapperBuilder { * 总任务超时,但本wrapper在WORKING。 * 单wrapper超时,但本wrapper在WORKING。 * wrapper应当被跳过,但本wrapper在WORKING。 - * 调用`WorkerWrapper#failNow()`方法,且wrapper在WORKING状态。 # 开放工具类 @@ -1361,14 +1425,14 @@ public interface WorkerWrapperBuilder { > > com.jd.platform > asyncTool-openutil -> 1.5.0-SNAPSHOT +> 1.5.1-SNAPSHOT > > > ``` ### 集合类 -> `com.jd.platform.async.openutil.collection.*` +> 普通集合包`com.jd.platform.async.openutil.collection.*` 这里不详述,要用的话源码里有注释。 @@ -1384,255 +1448,7 @@ public interface WorkerWrapperBuilder { ### 其他 -`com.jd.platform.async.openutil` +> `com.jd.platform.async.openutil.*` -* `BiInt` 一个表示两个int值的实体类,内含默认比较器、缓冲区间。 - -# 动态任务调度 - ->引入依赖 -> ->```xml -> -> -> asyncTool-scheduling -> com.jd.platform -> 1.5.0-SNAPSHOT -> ->``` - -动态任务调度,传个json就能让调度工厂随心所欲的构造wrapper与执行。 - -如果您已经熟悉**《任务编排》**这一章的内容,则可以试试这玩意。 - -## 基本组件 - -## 模型属性 - -### 字段属性详解 - -以下是一个传入的json的格式示例,注释中描述了属性含义与格式: - -```json -{ - // 图纸名称。可使用 字符串、null、不写。为null或不写,则会使用uuid - drawingsName :"", - // 提供的全部WorkerWrapper列表。 - // 内部格式请参考《wrappers》这章。 - wrappers:[], - // wrapper顺序关系。 - // 内部格式请参考《relations》这章 - relations: [], - // 任务启动参数 - // 内部格式请参考《beginWork》这章 - beginWork:{} -} -``` - -#### wrappers - -wrappers数组中,只允许存入满足以下格式的对象: - -```json -wrappers:[ - { - // id,传入字符串。 - // 而且:不能与wrappers数组中其他的对象的id属性相同。即必须保证id唯一。 - // 不允许undefined,不允许null。 - "id": "first", - // param,即参数。请看《param》 - // useObjectModel代表value的值是否是“对象模型”。 - // 允许undefined或null,视为{"useObjectModel": false,"value": null} - "param": { - "useObjectModel": false, - "value": "JackMa" - }, - // 传入对象模型,请看《对象模型ObjectModel》 - // 不允许undefined或null - "worker": { - "sameObjectId": 1, - "className:": "schedulingtest.impl.SelectUserByName" - }, - // 传入对象模型 - // 允许undefined与null。如果为两者则使用com.jd.platform.async.callback.DefaultCallback - "callback": { - "sameObjectId": 1 - }, - // wrapper策略 - // 允许undefined与null。如果为两者则使用com.jd.platform.async.wrapper.strategy.WrapperStrategy.DefaultWrapperStrategy - // 即"ALL_DEPENDENCIES_ALL_SUCCESS"与"CHECK_ONE_LEVEL" - "wrapperStrategy": { - // 传入{}键值对,键名为即wrapper的id属性,值为对象模型。 - // 允许undefined和null,两者之意与空键值对{}并无二致 - "dependOnUpWrapperStrategyMapper": null, - // -- 这里不再向历史妥协,舍弃了DependMustStrategyMapper - // 基础策略器,传入对象模型 - // 允许undefined和null,如果是两者则使用"ALL_DEPENDENCIES_ALL_SUCCESS" - "dependenceStrategy": { - "constObjectName": "ALL_DEPENDENCIES_ALL_SUCCESS" - }, - // 跳过策略,传入对象模型 - // 允许undefined和null,如果是两者则使用"CHECK_ONE_LEVEL" - "skipStrategy": { - "constObjectName": "CHECK_ONE_LEVEL" - } - }, - // 是否允许打断,传入boolean值。允许undefined和null,视为false - "allowInterrupt": true, - // 是否启动单wrapper计时,允许undefined和null,视为false - "enableTimeout": true, - // 单wrapper超时时间数值。传入long(int64)值 - // 在enableTimeout为true的情况下不允许为undefined或null或小于等于0的值,否则允许任何值 - "timeoutLength": 50, - // 单wrapper超时时间单位,有以下几个取值:(即java.util.concurrent.TimeUnit的枚举值) - // "NANOSECONDS"、"MICROSECONDS"、"MILLISECONDS" - // "SECONDS"、"MINUTES"、"HOURS"、"DAYS" - // 允许为undefined和null,视为"MILLISECONDS" - // 但是除了undefined、null和以上7个值外,不允许任何值 - "timeoutUnit": "MILLISECONDS" - }, - // 这是第二个wrapper属性,这里可以省略很多选用默认值的属性。 - { - "id": "second", - "param":{ - "useObjectModel": false, - "value":"first" - }, - "worker": { - "className": "schedulingtest.impl.PayTaxes", - "sameObjectId": 2 - }, - callback:{ - "sameObjectId": 2 - } - }, - // 这是第三个属性,这里的"extendConfig"属性是个省力的好东西。 - { - "id": "third", - // 传入存在的id属性 - // 效果是:本wrapper配置将继承此id表示的wrapper配置的所有属性 - // 并且本wrapper可以有选择的覆写配置——只需设置某个对象的值即可。 - // 可以为null或undefined,表示不继承配置 - "extendConfig": "second" - }, - /* , { ... } */ -] -``` - -##### param - -`useObjectModel`属性用于说明`value`属性的所代表的对象类型: - -* 为false:使用json所对应的类型。 -* 为true:使用《对象模型`ObjectModel`》中的我们自定义的对象模型。 - -```json -{ - "useObjectModel": false, - "value": "JackMa" -}, -``` - -```json -{ - "useObjectModel": true, - "value": { - sameObjectId: 3 - } -} -``` - -#### relations - -```json -relation: { - // 可使用 数组(仅当to使用字符串时)、字符串 - // 不允许为null或undefined,如果两个wrapper之间无关系,宁可不写这整个对象。 - from: "", - // 可使用 数组(仅当from使用字符串时)、字符串 - // 不允许为null或undefined,如果两个wrapper之间无关系,宁可不写这整个对象。 - to: "" -} -``` - -`from`和`to`两个属性传入的字符串,必须是在`wrappers`数组中所含有的`id`属性。 - -这两个属性只有在对方为字符串时,才能设置自己为数组,以表示“一对多”关系。 - -#### beginWork - -```json -beginWork: { - // 设置全组超时时间数值,使用long(int64)值 - // 可以传入<=0的值表示不限制超时时间。 - // 允许为null、undefined,视为不限制超时时间。 - timeoutLength: 100, - // 全组超时时间单位,有以下几个取值:(即java.util.concurrent.TimeUnit的枚举值) - // "NANOSECONDS"、"MICROSECONDS"、"MILLISECONDS" - // "SECONDS"、"MINUTES"、"HOURS"、"DAYS" - // 允许为undefined和null,视为"MILLISECONDS" - // 但是除了undefined、null和以上7个值外,不允许任何值 - timeoutUnit: "MILLISECONDS", - // 传入的启动wrapper的id数组,不允许null、undefined。可以为空,但是这不符合常理。 - wrappers: [ - "first" - ], - // 执行的线程池 - // 可以传入"COMMON_POOL",代表使用asyncTool的默认线程池。 - // 可以为null或undefined,视为"COMMON_POOL" - // 也可以以如下格式传入全限定名+字段/方法名字符串,则使用反射调取该字段的所指向的ExecutorService: - // 全限定名#字段名 或 全限定名##方法名 - executor: "COMMON_POOL" -} -``` - -### 其他特殊格式 - -#### 对象模型`ObjectModel` - -* 有时我们需要指定接口实现类,或者是默认的接口实现对象,并自定义这些对象的属性。 - -* 或是指定多个json字段代表一个共同的对象。 - -那么,我们就需要高度自定义此对象的属性,并封装为`com.jd.platform.async.scheduling.model.ObjectModel`类。 - -```json -{ - // 如果需要使用asyncTool指名道姓的常量实现对象,则请见下方《常量对象规范》,并在这里传入名称字符串。 - // 当设置了该属性时,其他的属性均会被忽视。 - "constObjectName": "NOT_SKIP", - // 如果希望指定多个json字段代表一个共同的对象: - // 则将其设为相同的id。 - // 如果设置了该属性,其他的属性均会被忽视。(优先级低于constObjectName) - "sameObjectId": 1, - // 提供类的全限定名字符串,将调用无参构造方法进行初始化 - // 如果constObjectName设置了非null且非undefined值,则此值允许为null或undefined - // 如果sameObjectId设置了非null且非undefined值,则id相同的对象模型中允许且只允许一个值为非null或非undefined,其他的都必须为null或undefined - "className": "your.package.name.YourKlassName", - // 初始化后,会根据该值来修改对象属性 - // 允许为null或undefined,表示不额外设置属性 - - "properties": { - // 其中的键值对为各字段名。这些字段需要有getter、setter方法。 - "myIntegerField": 123123 - } -} -``` - -##### 常量对象大全 - -在`constObjectName`属性中设置,用以下字符串代表如下对象: - -| 对象名字符串 | 常量值 | -| -------------------------------- | ------------------------------------------------------- | -| `"NOT_SKIP"` | `SkipStrategy.NOT_SKIP` | -| `"CHECK_ONE_LEVEL"` | `SkipStrategy.CHECK_ONE_LEVEL` | -| `"ALL_DEPENDENCIES_ALL_SUCCESS"` | `DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS` | -| `"ALL_DEPENDENCIES_ANY_SUCCESS"` | `DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS` | -| `"ALL_DEPENDENCIES_NONE_FAILED"` | ` DependenceStrategy.ALL_DEPENDENCIES_NONE_FAILED` | -| `"SUCCESS_CONTINUE"` | `DependOnUpWrapperStrategy.SUCCESS_CONTINUE` | -| `"SUCCESS_START_INIT_CONTINUE"` | `DependOnUpWrapperStrategy.SUCCESS_START_INIT_CONTINUE` | -| `"PRINT_EXCEPTION_STACK_TRACE"` | `ICallback.PRINT_EXCEPTION_STACK_TRACE` | - -详见请参考`com.jd.platform.async.scheduling.model.Constants`类中的具体代码。 +* `BiInt` 一个表示两个int值的实体类,内含缓冲区间、默认比较器,适合拿来当2维索引。 diff --git a/asyncTool-core/pom.xml b/asyncTool-core/pom.xml index 5ed85e0..0838a91 100644 --- a/asyncTool-core/pom.xml +++ b/asyncTool-core/pom.xml @@ -5,7 +5,7 @@ asyncTool com.jd.platform - 1.5.0-SNAPSHOT + 1.5.1-SNAPSHOT 4.0.0 @@ -20,7 +20,7 @@ com.jd.platform asyncTool-openutil - 1.5.0-SNAPSHOT + 1.5.1-SNAPSHOT diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelSkippedException.java b/asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelSkippedException.java new file mode 100644 index 0000000..5532b74 --- /dev/null +++ b/asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelSkippedException.java @@ -0,0 +1,19 @@ +package com.jd.platform.async.exception; + +/** + * 整组取消,设置该异常。 + * + * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午6:12 + */ +public class CancelSkippedException extends SkippedException { + public CancelSkippedException() { + } + + public CancelSkippedException(String message) { + super(message); + } + + public CancelSkippedException(String message, long skipAt) { + super(message, skipAt); + } +} diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java b/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java index 8d820e0..1c38e7a 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java @@ -1,5 +1,7 @@ package com.jd.platform.async.exception; +import com.jd.platform.async.executor.timer.SystemClock; + /** * 如果任务在执行之前,自己后面的任务已经执行完或正在被执行,则抛该exception * @@ -7,11 +9,22 @@ package com.jd.platform.async.exception; * @version 1.0 */ public class SkippedException extends RuntimeException { + private final long skipAt; + public SkippedException() { - super(); + this(null); } public SkippedException(String message) { + this(message, SystemClock.now()); + } + + public SkippedException(String message, long skipAt) { super(message); + this.skipAt = skipAt; + } + + public long getSkipAt() { + return skipAt; } } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index 3232946..c0c3d88 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -4,15 +4,15 @@ package com.jd.platform.async.executor; import com.jd.platform.async.callback.DefaultGroupCallback; import com.jd.platform.async.callback.IGroupCallback; import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.OnceWork; import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapperGroup; +import com.sun.istack.internal.Nullable; -import java.util.Arrays; -import java.util.Collection; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -26,20 +26,65 @@ public class Async { // ========================= 任务执行核心代码 ========================= /** - * 出发点 - * - * @return 只要执行未超时,就返回true。 + * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。 + * 使用uuid作为工作id。使用{@link #getCommonPool()}作为线程池。 */ - public static boolean beginWork(long timeout, - ExecutorService executorService, - Collection> workerWrappers) - throws InterruptedException { - if (workerWrappers == null || workerWrappers.size() == 0) { - return false; + public static OnceWork work(long timeout, + Collection> workerWrappers) { + return work(timeout, getCommonPool(), workerWrappers); + } + + /** + * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。 + * 可变参式传入。使用uuid作为工作id。使用{@link #getCommonPool()}作为线程池。 + */ + public static OnceWork work(long timeout, + WorkerWrapper... workerWrappers) { + return work(timeout, getCommonPool(), workerWrappers); + } + + /** + * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。 + * 可变参式传入。使用uuid作为工作id。 + */ + public static OnceWork work(long timeout, + ExecutorService executorService, + WorkerWrapper... workerWrappers) { + return work(timeout, executorService, Arrays.asList( + Objects.requireNonNull(workerWrappers, "workerWrappers array is null"))); + } + + /** + * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。 + * 省略工作id,使用uuid。 + */ + public static OnceWork work(long timeout, + ExecutorService executorService, + Collection> workerWrappers) { + return work(timeout, executorService, workerWrappers, UUID.randomUUID().toString()); + } + + /** + * 核心方法。 + * 该方法不是同步阻塞执行的。如果想要同步阻塞执行,则调用返回值的{@link OnceWork#awaitFinish()}即可。 + * + * @param timeout 全组超时时间 + * @param executorService 执行线程池 + * @param workerWrappers 任务容器集合 + * @param workId 本次工作id + * @return 返回 {@link OnceWork}封装对象。 + */ + public static OnceWork work(long timeout, + ExecutorService executorService, + Collection> workerWrappers, + String workId) { + if (workerWrappers == null || workerWrappers.isEmpty()) { + return OnceWork.emptyWork(workId); } //保存上次执行的线程池变量(为了兼容以前的旧功能) - Async.lastExecutorService = Objects.requireNonNull(executorService, "ExecutorService is null ! "); - WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout); + Async.lastExecutorService.set(Objects.requireNonNull(executorService, "ExecutorService is null ! ")); + final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout); + final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId); group.addWrapper(workerWrappers); workerWrappers.forEach(wrapper -> { if (wrapper == null) { @@ -47,38 +92,24 @@ public class Async { } executorService.submit(() -> wrapper.work(executorService, timeout, group)); }); - return group.awaitFinish(); - //处理超时的逻辑被移动到了WrapperEndingInspector中。 + return onceWork; } /** - * 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL + * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 */ - public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper) - throws ExecutionException, InterruptedException { - if (workerWrapper == null || workerWrapper.length == 0) { - return false; - } - Set workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet()); - //noinspection unchecked - return beginWork(timeout, executorService, workerWrappers); - } - - /** - * 同步阻塞,直到所有都完成,或失败 - */ - public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException { - return beginWork(timeout, getCommonPool(), workerWrapper); - } - @SuppressWarnings("unused") + @Deprecated public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper); } /** * 异步执行,直到所有都完成,或失败后,发起回调 + * + * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 */ + @Deprecated public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { if (groupCallback == null) { groupCallback = new DefaultGroupCallback(); @@ -129,7 +160,7 @@ public class Async { * 该线程池将会给线程取名为asyncTool-commonPool-thread-0(数字不重复)。 *

*/ - private static ThreadPoolExecutor COMMON_POOL; + private static volatile ThreadPoolExecutor COMMON_POOL; /** * 在以前(及现在)的版本中: @@ -137,7 +168,7 @@ public class Async { *

* 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个 */ - private static volatile ExecutorService lastExecutorService; + private static final AtomicReference lastExecutorService = new AtomicReference<>(null); /** * 该方法将会返回{@link #COMMON_POOL},如果还未初始化则会懒加载初始化后再返回。 @@ -155,9 +186,11 @@ public class Async { new ThreadFactory() { private final AtomicLong threadCount = new AtomicLong(0); + @SuppressWarnings("NullableProblems") @Override public Thread newThread(Runnable r) { - Thread t = new Thread(r, "asyncTool-commonPool-thread-" + threadCount.getAndIncrement()); + Thread t = new Thread(r, + "asyncTool-commonPool-thread-" + threadCount.getAndIncrement()); t.setDaemon(true); return t; } @@ -189,6 +222,7 @@ public class Async { * @param now 是否立即关闭 * @return 如果尚未调用过{@link #getCommonPool()},即没有初始化默认线程池,返回false。否则返回true。 */ + @SuppressWarnings("unused") public static synchronized boolean shutDownCommonPool(boolean now) { if (COMMON_POOL == null) { return false; @@ -203,6 +237,51 @@ public class Async { return true; } + // ========================= deprecated ========================= + + /** + * 同步执行一次任务。 + * + * @return 只要执行未超时,就返回true。 + * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 + */ + @Deprecated + public static boolean beginWork(long timeout, + ExecutorService executorService, + Collection> workerWrappers) + throws InterruptedException { + final OnceWork work = work(timeout, executorService, workerWrappers); + work.awaitFinish(); + return work.hasTimeout(); + } + + /** + * 同步执行一次任务。 + * 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL + * + * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 + */ + @Deprecated + public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper) + throws ExecutionException, InterruptedException { + if (workerWrapper == null || workerWrapper.length == 0) { + return false; + } + Set workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet()); + //noinspection unchecked + return beginWork(timeout, executorService, workerWrappers); + } + + /** + * 同步阻塞,直到所有都完成,或失败 + * + * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 + */ + @Deprecated + public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException { + return beginWork(timeout, getCommonPool(), workerWrapper); + } + /** * 关闭上次使用的线程池 * @@ -214,8 +293,9 @@ public class Async { */ @Deprecated public static void shutDown() { - if (lastExecutorService != COMMON_POOL) { - shutDown(lastExecutorService); + final ExecutorService last = lastExecutorService.get(); + if (last != COMMON_POOL) { + shutDown(last); } } @@ -226,7 +306,7 @@ public class Async { * @deprecated 没啥用的方法,要关闭线程池还不如直接调用线程池的关闭方法,避免歧义。 */ @Deprecated - public static void shutDown(ExecutorService executorService) { + public static void shutDown(@Nullable ExecutorService executorService) { if (executorService != null) { executorService.shutdown(); } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java index 01778a2..2dc1829 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java @@ -65,7 +65,7 @@ public class PollingCenter { thread.setDaemon(true); return thread; }, - 4, + 1, TimeUnit.MILLISECONDS, 1024); diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java new file mode 100644 index 0000000..611638b --- /dev/null +++ b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java @@ -0,0 +1,375 @@ +package com.jd.platform.async.worker; + +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WorkerWrapperGroup; + +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * 一次工作结果的总接口。 + * + * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午3:22 + */ +public interface OnceWork { + /** + * 返回唯一的workId + */ + String workId(); + + /** + * 判断是否结束。因超时而结束也算结束。 + */ + boolean isFinish(); + + /** + * 同步等待到结束。 + */ + void awaitFinish() throws InterruptedException; + + /** + * 判断是否超时 + * + * @return 如果尚未结束或已结束但未超时,返回false。已结束且已经超时返回true。 + */ + boolean hasTimeout(); + + /** + * 判断是否全部wrapper都处于 执行成功 或 跳过。 + * + * @return 如果已经结束,所有wrapper都成功或跳过返回true,否则返回false。如果尚未结束,返回false。 + */ + default boolean allSuccess() { + if (!isFinish()) { + return false; + } + return getWrappers().values().stream().allMatch(wrapper -> { + final ResultState state = wrapper.getWorkResult().getResultState(); + return state == ResultState.SUCCESS || state == ResultState.DEFAULT; + }); + } + + /** + * 获取全部参与到工作中的wrapper。 + */ + Map> getWrappers(); + + /** + * 获取{@link WorkResult#getResultState()}为{@link ResultState#SUCCESS}的wrapper。 + */ + default Map> getSuccessWrappers() { + return getWrappersOfState(ResultState.SUCCESS); + } + + /** + * 获取状态于这些state中的wrapper。 + * + * @param ofState 状态列表 + * @return 返回Map + */ + default Map> getWrappersOfState(ResultState... ofState) { + final HashSet states = new HashSet<>(Arrays.asList(ofState)); + return getWrappers().entrySet().stream() + .filter(entry -> states.contains(entry.getValue().getWorkResult().getResultState())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + /** + * 获取启动时间 + */ + long getStartTime(); + + /** + * 获取结束时间 + * + * @return 如果超时,返回超时的时刻。如果尚未结束,则抛出异常。 + * @throws IllegalStateException 尚未结束,抛出异常。 + */ + long getFinishTime(); + + /** + * @return 已经取消完成 + */ + boolean isCancelled(); + + /** + * @return 是否正在取消中 + */ + boolean isWaitingCancel(); + + /** + * 请求异步取消。 + */ + void pleaseCancel(); + + /** + * 同步等待取消完成。 + */ + default void pleaseCancelAndAwaitFinish() throws InterruptedException { + if (!isCancelled() && !isWaitingCancel()) { + pleaseCancel(); + } + awaitFinish(); + } + + /** + * @return 返回 {@link AsFuture}封装对象。 + */ + default AsFuture asFuture() { + return new AsFuture(this, limitTime -> limitTime / 16); + } + + /** + * @param sleepCheckInterval 为防止线程爆炸,在{@link Future#get(long, TimeUnit)}方法时使用隔一段时间检查一次。 + * 该Function的参数为总超时毫秒值,返回值为检查时间间隔。 + * @return 返回 {@link AsFuture}封装对象。 + */ + default AsFuture asFuture(Function sleepCheckInterval) { + return new AsFuture(this, sleepCheckInterval); + } + + // static + + /** + * 空任务 + */ + static OnceWork emptyWork(String workId) { + return new EmptyWork(workId); + } + + // class + + class AsFuture implements Future>> { + private final OnceWork onceWork; + private final Function sleepCheckInterval; + + private AsFuture(OnceWork onceWork, Function sleepCheckInterval) { + this.onceWork = onceWork; + this.sleepCheckInterval = sleepCheckInterval; + } + + /** + * 同步等待取消 + * + * @param ignore__mayInterruptIfRunning 该参数将被无视。因为暂未实现“修改允许打断属性”功能。 // todo 等待实现 + */ + @Override + public boolean cancel(boolean ignore__mayInterruptIfRunning) { + try { + onceWork.pleaseCancelAndAwaitFinish(); + } catch (InterruptedException e) { + throw new RuntimeException("", e); + } + return true; + } + + @Override + public boolean isCancelled() { + return onceWork.isCancelled(); + } + + @Override + public boolean isDone() { + return onceWork.isFinish(); + } + + @Override + public Map> get() throws InterruptedException, ExecutionException { + if (!onceWork.isFinish()) { + onceWork.awaitFinish(); + } + return onceWork.getWrappers(); + } + + /** + * 避免线程爆炸,该方法不予单独开线程,而是单线程{@link Thread#sleep(long)}每睡一段时间检查一次。 + */ + @Override + public Map> get(long timeout, + @SuppressWarnings("NullableProblems") TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + final long millis = Objects.requireNonNull(unit).toMillis(timeout); + final long interval = Math.max(1, Math.min(millis, sleepCheckInterval.apply(millis))); + for (int i = 0; interval * i < millis; i++) { + if (onceWork.isFinish()) { + return onceWork.getWrappers(); + } + Thread.sleep(interval); + } + throw new TimeoutException( + "onceWork.asFuture.get(long,TimeUnit) out of time limit(" + + timeout + "," + unit + ") , this is " + this); + } + + @Override + public String toString() { + return "(asFuture from " + this + ")"; + } + } + + abstract class AbstractOnceWork implements OnceWork { + protected final String workId; + + public AbstractOnceWork(String workId) { + this.workId = workId; + } + + @Override + public String workId() { + return workId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof OnceWork)) { + return false; + } + OnceWork _o = (OnceWork) o; + return Objects.equals(_o.workId(), this.workId()); + } + + @Override + public int hashCode() { + return workId().hashCode(); + } + + @Override + public String toString() { + final boolean finish; + final StringBuilder sb = new StringBuilder(48) + .append(this.getClass().getSimpleName()) + .append("{isFinish=").append(finish = isFinish()) + .append(", hasTimeout=").append(hasTimeout()) + .append(", allSuccess=").append(allSuccess()) + .append(", getStartTime=").append(getStartTime()) + .append(", isCancelled=").append(isCancelled()) + .append(", isWaitingCancel=").append(isWaitingCancel()); + if (finish) { + sb.append(", getFinishTime=").append(getFinishTime()); + } + return sb + .append(", wrappers::getId=").append(getWrappers().keySet()) + .append('}').toString(); + } + } + + class Impl extends AbstractOnceWork { + protected final WorkerWrapperGroup group; + + public Impl(WorkerWrapperGroup group, String workId) { + super(workId); + this.group = group; + } + + @Override + public boolean isFinish() { + return group.getEndCDL().getCount() > 0; + } + + @Override + public void awaitFinish() throws InterruptedException { + group.getEndCDL().await(); + } + + @Override + public boolean hasTimeout() { + return group.getAnyTimeout().get(); + } + + @Override + public Map> getWrappers() { + return group.getForParamUseWrappers(); + } + + @Override + public long getStartTime() { + return group.getGroupStartTime(); + } + + @Override + public long getFinishTime() { + if (isFinish()) { + throw new IllegalStateException("work not finish."); + } + return group.getFinishTime(); + } + + @Override + public boolean isCancelled() { + return group.isCancelled(); + } + + @Override + public boolean isWaitingCancel() { + return group.isWaitingCancel(); + } + + @Override + public void pleaseCancel() { + group.pleaseCancel(); + } + } + + class EmptyWork extends AbstractOnceWork { + private final long initTime = SystemClock.now(); + + public EmptyWork(String workId) { + super(workId); + } + + @Override + public boolean isFinish() { + return true; + } + + @Override + public void awaitFinish() { + // do nothing + } + + @Override + public boolean hasTimeout() { + return false; + } + + @Override + public Map> getWrappers() { + return Collections.emptyMap(); + } + + @Override + public long getStartTime() { + return initTime; + } + + @Override + public long getFinishTime() { + return initTime; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isWaitingCancel() { + return false; + } + + @Override + public void pleaseCancel() { + } + + @Override + public String toString() { + return "(it's empty work)"; + } + } +} diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index 64331d4..51fcdcc 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -1,5 +1,6 @@ package com.jd.platform.async.wrapper; +import com.jd.platform.async.exception.CancelSkippedException; import com.jd.platform.async.worker.ResultState; import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.callback.DefaultCallback; @@ -32,7 +33,7 @@ import static com.jd.platform.async.wrapper.WorkerWrapper.State.STARTED; import static com.jd.platform.async.wrapper.WorkerWrapper.State.SUCCESS; import static com.jd.platform.async.wrapper.WorkerWrapper.State.WORKING; import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_all; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_checkTimeoutAllowStates; +import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_beforeWorkingEnd; import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_notWorked; import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_skipOrAfterWork; import static com.jd.platform.async.wrapper.WorkerWrapper.State.*; @@ -96,7 +97,6 @@ public abstract class WorkerWrapper { */ protected final AtomicReference> workResult = new AtomicReference<>(null); - WorkerWrapper(String id, IWorker worker, ICallback callback, @@ -231,6 +231,16 @@ public abstract class WorkerWrapper { } while (true); } + /** + * 直接取消wrapper运行。 + * 如果状态在 {@link State#states_of_beforeWorkingEnd}中,则调用 {@link #fastFail(boolean, Exception, boolean)}。 + */ + public void cancel() { + if (State.setState(state, states_of_beforeWorkingEnd, SKIP, null)) { + fastFail(false, new CancelSkippedException(), true); + } + } + public WrapperStrategy getWrapperStrategy() { return wrapperStrategy; } @@ -298,15 +308,20 @@ public abstract class WorkerWrapper { if (isState(state, BUILDING)) { throw new IllegalStateException("wrapper can't work because state is BUILDING ! wrapper is " + this); } - //总的已经超时了,就快速失败,进行下一个 + // 判断是否整组取消 + if (group.isWaitingCancel() || group.isCancelled()) { + cancel(); + return; + } + // 总的已经超时了,就快速失败,进行下一个 if (remainTime <= 0) { - if (setState(state, states_of_checkTimeoutAllowStates, ERROR, null)) { + if (setState(state, states_of_beforeWorkingEnd, ERROR, null)) { __function__fastFail_callbackResult$false_beginNext.accept(true, null); } return; } - //如果自己已经执行过了。 - //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了 + // 如果自己已经执行过了。 + // 可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了 final AtomicReference oldStateRef = new AtomicReference<>(null); if (!setState(state, states_of_notWorked, STARTED, oldStateRef::set)) { return; @@ -317,7 +332,7 @@ public abstract class WorkerWrapper { callback.begin(); } catch (Exception e) { // callback.begin 发生异常 - if (setState(state, states_of_checkTimeoutAllowStates, ERROR, null)) { + if (setState(state, states_of_beforeWorkingEnd, ERROR, null)) { __function__fastFail_callbackResult$false_beginNext.accept(false, e); } return; @@ -341,7 +356,8 @@ public abstract class WorkerWrapper { } // 如果是由其他wrapper调用而运行至此,则使用策略器决定自己的行为 - DependenceAction.WithProperty judge = wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper); + DependenceAction.WithProperty judge = + wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper); switch (judge.getDependenceAction()) { case TAKE_REST: return; @@ -360,7 +376,9 @@ public abstract class WorkerWrapper { return; case JUDGE_BY_AFTER: default: - throw new Error("策略配置错误,不应当在WorkerWrapper中返回JUDGE_BY_AFTER或其他无效值 : this=" + this + ",fromWrapper=" + fromWrapper); + throw new IllegalStateException( + "策略配置错误,不应当在WorkerWrapper中返回JUDGE_BY_AFTER或其他无效值 : this=" + this + + ",fromWrapper=" + fromWrapper); } } catch (Exception e) { // wrapper本身抛出了不该有的异常 @@ -389,7 +407,6 @@ public abstract class WorkerWrapper { } finally { doWorkingThread.set(null); } - } /** @@ -458,8 +475,6 @@ public abstract class WorkerWrapper { } - // ========== private ========== - // ========== hashcode and equals ========== @Override @@ -645,7 +660,7 @@ public abstract class WorkerWrapper { static final State[] states_of_skipOrAfterWork = new State[]{SKIP, AFTER_WORK}; - static final State[] states_of_checkTimeoutAllowStates = new State[]{INIT, STARTED, WORKING}; + static final State[] states_of_beforeWorkingEnd = new State[]{INIT, STARTED, WORKING}; static final State[] states_all = new State[]{BUILDING, INIT, STARTED, WORKING, AFTER_WORK, SUCCESS, ERROR, SKIP}; diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperGroup.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperGroup.java index e3a0b3e..5ae630b 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperGroup.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperGroup.java @@ -1,5 +1,6 @@ package com.jd.platform.async.wrapper; +import com.jd.platform.async.exception.CancelSkippedException; import com.jd.platform.async.executor.PollingCenter; import java.util.Collection; @@ -8,9 +9,11 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; +import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.openutil.timer.*; /** @@ -35,8 +38,23 @@ public class WorkerWrapperGroup { * 当全部wrapper都调用结束,它会countDown */ private final CountDownLatch endCDL = new CountDownLatch(1); - + /** + * 检测到超时,此标记变量将为true。 + */ private final AtomicBoolean anyTimeout = new AtomicBoolean(false); + /** + * 结束时间 + */ + private volatile long finishTime = -1L; + /** + * 取消任务状态 + * 0 - not cancel , 1 - waiting cancel , 2 - already cancel + */ + private final AtomicInteger cancelState = new AtomicInteger(); + + public static final int NOT_CANCEL = 0; + public static final int WAITING_CANCEL = 1; + public static final int ALREADY_CANCEL = 2; public WorkerWrapperGroup(long groupStartTime, long timeoutLength) { this.groupStartTime = groupStartTime; @@ -47,7 +65,6 @@ public class WorkerWrapperGroup { Objects.requireNonNull(wrapper).forEach(this::addWrapper); } - @SuppressWarnings("unused") public void addWrapper(WorkerWrapper... wrappers) { for (WorkerWrapper wrapper : Objects.requireNonNull(wrappers)) { addWrapper(wrapper); @@ -64,14 +81,33 @@ public class WorkerWrapperGroup { return forParamUseWrappers; } - /** - * 同步等待这组wrapper执行完成 - * - * @return false代表有wrapper超时了。true代表全部wrapper没有超时。 - */ - public boolean awaitFinish() throws InterruptedException { - endCDL.await(); - return !anyTimeout.get(); + public CountDownLatch getEndCDL() { + return endCDL; + } + + public long getGroupStartTime() { + return groupStartTime; + } + + public AtomicBoolean getAnyTimeout() { + return anyTimeout; + } + + public long getFinishTime() { + return finishTime; + } + + public boolean isCancelled() { + return cancelState.get() == ALREADY_CANCEL; + } + + public boolean isWaitingCancel() { + return cancelState.get() == WAITING_CANCEL; + } + + @SuppressWarnings("UnusedReturnValue") + public boolean pleaseCancel() { + return cancelState.compareAndSet(NOT_CANCEL, WAITING_CANCEL); } public class CheckFinishTask implements TimerTask { @@ -85,11 +121,19 @@ public class WorkerWrapperGroup { } AtomicBoolean hasTimeout = new AtomicBoolean(false); // 记录正在运行中的wrapper里,最近的限时时间。 - AtomicLong minDaley = new AtomicLong(Long.MAX_VALUE); + final AtomicLong minDaley = new AtomicLong(Long.MAX_VALUE); final Collection> values = forParamUseWrappers.values(); - final Stream> stream = values.size() > 1024 ? values.parallelStream() : values.stream(); - boolean allFinish = stream - // 处理超时 + final Stream> stream = values.size() > 128 ? values.parallelStream() : values.stream(); + final boolean needCancel = cancelState.get() == WAITING_CANCEL; + boolean allFinish_and_notNeedCancel = stream + // 需要取消的话就取消 + .peek(wrapper -> { + if (needCancel) { + wrapper.cancel(); + } + }) + // 检查超时并保存最近一次限时时间 + // 当需要取消时,才会不断遍历。如果不需要取消,则计算一次(或并行流中多次)就因allMatch不满足而退出了。 .peek(wrapper -> { // time_diff : // -1 -> already timeout ; @@ -102,27 +146,39 @@ public class WorkerWrapperGroup { if (time_diff == 0) { return; } + // use CAS and SPIN for thread safety in parallelStream . do { long getMinDaley = minDaley.get(); - if (getMinDaley <= time_diff || minDaley.compareAndSet(getMinDaley, time_diff)) { - return; + // 需要设置最小时间,但是cas失败,则自旋 + if (getMinDaley <= time_diff && !minDaley.compareAndSet(getMinDaley, time_diff)) { + continue; } + return; } while (true); }) - // 判断是否结束,这里如果还有未结束的wrapper则会提前结束流。 - .allMatch(wrapper -> wrapper.getState().finished()); + // 判断是否不需要取消且全部结束 + // 在不需要取消时,这里如果还有未结束的wrapper则会提前结束流并返回false + // 在需要取消时,会全部遍历一遍并取消掉已经进入链路的wrapper + .allMatch(wrapper -> !needCancel && wrapper.getState().finished()); long getMinDaley = minDaley.get(); + // 如果本次取消掉了任务,或是所有wrapper都已经完成 + // ( ps : 前后两条件在这里是必定 一真一假 或 两者全假 ) + if (needCancel || allFinish_and_notNeedCancel) { + // 如果这次进行了取消,则设置取消状态为已完成 + if (needCancel) { + cancelState.set(ALREADY_CANCEL); + } + anyTimeout.set(hasTimeout.get()); + finishTime = SystemClock.now(); + endCDL.countDown(); + } // 如果有正在运行的wrapper - if (!allFinish) { + else { // 如果有正在WORKING的wrapper,则计算一下限时时间,限时完成后轮询它。 if (getMinDaley != Long.MAX_VALUE) { PollingCenter.getInstance().checkGroup(this, getMinDaley); } } - if (allFinish) { - anyTimeout.set(hasTimeout.get()); - endCDL.countDown(); - } } // hashCode and equals will called WorkerWrapperGroup.this diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/WrapperStrategy.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/WrapperStrategy.java index c1229fc..044c8d9 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/WrapperStrategy.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/WrapperStrategy.java @@ -33,7 +33,9 @@ public interface WrapperStrategy extends DependenceStrategy, SkipStrategy { */ @SuppressWarnings("DeprecatedIsStillUsed") @Deprecated - DependMustStrategyMapper getDependMustStrategyMapper(); + default DependMustStrategyMapper getDependMustStrategyMapper() { + return null; + } /** * 底层全局策略。 diff --git a/asyncTool-core/src/test/java/v15/cases/Case0.java b/asyncTool-core/src/test/java/v15/cases/Case0.java new file mode 100644 index 0000000..52fa6cf --- /dev/null +++ b/asyncTool-core/src/test/java/v15/cases/Case0.java @@ -0,0 +1,37 @@ +package v15.cases; + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WorkerWrapperBuilder; + +import java.util.concurrent.ExecutionException; + +/** + * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午9:10 + */ +class Case0 { + static WorkerWrapperBuilder builder(String id) { + return WorkerWrapper.builder() + .id(id) + .worker((param, allWrappers) -> { + System.out.println("wrapper(id=" + id + ") is working"); + return null; + }); + } + + public static void main(String[] args) { + WorkerWrapper a = builder("A").build(); + WorkerWrapper b = builder("B").build(); + WorkerWrapper c = builder("C").build(); + try { + Async.work(100, a, b, c).awaitFinish(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + /* 输出: + wrapper(id=A) is working + wrapper(id=B) is working + wrapper(id=C) is working + */ + } +} diff --git a/asyncTool-core/src/test/java/v15/cases/Case01.java b/asyncTool-core/src/test/java/v15/cases/Case01.java new file mode 100644 index 0000000..cf1194f --- /dev/null +++ b/asyncTool-core/src/test/java/v15/cases/Case01.java @@ -0,0 +1,37 @@ +package v15.cases; + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WorkerWrapperBuilder; + +/** + * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午9:13 + */ +class Case01 { + static WorkerWrapperBuilder builder(String id) { + return WorkerWrapper.builder() + .id(id) + .worker((param, allWrappers) -> { + System.out.println("wrapper(id=" + id + ") is working"); + return null; + }); + } + + public static void main(String[] args) { + WorkerWrapper a = builder("A").build(); + WorkerWrapper b = builder("B").depends(a).build(); + WorkerWrapper c = builder("C").depends(a).build(); + WorkerWrapper f = builder("F").depends(b, c).build(); + try { + Async.work(100, a).awaitFinish(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + /* 输出: + wrapper(id=A) is working + wrapper(id=C) is working + wrapper(id=B) is working + wrapper(id=F) is working + */ + } +} diff --git a/asyncTool-core/src/test/java/v15/cases/Case02.java b/asyncTool-core/src/test/java/v15/cases/Case02.java new file mode 100644 index 0000000..0c9a65d --- /dev/null +++ b/asyncTool-core/src/test/java/v15/cases/Case02.java @@ -0,0 +1,38 @@ +package v15.cases; + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WorkerWrapperBuilder; + +/** + * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午9:15 + */ +class Case02 { + static WorkerWrapperBuilder builder(String id) { + return WorkerWrapper.builder() + .id(id) + .worker((param, allWrappers) -> { + System.out.println("wrapper(id=" + id + ") is working"); + return null; + }); + } + + public static void main(String[] args) { + WorkerWrapper f = builder("F").build(); + WorkerWrapper a = builder("A") + .nextOf(builder("B").nextOf(f).build()) + .nextOf(builder("C").nextOf(f).build()) + .build(); + try { + Async.work(100, a).awaitFinish(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + /* 输出: + wrapper(id=A) is working + wrapper(id=B) is working + wrapper(id=C) is working + wrapper(id=F) is working + */ + } +} diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index 11075ba..c0a1b74 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -43,8 +43,8 @@ class Case1 { ) .build(); try { - Async.beginWork(1000, a, d); - } catch (ExecutionException | InterruptedException e) { + Async.work(1000, a, d).awaitFinish(); + } catch (InterruptedException e) { e.printStackTrace(); } /* 输出: diff --git a/asyncTool-core/src/test/java/v15/cases/Case10.java b/asyncTool-core/src/test/java/v15/cases/Case10.java new file mode 100644 index 0000000..2f6fcc6 --- /dev/null +++ b/asyncTool-core/src/test/java/v15/cases/Case10.java @@ -0,0 +1,72 @@ +package v15.cases; + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.worker.OnceWork; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WorkerWrapperBuilder; + +import java.util.concurrent.ExecutionException; + +/** + * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/26-下午4:07 + */ +class Case10 { + private static WorkerWrapperBuilder builder(String id) { + return builder(id, -1L); + } + + private static WorkerWrapperBuilder builder(String id, long sleepTime) { + return WorkerWrapper.builder() + .id(id) + .worker((param, allWrappers) -> { + System.out.println("\twrapper(id=" + id + ") is working"); + if (sleepTime > 0) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return null; + }) + .callback((new ICallback() { + @Override + public void begin() { + System.out.println("wrapper(id=" + id + ") has begin . "); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("\t\twrapper(id=" + id + ") callback " + + (success ? "success " : "fail ") + + ", workResult is " + workResult); + } + })) + .allowInterrupt(true); + } + + /** + * A(10ms) ==> B(10ms) ==> C(10ms) + */ + public static void main(String[] args) throws ExecutionException, InterruptedException { + final WorkerWrapper c; + final WorkerWrapper b; + final WorkerWrapper a = builder("A", 10) + .nextOf(b = builder("B", 10) + .nextOf(c = builder("C", 10).build()) + .build()) + .build(); + final OnceWork onceWork = Async.work(40, a); + Thread.sleep(25); + onceWork.pleaseCancelAndAwaitFinish(); + System.out.println("任务b信息 " + b); + System.out.println("任务c信息 " + c); + System.out.println("OnceWork信息 " + onceWork); + /* + 可以看到C的state为SKIP,workResult.ex为CancelSkippedException,即被取消了。 + 不过有时程序运行慢,导致B被取消了,那么C就不会执行,其状态就为INIT了。 + */ + } +} diff --git a/asyncTool-core/src/test/java/v15/cases/Case2.java b/asyncTool-core/src/test/java/v15/cases/Case2.java index bd2758d..1c930c7 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case2.java +++ b/asyncTool-core/src/test/java/v15/cases/Case2.java @@ -46,7 +46,7 @@ class Case2 { .id("id:200").worker(new AddWork()).param(200).build(); WorkerWrapper add = WorkerWrapper.builder().id("id:add") .worker(new AddWork("id:100", "id:200")).depends(wrapper100, wrapper200).build(); - Async.beginWork(20,wrapper100,wrapper200); + Async.work(20,wrapper100,wrapper200).awaitFinish(); System.out.println(add.getWorkResult()); // 输出WorkResult{result=300, resultState=SUCCESS, ex=null} } diff --git a/asyncTool-core/src/test/java/v15/cases/Case3.java b/asyncTool-core/src/test/java/v15/cases/Case3.java index 01e82a5..8c5dc98 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case3.java +++ b/asyncTool-core/src/test/java/v15/cases/Case3.java @@ -45,7 +45,7 @@ class Case3 { // 这里用线程数较少的线程池做示例,对于ALL_DEPENDENCIES_ANY_SUCCESS“仅需一个”的效果会好一点 ExecutorService pool = Executors.newFixedThreadPool(2); try { - Async.beginWork(1000, pool, a); + Async.work(1000, pool, a).awaitFinish(); } finally { pool.shutdown(); } diff --git a/asyncTool-core/src/test/java/v15/cases/Case4.java b/asyncTool-core/src/test/java/v15/cases/Case4.java index 657f7a7..e78a599 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case4.java +++ b/asyncTool-core/src/test/java/v15/cases/Case4.java @@ -51,7 +51,7 @@ class Case4 { } ExecutorService pool = Executors.newFixedThreadPool(2); try { - Async.beginWork(1000, pool, a); + Async.work(1000, pool, a).awaitFinish(); } finally { pool.shutdown(); } diff --git a/asyncTool-core/src/test/java/v15/cases/Case5.java b/asyncTool-core/src/test/java/v15/cases/Case5.java index 7a96e6d..5b3c99b 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case5.java +++ b/asyncTool-core/src/test/java/v15/cases/Case5.java @@ -56,7 +56,7 @@ class Case5 { WorkerWrapper start = builder("start").nextOf(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10).build(); ExecutorService pool = Executors.newFixedThreadPool(2); try { - Async.beginWork(1000, pool, start); + Async.work(1000, pool, start).awaitFinish(); } finally { pool.shutdown(); } diff --git a/asyncTool-core/src/test/java/v15/cases/Case6.java b/asyncTool-core/src/test/java/v15/cases/Case6.java index 2368220..9de0258 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case6.java +++ b/asyncTool-core/src/test/java/v15/cases/Case6.java @@ -46,7 +46,7 @@ class Case6 { .specialToNextWrapper(fromWrapper -> DependenceAction.START_WORK.emptyProperty(), b) .wrapper(b) .end().build(); - Async.beginWork(1000, a); + Async.work(1000, a).awaitFinish(); System.out.println(a.getWorkResult()); System.out.println(b.getWorkResult()); /* 输出: diff --git a/asyncTool-core/src/test/java/v15/cases/Case7.java b/asyncTool-core/src/test/java/v15/cases/Case7.java index 1be8b93..14c2e85 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case7.java +++ b/asyncTool-core/src/test/java/v15/cases/Case7.java @@ -50,7 +50,7 @@ class Case7 { .build(), builder("E", 5).nextOf(d).build() ).build(); - Async.beginWork(1000, a); + Async.work(1000, a).awaitFinish(); /* 输出: wrapper(id=A) is working wrapper(id=E) is working diff --git a/asyncTool-core/src/test/java/v15/cases/Case8.java b/asyncTool-core/src/test/java/v15/cases/Case8.java index c91b83a..bbc5def 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case8.java +++ b/asyncTool-core/src/test/java/v15/cases/Case8.java @@ -55,7 +55,7 @@ class Case8 { .nextOf(builder("C", 20).build()) .build()) .build(); - Async.beginWork(15, a); + Async.work(20, a).awaitFinish(); /* 输出: wrapper(id=A) has begin . wrapper(id=A) is working diff --git a/asyncTool-core/src/test/java/v15/cases/Case9.java b/asyncTool-core/src/test/java/v15/cases/Case9.java index a55fe08..d58c4d7 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case9.java +++ b/asyncTool-core/src/test/java/v15/cases/Case9.java @@ -51,9 +51,15 @@ class Case9 { // System.out.println(graph); - Async.beginWork(200, w1); + Async.work(200, w1).awaitFinish(); System.out.println(" Begin work end .\n w1 : " + w1 + "\n w2 : " + w2 + "\n"); - + /* 输出: + I am IWorker 1 + I am IWorker 2 + Begin work end . + w1 : 省略 + w2 : 省略 + */ } } diff --git a/asyncTool-openutil/pom.xml b/asyncTool-openutil/pom.xml index e5ef877..e0603b8 100644 --- a/asyncTool-openutil/pom.xml +++ b/asyncTool-openutil/pom.xml @@ -5,7 +5,7 @@ asyncTool com.jd.platform - 1.5.0-SNAPSHOT + 1.5.1-SNAPSHOT 4.0.0 diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/BiInt.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/BiInt.java index 5f7c749..5902bd6 100644 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/BiInt.java +++ b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/BiInt.java @@ -3,7 +3,7 @@ package com.jd.platform.async.openutil; import java.util.Comparator; /** - * 两个int值。重写了{@link #hashCode()}与{@link #equals(Object)} + * 两个int值包装类。重写了{@link #hashCode()}与{@link #equals(Object)} * * @author create by TcSnZh on 2021/5/16-上午1:50 */ diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/BiTree.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/BiTree.java deleted file mode 100644 index 0ed4d44..0000000 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/BiTree.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.jd.platform.async.openutil.collection; - -import java.util.Arrays; -import java.util.Collection; -import java.util.function.Function; - -/** - * 二叉树 - * - * @author create by TcSnZh on 2021/5/15-下午8:00 - */ -public interface BiTree extends Tree { - // todo - - /** - * 二叉树节点 - */ - interface BiNode extends Node { - @Override - default BiNode getParent() { - throw new UnsupportedOperationException("Get parent node is not supported."); - } - - BiNode getLeft(); - - BiNode getRight(); - - @Override - default Collection getChildren() { - return Arrays.asList(getLeft(), getRight()); - } - } - - /** - * 返回一个通俗易懂的字符画。 - *

- * 从leetcode上抄的。若有侵权请联系 {@code zh0u.he@qq.com},将会删除。 - * - * @param node 根节点 - * @param provideName 节点显示在图中的名字。 - * @param 根节点泛型 - * @return 返回一个字符画 - */ - static String toPrettyString(N node, Function provideName) { - StringBuilder sb = new StringBuilder(); - //noinspection unchecked - _toPrettyString(node, "", true, sb, (Function) provideName); - return sb.toString(); - } - - /** - * jdk8没有private static,只能加条下划线意思意思了。 - */ - static void _toPrettyString(N node, - String prefix, - boolean isLeft, - StringBuilder sb, - Function provideName) { - if (node == null) { - sb.append("(Empty tree)"); - return; - } - - BiNode right = node.getRight(); - if (right != null) { - _toPrettyString(right, prefix + (isLeft ? "│ " : " "), false, sb, provideName); - } - - sb.append(prefix).append(isLeft ? "└── " : "┌── ").append(provideName.apply(node)).append('\n'); - - BiNode left = node.getLeft(); - if (left != null) { - _toPrettyString(left, prefix + (isLeft ? " " : "│ "), true, sb, provideName); - } - } -} diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/CollisionRangeTable.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/CollisionRangeTable.java deleted file mode 100644 index f5e72dd..0000000 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/CollisionRangeTable.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.jd.platform.async.openutil.collection; - -/** - * 冲突范围表 - * - * @author create by TcSnZh on 2021/5/16-上午1:36 - */ -public class CollisionRangeTable { - // todo -} diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/DirectedGraph.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/DirectedGraph.java index 4f326d2..5ac6dc7 100644 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/DirectedGraph.java +++ b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/DirectedGraph.java @@ -1,9 +1,6 @@ package com.jd.platform.async.openutil.collection; -import java.util.AbstractSet; -import java.util.Collections; -import java.util.Iterator; -import java.util.Set; +import java.util.*; /** * @author create by TcSnZh on 2021/5/16-下午11:27 diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/Tree.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/Tree.java deleted file mode 100644 index b3d6ec3..0000000 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/Tree.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.jd.platform.async.openutil.collection; - -import java.util.Collection; - -/** - * @author create by TcSnZh on 2021/5/15-下午7:58 - */ -public interface Tree { - interface Node { - default Node getParent(){ - throw new UnsupportedOperationException("Get parent node is not supported."); - } - - Collection getChildren(); - } -} diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/WheelIterator.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/WheelIterator.java deleted file mode 100644 index 0c9ad40..0000000 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/collection/WheelIterator.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.jd.platform.async.openutil.collection; - -import java.util.Iterator; - -/** - * 一个反复循环的迭代器 - * - * @author create by TcSnZh on 2021/5/9-下午6:25 - */ -public interface WheelIterator extends Iterator { - @Override - E next(); - - /** - * 一轮的元素数 - */ - int cycle(); - - @Override - default boolean hasNext() { - return cycle() > 0; - } -} diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/concurrent/RangeLock.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/concurrent/RangeLock.java deleted file mode 100644 index 2d92fe2..0000000 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/concurrent/RangeLock.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.jd.platform.async.openutil.concurrent; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -/** - * 范围锁 todo - * - * @author create by TcSnZh on 2021/5/15-下午6:23 - */ -public interface RangeLock extends Lock { - void lock(int start, int end); - - boolean tryLock(int start, int end); - - boolean tryLock(int start, int end, long time, TimeUnit unit) throws InterruptedException; - - void lockInterruptibly(int start, int end) throws InterruptedException; - - @Override - Condition newCondition(); - - @Override - void unlock(); - - @Override - default void lock() { - lock(Integer.MIN_VALUE, Integer.MAX_VALUE); - } - - @Override - default void lockInterruptibly() throws InterruptedException { - lockInterruptibly(Integer.MIN_VALUE, Integer.MAX_VALUE); - } - - @Override - default boolean tryLock() { - return tryLock(Integer.MIN_VALUE, Integer.MAX_VALUE); - } - - @Override - default boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - return tryLock(Integer.MIN_VALUE, Integer.MAX_VALUE, time, unit); - } -} diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/unsafe/UnsafeUtil.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/unsafe/UnsafeUtil.java deleted file mode 100644 index 35ecf2a..0000000 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/unsafe/UnsafeUtil.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.jd.platform.async.openutil.unsafe; - -import sun.misc.Unsafe; - -import java.lang.reflect.Field; - -/** - * @author create by TcSnZh on 2021/5/16-下午4:36 - */ -@SuppressWarnings({"AlibabaAbstractClassShouldStartWithAbstractNaming", "unused"}) -abstract class UnsafeUtil { - private static final Unsafe unsafe; - - static { - Field theUnsafe; - try { - theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); - } catch (NoSuchFieldException e) { - throw new Error(e); - } - theUnsafe.setAccessible(true); - try { - unsafe = (Unsafe) theUnsafe.get(null); - } catch (IllegalAccessException e) { - throw new Error(e); - } - } - - @SuppressWarnings("FinalStaticMethod") - public static final Unsafe getUnsafe() { - return unsafe; - } -} diff --git a/asyncTool-openutil/src/test/java/openutiltest/TestGraph.java b/asyncTool-openutil/src/test/java/openutiltest/TestGraph.java index 533485b..bddf07b 100644 --- a/asyncTool-openutil/src/test/java/openutiltest/TestGraph.java +++ b/asyncTool-openutil/src/test/java/openutiltest/TestGraph.java @@ -10,7 +10,7 @@ import java.util.Arrays; * * @author create by TcSnZh on 2021/5/16-下午11:25 */ -public class TestGraph { +class TestGraph { public static void main(String[] args) { test_CommonDirectedGraph(); } diff --git a/asyncTool-scheduling/pom.xml b/asyncTool-scheduling/pom.xml deleted file mode 100644 index 7561f8d..0000000 --- a/asyncTool-scheduling/pom.xml +++ /dev/null @@ -1,31 +0,0 @@ - - - - asyncTool - com.jd.platform - 1.5.0-SNAPSHOT - - 4.0.0 - - asyncTool-scheduling - - - 8 - 8 - - - - - asyncTool-core - com.jd.platform - 1.5.0-SNAPSHOT - - - com.alibaba - fastjson - 1.2.76 - - - \ No newline at end of file diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/AsyncScheduling.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/AsyncScheduling.java deleted file mode 100644 index 413b609..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/AsyncScheduling.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.jd.platform.async.scheduling; - -/** - * 入口方法类 - * - * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/20-上午12:05 - */ -public class AsyncScheduling { -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/DefaultSchedulingDrawingsParser.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/DefaultSchedulingDrawingsParser.java deleted file mode 100644 index 030804c..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/DefaultSchedulingDrawingsParser.java +++ /dev/null @@ -1,219 +0,0 @@ -package com.jd.platform.async.scheduling.drawings; - -import com.jd.platform.async.scheduling.exception.IllegalSchedulingPropertyException; -import com.jd.platform.async.scheduling.model.ObjectModel; -import com.jd.platform.async.scheduling.model.SchedulingDrawingsModel; - -import java.util.*; -import java.util.function.BiConsumer; -import java.util.function.Supplier; - -/** - * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/20-下午4:38 - */ -class DefaultSchedulingDrawingsParser implements SchedulingDrawingsParser { - static final DefaultSchedulingDrawingsParser instance = new DefaultSchedulingDrawingsParser(); - - @Override - public SchedulingDrawings parseDrawings(final SchedulingDrawingsModel model) throws IllegalSchedulingPropertyException { - final SchedulingDrawingsImpl drawings = new SchedulingDrawingsImpl(); - final String drawingsName = model.getDrawingsName(); - drawings.setName(drawingsName == null ? UUID.randomUUID().toString() : drawingsName); - final List wrappers = - propRequireNotNull(model.getWrappers(), "wrappers"); - // 缓存WrapperModelMap并检查id是否唯一 - final LinkedHashMap wrapperId2ModelMap = - new LinkedHashMap<>(wrappers.size()); - for (int i = 0; i < wrappers.size(); i++) { - final SchedulingDrawingsModel.WrapperModel wrapper = wrappers.get(i); - // 检查id是否重复,并存入map - final String id = propRequireNotNull(wrapper.getId(), "wrappers[" + i + "].id"); - if (null != wrapperId2ModelMap.put(id, wrapper)) { - throw new IllegalSchedulingPropertyException("Wrapper id \"" + id + "\" duplicate ."); - } - } - // 缓存ObjectModelMap并检查 - final Map wrappersObjectModelMap = wrappersObjectModelMap(wrappers, wrapperId2ModelMap.keySet()); - // 构造wrapperFactory - // todo 还没写完 - return null; - } - - // ========== private method ========== - - - // ========== util static methods ========== - - /** - * 缓存sameObjectId有有效值的ObjectModel,并检查: - * 1. ObjectModel参数是否符合格式规范 - * 2. wrapperStrategy.dependOnUpWrapperStrategyMapper 的键id是否存在于wrappers的id属性集合中 - * - * @param wrappers wrappers属性值的列表 - * @param wrappersIdSet 已经包括wrappers列表中所有id属性的Set - * @return 返回Map的键为ObjectModel的sameObjectId,值为ObjectModel - * @throws IllegalSchedulingPropertyException 格式错误,抛出异常 - */ - protected static Map wrappersObjectModelMap( - List wrappers, - Set wrappersIdSet - ) throws IllegalSchedulingPropertyException { - final LinkedHashMap objectModelMap = new LinkedHashMap<>(); - for (final SchedulingDrawingsModel.WrapperModel wrapper : wrappers) { - final String prefixPropName = "wrappers[id=" + wrapper.getId() + "]"; - final SchedulingDrawingsModel.WrapperModel.ParamModel param = wrapper.getParam(); - // 将param参数中的objectModel检查并存入map - if (param != null && Boolean.TRUE.equals( - propRequireNotNull(param.getUseObjectModel(), prefixPropName + ".param.useObjectModel") - )) { - final Object value = param.getValue(); - if (value instanceof ObjectModel) { - checkAndPutObjectModelHasDifferentId( - (ObjectModel) value, - objectModelMap, - prefixPropName + ".param.value"); - } else { - throw IllegalSchedulingPropertyException.illegalFieldParameter( - prefixPropName + ".param.value", - value, - "it should instanceof ObjectModel." - ); - } - } - // 检查并缓存worker的objectModel - final String _workerPropName = prefixPropName + ".worker"; - checkAndPutObjectModelHasDifferentId(propRequireNotNull(wrapper.getWorker(), _workerPropName), - objectModelMap, _workerPropName); - // 检查并缓存callback的objectModel - checkAndPutObjectModelHasDifferentId(wrapper.getCallback(), objectModelMap, prefixPropName + ".callback"); - // 检查wrapperStrategy - final SchedulingDrawingsModel.WrapperModel.WrapperStrategyModel wrapperStrategy = wrapper.getWrapperStrategy(); - if (wrapperStrategy != null) { - // 检查并缓存dependOnUpWrapperStrategyMapper - final Map dependOnUpWrapperStrategyMapper = - wrapperStrategy.getDependOnUpWrapperStrategyMapper(); - if (dependOnUpWrapperStrategyMapper != null) { - for (Map.Entry entry : - dependOnUpWrapperStrategyMapper.entrySet()) { - final String wrapperId = entry.getKey(); - final String mapperPropName = prefixPropName + ".wrapperStrategy.dependOnUpWrapperStrategyMapper"; - if (!wrappersIdSet.contains(wrapperId)) { - throw IllegalSchedulingPropertyException.illegalFieldParameter( - mapperPropName, - dependOnUpWrapperStrategyMapper, - "the key(wrapperId) of \"" + wrapperId + "\n not in wrappers list" - ); - } - checkAndPutObjectModelHasDifferentId( - propRequireNotNull(entry.getValue(), mapperPropName), - objectModelMap, - mapperPropName - ); - } - } - // 检查并缓存dependenceStrategy - final ObjectModel dependenceStrategy = wrapperStrategy.getDependenceStrategy(); - final String dependenceStrategyPropName = prefixPropName + ".wrapperStrategy.dependenceStrategy"; - if (dependenceStrategy != null) { - checkAndPutObjectModelHasDifferentId( - dependenceStrategy, - objectModelMap, - dependenceStrategyPropName - ); - } - // 检查并缓存skipStrategy - final String skipStrategyPropName = prefixPropName + ".wrapperStrategy.skipStrategy"; - final ObjectModel skipStrategy = wrapperStrategy.getSkipStrategy(); - if (skipStrategy != null) { - checkAndPutObjectModelHasDifferentId( - skipStrategy, - objectModelMap, - skipStrategyPropName); - } - } - } - return objectModelMap; - } - - /** - * 本方法为{@link #wrappersObjectModelMap(List, Set)}的子方法。 - * 用于抽取ObjectModel格式的判断逻辑。 - * - * @param objectModel 对象模型 - * @param objectModelMap 要存入的Map - * @param propNameSup 属性名 - * @throws IllegalSchedulingPropertyException 格式错误,抛出异常 - */ - private static void checkAndPutObjectModelHasDifferentId(ObjectModel objectModel, - Map objectModelMap, - Supplier propNameSup) - throws IllegalSchedulingPropertyException { - final String constObjectName = objectModel.getConstObjectName(); - if (constObjectName != null) { - if (!ObjectModel.containConstObject(constObjectName)) { - final String propName; - throw new IllegalSchedulingPropertyException( - (propNameSup == null || (propName = propNameSup.get()) == null ? "" : "Property " + propName + " ") + - "ObjectModel doesn't have a constant object named \"" - + constObjectName + "\" . objectModel is " + objectModel); - } - return; - } - final Long sameObjectId = objectModel.getSameObjectId(); - if (sameObjectId != null) { - final Map properties = objectModel.getProperties(); - if (objectModelMap.containsKey(sameObjectId) && - // 同一id的对象,其他属性不允许重复设置 - (objectModel.getClassName() != null || properties != null && !properties.isEmpty())) { - throw new IllegalSchedulingPropertyException( - "The objectModel which sameObjectId=" + sameObjectId + - " cannot be set \"className\" or \"properties\" again . the two in conflict is " + - objectModel + " and " + objectModelMap.get(sameObjectId) + " ." - ); - } - objectModelMap.put(sameObjectId, objectModel); - return; - } - propRequireNotNull(objectModel.getClassName(), propNameSup.get()); - - } - - private static void checkAndPutObjectModelHasDifferentId(ObjectModel objectModel, - Map objectModelMap, - String propName) - throws IllegalSchedulingPropertyException { - checkAndPutObjectModelHasDifferentId(objectModel, objectModelMap, () -> propName); - } - - /** - * 检查属性是否为null - * - * @param prop 属性值 - * @param propName 属性名 - * @param reason 原因说明。 - * 默认值为{@code "it's not allow null"},用于{@link #propRequireNotNull(Object, String)}与 - * {@link #propRequireNotNull(Object, Supplier)}方法中。 - * @param 属性值泛型 - * @return 返回传入的属性值,供继续调用 - * @throws IllegalSchedulingPropertyException 如果属性为null,抛出异常 - */ - private static T propRequireNotNull(T prop, Supplier propName, Supplier reason) - throws IllegalSchedulingPropertyException { - if (prop == null) { - throw IllegalSchedulingPropertyException.illegalFieldParameter( - propName.get(), - null, - reason.get() - ); - } - return prop; - } - - private static T propRequireNotNull(T prop, String propName) throws IllegalSchedulingPropertyException { - return propRequireNotNull(prop, () -> propName); - } - - private static T propRequireNotNull(T prop, Supplier propName) throws IllegalSchedulingPropertyException { - return propRequireNotNull(prop, propName, () -> "it's not allow null"); - } -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawings.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawings.java deleted file mode 100644 index f0b5b4b..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawings.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.jd.platform.async.scheduling.drawings; - -/** - * 工厂图纸,可从模型转化而来 - * - * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/20-上午12:00 - */ -public interface SchedulingDrawings { - -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawingsImpl.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawingsImpl.java deleted file mode 100644 index 7cf3a4b..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawingsImpl.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.jd.platform.async.scheduling.drawings; - -/** - * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/20-下午4:39 - */ -class SchedulingDrawingsImpl implements SchedulingDrawings{ - protected String name; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawingsParser.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawingsParser.java deleted file mode 100644 index 178f47b..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/drawings/SchedulingDrawingsParser.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.jd.platform.async.scheduling.drawings; - -import com.jd.platform.async.scheduling.exception.IllegalSchedulingPropertyException; -import com.jd.platform.async.scheduling.model.SchedulingDrawingsModel; - -/** - * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/20-上午12:03 - */ -public interface SchedulingDrawingsParser { - /** - * 从模型转化为图纸 - */ - SchedulingDrawings parseDrawings(SchedulingDrawingsModel model) throws IllegalSchedulingPropertyException; - - default SchedulingDrawingsParser getDefaultInstance() { - return DefaultSchedulingDrawingsParser.instance; - } -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/exception/IllegalSchedulingException.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/exception/IllegalSchedulingException.java deleted file mode 100644 index 8a49582..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/exception/IllegalSchedulingException.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.jd.platform.async.scheduling.exception; - -/** - * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/19-下午7:20 - */ -public class IllegalSchedulingException extends Exception{ - public IllegalSchedulingException() { - } - - public IllegalSchedulingException(String message) { - super(message); - } - - public IllegalSchedulingException(String message, Throwable cause) { - super(message, cause); - } - - public IllegalSchedulingException(Throwable cause) { - super(cause); - } - - public IllegalSchedulingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/exception/IllegalSchedulingPropertyException.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/exception/IllegalSchedulingPropertyException.java deleted file mode 100644 index 459c65a..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/exception/IllegalSchedulingPropertyException.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.jd.platform.async.scheduling.exception; - -/** - * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/20-下午4:59 - */ -public class IllegalSchedulingPropertyException extends IllegalSchedulingException { - - public IllegalSchedulingPropertyException(String message) { - super(message); - } - - public IllegalSchedulingPropertyException(String message, Throwable cause) { - super(message, cause); - } - - /** - * json反序列化失败 - * - * @param json json - * @param cause 原因 - * @return 返回异常 - */ - public static IllegalSchedulingPropertyException deserializeJsonFailed(String json, - Throwable cause) { - return new IllegalSchedulingPropertyException( - "Json deserialize to model failed , please check properties and see QuickStart.md . the json is : " + json, - cause - ); - } - - /** - * 如果json反序列化有效,但是不符合规范。 - * - * @param illegalFieldName 无效的属性名 - * @param fieldValue 属性的值 - * @param reason 原因 - * @param cause 引发异常,可以为null - * @return 返回异常 - */ - public static IllegalSchedulingPropertyException illegalFieldParameter(String illegalFieldName, - Object fieldValue, - String reason, - Throwable cause) { - return new IllegalSchedulingPropertyException( - "Property" + illegalFieldName + " does not conform to specification. value is : " + fieldValue - + " . because " + reason, cause - ); - } - - public static IllegalSchedulingPropertyException illegalFieldParameter(String illegalFieldName, - Object fieldValue, - String reason) { - return illegalFieldParameter(illegalFieldName, fieldValue, reason, null); - } -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/factory/AbstractSchedulingFactory.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/factory/AbstractSchedulingFactory.java deleted file mode 100644 index 516b23a..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/factory/AbstractSchedulingFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.jd.platform.async.scheduling.factory; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * 调度工厂。传入图纸生成一组wrapper。 - * - * @author create by TcSnZh on 2021/5/17-上午1:11 - */ -public abstract class AbstractSchedulingFactory { - protected final String factoryName; - - /** - * 无参构造,默认使用 {@code 栈信息<自增long值> } 作为工厂名 - */ - public AbstractSchedulingFactory() { - this(Thread.currentThread().getStackTrace()[2] + "<" + defaultFactoryNameCount.getAndIncrement() + ">"); - } - - /** - * 指定工厂名 - * - * @param factoryName 工厂名 - */ - public AbstractSchedulingFactory(String factoryName) { - this.factoryName = factoryName; - } - - // ========== static ========== - - private static final AtomicLong defaultFactoryNameCount = new AtomicLong(); -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/DefaultSchedulingJsonModelParser.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/DefaultSchedulingJsonModelParser.java deleted file mode 100644 index 6e62218..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/DefaultSchedulingJsonModelParser.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.jd.platform.async.scheduling.model; - -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.parser.Feature; -import com.jd.platform.async.scheduling.exception.IllegalSchedulingPropertyException; - -import static com.alibaba.fastjson.parser.Feature.*; - -/** - * @author create by TcSnZh on 2021/5/17-上午1:22 - */ -class DefaultSchedulingJsonModelParser implements SchedulingJsonModelParser { - // ========== singleton instance ========== - - static final DefaultSchedulingJsonModelParser instance = new DefaultSchedulingJsonModelParser(); - - // ========== public methods ========== - - @Override - public SchedulingDrawingsModel parseToModel(String json) throws IllegalSchedulingPropertyException { - try { - return JSONObject.parseObject(json, SchedulingDrawingsModel.class, DEFAULT_FEATURES); - } catch (Exception e) { - throw IllegalSchedulingPropertyException.deserializeJsonFailed(json, e); - } - } - - static Feature[] DEFAULT_FEATURES = { - AllowComment, AllowUnQuotedFieldNames, AllowSingleQuotes, SafeMode, ErrorOnEnumNotMatch - }; - - // ========== util methods ========== -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/ObjectModel.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/ObjectModel.java deleted file mode 100644 index ea4783c..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/ObjectModel.java +++ /dev/null @@ -1,93 +0,0 @@ -package com.jd.platform.async.scheduling.model; - -import com.jd.platform.async.callback.ICallback; -import com.jd.platform.async.wrapper.strategy.depend.DependOnUpWrapperStrategy; -import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy; -import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy; - -import java.util.HashMap; -import java.util.Map; - -/** - * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/19-下午7:51 - */ -public class ObjectModel { - protected String constObjectName; - protected String className; - protected Long sameObjectId; - protected Map properties; - - public ObjectModel() { - } - - public String getConstObjectName() { - return constObjectName; - } - - public void setConstObjectName(String constObjectName) { - this.constObjectName = constObjectName; - } - - public String getClassName() { - return className; - } - - public void setClassName(String className) { - this.className = className; - } - - public Long getSameObjectId() { - return sameObjectId; - } - - public void setSameObjectId(Long sameObjectId) { - this.sameObjectId = sameObjectId; - } - - public Map getProperties() { - return properties; - } - - public void setProperties(Map properties) { - this.properties = properties; - } - - public static Map getConstObjects() { - return constObjects; - } - - @Override - public String toString() { - return "ObjectModel{" + - "constObjectName='" + constObjectName + '\'' + - ", className='" + className + '\'' + - ", sameObjectId=" + sameObjectId + - ", properties=" + properties + - '}'; - } - - // static constants - - private static final Map constObjects; - - static { - constObjects = new HashMap<>(16); - constObjects.put("NOT_SKIP", SkipStrategy.NOT_SKIP); - constObjects.put("CHECK_ONE_LEVEL", SkipStrategy.CHECK_ONE_LEVEL); - constObjects.put("ALL_DEPENDENCIES_ALL_SUCCESS", DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS); - constObjects.put("ALL_DEPENDENCIES_ANY_SUCCESS", DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS); - constObjects.put("ALL_DEPENDENCIES_NONE_FAILED", DependenceStrategy.ALL_DEPENDENCIES_NONE_FAILED); - constObjects.put("SUCCESS_CONTINUE", DependOnUpWrapperStrategy.SUCCESS_CONTINUE); - constObjects.put("SUCCESS_START_INIT_CONTINUE", DependOnUpWrapperStrategy.SUCCESS_START_INIT_CONTINUE); - constObjects.put("PRINT_EXCEPTION_STACK_TRACE", ICallback.PRINT_EXCEPTION_STACK_TRACE); - } - - public static boolean containConstObject(String name) { - return constObjects.containsKey(name); - } - - public static T getConstObject(String name) { - //noinspection unchecked - return (T) constObjects.get(name); - } -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/SchedulingDrawingsModel.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/SchedulingDrawingsModel.java deleted file mode 100644 index 6899622..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/SchedulingDrawingsModel.java +++ /dev/null @@ -1,300 +0,0 @@ -package com.jd.platform.async.scheduling.model; - -import java.util.List; -import java.util.Map; -import java.util.StringJoiner; -import java.util.concurrent.TimeUnit; - -/** - * 工厂图纸模型对象。 - * 具体参数含义,请查阅 QuickStart.md - * - * @author create by TcSnZh on 2021/5/17-上午1:16 - */ -@SuppressWarnings("unused") -public class SchedulingDrawingsModel { - protected String drawingsName; - protected List wrappers; - protected List relations; - protected BeginWorkModel beginWork; - - public static class WrapperModel { - protected String id; - protected ParamModel param; - protected ObjectModel worker; - protected ObjectModel callback; - protected WrapperStrategyModel wrapperStrategy; - protected Boolean allowInterrupt; - protected Boolean enableTimeout; - protected Long timeoutLength; - protected TimeUnit timeUnit; - protected String extendConfig; - - public static class ParamModel { - protected Boolean useObjectModel; - protected Object value; // true - ObjectModel ; false - the json converted to type, such as Map\List\String... - - public Boolean getUseObjectModel() { - return useObjectModel; - } - - public void setUseObjectModel(Boolean useObjectModel) { - this.useObjectModel = useObjectModel; - } - - public Object getValue() { - return value; - } - - public void setValue(Object value) { - this.value = value; - } - } - - public static class WrapperStrategyModel { - protected Map dependOnUpWrapperStrategyMapper; - protected ObjectModel dependenceStrategy; - protected ObjectModel skipStrategy; - - public Map getDependOnUpWrapperStrategyMapper() { - return dependOnUpWrapperStrategyMapper; - } - - public void setDependOnUpWrapperStrategyMapper(Map dependOnUpWrapperStrategyMapper) { - this.dependOnUpWrapperStrategyMapper = dependOnUpWrapperStrategyMapper; - } - - public ObjectModel getDependenceStrategy() { - return dependenceStrategy; - } - - public void setDependenceStrategy(ObjectModel dependenceStrategy) { - this.dependenceStrategy = dependenceStrategy; - } - - public ObjectModel getSkipStrategy() { - return skipStrategy; - } - - public void setSkipStrategy(ObjectModel skipStrategy) { - this.skipStrategy = skipStrategy; - } - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public ParamModel getParam() { - return param; - } - - public void setParam(ParamModel param) { - this.param = param; - } - - public ObjectModel getWorker() { - return worker; - } - - public void setWorker(ObjectModel worker) { - this.worker = worker; - } - - public ObjectModel getCallback() { - return callback; - } - - public void setCallback(ObjectModel callback) { - this.callback = callback; - } - - public WrapperStrategyModel getWrapperStrategy() { - return wrapperStrategy; - } - - public void setWrapperStrategy(WrapperStrategyModel wrapperStrategy) { - this.wrapperStrategy = wrapperStrategy; - } - - public Boolean getAllowInterrupt() { - return allowInterrupt; - } - - public void setAllowInterrupt(Boolean allowInterrupt) { - this.allowInterrupt = allowInterrupt; - } - - public Boolean getEnableTimeout() { - return enableTimeout; - } - - public void setEnableTimeout(Boolean enableTimeout) { - this.enableTimeout = enableTimeout; - } - - public Long getTimeoutLength() { - return timeoutLength; - } - - public void setTimeoutLength(Long timeoutLength) { - this.timeoutLength = timeoutLength; - } - - public TimeUnit getTimeUnit() { - return timeUnit; - } - - public void setTimeUnit(TimeUnit timeUnit) { - this.timeUnit = timeUnit; - } - - public String getExtendConfig() { - return extendConfig; - } - - public void setExtendConfig(String extendConfig) { - this.extendConfig = extendConfig; - } - - @Override - public String toString() { - return "WrapperModel{" + - "id='" + id + '\'' + - ", param=" + param + - ", worker=" + worker + - ", callback=" + callback + - ", wrapperStrategy=" + wrapperStrategy + - ", allowInterrupt=" + allowInterrupt + - ", enableTimeout=" + enableTimeout + - ", timeoutLength=" + timeoutLength + - ", timeUnit=" + timeUnit + - ", extendConfig='" + extendConfig + '\'' + - '}'; - } - } - - public static class RelationModel { - protected Object from; // from和to最多有一个数组,剩下的都是String - protected Object to; - - public Object getFrom() { - return from; - } - - public void setFrom(Object from) { - this.from = from; - } - - public Object getTo() { - return to; - } - - public void setTo(Object to) { - this.to = to; - } - - @Override - public String toString() { - return "RelationModel{" + - "from=" + from + - ", to=" + to + - '}'; - } - } - - public static class BeginWorkModel { - protected Long timeoutLength; - protected TimeUnit timeoutUnit; - protected List wrappers; - protected String executor; - - public Long getTimeoutLength() { - return timeoutLength; - } - - public void setTimeoutLength(Long timeoutLength) { - this.timeoutLength = timeoutLength; - } - - public TimeUnit getTimeoutUnit() { - return timeoutUnit; - } - - public void setTimeoutUnit(TimeUnit timeoutUnit) { - this.timeoutUnit = timeoutUnit; - } - - public List getWrappers() { - return wrappers; - } - - public void setWrappers(List wrappers) { - this.wrappers = wrappers; - } - - public String getExecutor() { - return executor; - } - - public void setExecutor(String executor) { - this.executor = executor; - } - - @Override - public String toString() { - return "BeginWorkModel{" + - "timeoutLength=" + timeoutLength + - ", timeoutUnit=" + timeoutUnit + - ", wrappers=" + wrappers + - ", executor='" + executor + '\'' + - '}'; - } - } - - public String getDrawingsName() { - return drawingsName; - } - - public void setDrawingsName(String drawingsName) { - this.drawingsName = drawingsName; - } - - public List getWrappers() { - return wrappers; - } - - public void setWrappers(List wrappers) { - this.wrappers = wrappers; - } - - public List getRelations() { - return relations; - } - - public void setRelations(List relations) { - this.relations = relations; - } - - public BeginWorkModel getBeginWork() { - return beginWork; - } - - public void setBeginWork(BeginWorkModel beginWork) { - this.beginWork = beginWork; - } - - @Override - public String toString() { - return "SchedulingDrawingsModel{" + - "drawingsName='" + drawingsName + '\'' + - ", wrappers=" + wrappers + - ", relations=" + relations + - ", beginWork=" + beginWork + - '}'; - } -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/SchedulingJsonModelParser.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/SchedulingJsonModelParser.java deleted file mode 100644 index f3cf69a..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/model/SchedulingJsonModelParser.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.jd.platform.async.scheduling.model; - -import com.jd.platform.async.scheduling.exception.IllegalSchedulingPropertyException; - -/** - * @author create by TcSnZh on 2021/5/17-下午7:22 - */ -public interface SchedulingJsonModelParser { - /** - * 解析json为配置模型对象 - * - * @param json json - * @return 返回图纸对象接口 - */ - SchedulingDrawingsModel parseToModel(String json) throws IllegalSchedulingPropertyException; - - /** - * 默认实现 - */ - static SchedulingJsonModelParser getDefaultInstance() { - return DefaultSchedulingJsonModelParser.instance; - } -} diff --git a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/util/ReflectUtil.java b/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/util/ReflectUtil.java deleted file mode 100644 index 62d1675..0000000 --- a/asyncTool-scheduling/src/main/java/com/jd/platform/async/scheduling/util/ReflectUtil.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.jd.platform.async.scheduling.util; - -/** - * 反射工具类 - * - * @author tcsnzh[zh.jobs@foxmail.com] - */ -@SuppressWarnings("AlibabaAbstractClassShouldStartWithAbstractNaming") -public abstract class ReflectUtil { - private ReflectUtil() { - } - -} diff --git a/asyncTool-scheduling/src/test/java/v15/schedulingtest/FileStringReader.java b/asyncTool-scheduling/src/test/java/v15/schedulingtest/FileStringReader.java deleted file mode 100644 index bda04e1..0000000 --- a/asyncTool-scheduling/src/test/java/v15/schedulingtest/FileStringReader.java +++ /dev/null @@ -1,37 +0,0 @@ -package v15.schedulingtest; - -import java.io.*; -import java.util.Objects; - -/** - * @author create by TcSnZh on 2021/5/17-上午11:34 - */ -public class FileStringReader { - public static String readFile(String resourceClasspath) { - InputStream is = null; - try { - BufferedReader reader = new BufferedReader(new InputStreamReader(Objects.requireNonNull( - is = Objects.requireNonNull( - Thread.currentThread().getContextClassLoader(), "get classLoader is null" - ).getResourceAsStream(resourceClasspath), () -> "get resource " + resourceClasspath + " is null" - ))); - StringWriter stringWriter = new StringWriter(); - char[] buf = new char[4096]; - int len; - while ((len = reader.read(buf)) > 0) { - stringWriter.write(buf, 0, len); - } - return stringWriter.toString(); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - if (is != null) { - try { - is.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } -} diff --git a/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/Case1.java b/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/Case1.java deleted file mode 100644 index 8f40ae1..0000000 --- a/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/Case1.java +++ /dev/null @@ -1,15 +0,0 @@ -package v15.schedulingtest.cases.case1; - -import com.jd.platform.async.scheduling.exception.IllegalSchedulingException; -import com.jd.platform.async.scheduling.model.SchedulingJsonModelParser; -import v15.schedulingtest.FileStringReader; - -/** - * @author create by TcSnZh on 2021/5/17-上午11:49 - */ -class Case1 { - public static void main(String[] args) throws IllegalSchedulingException { - String json = FileStringReader.readFile("v15/case1_1.json"); - System.out.println(SchedulingJsonModelParser.getDefaultInstance().parseToModel(json)); - } -} diff --git a/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/PayTaxes.java b/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/PayTaxes.java deleted file mode 100644 index 71f6e56..0000000 --- a/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/PayTaxes.java +++ /dev/null @@ -1,34 +0,0 @@ -package v15.schedulingtest.cases.case1; - -import com.jd.platform.async.callback.ICallback; -import com.jd.platform.async.callback.IWorker; -import com.jd.platform.async.worker.WorkResult; -import com.jd.platform.async.wrapper.WorkerWrapper; - -import java.util.Map; - -/** - * @author create by TcSnZh on 2021/5/17-上午2:04 - */ -class PayTaxes implements IWorker, ICallback { - /** - * 富人(资产>1亿)收30%的税,穷人发1000块。 - */ - @Override - public User action(String selectUserWrapperId, Map> allWrappers) { - User u = (User) allWrappers.get(selectUserWrapperId).getWorkResult().getResult(); - double money; - if ((money = u.getMoney()) > 1.00 * SelectUserByName.HundredMillion) { - u.setMoney(money * 0.7); - } else { - u.setMoney(money + 1000); - } - return u; - } - - @Override - public void result(boolean success, String ignore, WorkResult workResult) { - User u = workResult.getResult(); - (success ? System.out : System.err).printf("User %s has %.2f yuan .\n", u.getName(), u.getMoney()); - } -} diff --git a/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/PrintParam.java b/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/PrintParam.java deleted file mode 100644 index c46166c..0000000 --- a/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/PrintParam.java +++ /dev/null @@ -1,17 +0,0 @@ -package v15.schedulingtest.cases.case1; - -import com.jd.platform.async.callback.IWorker; -import com.jd.platform.async.wrapper.WorkerWrapper; - -import java.util.Map; - -/** - * @author create by TcSnZh on 2021/5/17-上午2:31 - */ -class PrintParam implements IWorker { - @Override - public Object action(Object object, Map> allWrappers) { - System.out.println("print param : " + object); - return object; - } -} diff --git a/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/SelectUserByName.java b/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/SelectUserByName.java deleted file mode 100644 index b796933..0000000 --- a/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/SelectUserByName.java +++ /dev/null @@ -1,48 +0,0 @@ -package v15.schedulingtest.cases.case1; - -import com.jd.platform.async.callback.ICallback; -import com.jd.platform.async.callback.IWorker; -import com.jd.platform.async.worker.WorkResult; -import com.jd.platform.async.wrapper.WorkerWrapper; -import v15.schedulingtest.cases.case1.User; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author create by TcSnZh on 2021/5/17-上午1:33 - */ -class SelectUserByName implements IWorker, ICallback { - public static final long HundredMillion = 100000000; - private static final Map datasource; - - static { - datasource = new ConcurrentHashMap<>(); - User pony = new User("Pony", 50, 612.0 * HundredMillion); - User jackMa = new User("JackMa", 57, 475.0 * HundredMillion); - User me = new User("tcsnzh", 20, 0.01); - datasource.put(pony.getName(), pony); - datasource.put(jackMa.getName(), jackMa); - datasource.put(me.getName(), me); - } - - private static final AtomicInteger queryCount = new AtomicInteger(); - - /** - * 查询用户 - */ - @Override - public User action(String param, Map> allWrappers) { - return datasource.get(param); - } - - @Override - public void result(boolean success, String param, WorkResult workResult) { - (success ? System.out : System.err).println( - "query[" + queryCount.getAndIncrement() + "] " + - "with parameter " + param + - (success ? " succeeded" : " failed") + - ". workResult is " + workResult); - } -} diff --git a/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/User.java b/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/User.java deleted file mode 100644 index 9a192ee..0000000 --- a/asyncTool-scheduling/src/test/java/v15/schedulingtest/cases/case1/User.java +++ /dev/null @@ -1,52 +0,0 @@ -package v15.schedulingtest.cases.case1; - -/** - * @author create by TcSnZh on 2021/5/17-上午1:33 - */ -class User { - private String name; - private int age; - private double money; - - public User() { - } - - public User(String name, int age, double money) { - this.name = name; - this.age = age; - this.money = money; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public int getAge() { - return age; - } - - public void setAge(int age) { - this.age = age; - } - - public double getMoney() { - return money; - } - - public void setMoney(double money) { - this.money = money; - } - - @Override - public String toString() { - return "User{" + - "name='" + name + '\'' + - ", age=" + age + - ", money=" + money + - '}'; - } -} diff --git a/asyncTool-scheduling/src/test/resources/v15/case1_1.json b/asyncTool-scheduling/src/test/resources/v15/case1_1.json deleted file mode 100644 index cedf0fb..0000000 --- a/asyncTool-scheduling/src/test/resources/v15/case1_1.json +++ /dev/null @@ -1,80 +0,0 @@ -{ - "drawingsName": "case1_1", - "wrappers": [ - { - "id": "first", - "param": { - "useObjectModel": false, - "value": "JackMa" - }, - "worker": { - "sameObjectId": 1, - "className:": "schedulingtest.impl.SelectUserByName" - }, - "callback": { - "sameObjectId": 1 - }, - "wrapperStrategy": { - "dependOnUpWrapperStrategyMapper": null, - "dependenceStrategy": { - "constObjectName": "ALL_DEPENDENCIES_ALL_SUCCESS" - }, - "skipStrategy": { - "constObjectName": "CHECK_ONE_LEVEL" - } - }, - "allowInterrupt": true, - "enableTimeout": true, - "timeoutLength": 50, - "timeoutUnit": "TimeUnit.MILLISECONDS" - }, - { - "id": "second", - "worker": { - "sameObjectId": 2, - "className:": "schedulingtest.impl.SelectUserByName" - }, - "callback": { - "sameObjectId": 2 - } - }, - { - "id": "third", - "__extend": "second" - }, - { - "id": "fourth", - "worker": { - "sameObjectId": 2, - "className:": "schedulingtest.impl.PrintParam" - }, - "wrapperStrategy": { - "dependenceStrategy": { - "constObjectName": "ALL_DEPENDENCIES_ALL_SUCCESS" - } - } - } - ], - "relations": [ - { - "from": "first", - "to": "second" - }, - { - "from": "second", - "to": "fourth" - }, - { - "from": "third", - "to": "fourth" - } - ], - "beginWork": { - "timeoutLength": 100, - "timeoutUnit": "MILLISECONDS", - "wrappers": [ - "first" - ], - "executor": "COMMON_POOL" - } -} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7d4ab74..cdb66dc 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.jd.platform asyncTool pom - 1.5.0-SNAPSHOT + 1.5.1-SNAPSHOT @@ -25,6 +25,5 @@ asyncTool-openutil asyncTool-core - asyncTool-scheduling \ No newline at end of file