This commit is contained in:
wuweifeng10 2019-12-25 13:00:58 +08:00
commit 471e4fc0d6
31 changed files with 2083 additions and 0 deletions

31
.gitignore vendored Normal file
View File

@ -0,0 +1,31 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/

12
pom.xml Normal file
View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tianyalei</groupId>
<artifactId>asyncTool</artifactId>
<version>1.0-SNAPSHOT</version>
</project>

View File

@ -0,0 +1,20 @@
package com.tianyalei.async.callback;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-19.
*/
public class DefaultCallback<T, V> implements ICallback<T, V> {
@Override
public void begin() {
}
@Override
public void result(boolean success, T param, WorkResult<V> workResult) {
}
}

View File

@ -0,0 +1,20 @@
package com.tianyalei.async.callback;
import com.tianyalei.async.worker.WorkResult;
/**
* 每个执行单元执行完毕后会回调该接口</p>
* 需要监听执行结果的实现该接口即可
* @author wuweifeng wrote on 2019-11-19.
*/
public interface ICallback<T, V> {
void begin();
/**
* 耗时操作执行完毕后就给value注入值
*
*/
void result(boolean success, T param, WorkResult<V> workResult);
}

View File

@ -0,0 +1,12 @@
package com.tianyalei.async.callback;
import java.util.List;
/**
* @author wuweifeng wrote on 2019-11-19.
*/
public interface IGroupCallback {
void success(List<?> result);
void failure(Exception e);
}

View File

@ -0,0 +1,20 @@
package com.tianyalei.async.callback;
/**
* @author wuweifeng wrote on 2019-12-20
* @version 1.0
*/
public interface ITimeoutWorker<T, V> extends IWorker<T, V> {
/**
* 每个worker都可以设置超时时间
* @return 毫秒超时时间
*/
long timeOut();
/**
* 是否开启单个执行单元的超时功能有时是一个group设置个超时而不具备关心单个worker的超时
* <p>注意如果开启了单个执行单元的超时检测将使线程池数量多出一倍</p>
* @return 是否开启
*/
boolean enableTimeOut();
}

View File

@ -0,0 +1,21 @@
package com.tianyalei.async.callback;
/**
* 每个最小执行单元需要实现该接口
* @author wuweifeng wrote on 2019-11-19.
*/
public interface IWorker<T, V> {
/**
* 在这里做耗时操作如rpc请求IO等
*
* @param object
* object
*/
V action(T object);
/**
* 超时异常时返回的默认值
* @return 默认值
*/
V defaultValue();
}

View File

@ -0,0 +1,78 @@
package com.tianyalei.async.executor;
import com.tianyalei.async.group.WorkerWrapper;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @author wuweifeng wrote on 2019-12-18
* @version 1.0
*/
public class Async {
public static final ThreadPoolExecutor COMMON_POOL =
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024,
15L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
(ThreadFactory) Thread::new);
public static void beginWork(long timeout, ThreadPoolExecutor pool, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
if(workerWrapper == null || workerWrapper.length == 0) {
return;
}
List<WorkerWrapper> workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toList());
CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
for (int i = 0; i < workerWrappers.size(); i++) {
WorkerWrapper wrapper = workerWrappers.get(i);
futures[i] = CompletableFuture.runAsync(() -> wrapper.work(COMMON_POOL, timeout), COMMON_POOL);
}
try {
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
Set<WorkerWrapper> set = new HashSet<>();
totalWorkers(workerWrappers, set);
for (WorkerWrapper wrapper : set) {
wrapper.stopNow();
}
}
}
/**
* 同步阻塞,直到所有都完成,或失败
*/
public static void beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
beginWork(timeout, COMMON_POOL, workerWrapper);
}
/**
* 总共多少个执行单元
*/
private static void totalWorkers(List<WorkerWrapper> workerWrappers, Set<WorkerWrapper> set) {
set.addAll(workerWrappers);
for (WorkerWrapper wrapper : workerWrappers) {
if (wrapper.getNextWrappers() == null) {
continue;
}
List<WorkerWrapper> wrappers = wrapper.getNextWrappers();
totalWorkers(wrappers, set);
}
}
public static void shutDown() {
COMMON_POOL.shutdown();
}
public static String getThreadCount() {
return "activeCount=" + COMMON_POOL.getActiveCount() +
" completedCount " + COMMON_POOL.getCompletedTaskCount() +
" largestCount " + COMMON_POOL.getLargestPoolSize();
}
}

