From 52ab114940a6a68ab5719d5360c62d094f11b033 Mon Sep 17 00:00:00 2001 From: wuweifeng10 Date: Mon, 11 May 2020 21:35:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=A5=E5=8F=82=E5=A2=9E=E5=8A=A0=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E5=8F=82=E6=95=B0=EF=BC=8C=E5=8F=AF=E4=BB=A5=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E4=BB=BB=E6=84=8F=E4=B8=80=E4=B8=AA=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=8D=95=E5=85=83=E7=9A=84=E6=89=A7=E8=A1=8C=E7=BB=93=E6=9E=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jd/platform/async/callback/IWorker.java | 6 ++- .../com/jd/platform/async/executor/Async.java | 9 ++-- .../platform/async/wrapper/WorkerWrapper.java | 45 +++++++++++++---- src/test/java/depend/DeWorker.java | 5 +- src/test/java/depend/DeWorker1.java | 5 +- src/test/java/depend/DeWorker2.java | 5 +- src/test/java/depend/Test.java | 11 ++++- src/test/java/dependnew/DeWorker.java | 42 ++++++++++++++++ src/test/java/dependnew/DeWorker1.java | 45 +++++++++++++++++ src/test/java/dependnew/DeWorker2.java | 45 +++++++++++++++++ src/test/java/dependnew/Test.java | 49 +++++++++++++++++++ src/test/java/dependnew/User.java | 29 +++++++++++ src/test/java/parallel/ParTimeoutWorker.java | 5 +- src/test/java/parallel/ParWorker.java | 5 +- src/test/java/parallel/ParWorker1.java | 5 +- src/test/java/parallel/ParWorker2.java | 5 +- src/test/java/parallel/ParWorker3.java | 5 +- src/test/java/parallel/ParWorker4.java | 5 +- src/test/java/seq/SeqTimeoutWorker.java | 5 +- src/test/java/seq/SeqWorker.java | 5 +- src/test/java/seq/SeqWorker1.java | 5 +- src/test/java/seq/SeqWorker2.java | 5 +- 22 files changed, 316 insertions(+), 30 deletions(-) create mode 100755 src/test/java/dependnew/DeWorker.java create mode 100755 src/test/java/dependnew/DeWorker1.java create mode 100755 src/test/java/dependnew/DeWorker2.java create mode 100644 src/test/java/dependnew/Test.java create mode 100644 src/test/java/dependnew/User.java diff --git a/src/main/java/com/jd/platform/async/callback/IWorker.java b/src/main/java/com/jd/platform/async/callback/IWorker.java index 589f591..d7a16dd 100755 --- a/src/main/java/com/jd/platform/async/callback/IWorker.java +++ b/src/main/java/com/jd/platform/async/callback/IWorker.java @@ -1,5 +1,9 @@ package com.jd.platform.async.callback; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + /** * 每个最小执行单元需要实现该接口 * @author wuweifeng wrote on 2019-11-19. @@ -11,7 +15,7 @@ public interface IWorker { * @param object * object */ - V action(T object); + V action(T object, Map allWrappers); /** * 超时、异常时,返回的默认值 diff --git a/src/main/java/com/jd/platform/async/executor/Async.java b/src/main/java/com/jd/platform/async/executor/Async.java index fbb45a4..4fc1766 100644 --- a/src/main/java/com/jd/platform/async/executor/Async.java +++ b/src/main/java/com/jd/platform/async/executor/Async.java @@ -5,10 +5,7 @@ import com.jd.platform.async.callback.DefaultGroupCallback; import com.jd.platform.async.callback.IGroupCallback; import com.jd.platform.async.wrapper.WorkerWrapper; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; @@ -28,10 +25,12 @@ public class Async { if(workerWrappers == null || workerWrappers.size() == 0) { return false; } + //定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result + Map forParamUseWrappers = new ConcurrentHashMap<>(); CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()]; for (int i = 0; i < workerWrappers.size(); i++) { WorkerWrapper wrapper = workerWrappers.get(i); - futures[i] = CompletableFuture.runAsync(() -> wrapper.work(pool, timeout), pool); + futures[i] = CompletableFuture.runAsync(() -> wrapper.work(pool, timeout, forParamUseWrappers), pool); } try { CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS); diff --git a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index 25896a0..50f1c2b 100755 --- a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -3,10 +3,10 @@ package com.jd.platform.async.wrapper; import com.jd.platform.async.callback.DefaultCallback; import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.exception.SkippedException; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.DependWrapper; import com.jd.platform.async.worker.ResultState; -import com.jd.platform.async.exception.SkippedException; import com.jd.platform.async.worker.WorkResult; import java.util.*; @@ -21,6 +21,10 @@ import java.util.concurrent.atomic.AtomicInteger; * @author wuweifeng wrote on 2019-11-19. */ public class WorkerWrapper { + /** + * 该wrapper的唯一标识 + */ + private String id; /** * worker将来要处理的param */ @@ -51,6 +55,10 @@ public class WorkerWrapper { * 1-finish, 2-error, 3-working */ private AtomicInteger state = new AtomicInteger(0); + /** + * 该map存放所有wrapper的id和wrapper映射 + */ + private Map forParamUseWrappers; /** * 也是个钩子变量,用来存临时的结果 */ @@ -70,12 +78,13 @@ public class WorkerWrapper { private static final int WORKING = 3; private static final int INIT = 0; - private WorkerWrapper(IWorker worker, T param, ICallback callback) { + private WorkerWrapper(String id, IWorker worker, T param, ICallback callback) { if (worker == null) { throw new NullPointerException("async.worker is null"); } this.worker = worker; this.param = param; + this.id = id; //允许不设置回调 if (callback == null) { callback = new DefaultCallback<>(); @@ -87,7 +96,10 @@ public class WorkerWrapper { * 开始工作 * fromWrapper代表这次work是由哪个上游wrapper发起的 */ - private void work(ThreadPoolExecutor poolExecutor, WorkerWrapper fromWrapper, long remainTime) { + private void work(ThreadPoolExecutor poolExecutor, WorkerWrapper fromWrapper, long remainTime, Map forParamUseWrappers) { + this.forParamUseWrappers = forParamUseWrappers; + //将自己放到所有wrapper的集合里去 + forParamUseWrappers.put(id, this); long now = SystemClock.now(); //总的已经超时了,就快速失败,进行下一个 if (remainTime <= 0) { @@ -136,8 +148,8 @@ public class WorkerWrapper { } - public void work(ThreadPoolExecutor poolExecutor, long remainTime) { - work(poolExecutor, null, remainTime); + public void work(ThreadPoolExecutor poolExecutor, long remainTime, Map forParamUseWrappers) { + work(poolExecutor, null, remainTime, forParamUseWrappers); } /** @@ -174,14 +186,14 @@ public class WorkerWrapper { return; } if (nextWrappers.size() == 1) { - nextWrappers.get(0).work(poolExecutor, WorkerWrapper.this, remainTime - costTime); + nextWrappers.get(0).work(poolExecutor, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers); return; } CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()]; for (int i = 0; i < nextWrappers.size(); i++) { int finalI = i; futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI) - .work(poolExecutor, WorkerWrapper.this, remainTime - costTime), poolExecutor); + .work(poolExecutor, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), poolExecutor); } try { CompletableFuture.allOf(futures).get(); @@ -320,7 +332,7 @@ public class WorkerWrapper { callback.begin(); //执行耗时操作 - V resultValue = worker.action(param); + V resultValue = worker.action(param, forParamUseWrappers); //如果状态不是在working,说明别的地方已经修改了 if (!compareAndSetState(WORKING, FINISH)) { @@ -425,6 +437,10 @@ public class WorkerWrapper { return state.get(); } + public String getId() { + return id; + } + private boolean compareAndSetState(int expect, int update) { return this.state.compareAndSet(expect, update); } @@ -458,6 +474,10 @@ public class WorkerWrapper { } public static class Builder { + /** + * 该wrapper的唯一标识 + */ + private String id = UUID.randomUUID().toString(); /** * worker将来要处理的param */ @@ -489,6 +509,13 @@ public class WorkerWrapper { return this; } + public Builder id(String id) { + if (id != null) { + this.id = id; + } + return this; + } + public Builder needCheckNextWrapperResult(boolean needCheckNextWrapperResult) { this.needCheckNextWrapperResult = needCheckNextWrapperResult; return this; @@ -556,7 +583,7 @@ public class WorkerWrapper { } public WorkerWrapper build() { - WorkerWrapper wrapper = new WorkerWrapper<>(worker, param, callback); + WorkerWrapper wrapper = new WorkerWrapper<>(id, worker, param, callback); wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult); if (dependWrappers != null) { for (DependWrapper workerWrapper : dependWrappers) { diff --git a/src/test/java/depend/DeWorker.java b/src/test/java/depend/DeWorker.java index b50bfaa..e963816 100755 --- a/src/test/java/depend/DeWorker.java +++ b/src/test/java/depend/DeWorker.java @@ -4,6 +4,9 @@ package depend; import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -11,7 +14,7 @@ import com.jd.platform.async.worker.WorkResult; public class DeWorker implements IWorker, ICallback { @Override - public User action(String object) { + public User action(String object, Map allWrappers) { try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/src/test/java/depend/DeWorker1.java b/src/test/java/depend/DeWorker1.java index f3f4a46..6cafc30 100755 --- a/src/test/java/depend/DeWorker1.java +++ b/src/test/java/depend/DeWorker1.java @@ -4,6 +4,9 @@ package depend; import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -11,7 +14,7 @@ import com.jd.platform.async.worker.WorkResult; public class DeWorker1 implements IWorker, User>, ICallback, User> { @Override - public User action(WorkResult result) { + public User action(WorkResult result, Map allWrappers) { System.out.println("par1的入参来自于par0: " + result.getResult()); try { Thread.sleep(1000); diff --git a/src/test/java/depend/DeWorker2.java b/src/test/java/depend/DeWorker2.java index 1d75375..3dd73e7 100755 --- a/src/test/java/depend/DeWorker2.java +++ b/src/test/java/depend/DeWorker2.java @@ -4,6 +4,9 @@ package depend; import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -11,7 +14,7 @@ import com.jd.platform.async.worker.WorkResult; public class DeWorker2 implements IWorker, String>, ICallback, String> { @Override - public String action(WorkResult result) { + public String action(WorkResult result, Map allWrappers) { System.out.println("par2的入参来自于par1: " + result.getResult()); try { Thread.sleep(1000); diff --git a/src/test/java/depend/Test.java b/src/test/java/depend/Test.java index ec153ee..6d0baf3 100644 --- a/src/test/java/depend/Test.java +++ b/src/test/java/depend/Test.java @@ -22,27 +22,34 @@ public class Test { WorkerWrapper, String> workerWrapper2 = new WorkerWrapper.Builder, String>() .worker(w2) .callback(w2) + .id("third") .build(); WorkerWrapper, User> workerWrapper1 = new WorkerWrapper.Builder, User>() .worker(w1) .callback(w1) + .id("second") .next(workerWrapper2) .build(); WorkerWrapper workerWrapper = new WorkerWrapper.Builder() .worker(w) .param("0") + .id("first") .next(workerWrapper1) .callback(w) .build(); - //虽然尚未执行,但是也可以先取得结果的引用,作为下一个任务的入参 + + //虽然尚未执行,但是也可以先取得结果的引用,作为下一个任务的入参。V1.2前写法,需要手工给 + //V1.3后,不用给wrapper setParam了,直接在worker的action里自行根据id获取即可.参考dependnew包下代码 WorkResult result = workerWrapper.getWorkResult(); WorkResult result1 = workerWrapper1.getWorkResult(); - workerWrapper1.setParam(result); workerWrapper2.setParam(result1); + + + Async.beginWork(3500, workerWrapper); System.out.println(workerWrapper2.getWorkResult()); diff --git a/src/test/java/dependnew/DeWorker.java b/src/test/java/dependnew/DeWorker.java new file mode 100755 index 0000000..6ae011f --- /dev/null +++ b/src/test/java/dependnew/DeWorker.java @@ -0,0 +1,42 @@ +package dependnew; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class DeWorker implements IWorker, ICallback { + + @Override + public User action(String object, Map allWrappers) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new User("user0"); + } + + + @Override + public User defaultValue() { + return new User("default User"); + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("worker0 的结果是:" + workResult.getResult()); + } + +} diff --git a/src/test/java/dependnew/DeWorker1.java b/src/test/java/dependnew/DeWorker1.java new file mode 100755 index 0000000..0a56fdf --- /dev/null +++ b/src/test/java/dependnew/DeWorker1.java @@ -0,0 +1,45 @@ +package dependnew; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class DeWorker1 implements IWorker, ICallback { + + @Override + public User action(String object, Map allWrappers) { + System.out.println("-----------------"); + System.out.println("获取par0的执行结果: " + allWrappers.get("first").getWorkResult()); + System.out.println("取par0的结果作为自己的入参,并将par0的结果加上一些东西"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + User user0 = (User) allWrappers.get("first").getWorkResult().getResult(); + return new User(user0.getName() + " worker1 add"); + } + + @Override + public User defaultValue() { + return new User("default User"); + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("worker1 的结果是:" + workResult.getResult()); + } + +} diff --git a/src/test/java/dependnew/DeWorker2.java b/src/test/java/dependnew/DeWorker2.java new file mode 100755 index 0000000..c4f61bc --- /dev/null +++ b/src/test/java/dependnew/DeWorker2.java @@ -0,0 +1,45 @@ +package dependnew; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class DeWorker2 implements IWorker, ICallback { + + @Override + public String action(User object, Map allWrappers) { + System.out.println("-----------------"); + System.out.println("par1的执行结果是: " + allWrappers.get("second").getWorkResult()); + System.out.println("取par1的结果作为自己的入参,并将par1的结果加上一些东西"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + User user1 = (User) allWrappers.get("second").getWorkResult().getResult(); + return user1.getName() + " worker2 add"; + } + + @Override + public String defaultValue() { + return "default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, User param, WorkResult workResult) { + System.out.println("worker2 的结果是:" + workResult.getResult()); + } + +} diff --git a/src/test/java/dependnew/Test.java b/src/test/java/dependnew/Test.java new file mode 100644 index 0000000..731e42b --- /dev/null +++ b/src/test/java/dependnew/Test.java @@ -0,0 +1,49 @@ +package dependnew; + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.concurrent.ExecutionException; + + +/** + * 后面请求依赖于前面请求的执行结果 + * @author wuweifeng wrote on 2019-12-26 + * @version 1.0 + */ +public class Test { + + public static void main(String[] args) throws ExecutionException, InterruptedException { + DeWorker w = new DeWorker(); + DeWorker1 w1 = new DeWorker1(); + DeWorker2 w2 = new DeWorker2(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .id("third") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .id("second") + .next(workerWrapper2) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .param("0") + .id("first") + .next(workerWrapper1) + .callback(w) + .build(); + + //V1.3后,不用给wrapper setParam了,直接在worker的action里自行根据id获取即可 + + Async.beginWork(3500, workerWrapper); + + System.out.println(workerWrapper2.getWorkResult()); + Async.shutDown(); + } +} diff --git a/src/test/java/dependnew/User.java b/src/test/java/dependnew/User.java new file mode 100644 index 0000000..bbef801 --- /dev/null +++ b/src/test/java/dependnew/User.java @@ -0,0 +1,29 @@ +package dependnew; + +/** + * 一个包装类 + * @author wuweifeng wrote on 2019-12-26 + * @version 1.0 + */ +public class User { + private String name; + + public User(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return "User{" + + "name='" + name + '\'' + + '}'; + } +} diff --git a/src/test/java/parallel/ParTimeoutWorker.java b/src/test/java/parallel/ParTimeoutWorker.java index 3cbf19e..7f7b9aa 100755 --- a/src/test/java/parallel/ParTimeoutWorker.java +++ b/src/test/java/parallel/ParTimeoutWorker.java @@ -5,6 +5,9 @@ import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -12,7 +15,7 @@ import com.jd.platform.async.worker.WorkResult; public class ParTimeoutWorker implements IWorker, ICallback { @Override - public String action(String object) { + public String action(String object, Map allWrappers) { try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/src/test/java/parallel/ParWorker.java b/src/test/java/parallel/ParWorker.java index 59a19b1..b174c51 100755 --- a/src/test/java/parallel/ParWorker.java +++ b/src/test/java/parallel/ParWorker.java @@ -5,6 +5,9 @@ import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -12,7 +15,7 @@ import com.jd.platform.async.worker.WorkResult; public class ParWorker implements IWorker, ICallback { @Override - public String action(String object) { + public String action(String object, Map allWrappers) { try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/src/test/java/parallel/ParWorker1.java b/src/test/java/parallel/ParWorker1.java index 880ad35..7f13081 100755 --- a/src/test/java/parallel/ParWorker1.java +++ b/src/test/java/parallel/ParWorker1.java @@ -5,6 +5,9 @@ import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -17,7 +20,7 @@ public class ParWorker1 implements IWorker, ICallback allWrappers) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { diff --git a/src/test/java/parallel/ParWorker2.java b/src/test/java/parallel/ParWorker2.java index 80a18e5..0e89e45 100755 --- a/src/test/java/parallel/ParWorker2.java +++ b/src/test/java/parallel/ParWorker2.java @@ -5,6 +5,9 @@ import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -17,7 +20,7 @@ public class ParWorker2 implements IWorker, ICallback allWrappers) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { diff --git a/src/test/java/parallel/ParWorker3.java b/src/test/java/parallel/ParWorker3.java index d3b51f3..4284b0f 100755 --- a/src/test/java/parallel/ParWorker3.java +++ b/src/test/java/parallel/ParWorker3.java @@ -5,6 +5,9 @@ import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -16,7 +19,7 @@ public class ParWorker3 implements IWorker, ICallback allWrappers) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { diff --git a/src/test/java/parallel/ParWorker4.java b/src/test/java/parallel/ParWorker4.java index 9c8e8b4..723c5f2 100755 --- a/src/test/java/parallel/ParWorker4.java +++ b/src/test/java/parallel/ParWorker4.java @@ -5,6 +5,9 @@ import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -12,7 +15,7 @@ import com.jd.platform.async.worker.WorkResult; public class ParWorker4 implements IWorker, ICallback { @Override - public String action(String object) { + public String action(String object, Map allWrappers) { try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/src/test/java/seq/SeqTimeoutWorker.java b/src/test/java/seq/SeqTimeoutWorker.java index 256bd1c..0de5e0a 100755 --- a/src/test/java/seq/SeqTimeoutWorker.java +++ b/src/test/java/seq/SeqTimeoutWorker.java @@ -5,6 +5,9 @@ import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -12,7 +15,7 @@ import com.jd.platform.async.worker.WorkResult; public class SeqTimeoutWorker implements IWorker, ICallback { @Override - public String action(String object) { + public String action(String object, Map allWrappers) { try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/src/test/java/seq/SeqWorker.java b/src/test/java/seq/SeqWorker.java index 86b6d98..18c3457 100755 --- a/src/test/java/seq/SeqWorker.java +++ b/src/test/java/seq/SeqWorker.java @@ -5,6 +5,9 @@ import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -12,7 +15,7 @@ import com.jd.platform.async.worker.WorkResult; public class SeqWorker implements IWorker, ICallback { @Override - public String action(String object) { + public String action(String object, Map allWrappers) { try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/src/test/java/seq/SeqWorker1.java b/src/test/java/seq/SeqWorker1.java index df680a3..ae445c6 100755 --- a/src/test/java/seq/SeqWorker1.java +++ b/src/test/java/seq/SeqWorker1.java @@ -5,6 +5,9 @@ import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -12,7 +15,7 @@ import com.jd.platform.async.worker.WorkResult; public class SeqWorker1 implements IWorker, ICallback { @Override - public String action(String object) { + public String action(String object, Map allWrappers) { try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/src/test/java/seq/SeqWorker2.java b/src/test/java/seq/SeqWorker2.java index 652cd89..34853ee 100755 --- a/src/test/java/seq/SeqWorker2.java +++ b/src/test/java/seq/SeqWorker2.java @@ -5,6 +5,9 @@ import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; /** * @author wuweifeng wrote on 2019-11-20. @@ -12,7 +15,7 @@ import com.jd.platform.async.worker.WorkResult; public class SeqWorker2 implements IWorker, ICallback { @Override - public String action(String object) { + public String action(String object, Map allWrappers) { try { Thread.sleep(1000); } catch (InterruptedException e) {