!20 调整逻辑,去除不合理的取值方式

Merge pull request !20 from klaokai/dev-kyle
This commit is contained in:
tianyaleixiaowu
2022-06-30 02:15:04 +00:00
committed by Gitee
2 changed files with 36 additions and 41 deletions

View File

@@ -1,15 +1,15 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.exception.CancelException;
import com.jd.platform.async.exception.EndsNormallyException;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.callback.DefaultCallback;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.exception.CancelException;
import com.jd.platform.async.exception.EndsNormallyException;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.executor.PollingCenter;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.strategy.WrapperStrategy;
import com.jd.platform.async.wrapper.strategy.depend.DependMustStrategyMapper;
import com.jd.platform.async.wrapper.strategy.depend.DependOnUpWrapperStrategyMapper;
@@ -25,18 +25,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.AFTER_WORK;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.BUILDING;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.ERROR;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.INIT;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.SKIP;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.STARTED;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.SUCCESS;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.WORKING;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_all;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_beforeWorkingEnd;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_notWorked;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_skipOrAfterWork;
import static com.jd.platform.async.wrapper.WorkerWrapper.State.*;
/**
@@ -268,17 +256,6 @@ public abstract class WorkerWrapper<T, V> {
final Consumer<Boolean> __function__callbackResult =
success -> {
WorkResult<V> _workResult = getWorkResult();
/*
如果不循环拿则很容易拿到空值用户有可能拿到值也有可能拿到null
但如果一定要空值的话那么尝试25次之后就允许
这是个魔法值,如果有更合适的设计请修改这里。
比如将getWorkResult()方法的调用交给用户,
但用户必须明确知道会有这种情况发生
*/
int count = 25;
while (_workResult.getResultState() == ResultState.DEFAULT && count-- > 0) {
_workResult = getWorkResult();
}
try {
callback.result(success, param, _workResult);
} catch (Exception e) {
@@ -302,18 +279,18 @@ public abstract class WorkerWrapper<T, V> {
() -> {
if (setState(state, STARTED, WORKING)) {
try {
fire(group);
if (fire(group)) {
if (setState(state, WORKING, AFTER_WORK)) {
__function__callbackResult.accept(true);
beginNext(executorService, now, remainTime, group);
}
}
} catch (Exception e) {
if (setState(state, WORKING, ERROR)) {
__function__fastFail_callbackResult$false_beginNext.accept(false, e);
}
return;
}
}
if (setState(state, WORKING, AFTER_WORK)) {
__function__callbackResult.accept(true);
beginNext(executorService, now, remainTime, group);
}
};
// ================================================
// 开始执行
@@ -375,7 +352,9 @@ public abstract class WorkerWrapper<T, V> {
case TAKE_REST:
//FIXME 等待200毫秒重新投入线程池主要为了调起最后一个任务
Thread.sleep(200L);
executorService.submit(() -> this.work(executorService, fromWrapper, remainTime-(SystemClock.now()-now), group));
System.out.println(id+"进入休息");
executorService.submit(() -> this.work(executorService, fromWrapper,
remainTime - (SystemClock.now() - now), group));
return;
case FAST_FAIL:
if (setState(state, STARTED, ERROR)) {
@@ -410,13 +389,14 @@ public abstract class WorkerWrapper<T, V> {
* 本工作线程执行自己的job.
* <p/>
* 本方法不负责校验状态。请在调用前自行检验
* @return
*/
protected void fire(WorkerWrapperGroup group) {
protected boolean fire(WorkerWrapperGroup group) {
try {
doWorkingThread.set(Thread.currentThread());
//执行耗时操作
V result = worker.action(param, group.getForParamUseWrappers());
workResult.compareAndSet(
return workResult.compareAndSet(
null,
new WorkResult<>(result, ResultState.SUCCESS)
);

View File

@@ -1,7 +1,9 @@
package v15.cases;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperBuilder;
@@ -19,7 +21,7 @@ class Case1 {
try {
if ("F".equals(id)) {
System.out.println("wrapper(id=" + id + ") is working");
Thread.sleep(12000);
Thread.sleep(100);
} else {
System.out.println("wrapper(id=" + id + ") is worki444ng");
}
@@ -27,13 +29,26 @@ class Case1 {
e.printStackTrace();
}
return id;
});
}).callback((new ICallback<String, String>() {
@Override
public void begin() {
System.out.println("wrapper(id=" + id + ") has begin . ");
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
System.out.println("\t\twrapper(id=" + id + ") callback "
+ (success ? "success " : "fail ")
+ ", workResult is " + workResult);
}
}))
.allowInterrupt(true);
}
public static void main(String[] args) {
long now = SystemClock.now();
WorkerWrapper<?, ?> a = builder("A").build();
WorkerWrapper<?, ?> d;
WorkerWrapper<?, ?> d = builder("D").build();
builder("H")
.depends(
builder("F")
@@ -42,13 +57,13 @@ class Case1 {
.build(),
builder("G")
.depends(builder("E")
.depends(d = builder("D").build())
.depends(d)
.build())
.build()
)
.build();
try {
Async.work(10000, a, d).awaitFinish();
Async.work(5000, a, d).awaitFinish();
} catch (InterruptedException e) {
e.printStackTrace();
}