View File

@ -0,0 +1,55 @@
package com.tianyalei.async.executor.timer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 用于解决高并发下System.currentTimeMillis卡顿
* @author lry
*/
public class SystemClock {
private final int period;
private final AtomicLong now;
private static class InstanceHolder {
private static final SystemClock INSTANCE = new SystemClock(1);
}
private SystemClock(int period) {
this.period = period;
this.now = new AtomicLong(System.currentTimeMillis());
scheduleClockUpdating();
}
private static SystemClock instance() {
return InstanceHolder.INSTANCE;
}
private void scheduleClockUpdating() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "System Clock");
thread.setDaemon(true);
return thread;
}
});
scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
}
private long currentTimeMillis() {
return now.get();
}
/**
* 用来替换原来的System.currentTimeMillis()
*/
public static long now() {
return instance().currentTimeMillis();
}
}

View File

@ -0,0 +1,129 @@
package com.tianyalei.async.group;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* 最终要执行时都要放到一个group里group集合会并发所有的Wrapper
* @author wuweifeng wrote on 2019-11-19.
*/
public class WorkerGroup {
private List<WorkerWrapper<?, ?>> workerWrapperList;
// +-----+ +-----+
// +------> | EH2 |-----> | EH3 |-------+
// | +-----+ +-----+ |
// | v
//+-----+ +-----+
//| EH1 | | EH6 |
//+-----+ +-----+
// | ^
// | +-----+ +-----+ |
// +------> | EH4 |-----> | EH5 |-------+
// +-----+ +-----+
/**
* 如图这种的begin就是EH1begin代表起始状态下要并发执行的wrapper集合
*/
private List<WorkerWrapper<?, ?>> beginList;
public WorkerGroup() {
workerWrapperList = new ArrayList<>();
}
/**
* 起始任务
*/
public WorkerGroup begin(WorkerWrapper<?, ?>... workerWrappers) {
if (workerWrappers == null) {
throw new NullPointerException("workerWrapper cannot be null");
}
beginList = Arrays.asList(workerWrappers);
return this;
}
public WorkerGroup then(WorkerWrapper<?, ?>... workerWrappers) {
if (workerWrappers == null) {
throw new NullPointerException("workerWrapper cannot be null");
}
beginList = Arrays.asList(workerWrappers);
return this;
}
/**
* 添加需要串行执行的worker集合一个wrapper可能只有一个worker也可能是个要串行的worker集合
*
* @param workerWrapper workerWrapper
*/
public WorkerGroup addWrapper(WorkerWrapper workerWrapper) {
if (workerWrapper == null) {
throw new NullPointerException("workerWrapper cannot be null");
}
workerWrapperList.add(workerWrapper);
return this;
}
/**
* 添加一个需要并行执行的worker
*
* @param iWorker iWorker
*/
public <T, V> WorkerGroup addWrapper(IWorker<T, V> iWorker, T param, ICallback<T, V> iCallback) {
synchronized (this) {
WorkerWrapper<?, ?> workerWrapper = new WorkerWrapper<>(iWorker, param, iCallback);
workerWrapperList.add(workerWrapper);
}
return this;
}
public WorkerGroup addWrappers(List<WorkerWrapper<?, ?>> workerWrappers) {
if (workerWrappers == null) {
throw new NullPointerException("workers cannot be null");
}
this.workerWrapperList.addAll(workerWrappers);
return this;
}
public WorkerGroup addWrappers(WorkerWrapper<?, ?>... workerWrappers) {
if (workerWrappers == null) {
throw new NullPointerException("workers cannot be null");
}
return addWrappers(Arrays.asList(workerWrappers));
}
/**
* 添加一个不需要回调的worker
*
* @param iWorker async.worker
*/
public <T, V> WorkerGroup addWrapper(IWorker<T, V> iWorker, T param) {
return this.addWrapper(iWorker, param, null);
}
/**
* 添加一个不需要回调的worker
*
* @param iWorker async.worker
*/
public <T, V> WorkerGroup addWrapper(IWorker<T, V> iWorker) {
return this.addWrapper(iWorker, null);
}
/**
* 返回当前worker的数量用于决定启用的线程数量
*
* @return size
*/
public int size() {
synchronized (this) {
return workerWrapperList.size();
}
}
public List<WorkerWrapper<?, ?>> getWorkerWrapperList() {
return workerWrapperList;
}
}

