From fe62735389a9b35ee31d9bca1bec35d0ccd0ee55 Mon Sep 17 00:00:00 2001 From: wuweifeng10 Date: Tue, 25 Feb 2020 16:15:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BC=96=E6=8E=92=E6=96=B9?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 14 +- .../platform/async/wrapper/WorkerWrapper.java | 98 ++++++-- .../jd/platform/test/parallel/TestPar.java | 227 +++++++++++++++++- 3 files changed, 319 insertions(+), 20 deletions(-) diff --git a/pom.xml b/pom.xml index ebae0e3..a799dc9 100644 --- a/pom.xml +++ b/pom.xml @@ -8,5 +8,17 @@ asyncTool 1.0-SNAPSHOT - + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + + + \ No newline at end of file diff --git a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index c0bb419..43c101a 100755 --- a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -9,10 +9,7 @@ import com.jd.platform.async.worker.ResultState; import com.jd.platform.async.exception.SkippedException; import com.jd.platform.async.worker.WorkResult; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; @@ -171,7 +168,6 @@ public class WorkerWrapper { * 进行下一个任务 */ private void beginNext(ThreadPoolExecutor poolExecutor, long now, long remainTime) { -// System.out.println("now is " + SystemClock.now() + " and thread count : " + getThreadCount()); //花费的时间 long costTime = SystemClock.now() - now; if (nextWrappers == null) { @@ -291,7 +287,6 @@ public class WorkerWrapper { private boolean fastFail(int expect, Exception e) { //试图将它从expect状态,改成Error if (!compareAndSetState(expect, ERROR)) { -// System.out.println("compareAndSetState----------fail"); return false; } @@ -365,16 +360,51 @@ public class WorkerWrapper { } private void addDepend(WorkerWrapper workerWrapper, boolean must) { + addDepend(new DependWrapper(workerWrapper, must)); + } + + private void addDepend(DependWrapper dependWrapper) { if (dependWrappers == null) { dependWrappers = new ArrayList<>(); } //如果依赖的是重复的同一个,就不重复添加了 - for (DependWrapper dependWrapper : dependWrappers) { - if (workerWrapper.equals(dependWrapper.getDependWrapper())) { + for (DependWrapper wrapper : dependWrappers) { + if (wrapper.equals(dependWrapper)) { return; } } - dependWrappers.add(new DependWrapper(workerWrapper, must)); + dependWrappers.add(dependWrapper); + } + + private void addNext(WorkerWrapper workerWrapper) { + if (nextWrappers == null) { + nextWrappers = new ArrayList<>(); + } + //避免添加重复 + for (WorkerWrapper wrapper : nextWrappers) { + if (workerWrapper.equals(wrapper)) { + return; + } + } + nextWrappers.add(workerWrapper); + } + + private void addNextWrappers(List> wrappers) { + if (wrappers == null) { + return; + } + for (WorkerWrapper wrapper : wrappers) { + addNext(wrapper); + } + } + + private void addDependWrappers(List dependWrappers) { + if (dependWrappers == null) { + return; + } + for (DependWrapper wrapper : dependWrappers) { + addDepend(wrapper); + } } private WorkResult defaultResult() { @@ -403,11 +433,6 @@ public class WorkerWrapper { this.needCheckNextWrapperResult = needCheckNextWrapperResult; } - private void setNextWrappers(List> wrappers) { - this.nextWrappers = wrappers; - } - - public static class Builder { /** * worker将来要处理的param @@ -419,6 +444,10 @@ public class WorkerWrapper { * 自己后面的所有 */ private List> nextWrappers; + /** + * 自己依赖的所有 + */ + private List dependWrappers; /** * 存储强依赖于自己的wrapper集合 */ @@ -446,6 +475,32 @@ public class WorkerWrapper { return this; } + public Builder depend(WorkerWrapper... wrappers) { + if (wrappers == null) { + return this; + } + for (WorkerWrapper wrapper : wrappers) { + depend(wrapper); + } + return this; + } + + public Builder depend(WorkerWrapper wrapper) { + return depend(wrapper, true); + } + + public Builder depend(WorkerWrapper wrapper, boolean isMust) { + if (wrapper == null) { + return this; + } + DependWrapper dependWrapper = new DependWrapper(wrapper, isMust); + if (dependWrappers == null) { + dependWrappers = new ArrayList<>(); + } + dependWrappers.add(dependWrapper); + return this; + } + public Builder next(WorkerWrapper wrapper) { return next(wrapper, true); } @@ -471,24 +526,31 @@ public class WorkerWrapper { return this; } for (WorkerWrapper wrapper : wrappers) { - next(wrapper, true); + next(wrapper); } return this; } - public WorkerWrapper build() { WorkerWrapper wrapper = new WorkerWrapper<>(worker, param, callback); wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult); - wrapper.setNextWrappers(nextWrappers); - if (nextWrappers != null && nextWrappers.size() > 0) { + if (dependWrappers != null) { + for (DependWrapper workerWrapper : dependWrappers) { + workerWrapper.getDependWrapper().addNext(wrapper); + wrapper.addDepend(workerWrapper); + } + } + if (nextWrappers != null) { for (WorkerWrapper workerWrapper : nextWrappers) { if (selfIsMustSet != null) { workerWrapper.addDepend(wrapper, selfIsMustSet.contains(workerWrapper)); } + wrapper.addNext(workerWrapper); } } + return wrapper; } + } } diff --git a/src/main/java/com/jd/platform/test/parallel/TestPar.java b/src/main/java/com/jd/platform/test/parallel/TestPar.java index 1a36ac6..d2df0e2 100755 --- a/src/main/java/com/jd/platform/test/parallel/TestPar.java +++ b/src/main/java/com/jd/platform/test/parallel/TestPar.java @@ -18,10 +18,14 @@ public class TestPar { // testNormal(); // testMulti(); +// testMultiReverse(); // testMultiError2(); // testMulti3(); - testMulti4(); +// testMulti3Reverse(); +// testMulti4(); +// testMulti4Reverse(); // testMulti5(); + testMulti5Reverse(); // testMulti6(); // testMulti7(); // testMulti8(); @@ -109,6 +113,47 @@ public class TestPar { Async.shutDown(); } + /** + * 0,2同时开启,1在0后面 + * 0---1 + * 2 + */ + private static void testMultiReverse() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .depend(workerWrapper) + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); + + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(2500, workerWrapper, workerWrapper2); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + Async.shutDown(); + } + /** * 0,2同时开启,1在0后面. 组超时,则0和2成功,1失败 @@ -203,6 +248,60 @@ public class TestPar { Async.shutDown(); } + /** + * 0执行完,同时1和2, 1\2都完成后3 + * 1 + * 0 3 + * 2 + */ + private static void testMulti3Reverse() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); + ParWorker3 w3 = new ParWorker3(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .depend(workerWrapper) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .depend(workerWrapper) + .build(); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .depend(workerWrapper1, workerWrapper2) + .build(); + + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(3100, workerWrapper); +// Async.beginWork(2100, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + + /** * 0执行完,同时1和2, 1\2都完成后3,2耗时2秒,1耗时1秒。3会等待2完成 * 1 @@ -264,6 +363,68 @@ public class TestPar { Async.shutDown(); } + /** + * 0执行完,同时1和2, 1\2都完成后3,2耗时2秒,1耗时1秒。3会等待2完成 + * 1 + * 0 3 + * 2 + * + * 执行结果0,1,2,3 + */ + private static void testMulti4Reverse() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + + ParWorker2 w2 = new ParWorker2(); + w2.setSleepTime(2000); + + ParWorker3 w3 = new ParWorker3(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .build(); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .depend(workerWrapper) + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .depend(workerWrapper) + .next(workerWrapper3) + .build(); + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + //正常完毕 + Async.beginWork(4100, workerWrapper); + //3会超时 +// Async.beginWork(3100, workerWrapper); + //2,3会超时 +// Async.beginWork(2900, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + /** * 0执行完,同时1和2, 1\2 任何一个执行完后,都执行3 * 1 @@ -324,6 +485,70 @@ public class TestPar { Async.shutDown(); } + + /** + * 0执行完,同时1和2, 1\2 任何一个执行完后,都执行3 + * 1 + * 0 3 + * 2 + * + * 则结果是: + * 0,2,3,1 + * 2,3分别是500、400.3执行完毕后,1才执行完 + */ + private static void testMulti5Reverse() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + + ParWorker2 w2 = new ParWorker2(); + w2.setSleepTime(500); + + ParWorker3 w3 = new ParWorker3(); + w3.setSleepTime(400); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .build(); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .depend(workerWrapper, true) + .next(workerWrapper3, false) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .depend(workerWrapper, true) + .next(workerWrapper3, false) + .build(); + + + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + //正常完毕 + Async.beginWork(4100, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + /** * 0执行完,同时1和2, 必须1执行完毕后,才能执行3. 无论2是否领先1完毕,都要等1 * 1