mirror of
https://gitee.com/jd-platform-opensource/asyncTool.git
synced 2026-03-22 04:27:15 +08:00
@@ -1,6 +1,5 @@
|
||||
package com.jd.platform.async.callback;
|
||||
|
||||
import com.jd.platform.async.exception.SkippedException;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
|
||||
/**
|
||||
@@ -9,9 +8,20 @@ import com.jd.platform.async.worker.WorkResult;
|
||||
* @author wuweifeng wrote on 2019-11-19.
|
||||
*/
|
||||
public class DefaultCallback<T, V> implements ICallback<T, V> {
|
||||
private static final DefaultCallback<Object, Object> instance = new DefaultCallback<Object, Object>() {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "(DefaultCallback instance)";
|
||||
}
|
||||
};
|
||||
|
||||
public static DefaultCallback<Object, Object> getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void begin() {
|
||||
|
||||
// do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -7,15 +7,18 @@ import java.util.List;
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-12-27
|
||||
* @version 1.0
|
||||
* @deprecated deprecate at version 1.5.1 , see {@link IGroupCallback} .
|
||||
*/
|
||||
@SuppressWarnings("DeprecatedIsStillUsed")
|
||||
@Deprecated
|
||||
public class DefaultGroupCallback implements IGroupCallback {
|
||||
@Override
|
||||
public void success(List<WorkerWrapper> workerWrappers) {
|
||||
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failure(List<WorkerWrapper> workerWrappers, Exception e) {
|
||||
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.jd.platform.async.callback;
|
||||
|
||||
|
||||
import com.jd.platform.async.exception.EndsNormallyException;
|
||||
import com.jd.platform.async.exception.SkippedException;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
|
||||
@@ -27,13 +28,16 @@ public interface ICallback<T, V> {
|
||||
void result(boolean success, T param, WorkResult<V> workResult);
|
||||
|
||||
/**
|
||||
* 提供常量选项:打印异常信息,跳过时的异常{@link SkippedException}不会打印。
|
||||
* 提供常量选项:
|
||||
* <p/>
|
||||
* 如果发生了异常,则打印异常信息。
|
||||
* 正常结束(例如取消、跳过)的异常{@link com.jd.platform.async.exception.EndsNormallyException}不会打印。
|
||||
*/
|
||||
ICallback PRINT_EXCEPTION_STACK_TRACE = new ICallback<Object, Object>() {
|
||||
@Override
|
||||
public void result(boolean success, Object param, WorkResult<Object> workResult) {
|
||||
Exception ex = workResult.getEx();
|
||||
if (ex != null && !(ex instanceof SkippedException)) {
|
||||
if (ex != null && !(ex instanceof EndsNormallyException)) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,18 +1,35 @@
|
||||
package com.jd.platform.async.callback;
|
||||
|
||||
import com.jd.platform.async.worker.OnceWork;
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
* 如果是异步执行整组的话,可以用这个组回调。不推荐使用
|
||||
* 如果是异步执行整组的话,可以用这个组回调。<b>已经废弃</b>
|
||||
*
|
||||
* @author wuweifeng wrote on 2019-11-19.
|
||||
* @deprecated deprecate at version 1.5.1
|
||||
* <p>
|
||||
* please use {@link com.jd.platform.async.executor.Async#work(long, ExecutorService, Collection, String)}.
|
||||
* <p>
|
||||
* 该方法返回的{@link OnceWork}句柄,默认不会同步等待结束,
|
||||
* 这便替代了原先的
|
||||
* {@link com.jd.platform.async.executor.Async#beginWorkAsync(long, ExecutorService, IGroupCallback, WorkerWrapper[])}
|
||||
* <p>
|
||||
* 需要同步等待的话调用{@link OnceWork#awaitFinish()}即可。
|
||||
* </p>
|
||||
*/
|
||||
@SuppressWarnings("DeprecatedIsStillUsed")
|
||||
@Deprecated
|
||||
public interface IGroupCallback {
|
||||
/**
|
||||
* 成功后,可以从wrapper里去getWorkResult
|
||||
*/
|
||||
void success(List<WorkerWrapper> workerWrappers);
|
||||
|
||||
/**
|
||||
* 失败了,也可以从wrapper里去getWorkResult
|
||||
*/
|
||||
|
||||
@@ -2,11 +2,18 @@ package com.jd.platform.async.callback;
|
||||
|
||||
/**
|
||||
* @author wuweifeng wrote on 2019-12-20
|
||||
* @author tcsnzh
|
||||
* 远古时期的代码,估计也没人会使用。但我也不确定,因此标注废弃。
|
||||
* <p/>
|
||||
* 难受的一比,为了屎山的兼容性要在代码里保留这么多屎盆。
|
||||
* @version 1.0
|
||||
* @deprecated deprecated by version 1.5.1--SNAPSHOT
|
||||
*/
|
||||
@Deprecated
|
||||
public interface ITimeoutWorker<T, V> extends IWorker<T, V> {
|
||||
/**
|
||||
* 每个worker都可以设置超时时间
|
||||
*
|
||||
* @return 毫秒超时时间
|
||||
*/
|
||||
long timeOut();
|
||||
@@ -14,6 +21,7 @@ public interface ITimeoutWorker<T, V> extends IWorker<T, V> {
|
||||
/**
|
||||
* 是否开启单个执行单元的超时功能(有时是一个group设置个超时,而不具备关心单个worker的超时)
|
||||
* <p>注意,如果开启了单个执行单元的超时检测,将使线程池数量多出一倍</p>
|
||||
*
|
||||
* @return 是否开启
|
||||
*/
|
||||
boolean enableTimeOut();
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.jd.platform.async.exception;
|
||||
|
||||
/**
|
||||
* 整组取消,设置该异常。
|
||||
*
|
||||
* @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午6:12
|
||||
*/
|
||||
public class CancelException extends EndsNormallyException {
|
||||
public CancelException() {
|
||||
}
|
||||
|
||||
public CancelException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
package com.jd.platform.async.exception;
|
||||
|
||||
/**
|
||||
* 整组取消,设置该异常。
|
||||
*
|
||||
* @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午6:12
|
||||
*/
|
||||
public class CancelSkippedException extends SkippedException {
|
||||
public CancelSkippedException() {
|
||||
}
|
||||
|
||||
public CancelSkippedException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public CancelSkippedException(String message, long skipAt) {
|
||||
super(message, skipAt);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.jd.platform.async.exception;
|
||||
|
||||
/**
|
||||
* 该异常表示此任务单元是正常结束的。
|
||||
*
|
||||
* @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/6/5-下午11:57
|
||||
*/
|
||||
public abstract class EndsNormallyException extends RuntimeException {
|
||||
public EndsNormallyException() {
|
||||
}
|
||||
|
||||
public EndsNormallyException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public EndsNormallyException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public EndsNormallyException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public EndsNormallyException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
}
|
||||
@@ -1,30 +1,17 @@
|
||||
package com.jd.platform.async.exception;
|
||||
|
||||
import com.jd.platform.async.executor.timer.SystemClock;
|
||||
|
||||
/**
|
||||
* 如果任务在执行之前,自己后面的任务已经执行完或正在被执行,则抛该exception
|
||||
*
|
||||
* @author wuweifeng wrote on 2020-02-18
|
||||
* @version 1.0
|
||||
*/
|
||||
public class SkippedException extends RuntimeException {
|
||||
private final long skipAt;
|
||||
|
||||
public class SkippedException extends EndsNormallyException {
|
||||
public SkippedException() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
public SkippedException(String message) {
|
||||
this(message, SystemClock.now());
|
||||
}
|
||||
|
||||
public SkippedException(String message, long skipAt) {
|
||||
super(message);
|
||||
this.skipAt = skipAt;
|
||||
}
|
||||
|
||||
public long getSkipAt() {
|
||||
return skipAt;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,14 +65,13 @@ public class Async {
|
||||
}
|
||||
|
||||
/**
|
||||
* 核心方法。
|
||||
* 该方法不是同步阻塞执行的。如果想要同步阻塞执行,则调用返回值的{@link OnceWork#awaitFinish()}即可。
|
||||
* <b>核心方法。该方法不是同步阻塞执行的。</b>如果想要同步阻塞执行,则调用返回值的{@link OnceWork#awaitFinish()}即可。
|
||||
*
|
||||
* @param timeout 全组超时时间
|
||||
* @param executorService 执行线程池
|
||||
* @param workerWrappers 任务容器集合
|
||||
* @param workId 本次工作id
|
||||
* @return 返回 {@link OnceWork}封装对象。
|
||||
* @return 返回 {@link OnceWork}任务句柄对象。
|
||||
*/
|
||||
public static OnceWork work(long timeout,
|
||||
ExecutorService executorService,
|
||||
@@ -95,59 +94,6 @@ public class Async {
|
||||
return onceWork;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
|
||||
*/
|
||||
@SuppressWarnings("unused")
|
||||
@Deprecated
|
||||
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
|
||||
beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步执行,直到所有都完成,或失败后,发起回调
|
||||
*
|
||||
* @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
|
||||
*/
|
||||
@Deprecated
|
||||
public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
|
||||
if (groupCallback == null) {
|
||||
groupCallback = new DefaultGroupCallback();
|
||||
}
|
||||
IGroupCallback finalGroupCallback = groupCallback;
|
||||
if (executorService != null) {
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
boolean success = beginWork(timeout, executorService, workerWrapper);
|
||||
if (success) {
|
||||
finalGroupCallback.success(Arrays.asList(workerWrapper));
|
||||
} else {
|
||||
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
|
||||
}
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
final ExecutorService commonPool = getCommonPool();
|
||||
commonPool.submit(() -> {
|
||||
try {
|
||||
boolean success = beginWork(timeout, commonPool, workerWrapper);
|
||||
if (success) {
|
||||
finalGroupCallback.success(Arrays.asList(workerWrapper));
|
||||
} else {
|
||||
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
|
||||
}
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ========================= 设置/属性选项 =========================
|
||||
|
||||
/**
|
||||
@@ -156,7 +102,11 @@ public class Async {
|
||||
* 在v1.4及之前,该COMMON_POLL是被写死的。
|
||||
* <p>
|
||||
* 自v1.5后:
|
||||
* 该线程池会被懒加载。
|
||||
* 该线程池将会在第一次调用{@link #getCommonPool()}时懒加载。
|
||||
* tip:
|
||||
* 要注意,{@link #work(long, WorkerWrapper[])}、{@link #work(long, Collection)}这些方法,
|
||||
* 不传入线程池就会默认调用{@link #getCommonPool()},就会初始化线程池。
|
||||
* <p>
|
||||
* 该线程池将会给线程取名为asyncTool-commonPool-thread-0(数字不重复)。
|
||||
* </p>
|
||||
*/
|
||||
@@ -167,7 +117,11 @@ public class Async {
|
||||
* 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时,ExecutorService将会被记录下来。
|
||||
* <p/>
|
||||
* 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个
|
||||
*
|
||||
* @deprecated 不明意义、毫无用处的字段。记录之前使用的线程池没啥意义。
|
||||
*/
|
||||
@SuppressWarnings("DeprecatedIsStillUsed")
|
||||
@Deprecated
|
||||
private static final AtomicReference<ExecutorService> lastExecutorService = new AtomicReference<>(null);
|
||||
|
||||
/**
|
||||
@@ -186,7 +140,6 @@ public class Async {
|
||||
new ThreadFactory() {
|
||||
private final AtomicLong threadCount = new AtomicLong(0);
|
||||
|
||||
@SuppressWarnings("NullableProblems")
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r,
|
||||
@@ -212,6 +165,10 @@ public class Async {
|
||||
return COMMON_POOL;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated 不明意义的输出信息的方法
|
||||
*/
|
||||
@Deprecated
|
||||
public static String getThreadCount() {
|
||||
return "activeCount=" + COMMON_POOL.getActiveCount() +
|
||||
",completedCount=" + COMMON_POOL.getCompletedTaskCount() +
|
||||
@@ -282,6 +239,58 @@ public class Async {
|
||||
return beginWork(timeout, getCommonPool(), workerWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
|
||||
*/
|
||||
@Deprecated
|
||||
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
|
||||
beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步执行,直到所有都完成,或失败后,发起回调
|
||||
*
|
||||
* @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
|
||||
*/
|
||||
@Deprecated
|
||||
public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
|
||||
if (groupCallback == null) {
|
||||
groupCallback = new DefaultGroupCallback();
|
||||
}
|
||||
IGroupCallback finalGroupCallback = groupCallback;
|
||||
if (executorService != null) {
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
boolean success = beginWork(timeout, executorService, workerWrapper);
|
||||
if (success) {
|
||||
finalGroupCallback.success(Arrays.asList(workerWrapper));
|
||||
} else {
|
||||
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
|
||||
}
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
final ExecutorService commonPool = getCommonPool();
|
||||
commonPool.submit(() -> {
|
||||
try {
|
||||
boolean success = beginWork(timeout, commonPool, workerWrapper);
|
||||
if (success) {
|
||||
finalGroupCallback.success(Arrays.asList(workerWrapper));
|
||||
} else {
|
||||
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
|
||||
}
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭上次使用的线程池
|
||||
*
|
||||
|
||||
@@ -72,6 +72,9 @@ public interface OnceWork {
|
||||
*/
|
||||
default Map<String, WorkerWrapper<?, ?>> getWrappersOfState(ResultState... ofState) {
|
||||
final HashSet<ResultState> states = new HashSet<>(Arrays.asList(ofState));
|
||||
if (states.isEmpty()) {
|
||||
return new HashMap<>(1);
|
||||
}
|
||||
return getWrappers().entrySet().stream()
|
||||
.filter(entry -> states.contains(entry.getValue().getWorkResult().getResultState()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
@@ -123,6 +126,8 @@ public interface OnceWork {
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回{@link Future}视图。
|
||||
*
|
||||
* @param sleepCheckInterval 为防止线程爆炸,在{@link Future#get(long, TimeUnit)}方法时使用隔一段时间检查一次。
|
||||
* 该Function的参数为总超时毫秒值,返回值为检查时间间隔。
|
||||
* @return 返回 {@link AsFuture}封装对象。
|
||||
@@ -152,16 +157,19 @@ public interface OnceWork {
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步等待取消
|
||||
* 同步等待取消。
|
||||
*
|
||||
* @param ignore__mayInterruptIfRunning 该参数将被无视。因为暂未实现“修改允许打断属性”功能。 // todo 等待实现
|
||||
* @param ignore 该参数将被无视。因为暂未实现“修改允许打断属性”功能。todo : await implement
|
||||
*/
|
||||
@Override
|
||||
public boolean cancel(boolean ignore__mayInterruptIfRunning) {
|
||||
public boolean cancel(boolean ignore) {
|
||||
try {
|
||||
if (onceWork.isFinish()) {
|
||||
return false;
|
||||
}
|
||||
onceWork.pleaseCancelAndAwaitFinish();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("", e);
|
||||
throw new RuntimeException("interrupted when await finish in : " + this, e);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -189,7 +197,7 @@ public interface OnceWork {
|
||||
*/
|
||||
@Override
|
||||
public Map<String, WorkerWrapper<?, ?>> get(long timeout,
|
||||
@SuppressWarnings("NullableProblems") TimeUnit unit)
|
||||
TimeUnit unit)
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
final long millis = Objects.requireNonNull(unit).toMillis(timeout);
|
||||
final long interval = Math.max(1, Math.min(millis, sleepCheckInterval.apply(millis)));
|
||||
@@ -206,7 +214,7 @@ public interface OnceWork {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "(asFuture from " + this + ")";
|
||||
return "(asFuture from " + onceWork + ")@" + Integer.toHexString(this.hashCode());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ public class WorkResult<V> {
|
||||
* 返回不可修改的DEFAULT单例。
|
||||
*/
|
||||
public static <V> WorkResult<V> defaultResult() {
|
||||
//noinspection unchecked
|
||||
return (WorkResult<V>) DEFAULT;
|
||||
}
|
||||
|
||||
|
||||
@@ -348,6 +348,7 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
|
||||
wrapper.getWrapperStrategy().setDependWrapperStrategyMapper(mapper);
|
||||
}
|
||||
if (selfIsMustSet != null && selfIsMustSet.size() > 0) {
|
||||
//noinspection deprecation
|
||||
selfIsMustSet.forEach(next -> Optional.ofNullable(next.getWrapperStrategy().getDependMustStrategyMapper())
|
||||
.ifPresent(mustMapper -> mustMapper.addDependMust(wrapper)));
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.jd.platform.async.wrapper;
|
||||
|
||||
import com.jd.platform.async.exception.CancelSkippedException;
|
||||
import com.jd.platform.async.exception.CancelException;
|
||||
import com.jd.platform.async.exception.EndsNormallyException;
|
||||
import com.jd.platform.async.worker.ResultState;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
import com.jd.platform.async.callback.DefaultCallback;
|
||||
@@ -113,7 +114,8 @@ public abstract class WorkerWrapper<T, V> {
|
||||
this.id = id;
|
||||
//允许不设置回调
|
||||
if (callback == null) {
|
||||
callback = new DefaultCallback<>();
|
||||
//noinspection unchecked
|
||||
callback = (ICallback<T, V>) DefaultCallback.getInstance();
|
||||
}
|
||||
this.callback = callback;
|
||||
this.allowInterrupt = allowInterrupt;
|
||||
@@ -237,7 +239,7 @@ public abstract class WorkerWrapper<T, V> {
|
||||
*/
|
||||
public void cancel() {
|
||||
if (State.setState(state, states_of_beforeWorkingEnd, SKIP, null)) {
|
||||
fastFail(false, new CancelSkippedException(), true);
|
||||
fastFail(false, new CancelException(), true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,7 +272,7 @@ public abstract class WorkerWrapper<T, V> {
|
||||
callback.result(success, param, _workResult);
|
||||
} catch (Exception e) {
|
||||
if (setState(state, states_of_skipOrAfterWork, ERROR, null)) {
|
||||
fastFail(false, e, _workResult.getEx() instanceof SkippedException);
|
||||
fastFail(false, e, _workResult.getEx() instanceof EndsNormallyException);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -281,8 +283,8 @@ public abstract class WorkerWrapper<T, V> {
|
||||
};
|
||||
final BiConsumer<Boolean, Exception> __function__fastFail_callbackResult$false_beginNext =
|
||||
(fastFail_isTimeout, fastFail_exception) -> {
|
||||
boolean isSkip = fastFail_exception instanceof SkippedException;
|
||||
fastFail(fastFail_isTimeout && !isSkip, fastFail_exception, isSkip);
|
||||
boolean isEndsNormally = fastFail_exception instanceof EndsNormallyException;
|
||||
fastFail(fastFail_isTimeout && !isEndsNormally, fastFail_exception, isEndsNormally);
|
||||
__function__callbackResultOfFalse_beginNext.run();
|
||||
};
|
||||
final Runnable __function__doWork =
|
||||
@@ -413,13 +415,14 @@ public abstract class WorkerWrapper<T, V> {
|
||||
* 快速失败。
|
||||
* 该方法不负责检查状态,请自行控制。
|
||||
*
|
||||
* @param isTimeout 是否是因为超时而快速失败
|
||||
* @param e 设置异常信息到{@link WorkResult#getEx()}
|
||||
* @param isTimeout 是否是因为超时而快速失败
|
||||
* @param e 设置异常信息到{@link WorkResult#getEx()}
|
||||
* @param isEndsNormally 是否是因正常情况正常而结束,例如跳过{@link SkippedException}、取消{@link CancelException}。
|
||||
*/
|
||||
protected void fastFail(boolean isTimeout, Exception e, boolean isSkip) {
|
||||
protected void fastFail(boolean isTimeout, Exception e, boolean isEndsNormally) {
|
||||
// 试图打断正在执行{@link IWorker#action(Object, Map)}的线程
|
||||
Thread _doWorkingThread;
|
||||
if ((_doWorkingThread = doWorkingThread.get()) != null
|
||||
if ((_doWorkingThread = this.doWorkingThread.get()) != null
|
||||
// 不会打断自己
|
||||
&& !Objects.equals(Thread.currentThread(), _doWorkingThread)) {
|
||||
_doWorkingThread.interrupt();
|
||||
@@ -427,7 +430,7 @@ public abstract class WorkerWrapper<T, V> {
|
||||
// 尚未处理过结果则设置
|
||||
workResult.compareAndSet(null, new WorkResult<>(
|
||||
worker.defaultValue(),
|
||||
isTimeout ? ResultState.TIMEOUT : (isSkip ? ResultState.DEFAULT : ResultState.EXCEPTION),
|
||||
isTimeout ? ResultState.TIMEOUT : (isEndsNormally ? ResultState.DEFAULT : ResultState.EXCEPTION),
|
||||
e
|
||||
));
|
||||
}
|
||||
@@ -520,8 +523,9 @@ public abstract class WorkerWrapper<T, V> {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(400)
|
||||
.append("WorkerWrapper{id=").append(id)
|
||||
final StringBuilder sb = new StringBuilder(256)
|
||||
.append(this.getClass().getSimpleName())
|
||||
.append("{id=").append(id)
|
||||
.append(", state=").append(of(state.get()))
|
||||
.append(", param=").append(param)
|
||||
.append(", workResult=").append(workResult)
|
||||
@@ -571,6 +575,7 @@ public abstract class WorkerWrapper<T, V> {
|
||||
this.dependOnUpWrapperStrategyMapper = dependOnUpWrapperStrategyMapper;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public DependMustStrategyMapper getDependMustStrategyMapper() {
|
||||
return dependMustStrategyMapper;
|
||||
|
||||
@@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit;
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/4-下午1:26
|
||||
*/
|
||||
@SuppressWarnings("unused")
|
||||
public interface WorkerWrapperBuilder<T, V> {
|
||||
/**
|
||||
* 设置唯一id。
|
||||
@@ -56,6 +57,7 @@ public interface WorkerWrapperBuilder<T, V> {
|
||||
*/
|
||||
SetDepend<T, V> setDepend();
|
||||
|
||||
@SuppressWarnings({"UnusedReturnValue"})
|
||||
interface SetDepend<T, V> {
|
||||
/**
|
||||
* 设置在本Wrapper之前的上游Wrapper。
|
||||
@@ -112,10 +114,8 @@ public interface WorkerWrapperBuilder<T, V> {
|
||||
* @param wrapper 需要设置特殊策略的Wrapper。
|
||||
* @param strategy 特殊策略。
|
||||
*/
|
||||
@SuppressWarnings("UnusedReturnValue")
|
||||
SetDepend<T, V> specialDependWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper<?, ?> wrapper);
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
default SetDepend<T, V> specialDependWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper... wrappers) {
|
||||
if (strategy == null || wrappers == null) {
|
||||
return this;
|
||||
@@ -158,7 +158,6 @@ public interface WorkerWrapperBuilder<T, V> {
|
||||
return setDepend().wrapper(wrappers).end();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
default WorkerWrapperBuilder<T, V> depends(Collection<WorkerWrapper> wrappers) {
|
||||
return setDepend().wrapper(wrappers).end();
|
||||
}
|
||||
@@ -167,7 +166,6 @@ public interface WorkerWrapperBuilder<T, V> {
|
||||
return setDepend().wrapper(wrappers).strategy(strategy).end();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
default WorkerWrapperBuilder<T, V> depends(DependenceStrategy strategy, Collection<WorkerWrapper> wrappers) {
|
||||
return setDepend().wrapper(wrappers).strategy(strategy).end();
|
||||
}
|
||||
@@ -208,7 +206,6 @@ public interface WorkerWrapperBuilder<T, V> {
|
||||
*/
|
||||
SetNext<T, V> mustToNextWrapper(WorkerWrapper<?, ?> wrapper);
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
default SetNext<T, V> requireToNextWrapper(WorkerWrapper<?, ?> wrapper, boolean must) {
|
||||
return must ? mustToNextWrapper(wrapper) : wrapper(wrapper);
|
||||
}
|
||||
@@ -234,7 +231,6 @@ public interface WorkerWrapperBuilder<T, V> {
|
||||
return setNext().wrapper(wrappers).end();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
default WorkerWrapperBuilder<T, V> nextOf(Collection<WorkerWrapper> wrappers) {
|
||||
return setNext().wrapper(wrappers).end();
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.jd.platform.async.wrapper;
|
||||
|
||||
import com.jd.platform.async.exception.CancelSkippedException;
|
||||
import com.jd.platform.async.executor.PollingCenter;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
@@ -8,6 +8,7 @@ import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy;
|
||||
import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* @author create by TcSnZh on 2021/5/17-下午6:23
|
||||
@@ -99,12 +100,33 @@ public interface WrapperStrategy extends DependenceStrategy, SkipStrategy {
|
||||
abstract class AbstractWrapperStrategy implements WrapperStrategy {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "WrapperStrategy{" +
|
||||
"dependWrapperStrategyMapper=" + getDependWrapperStrategyMapper() +
|
||||
", dependMustStrategyMapper=" + getDependMustStrategyMapper() +
|
||||
", dependenceStrategy=" + getDependenceStrategy() +
|
||||
", skipStrategy=" + getSkipStrategy() +
|
||||
'}';
|
||||
final StringBuilder sb = new StringBuilder(128)
|
||||
.append(this.getClass().getSimpleName()).append('{');
|
||||
final AtomicBoolean needAppendSplit = new AtomicBoolean();
|
||||
appendNotNullProperty(sb, "dependWrapperStrategyMapper=",
|
||||
getDependWrapperStrategyMapper(), needAppendSplit, ", ");
|
||||
appendNotNullProperty(sb, "dependMustStrategyMapper=",
|
||||
getDependMustStrategyMapper(), needAppendSplit, ", ");
|
||||
appendNotNullProperty(sb, "dependenceStrategy=",
|
||||
getDependenceStrategy(), needAppendSplit, ", ");
|
||||
appendNotNullProperty(sb, "skipStrategy=",
|
||||
getSkipStrategy(), needAppendSplit, ", ");
|
||||
return sb.append('}').toString();
|
||||
}
|
||||
|
||||
private static void appendNotNullProperty(StringBuilder sb,
|
||||
String propPrefix,
|
||||
Object prop,
|
||||
AtomicBoolean needAppendSplit,
|
||||
@SuppressWarnings("SameParameterValue") String split) {
|
||||
if (prop == null) {
|
||||
return;
|
||||
}
|
||||
if (needAppendSplit.get()) {
|
||||
sb.append(split);
|
||||
}
|
||||
sb.append(propPrefix).append(prop);
|
||||
needAppendSplit.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +139,7 @@ public interface WrapperStrategy extends DependenceStrategy, SkipStrategy {
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public DependMustStrategyMapper getDependMustStrategyMapper() {
|
||||
return null;
|
||||
|
||||
@@ -14,6 +14,7 @@ import java.util.stream.Collectors;
|
||||
*
|
||||
* @author create by TcSnZh on 2021/5/4-下午1:24
|
||||
*/
|
||||
@SuppressWarnings("UnusedReturnValue")
|
||||
public class DependMustStrategyMapper implements DependenceStrategy {
|
||||
|
||||
private final Set<WorkerWrapper<?, ?>> mustDependSet = new LinkedHashSet<>();
|
||||
|
||||
@@ -6,6 +6,7 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* 对不同的{@link WorkerWrapper}调用者实行个性化依赖响应策略。
|
||||
@@ -27,7 +28,6 @@ public class DependOnUpWrapperStrategyMapper implements DependenceStrategy {
|
||||
@SuppressWarnings("UnusedReturnValue")
|
||||
public DependOnUpWrapperStrategyMapper putMapping(WorkerWrapper<?, ?> targetWrapper, DependOnUpWrapperStrategy strategy) {
|
||||
mapper.put(targetWrapper, strategy);
|
||||
toStringCache = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ public class DependOnUpWrapperStrategyMapper implements DependenceStrategy {
|
||||
* @return 如果在mapper中有对fromWrapper的处理策略,则使用其进行判断。否则返回JUDGE_BY_AFTER交给下一个进行判断。
|
||||
*/
|
||||
@Override
|
||||
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
|
||||
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
|
||||
WorkerWrapper<?, ?> thisWrapper,
|
||||
WorkerWrapper<?, ?> fromWrapper) {
|
||||
DependOnUpWrapperStrategy strategy = mapper.get(fromWrapper);
|
||||
@@ -52,19 +52,18 @@ public class DependOnUpWrapperStrategyMapper implements DependenceStrategy {
|
||||
return strategy.judge(fromWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 缓存toString
|
||||
*/
|
||||
private String toStringCache;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (toStringCache == null) {
|
||||
toStringCache = "DependWrapperStrategyMapper{mapper=" + mapper.entrySet().stream()
|
||||
.map(entry -> "{" + entry.getKey().getId() + ":" + entry.getValue() + "}")
|
||||
.collect(Collectors.toList())
|
||||
+ "}";
|
||||
final StringBuilder sb = new StringBuilder(64)
|
||||
.append(this.getClass().getSimpleName()).append("{mapper=");
|
||||
final Set<Map.Entry<WorkerWrapper<?, ?>, DependOnUpWrapperStrategy>> entrySet = mapper.entrySet();
|
||||
entrySet.forEach(entry -> {
|
||||
sb.append(entry.getKey().getId()).append(':').append(entry.getValue()).append(", ");
|
||||
});
|
||||
if (entrySet.size() > 0) {
|
||||
final int length = sb.length();
|
||||
sb.delete(length - 2, length);
|
||||
}
|
||||
return toStringCache;
|
||||
return sb.append('}').toString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ class Case9 {
|
||||
System.out.println("I am IWorker 1");
|
||||
return null;
|
||||
},
|
||||
new DefaultCallback<>(),
|
||||
DefaultCallback.getInstance(),
|
||||
false,
|
||||
true,
|
||||
100,
|
||||
@@ -38,7 +38,7 @@ class Case9 {
|
||||
System.out.println("I am IWorker 2");
|
||||
return null;
|
||||
},
|
||||
new DefaultCallback<>(),
|
||||
DefaultCallback.getInstance(),
|
||||
false,
|
||||
true,
|
||||
100,
|
||||
|
||||
Reference in New Issue
Block a user