View File

@ -0,0 +1,446 @@
package com.tianyalei.async.group;
import com.tianyalei.async.callback.DefaultCallback;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.DependWrapper;
import com.tianyalei.async.worker.ResultState;
import com.tianyalei.async.worker.WorkResult;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 对每个worker及callback进行包装一对一
*
* @author wuweifeng wrote on 2019-11-19.
*/
public class WorkerWrapper<T, V> {
/**
* worker将来要处理的param
*/
private T param;
private IWorker<T, V> worker;
private ICallback<T, V> callback;
/**
* 在自己后面的wrapper如果没有自己就是末尾如果有一个就是串行如果有多个有几个就需要开几个线程</p>
* -------2
* 1
* -------3
* 如1后面有23
*/
private List<WorkerWrapper<?, ?>> nextWrappers;
/**
* 依赖的wrappers必须依赖的全部完成后才能执行自己
* 1
* -------3
* 2
* 12执行完毕后才能执行3
*/
private List<DependWrapper> dependWrappers;
/**
* 标记该事件是否已经被处理过了譬如已经超时返回false了后续rpc又收到返回值了则不再二次回调
* 经试验,volatile并不能保证"同一毫秒",多线程对该值的修改和拉取
* <p>
* 1-finish, 2-error, 3-working
*/
private AtomicInteger state = new AtomicInteger(0);
/**
* 也是个钩子变量用来存临时的结果
*/
private volatile WorkResult<V> workResult;
private static final int FINISH = 1;
private static final int ERROR = 2;
private static final int WORKING = 3;
private static final int INIT = 0;
public WorkerWrapper(IWorker<T, V> worker, T param, ICallback<T, V> callback) {
if (worker == null) {
throw new NullPointerException("async.worker is null");
}
this.worker = worker;
this.param = param;
//允许不设置回调
if (callback == null) {
callback = new DefaultCallback<>();
}
this.callback = callback;
}
/**
* 开始工作
* fatherWrapper代表这次work是由哪个上游wrapper发起的
*/
private void work(ThreadPoolExecutor poolExecutor, WorkerWrapper fromWrapper, long remainTime) {
long now = SystemClock.now();
//总的已经超时了就快速失败进行下一个
if (remainTime <= 0) {
fastFail(INIT, null);
beginNext(poolExecutor, now, remainTime);
return;
}
//如果自己已经执行过了可能有多个依赖其中的一个依赖已经执行完了并且自己也执行完了当另一个依赖过来时就不重复处理了
if (getState() != INIT) {
beginNext(poolExecutor, now, remainTime);
return;
}
//如果没有任何依赖说明自己就是第一批要执行的
if (dependWrappers == null || dependWrappers.size() == 0) {
fire(poolExecutor, remainTime);
beginNext(poolExecutor, now, remainTime);
return;
}
//如果有前方依赖存在两种情况
// 一种是前面只有一个wrapper A -> B
//一种是前面有多个wrapperA C D -> B需要ACD都完成了才能轮到B但是无论是A执行完还是C执行完都会去唤醒B
//所以需要B来做判断必须ACD都完成自己才能执行
if (dependWrappers.size() == 1) {
doDependsOneJob(poolExecutor, fromWrapper, remainTime);
beginNext(poolExecutor, now, remainTime);
return;
}
doDependsJobs(poolExecutor, dependWrappers, fromWrapper, now, remainTime);
}
public void work(ThreadPoolExecutor poolExecutor, long remainTime) {
work(poolExecutor, null, remainTime);
}
/**
* 总控制台超时停止所有任务
*/
public void stopNow() {
if (getState() == INIT || getState() == WORKING) {
fastFail(getState(), null);
}
}
/**
* 进行下一个任务
*/
private void beginNext(ThreadPoolExecutor poolExecutor, long now, long remainTime) {
// System.out.println("now is " + SystemClock.now() + " and thread count : " + getThreadCount());
//花费的时间
long costTime = SystemClock.now() - now;
if (nextWrappers == null) {
return;
}
if (nextWrappers.size() == 1) {
nextWrappers.get(0).work(poolExecutor, WorkerWrapper.this, remainTime - costTime);
return;
}
CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
for (int i = 0; i < nextWrappers.size(); i++) {
int finalI = i;
futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
.work(poolExecutor, WorkerWrapper.this, remainTime - costTime), poolExecutor);
}
try {
CompletableFuture.allOf(futures).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
private void doDependsOneJob(ThreadPoolExecutor poolExecutor, WorkerWrapper dependWrapper, long remainTime) {
if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultResult();
fastFail(INIT, null);
} else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
fastFail(INIT, null);
} else {
//前面任务正常完毕了该自己了
fire(poolExecutor, remainTime);
}
}
private void doDependsJobs(ThreadPoolExecutor poolExecutor, List<DependWrapper> dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
boolean nowDependIsMust = false;
//创建必须完成的上游wrapper集合
Set<DependWrapper> mustWrapper = new HashSet<>();
for (DependWrapper dependWrapper : dependWrappers) {
if (dependWrapper.isMust()) {
mustWrapper.add(dependWrapper);
}
if (dependWrapper.getDependWrapper().equals(fromWrapper)) {
nowDependIsMust = dependWrapper.isMust();
}
}
//如果全部是不必须的条件那么只要到了这里就执行自己
if (mustWrapper.size() == 0) {
if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) {
fastFail(INIT, null);
} else {
fire(poolExecutor, remainTime);
}
beginNext(poolExecutor, now, remainTime);
return;
}
//如果存在需要必须完成的且fromWrapper不是必须的就什么也不干
if (!nowDependIsMust) {
return;
}
//如果fromWrapper是必须的
boolean existNoFinish = false;
boolean hasError = false;
//先判断前面必须要执行的依赖任务的执行结果如果有任何一个失败那就不用走action了直接给自己设置为失败进行下一步就是了
for (DependWrapper dependWrapper : mustWrapper) {
WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();
WorkResult tempWorkResult = workerWrapper.getWorkResult();
//为null或者isWorking说明它依赖的某个任务还没执行到或没执行完
if (tempWorkResult == null || workerWrapper.getState() == WORKING) {
existNoFinish = true;
break;
}
if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {
workResult = defaultResult();
hasError = true;
break;
}
if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {
workResult = defaultExResult(workerWrapper.getWorkResult().getEx());
hasError = true;
break;
}
}
//只要有失败的
if (hasError) {
fastFail(INIT, null);
beginNext(poolExecutor, now, remainTime);
return;
}
//如果上游都没有失败分为两种情况一种是都finish了一种是有的在working
//都finish的话
if (!existNoFinish) {
//上游都finish了进行自己
fire(poolExecutor, remainTime);
beginNext(poolExecutor, now, remainTime);
return;
}
}
/**
* 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
*/
private void fire(ThreadPoolExecutor poolExecutor, long workerTimeOut) {
//阻塞取结果
workResult = workerDoJob();
// completableFuture = CompletableFuture.supplyAsync(this::workerDoJob,
// poolExecutor);
// try {
// //阻塞取结果
// workResult = completableFuture.get(workerTimeOut, TimeUnit.MILLISECONDS);
// } catch (InterruptedException | ExecutionException | TimeoutException e) {
//// e.printStackTrace();
// System.out.println("exception " + Thread.currentThread().getName());
// //超时了.如果已经处理过了
// if (getState() == FINISH || getState() == ERROR) {
// return;
// }
// if (fastFail(WORKING, null)) {
// completableFuture.complete(workResult);
// }
// }
}
/**
* 快速失败
*/
private boolean fastFail(int expect, Exception e) {
System.out.println("fastFail:" + Thread.currentThread().getName() + " time " + System.currentTimeMillis());
//试图将它从expect状态,改成Error
if (!compareAndSetState(expect, ERROR)) {
System.out.println("compareAndSetState----------fail");
return false;
}
if (workResult == null) {
if (e == null) {
workResult = defaultResult();
} else {
workResult = defaultExResult(e);
}
}
callback.result(false, getParam(), workResult);
return true;
}
/**
* 具体的单个worker执行任务
*/
private WorkResult<V> workerDoJob() {
//避免重复执行
if (workResult != null) {
return workResult;
}
try {
//如果已经不是init状态了
if (!compareAndSetState(INIT, WORKING)) {
return workResult;
}
callback.begin();
//执行耗时操作
V resultValue = worker.action(getParam());
WorkResult<V> tempResult = new WorkResult<>(resultValue, ResultState.SUCCESS);
//如果状态不是在working,说明别的地方已经修改了
if (!compareAndSetState(WORKING, FINISH)) {
return workResult;
}
//回调成功
callback.result(true, getParam(), tempResult);
workResult = tempResult;
return workResult;
} catch (Exception e) {
//避免重复回调
if (workResult != null) {
return workResult;
}
fastFail(WORKING, e);
return workResult;
}
}
public WorkerWrapper addNext(WorkerWrapper<?, ?>... nextWrappers) {
if (nextWrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> workerWrapper : nextWrappers) {
addNext(workerWrapper);
}
return this;
}
public WorkerWrapper addNext(IWorker<T, V> worker, T param, ICallback<T, V> callback) {
WorkerWrapper<T, V> workerWrapper = new WorkerWrapper<>(worker, param, callback);
return addNext(workerWrapper);
}
public WorkerWrapper addNext(WorkerWrapper<?, ?> workerWrapper) {
if (nextWrappers == null) {
nextWrappers = new ArrayList<>();
}
nextWrappers.add(workerWrapper);
workerWrapper.addDepend(this);
return this;
}
/**
* 设置这几个依赖的wrapper不是must执行完毕才能执行自己
*/
public void setDependNotMust(WorkerWrapper<?, ?>... workerWrapper) {
if (dependWrappers == null) {
return;
}
if (workerWrapper == null) {
return;
}
for (DependWrapper dependWrapper : dependWrappers) {
for (WorkerWrapper wrapper : workerWrapper) {
if (dependWrapper.getDependWrapper().equals(wrapper)) {
dependWrapper.setMust(false);
}
}
}
}
private void addDepend(WorkerWrapper<?, ?> workerWrapper) {
this.addDepend(workerWrapper, true);
}
private void addDepend(WorkerWrapper<?, ?> workerWrapper, boolean must) {
if (dependWrappers == null) {
dependWrappers = new ArrayList<>();
}
dependWrappers.add(new DependWrapper(workerWrapper, must));
}
private WorkResult<V> defaultResult() {
return new WorkResult<>(getWorker().defaultValue(), ResultState.TIMEOUT);
}
private WorkResult<V> defaultExResult(Exception ex) {
return new WorkResult<>(getWorker().defaultValue(), ResultState.EXCEPTION, ex);
}
private WorkResult<V> getNoneNullWorkResult() {
if (workResult == null) {
return defaultResult();
}
return workResult;
}
public T getParam() {
return param;
}
public IWorker<T, V> getWorker() {
return worker;
}
public ICallback<T, V> getCallback() {
return callback;
}
public List<WorkerWrapper<?, ?>> getNextWrappers() {
return nextWrappers;
}
public void setNextWrappers(List<WorkerWrapper<?, ?>> nextWrappers) {
this.nextWrappers = nextWrappers;
}
public List<DependWrapper> getDependWrappers() {
return dependWrappers;
}
public void setDependWrappers(List<DependWrapper> dependWrappers) {
this.dependWrappers = dependWrappers;
}
public int getState() {
return state.get();
}
public boolean compareAndSetState(int expect, int update) {
return this.state.compareAndSet(expect, update);
}
public WorkResult<V> getWorkResult() {
return workResult;
}
public void setWorkResult(WorkResult<V> workResult) {
this.workResult = workResult;
}
}

