diff --git a/src/main/java/com/tianyalei/async/executor/Async.java b/src/main/java/com/tianyalei/async/executor/Async.java index c07d417..a6f7f8b 100644 --- a/src/main/java/com/tianyalei/async/executor/Async.java +++ b/src/main/java/com/tianyalei/async/executor/Async.java @@ -84,6 +84,7 @@ public class Async { /** * 总共多少个执行单元 */ + @SuppressWarnings("unchecked") private static void totalWorkers(List workerWrappers, Set set) { set.addAll(workerWrappers); for (WorkerWrapper wrapper : workerWrappers) { diff --git a/src/main/java/com/tianyalei/async/group/WorkerWrapper.java b/src/main/java/com/tianyalei/async/group/WorkerWrapper.java index d68415d..f40a267 100755 --- a/src/main/java/com/tianyalei/async/group/WorkerWrapper.java +++ b/src/main/java/com/tianyalei/async/group/WorkerWrapper.java @@ -96,7 +96,7 @@ public class WorkerWrapper { //如果没有任何依赖,说明自己就是第一批要执行的 if (dependWrappers == null || dependWrappers.size() == 0) { - fire(poolExecutor, remainTime); + fire(); beginNext(poolExecutor, now, remainTime); return; } @@ -106,7 +106,7 @@ public class WorkerWrapper { //一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。 //所以需要B来做判断,必须A、C、D都完成,自己才能执行 if (dependWrappers.size() == 1) { - doDependsOneJob(poolExecutor, fromWrapper, remainTime); + doDependsOneJob(fromWrapper); beginNext(poolExecutor, now, remainTime); return; } @@ -155,7 +155,7 @@ public class WorkerWrapper { } } - private void doDependsOneJob(ThreadPoolExecutor poolExecutor, WorkerWrapper dependWrapper, long remainTime) { + private void doDependsOneJob(WorkerWrapper dependWrapper) { if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) { workResult = defaultResult(); fastFail(INIT, null); @@ -164,7 +164,7 @@ public class WorkerWrapper { fastFail(INIT, null); } else { //前面任务正常完毕了,该自己了 - fire(poolExecutor, remainTime); + fire(); } } @@ -186,7 +186,7 @@ public class WorkerWrapper { if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) { fastFail(INIT, null); } else { - fire(poolExecutor, remainTime); + fire(); } beginNext(poolExecutor, now, remainTime); return; @@ -232,7 +232,7 @@ public class WorkerWrapper { //都finish的话 if (!existNoFinish) { //上游都finish了,进行自己 - fire(poolExecutor, remainTime); + fire(); beginNext(poolExecutor, now, remainTime); return; } @@ -241,35 +241,15 @@ public class WorkerWrapper { /** * 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程 */ - private void fire(ThreadPoolExecutor poolExecutor, long workerTimeOut) { + private void fire() { //阻塞取结果 workResult = workerDoJob(); - -// completableFuture = CompletableFuture.supplyAsync(this::workerDoJob, -// poolExecutor); - -// try { -// //阻塞取结果 -// workResult = completableFuture.get(workerTimeOut, TimeUnit.MILLISECONDS); -// } catch (InterruptedException | ExecutionException | TimeoutException e) { -//// e.printStackTrace(); -// System.out.println("exception " + Thread.currentThread().getName()); -// //超时了.如果已经处理过了 -// if (getState() == FINISH || getState() == ERROR) { -// return; -// } -// if (fastFail(WORKING, null)) { -// completableFuture.complete(workResult); -// } -// } } /** * 快速失败 */ private boolean fastFail(int expect, Exception e) { - System.out.println("fastFail:" + Thread.currentThread().getName() + " time " + System.currentTimeMillis()); - //试图将它从expect状态,改成Error if (!compareAndSetState(expect, ERROR)) { System.out.println("compareAndSetState----------fail"); diff --git a/src/main/java/com/tianyalei/async/worker/WorkResult.java b/src/main/java/com/tianyalei/async/worker/WorkResult.java index 40d04f5..0fe36ee 100755 --- a/src/main/java/com/tianyalei/async/worker/WorkResult.java +++ b/src/main/java/com/tianyalei/async/worker/WorkResult.java @@ -24,7 +24,7 @@ public class WorkResult { this.ex = ex; } - public static WorkResult defaultResult() { + public static WorkResult defaultResult() { return new WorkResult<>(null, ResultState.DEFAULT); }