diff --git a/pom.xml b/pom.xml
index ebae0e3..a799dc9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,5 +8,17 @@
asyncTool
1.0-SNAPSHOT
-
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.3
+
+ 1.8
+ 1.8
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java
index c0bb419..43c101a 100755
--- a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java
+++ b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java
@@ -9,10 +9,7 @@ import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.worker.WorkResult;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
@@ -171,7 +168,6 @@ public class WorkerWrapper {
* 进行下一个任务
*/
private void beginNext(ThreadPoolExecutor poolExecutor, long now, long remainTime) {
-// System.out.println("now is " + SystemClock.now() + " and thread count : " + getThreadCount());
//花费的时间
long costTime = SystemClock.now() - now;
if (nextWrappers == null) {
@@ -291,7 +287,6 @@ public class WorkerWrapper {
private boolean fastFail(int expect, Exception e) {
//试图将它从expect状态,改成Error
if (!compareAndSetState(expect, ERROR)) {
-// System.out.println("compareAndSetState----------fail");
return false;
}
@@ -365,16 +360,51 @@ public class WorkerWrapper {
}
private void addDepend(WorkerWrapper, ?> workerWrapper, boolean must) {
+ addDepend(new DependWrapper(workerWrapper, must));
+ }
+
+ private void addDepend(DependWrapper dependWrapper) {
if (dependWrappers == null) {
dependWrappers = new ArrayList<>();
}
//如果依赖的是重复的同一个,就不重复添加了
- for (DependWrapper dependWrapper : dependWrappers) {
- if (workerWrapper.equals(dependWrapper.getDependWrapper())) {
+ for (DependWrapper wrapper : dependWrappers) {
+ if (wrapper.equals(dependWrapper)) {
return;
}
}
- dependWrappers.add(new DependWrapper(workerWrapper, must));
+ dependWrappers.add(dependWrapper);
+ }
+
+ private void addNext(WorkerWrapper, ?> workerWrapper) {
+ if (nextWrappers == null) {
+ nextWrappers = new ArrayList<>();
+ }
+ //避免添加重复
+ for (WorkerWrapper wrapper : nextWrappers) {
+ if (workerWrapper.equals(wrapper)) {
+ return;
+ }
+ }
+ nextWrappers.add(workerWrapper);
+ }
+
+ private void addNextWrappers(List> wrappers) {
+ if (wrappers == null) {
+ return;
+ }
+ for (WorkerWrapper, ?> wrapper : wrappers) {
+ addNext(wrapper);
+ }
+ }
+
+ private void addDependWrappers(List dependWrappers) {
+ if (dependWrappers == null) {
+ return;
+ }
+ for (DependWrapper wrapper : dependWrappers) {
+ addDepend(wrapper);
+ }
}
private WorkResult defaultResult() {
@@ -403,11 +433,6 @@ public class WorkerWrapper {
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
}
- private void setNextWrappers(List> wrappers) {
- this.nextWrappers = wrappers;
- }
-
-
public static class Builder {
/**
* worker将来要处理的param
@@ -419,6 +444,10 @@ public class WorkerWrapper {
* 自己后面的所有
*/
private List> nextWrappers;
+ /**
+ * 自己依赖的所有
+ */
+ private List dependWrappers;
/**
* 存储强依赖于自己的wrapper集合
*/
@@ -446,6 +475,32 @@ public class WorkerWrapper {
return this;
}
+ public Builder depend(WorkerWrapper, ?>... wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ for (WorkerWrapper, ?> wrapper : wrappers) {
+ depend(wrapper);
+ }
+ return this;
+ }
+
+ public Builder depend(WorkerWrapper, ?> wrapper) {
+ return depend(wrapper, true);
+ }
+
+ public Builder depend(WorkerWrapper, ?> wrapper, boolean isMust) {
+ if (wrapper == null) {
+ return this;
+ }
+ DependWrapper dependWrapper = new DependWrapper(wrapper, isMust);
+ if (dependWrappers == null) {
+ dependWrappers = new ArrayList<>();
+ }
+ dependWrappers.add(dependWrapper);
+ return this;
+ }
+
public Builder next(WorkerWrapper, ?> wrapper) {
return next(wrapper, true);
}
@@ -471,24 +526,31 @@ public class WorkerWrapper {
return this;
}
for (WorkerWrapper, ?> wrapper : wrappers) {
- next(wrapper, true);
+ next(wrapper);
}
return this;
}
-
public WorkerWrapper build() {
WorkerWrapper wrapper = new WorkerWrapper<>(worker, param, callback);
wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult);
- wrapper.setNextWrappers(nextWrappers);
- if (nextWrappers != null && nextWrappers.size() > 0) {
+ if (dependWrappers != null) {
+ for (DependWrapper workerWrapper : dependWrappers) {
+ workerWrapper.getDependWrapper().addNext(wrapper);
+ wrapper.addDepend(workerWrapper);
+ }
+ }
+ if (nextWrappers != null) {
for (WorkerWrapper, ?> workerWrapper : nextWrappers) {
if (selfIsMustSet != null) {
workerWrapper.addDepend(wrapper, selfIsMustSet.contains(workerWrapper));
}
+ wrapper.addNext(workerWrapper);
}
}
+
return wrapper;
}
+
}
}
diff --git a/src/main/java/com/jd/platform/test/parallel/TestPar.java b/src/main/java/com/jd/platform/test/parallel/TestPar.java
index 1a36ac6..d2df0e2 100755
--- a/src/main/java/com/jd/platform/test/parallel/TestPar.java
+++ b/src/main/java/com/jd/platform/test/parallel/TestPar.java
@@ -18,10 +18,14 @@ public class TestPar {
// testNormal();
// testMulti();
+// testMultiReverse();
// testMultiError2();
// testMulti3();
- testMulti4();
+// testMulti3Reverse();
+// testMulti4();
+// testMulti4Reverse();
// testMulti5();
+ testMulti5Reverse();
// testMulti6();
// testMulti7();
// testMulti8();
@@ -109,6 +113,47 @@ public class TestPar {
Async.shutDown();
}
+ /**
+ * 0,2同时开启,1在0后面
+ * 0---1
+ * 2
+ */
+ private static void testMultiReverse() throws ExecutionException, InterruptedException {
+ ParWorker w = new ParWorker();
+ ParWorker1 w1 = new ParWorker1();
+ ParWorker2 w2 = new ParWorker2();
+
+ WorkerWrapper workerWrapper = new WorkerWrapper.Builder()
+ .worker(w)
+ .callback(w)
+ .param("0")
+ .build();
+
+ WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder()
+ .worker(w1)
+ .callback(w1)
+ .param("1")
+ .depend(workerWrapper)
+ .build();
+
+ WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder()
+ .worker(w2)
+ .callback(w2)
+ .param("2")
+ .build();
+
+
+ long now = SystemClock.now();
+ System.out.println("begin-" + now);
+
+ Async.beginWork(2500, workerWrapper, workerWrapper2);
+
+ System.out.println("end-" + SystemClock.now());
+ System.err.println("cost-" + (SystemClock.now() - now));
+
+ Async.shutDown();
+ }
+
/**
* 0,2同时开启,1在0后面. 组超时,则0和2成功,1失败
@@ -203,6 +248,60 @@ public class TestPar {
Async.shutDown();
}
+ /**
+ * 0执行完,同时1和2, 1\2都完成后3
+ * 1
+ * 0 3
+ * 2
+ */
+ private static void testMulti3Reverse() throws ExecutionException, InterruptedException {
+ ParWorker w = new ParWorker();
+ ParWorker1 w1 = new ParWorker1();
+ ParWorker2 w2 = new ParWorker2();
+ ParWorker3 w3 = new ParWorker3();
+
+ WorkerWrapper workerWrapper = new WorkerWrapper.Builder()
+ .worker(w)
+ .callback(w)
+ .param("0")
+ .build();
+
+ WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder()
+ .worker(w2)
+ .callback(w2)
+ .param("2")
+ .depend(workerWrapper)
+ .build();
+
+ WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder()
+ .worker(w1)
+ .callback(w1)
+ .param("1")
+ .depend(workerWrapper)
+ .build();
+
+ WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder()
+ .worker(w3)
+ .callback(w3)
+ .param("3")
+ .depend(workerWrapper1, workerWrapper2)
+ .build();
+
+
+ long now = SystemClock.now();
+ System.out.println("begin-" + now);
+
+ Async.beginWork(3100, workerWrapper);
+// Async.beginWork(2100, workerWrapper);
+
+ System.out.println("end-" + SystemClock.now());
+ System.err.println("cost-" + (SystemClock.now() - now));
+
+ System.out.println(Async.getThreadCount());
+ Async.shutDown();
+ }
+
+
/**
* 0执行完,同时1和2, 1\2都完成后3,2耗时2秒,1耗时1秒。3会等待2完成
* 1
@@ -264,6 +363,68 @@ public class TestPar {
Async.shutDown();
}
+ /**
+ * 0执行完,同时1和2, 1\2都完成后3,2耗时2秒,1耗时1秒。3会等待2完成
+ * 1
+ * 0 3
+ * 2
+ *
+ * 执行结果0,1,2,3
+ */
+ private static void testMulti4Reverse() throws ExecutionException, InterruptedException {
+ ParWorker w = new ParWorker();
+ ParWorker1 w1 = new ParWorker1();
+
+ ParWorker2 w2 = new ParWorker2();
+ w2.setSleepTime(2000);
+
+ ParWorker3 w3 = new ParWorker3();
+
+ WorkerWrapper workerWrapper = new WorkerWrapper.Builder()
+ .worker(w)
+ .callback(w)
+ .param("0")
+ .build();
+
+ WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder()
+ .worker(w3)
+ .callback(w3)
+ .param("3")
+ .build();
+
+ WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder()
+ .worker(w2)
+ .callback(w2)
+ .param("2")
+ .depend(workerWrapper)
+ .next(workerWrapper3)
+ .build();
+
+ WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder()
+ .worker(w1)
+ .callback(w1)
+ .param("1")
+ .depend(workerWrapper)
+ .next(workerWrapper3)
+ .build();
+
+ long now = SystemClock.now();
+ System.out.println("begin-" + now);
+
+ //正常完毕
+ Async.beginWork(4100, workerWrapper);
+ //3会超时
+// Async.beginWork(3100, workerWrapper);
+ //2,3会超时
+// Async.beginWork(2900, workerWrapper);
+
+ System.out.println("end-" + SystemClock.now());
+ System.err.println("cost-" + (SystemClock.now() - now));
+
+ System.out.println(Async.getThreadCount());
+ Async.shutDown();
+ }
+
/**
* 0执行完,同时1和2, 1\2 任何一个执行完后,都执行3
* 1
@@ -324,6 +485,70 @@ public class TestPar {
Async.shutDown();
}
+
+ /**
+ * 0执行完,同时1和2, 1\2 任何一个执行完后,都执行3
+ * 1
+ * 0 3
+ * 2
+ *
+ * 则结果是:
+ * 0,2,3,1
+ * 2,3分别是500、400.3执行完毕后,1才执行完
+ */
+ private static void testMulti5Reverse() throws ExecutionException, InterruptedException {
+ ParWorker w = new ParWorker();
+ ParWorker1 w1 = new ParWorker1();
+
+ ParWorker2 w2 = new ParWorker2();
+ w2.setSleepTime(500);
+
+ ParWorker3 w3 = new ParWorker3();
+ w3.setSleepTime(400);
+
+ WorkerWrapper workerWrapper = new WorkerWrapper.Builder()
+ .worker(w)
+ .callback(w)
+ .param("0")
+ .build();
+
+ WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder()
+ .worker(w3)
+ .callback(w3)
+ .param("3")
+ .build();
+
+ WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder()
+ .worker(w2)
+ .callback(w2)
+ .param("2")
+ .depend(workerWrapper, true)
+ .next(workerWrapper3, false)
+ .build();
+
+ WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder()
+ .worker(w1)
+ .callback(w1)
+ .param("1")
+ .depend(workerWrapper, true)
+ .next(workerWrapper3, false)
+ .build();
+
+
+
+ long now = SystemClock.now();
+ System.out.println("begin-" + now);
+
+ //正常完毕
+ Async.beginWork(4100, workerWrapper);
+
+ System.out.println("end-" + SystemClock.now());
+ System.err.println("cost-" + (SystemClock.now() - now));
+
+ System.out.println(Async.getThreadCount());
+ Async.shutDown();
+ }
+
/**
* 0执行完,同时1和2, 必须1执行完毕后,才能执行3. 无论2是否领先1完毕,都要等1
* 1