View File

@ -0,0 +1,59 @@
package com.tianyalei.async.worker;
import com.tianyalei.async.group.WorkerWrapper;
/**
* 对依赖的wrapper的封装
* @author wuweifeng wrote on 2019-12-20
* @version 1.0
*/
public class DependWrapper {
private WorkerWrapper<?, ?> dependWrapper;
/**
* 是否该依赖必须完成后才能执行自己.<p>
* 因为存在一个任务依赖于多个任务是让这多个任务全部完成后才执行自己还是某几个执行完毕就可以执行自己
*
* 1
* ---3
* 2
*
* 1---3
* 2---3
* 这两种就不一样上面的就是必须12都完毕才能3
* 下面的就是1完毕就可以3
*/
private boolean must = true;
public DependWrapper(WorkerWrapper<?, ?> dependWrapper, boolean must) {
this.dependWrapper = dependWrapper;
this.must = must;
}
public DependWrapper() {
}
public WorkerWrapper<?, ?> getDependWrapper() {
return dependWrapper;
}
public void setDependWrapper(WorkerWrapper<?, ?> dependWrapper) {
this.dependWrapper = dependWrapper;
}
public boolean isMust() {
return must;
}
public void setMust(boolean must) {
this.must = must;
}
@Override
public String toString() {
return "DependWrapper{" +
"dependWrapper=" + dependWrapper +
", must=" + must +
'}';
}
}

