!21 发现当超时后任务无法结束

Merge pull request !21 from klaokai/dev-kyle
This commit is contained in:
tianyaleixiaowu 2022-06-30 10:21:15 +00:00 committed by Gitee
commit 5ba46f8cfd
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 14 additions and 10 deletions

View File

@ -20,6 +20,7 @@ import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
@ -264,16 +265,16 @@ public abstract class WorkerWrapper<T, V> {
}
}
};
final Runnable __function__callbackResultOfFalse_beginNext =
() -> {
__function__callbackResult.accept(false);
final Consumer<Boolean> __function__callbackResultOfFalse_beginNext =
(success) -> {
__function__callbackResult.accept(success);
beginNext(executorService, now, remainTime, group);
};
final BiConsumer<Boolean, Exception> __function__fastFail_callbackResult$false_beginNext =
(fastFail_isTimeout, fastFail_exception) -> {
boolean isEndsNormally = fastFail_exception instanceof EndsNormallyException;
fastFail(fastFail_isTimeout && !isEndsNormally, fastFail_exception, isEndsNormally);
__function__callbackResultOfFalse_beginNext.run();
__function__callbackResultOfFalse_beginNext.accept(false);
};
final Runnable __function__doWork =
() -> {
@ -281,8 +282,12 @@ public abstract class WorkerWrapper<T, V> {
try {
if (fire(group)) {
if (setState(state, WORKING, AFTER_WORK)) {
__function__callbackResult.accept(true);
beginNext(executorService, now, remainTime, group);
__function__callbackResultOfFalse_beginNext.accept(true);
}
}else {
//如果任务超时需要将最后那个超时任务设置为超时异常结束的
if (setState(state, WORKING, ERROR)) {
__function__fastFail_callbackResult$false_beginNext.accept(true, new TimeoutException());
}
}
} catch (Exception e) {
@ -352,7 +357,6 @@ public abstract class WorkerWrapper<T, V> {
case TAKE_REST:
//FIXME 等待200毫秒重新投入线程池主要为了调起最后一个任务
Thread.sleep(200L);
System.out.println(id+"进入休息");
executorService.submit(() -> this.work(executorService, fromWrapper,
remainTime - (SystemClock.now() - now), group));
return;

View File

@ -21,7 +21,7 @@ class Case1 {
try {
if ("F".equals(id)) {
System.out.println("wrapper(id=" + id + ") is working");
Thread.sleep(100);
Thread.sleep(2000);
} else {
System.out.println("wrapper(id=" + id + ") is worki444ng");
}
@ -63,7 +63,7 @@ class Case1 {
)
.build();
try {
Async.work(5000, a, d).awaitFinish();
Async.work(1000, a, d).awaitFinish();
} catch (InterruptedException e) {
e.printStackTrace();
}

View File

@ -315,7 +315,7 @@ public class HashedWheelTimer extends AbstractWheelTimer {
startTimeInitialized.countDown();
do {
//TODO 时间轮这里结束不了任务
//TODO 时间轮这里一直执行结束不了任务
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);