View File

@ -0,0 +1,11 @@
package com.tianyalei.async.worker;
/**
* 结果状态
* @author wuweifeng wrote on 2019-11-19.
*/
public enum ResultState {
SUCCESS,
TIMEOUT,
EXCEPTION
}

View File

@ -0,0 +1,9 @@
package com.tianyalei.async.worker;
/**
* @author wuweifeng wrote on 2019-11-19.
*/
public enum WorkMode {
SELF,
SHARE
}

View File

@ -0,0 +1,63 @@
package com.tianyalei.async.worker;
/**
* 执行结果
*/
public class WorkResult<V> {
/**
* 执行的结果
*/
private V result;
/**
* 结果状态
*/
private ResultState resultState;
private Exception ex;
public WorkResult(V result, ResultState resultState) {
this(result, resultState, null);
}
public WorkResult(V result, ResultState resultState, Exception ex) {
this.result = result;
this.resultState = resultState;
this.ex = ex;
}
public static WorkResult defaultResult() {
return new WorkResult<>(null, ResultState.TIMEOUT);
}
@Override
public String toString() {
return "WorkResult{" +
"result=" + result +
", resultState=" + resultState +
", ex=" + ex +
'}';
}
public Exception getEx() {
return ex;
}
public void setEx(Exception ex) {
this.ex = ex;
}
public V getResult() {
return result;
}
public void setResult(V result) {
this.result = result;
}
public ResultState getResultState() {
return resultState;
}
public void setResultState(ResultState resultState) {
this.resultState = resultState;
}
}

View File

@ -0,0 +1,42 @@
package com.tianyalei.test;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class MyWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker0--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult());
} else {
System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult());
}
}
}

View File

@ -0,0 +1,43 @@
package com.tianyalei.test;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class MyWorker1 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 1";
}
@Override
public String defaultValue() {
return "worker1--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker1 success--" + SystemClock.now() + "----" + workResult.getResult());
} else {
System.err.println("callback worker1 failure--" + SystemClock.now() + "----" + workResult.getResult());
}
}
}

View File

@ -0,0 +1,43 @@
package com.tianyalei.test;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class MyWorker2 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 2";
}
@Override
public String defaultValue() {
return Thread.currentThread().getName() + "--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker2 success--" + SystemClock.now() + "----" + workResult.getResult());
} else {
System.err.println("callback worker2 failure--" + SystemClock.now() + "----" + workResult.getResult());
}
}
}

View File

@ -0,0 +1,46 @@
package com.tianyalei.test.parallel;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker0--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@ -0,0 +1,46 @@
package com.tianyalei.test.parallel;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker0--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@ -0,0 +1,45 @@
package com.tianyalei.test.parallel;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker1 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 1";
}
@Override
public String defaultValue() {
return "worker1--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker1 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker1 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@ -0,0 +1,51 @@
package com.tianyalei.test.parallel;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker2 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 2";
}
@Override
public String defaultValue() {
return "worker2--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker2 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker2 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@ -0,0 +1,50 @@
package com.tianyalei.test.parallel;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker3 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 3";
}
@Override
public String defaultValue() {
return "worker3--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@ -0,0 +1,46 @@
package com.tianyalei.test.parallel;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker4 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 4";
}
@Override
public String defaultValue() {
return "worker4--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker4 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker4 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@ -0,0 +1,355 @@
package com.tianyalei.test.parallel;
import com.tianyalei.async.executor.Async;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.group.WorkerWrapper;
import java.util.concurrent.ExecutionException;
import static com.tianyalei.async.executor.Async.getThreadCount;
/**
* 串行测试
*
* @author wuweifeng wrote on 2019-11-20.
*/
@SuppressWarnings("ALL")
public class TestPar {
public static void main(String[] args) throws Exception {
// testNormal();
// testMulti();
// testMultiError();
// testMultiError2();
// testMulti3();
// testMulti4();
// testMulti5();
// testMulti6();
testMulti7();
}
/**
* 3个并行测试不同时间的超时
*/
private static void testNormal() throws InterruptedException, ExecutionException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(1500, workerWrapper, workerWrapper1, workerWrapper2);
// Async.beginWork(800, workerWrapper, workerWrapper1, workerWrapper2);
// Async.beginWork(1000, workerWrapper, workerWrapper1, workerWrapper2);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(getThreadCount());
System.out.println(workerWrapper.getWorkResult());
// System.out.println(getThreadCount());
Async.shutDown();
}
/**
* 0,2同时开启,1在0后面
* 0---1
* 2
*/
private static void testMulti() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
workerWrapper.addNext(workerWrapper1);
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(2500, workerWrapper, workerWrapper2);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
/**
* 0,2同时开启,1在0后面. 1超时
* 0---1
* 2
*/
private static void testMultiError() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
workerWrapper.addNext(workerWrapper1);
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(1500, workerWrapper, workerWrapper2);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
/**
* 0,2同时开启,1在0后面. 组超时,则0和2成功,1失败
* 0---1
* 2
*/
private static void testMultiError2() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
workerWrapper.addNext(workerWrapper1);
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(1500, workerWrapper, workerWrapper2);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2都完成后3
* 1
* 0 3
* 2
*/
private static void testMulti3() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
ParWorker3 w3 = new ParWorker3();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3);
workerWrapper.addNext(workerWrapper1, workerWrapper2);
workerWrapper1.addNext(workerWrapper3);
workerWrapper2.addNext(workerWrapper3);
long now = SystemClock.now();
System.out.println("begin-" + now);
// Async.beginWork(3100, workerWrapper);
Async.beginWork(2100, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2都完成后32耗时2秒1耗时1秒3会等待2完成
* 1
* 0 3
* 2
*/
private static void testMulti4() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(2000);
ParWorker3 w3 = new ParWorker3();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3);
workerWrapper.addNext(workerWrapper1, workerWrapper2);
workerWrapper1.addNext(workerWrapper3);
workerWrapper2.addNext(workerWrapper3);
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper);
//3会超时
// Async.beginWork(3100, workerWrapper);
//2,3会超时
// Async.beginWork(2900, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2 任何一个执行完后都执行3
* 1
* 0 3
* 2
*
* 则结果是
* 0231
* 23分别是500400.3执行完毕后1才执行完
*/
private static void testMulti5() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(500);
ParWorker3 w3 = new ParWorker3();
w3.setSleepTime(400);
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3);
workerWrapper.addNext(workerWrapper1, workerWrapper2);
workerWrapper1.addNext(workerWrapper3);
workerWrapper2.addNext(workerWrapper3);
workerWrapper3.setDependNotMust(workerWrapper1, workerWrapper2);
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 必须1执行完毕后才能执行3. 无论2是否领先1完毕都要等1
* 1
* 0 3
* 2
*
* 则结果是
* 0213
* 23分别是500400.2执行完了1没完那就等着1完毕才能3
*/
private static void testMulti6() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(500);
ParWorker3 w3 = new ParWorker3();
w3.setSleepTime(400);
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3);
workerWrapper.addNext(workerWrapper1, workerWrapper2);
workerWrapper1.addNext(workerWrapper3);
workerWrapper2.addNext(workerWrapper3);
//设置2不是必须1是必须的
workerWrapper3.setDependNotMust(workerWrapper2);
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(getThreadCount());
Async.shutDown();
}
/**
* 两个0并行上面0执行完,同时1和2, 下面0执行完开始1上面的 必须12执行完毕后才能执行3. 最后必须23都完成才能4
* 1
* 0 3
* 2 4
* ---------
* 0 1 2
*
* 则结果是
* callback worker0 success--1577242870969----result = 1577242870968---param = 00 from 0-threadName:Thread-1
* callback worker0 success--1577242870969----result = 1577242870968---param = 0 from 0-threadName:Thread-0
* callback worker1 success--1577242871972----result = 1577242871972---param = 11 from 1-threadName:Thread-1
* callback worker1 success--1577242871972----result = 1577242871972---param = 1 from 1-threadName:Thread-2
* callback worker2 success--1577242871973----result = 1577242871973---param = 2 from 2-threadName:Thread-3
* callback worker2 success--1577242872975----result = 1577242872975---param = 22 from 2-threadName:Thread-1
* callback worker3 success--1577242872977----result = 1577242872977---param = 3 from 3-threadName:Thread-2
* callback worker4 success--1577242873980----result = 1577242873980---param = 4 from 3-threadName:Thread-2
*/
private static void testMulti7() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
ParWorker3 w3 = new ParWorker3();
ParWorker4 w4 = new ParWorker4();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper0 = new WorkerWrapper<>(w, "00", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper11 = new WorkerWrapper<>(w1, "11", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
WorkerWrapper<String, String> workerWrapper22 = new WorkerWrapper<>(w2, "22", w2);
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3);
WorkerWrapper<String, String> workerWrapper4 = new WorkerWrapper<>(w3, "4", w4);
workerWrapper.addNext(workerWrapper1, workerWrapper2);
workerWrapper1.addNext(workerWrapper3);
workerWrapper2.addNext(workerWrapper3);
workerWrapper3.addNext(workerWrapper4);
workerWrapper0.addNext(workerWrapper11);
workerWrapper11.addNext(workerWrapper22);
workerWrapper22.addNext(workerWrapper4);
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper, workerWrapper0);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(getThreadCount());
Async.shutDown();
}
}

View File

@ -0,0 +1,45 @@
package com.tianyalei.test.seq;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker0--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@ -0,0 +1,46 @@
package com.tianyalei.test.seq;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker0--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@ -0,0 +1,45 @@
package com.tianyalei.test.seq;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqWorker1 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker1--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker1 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker1 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@ -0,0 +1,45 @@
package com.tianyalei.test.seq;
import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.WorkResult;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqWorker2 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker2--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker2 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker2 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@ -0,0 +1,61 @@
package com.tianyalei.test.seq;
import com.tianyalei.async.executor.Async;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.group.WorkerWrapper;
import java.util.concurrent.ExecutionException;
/**
* 串行测试
* @author wuweifeng wrote on 2019-11-20.
*/
public class TestSequential {
public static void main(String[] args) throws InterruptedException {
SeqWorker w = new SeqWorker();
SeqWorker1 w1 = new SeqWorker1();
SeqWorker2 w2 = new SeqWorker2();
SeqTimeoutWorker t = new SeqTimeoutWorker();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
//1在0后面串行
workerWrapper.addNext(workerWrapper1);
//2在1后面串行
workerWrapper1.addNext(workerWrapper2);
// testNormal(workerWrapper);
// testGroupTimeout(workerWrapper);
}
private static void testNormal(WorkerWrapper<String, String> workerWrapper) throws ExecutionException, InterruptedException {
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(3500, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
private static void testGroupTimeout(WorkerWrapper<String, String> workerWrapper) throws ExecutionException, InterruptedException {
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(2500, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
}

View File

@ -0,0 +1,88 @@
package com.tianyalei.test.seq;
import com.tianyalei.async.executor.Async;
import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.group.WorkerWrapper;
import java.util.concurrent.ExecutionException;
/**
* 串行测试
* @author wuweifeng wrote on 2019-11-20.
*/
@SuppressWarnings("Duplicates")
public class TestSequentialTimeout {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// testFirstTimeout();
testSecondTimeout();
}
/**
* begin-1576719450476
* callback worker0 failure--1576719451338----worker0--default-threadName:main
* callback worker1 failure--1576719451338----worker1--default-threadName:main
* callback worker2 failure--1576719451338----worker2--default-threadName:main
* end-1576719451338
* cost-862
*/
private static void testFirstTimeout() throws ExecutionException, InterruptedException {
SeqWorker1 w1 = new SeqWorker1();
SeqWorker2 w2 = new SeqWorker2();
SeqTimeoutWorker t = new SeqTimeoutWorker();
WorkerWrapper<String, String> workerWrapperT = new WorkerWrapper<>(t, "t", t);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
//2在1后面串行
workerWrapper1.addNext(workerWrapper2);
//T会超时
workerWrapperT.addNext(workerWrapper1);
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(5000, workerWrapperT);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
/**
* begin-1576719842504
* callback worker0 success--1576719843571----result = 1576719843570---param = t from 0-threadName:Thread-0
* callback worker1 failure--1576719844376----worker1--default-threadName:main
* callback worker2 failure--1576719844376----worker2--default-threadName:main
* end-1576719844376
* cost-1872
*/
private static void testSecondTimeout() throws ExecutionException, InterruptedException {
SeqTimeoutWorker t = new SeqTimeoutWorker();
//让1超时
SeqWorker1 w1 = new SeqWorker1();
SeqWorker2 w2 = new SeqWorker2();
WorkerWrapper<String, String> workerWrapperT = new WorkerWrapper<>(t, "t", t);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
//2在1后面串行
workerWrapper1.addNext(workerWrapper2);
//T会超时
workerWrapperT.addNext(workerWrapper1);
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(5000, workerWrapperT);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
}