1.修复了OnceWork.AsFuture.toString()方法循环调用自身问题。

2.更新了文档注释、为一些废弃的类、字段、方法添加了@Deprecated注解
3.将正常结束(取消、跳过)的异常抽取为抽象类,并修改了之前异常包的不合理的继承逻辑。
4.优化了其他的一些小细节。例如:策略类的toString()方法、默认回调修改为单例模式。
This commit is contained in:
TcSnZh
2021-06-06 01:33:22 +08:00
parent 5ce6488dbe
commit 105aebb7e0
20 changed files with 239 additions and 145 deletions

View File

@@ -1,6 +1,5 @@
package com.jd.platform.async.callback;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.worker.WorkResult;
/**
@@ -9,9 +8,20 @@ import com.jd.platform.async.worker.WorkResult;
* @author wuweifeng wrote on 2019-11-19.
*/
public class DefaultCallback<T, V> implements ICallback<T, V> {
private static final DefaultCallback<Object, Object> instance = new DefaultCallback<Object, Object>() {
@Override
public String toString() {
return "(DefaultCallback instance)";
}
};
public static DefaultCallback<Object, Object> getInstance() {
return instance;
}
@Override
public void begin() {
// do nothing
}
/**

View File

@@ -7,15 +7,18 @@ import java.util.List;
/**
* @author wuweifeng wrote on 2019-12-27
* @version 1.0
* @deprecated deprecate at version 1.5.1 , see {@link IGroupCallback} .
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
public class DefaultGroupCallback implements IGroupCallback {
@Override
public void success(List<WorkerWrapper> workerWrappers) {
// do nothing
}
@Override
public void failure(List<WorkerWrapper> workerWrappers, Exception e) {
// do nothing
}
}

View File

@@ -1,6 +1,7 @@
package com.jd.platform.async.callback;
import com.jd.platform.async.exception.EndsNormallyException;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.worker.WorkResult;
@@ -27,13 +28,16 @@ public interface ICallback<T, V> {
void result(boolean success, T param, WorkResult<V> workResult);
/**
* 提供常量选项:打印异常信息,跳过时的异常{@link SkippedException}不会打印。
* 提供常量选项:
* <p/>
* 如果发生了异常,则打印异常信息。
* 正常结束(例如取消、跳过)的异常{@link com.jd.platform.async.exception.EndsNormallyException}不会打印。
*/
ICallback PRINT_EXCEPTION_STACK_TRACE = new ICallback<Object, Object>() {
@Override
public void result(boolean success, Object param, WorkResult<Object> workResult) {
Exception ex = workResult.getEx();
if (ex != null && !(ex instanceof SkippedException)) {
if (ex != null && !(ex instanceof EndsNormallyException)) {
ex.printStackTrace();
}
}

View File

@@ -1,18 +1,35 @@
package com.jd.platform.async.callback;
import com.jd.platform.async.worker.OnceWork;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* 如果是异步执行整组的话,可以用这个组回调。不推荐使用
* 如果是异步执行整组的话,可以用这个组回调。<b>已经废弃</b>
*
* @author wuweifeng wrote on 2019-11-19.
* @deprecated deprecate at version 1.5.1
* <p>
* please use {@link com.jd.platform.async.executor.Async#work(long, ExecutorService, Collection, String)}.
* <p>
* 该方法返回的{@link OnceWork}句柄,默认不会同步等待结束,
* 这便替代了原先的
* {@link com.jd.platform.async.executor.Async#beginWorkAsync(long, ExecutorService, IGroupCallback, WorkerWrapper[])}
* <p>
* 需要同步等待的话调用{@link OnceWork#awaitFinish()}即可。
* </p>
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
public interface IGroupCallback {
/**
* 成功后可以从wrapper里去getWorkResult
*/
void success(List<WorkerWrapper> workerWrappers);
/**
* 失败了也可以从wrapper里去getWorkResult
*/

View File

@@ -2,11 +2,18 @@ package com.jd.platform.async.callback;
/**
* @author wuweifeng wrote on 2019-12-20
* @author tcsnzh
* 远古时期的代码,估计也没人会使用。但我也不确定,因此标注废弃。
* <p/>
* 难受的一比,为了屎山的兼容性要在代码里保留这么多屎盆。
* @version 1.0
* @deprecated deprecated by version 1.5.1--SNAPSHOT
*/
@Deprecated
public interface ITimeoutWorker<T, V> extends IWorker<T, V> {
/**
* 每个worker都可以设置超时时间
*
* @return 毫秒超时时间
*/
long timeOut();
@@ -14,6 +21,7 @@ public interface ITimeoutWorker<T, V> extends IWorker<T, V> {
/**
* 是否开启单个执行单元的超时功能有时是一个group设置个超时而不具备关心单个worker的超时
* <p>注意,如果开启了单个执行单元的超时检测,将使线程池数量多出一倍</p>
*
* @return 是否开启
*/
boolean enableTimeOut();

View File

@@ -0,0 +1,15 @@
package com.jd.platform.async.exception;
/**
* 整组取消,设置该异常。
*
* @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午6:12
*/
public class CancelException extends EndsNormallyException {
public CancelException() {
}
public CancelException(String message) {
super(message);
}
}

View File

@@ -1,19 +0,0 @@
package com.jd.platform.async.exception;
/**
* 整组取消,设置该异常。
*
* @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午6:12
*/
public class CancelSkippedException extends SkippedException {
public CancelSkippedException() {
}
public CancelSkippedException(String message) {
super(message);
}
public CancelSkippedException(String message, long skipAt) {
super(message, skipAt);
}
}

View File

@@ -0,0 +1,27 @@
package com.jd.platform.async.exception;
/**
* 该异常表示此任务单元是正常结束的。
*
* @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/6/5-下午11:57
*/
public abstract class EndsNormallyException extends RuntimeException {
public EndsNormallyException() {
}
public EndsNormallyException(String message) {
super(message);
}
public EndsNormallyException(String message, Throwable cause) {
super(message, cause);
}
public EndsNormallyException(Throwable cause) {
super(cause);
}
public EndsNormallyException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@@ -1,30 +1,17 @@
package com.jd.platform.async.exception;
import com.jd.platform.async.executor.timer.SystemClock;
/**
* 如果任务在执行之前自己后面的任务已经执行完或正在被执行则抛该exception
*
* @author wuweifeng wrote on 2020-02-18
* @version 1.0
*/
public class SkippedException extends RuntimeException {
private final long skipAt;
public class SkippedException extends EndsNormallyException {
public SkippedException() {
this(null);
}
public SkippedException(String message) {
this(message, SystemClock.now());
}
public SkippedException(String message, long skipAt) {
super(message);
this.skipAt = skipAt;
}
public long getSkipAt() {
return skipAt;
}
}

View File

@@ -65,14 +65,13 @@ public class Async {
}
/**
* 核心方法。
* 该方法不是同步阻塞执行的。如果想要同步阻塞执行,则调用返回值的{@link OnceWork#awaitFinish()}即可。
* <b>核心方法。该方法不是同步阻塞执行的。</b>如果想要同步阻塞执行,则调用返回值的{@link OnceWork#awaitFinish()}即可。
*
* @param timeout 全组超时时间
* @param executorService 执行线程池
* @param workerWrappers 任务容器集合
* @param workId 本次工作id
* @return 返回 {@link OnceWork}封装对象。
* @return 返回 {@link OnceWork}任务句柄对象。
*/
public static OnceWork work(long timeout,
ExecutorService executorService,
@@ -95,59 +94,6 @@ public class Async {
return onceWork;
}
/**
* @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
*/
@SuppressWarnings("unused")
@Deprecated
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper);
}
/**
* 异步执行,直到所有都完成,或失败后,发起回调
*
* @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
*/
@Deprecated
public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
if (groupCallback == null) {
groupCallback = new DefaultGroupCallback();
}
IGroupCallback finalGroupCallback = groupCallback;
if (executorService != null) {
executorService.submit(() -> {
try {
boolean success = beginWork(timeout, executorService, workerWrapper);
if (success) {
finalGroupCallback.success(Arrays.asList(workerWrapper));
} else {
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
}
});
} else {
final ExecutorService commonPool = getCommonPool();
commonPool.submit(() -> {
try {
boolean success = beginWork(timeout, commonPool, workerWrapper);
if (success) {
finalGroupCallback.success(Arrays.asList(workerWrapper));
} else {
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
}
});
}
}
// ========================= 设置/属性选项 =========================
/**
@@ -156,7 +102,11 @@ public class Async {
* 在v1.4及之前该COMMON_POLL是被写死的。
* <p>
* 自v1.5后:
* 该线程池会被懒加载。
* 该线程池将会在第一次调用{@link #getCommonPool()}时懒加载。
* tip:
* 要注意,{@link #work(long, WorkerWrapper[])}、{@link #work(long, Collection)}这些方法,
* 不传入线程池就会默认调用{@link #getCommonPool()},就会初始化线程池。
* <p>
* 该线程池将会给线程取名为asyncTool-commonPool-thread-0数字不重复
* </p>
*/
@@ -167,7 +117,11 @@ public class Async {
* 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时ExecutorService将会被记录下来。
* <p/>
* 注意这里是个static也就是只能有一个线程池。用户自定义线程池时也只能定义一个
*
* @deprecated 不明意义、毫无用处的字段。记录之前使用的线程池没啥意义。
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
private static final AtomicReference<ExecutorService> lastExecutorService = new AtomicReference<>(null);
/**
@@ -186,7 +140,6 @@ public class Async {
new ThreadFactory() {
private final AtomicLong threadCount = new AtomicLong(0);
@SuppressWarnings("NullableProblems")
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r,
@@ -212,6 +165,10 @@ public class Async {
return COMMON_POOL;
}
/**
* @deprecated 不明意义的输出信息的方法
*/
@Deprecated
public static String getThreadCount() {
return "activeCount=" + COMMON_POOL.getActiveCount() +
",completedCount=" + COMMON_POOL.getCompletedTaskCount() +
@@ -282,6 +239,58 @@ public class Async {
return beginWork(timeout, getCommonPool(), workerWrapper);
}
/**
* @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
*/
@Deprecated
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper);
}
/**
* 异步执行,直到所有都完成,或失败后,发起回调
*
* @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
*/
@Deprecated
public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
if (groupCallback == null) {
groupCallback = new DefaultGroupCallback();
}
IGroupCallback finalGroupCallback = groupCallback;
if (executorService != null) {
executorService.submit(() -> {
try {
boolean success = beginWork(timeout, executorService, workerWrapper);
if (success) {
finalGroupCallback.success(Arrays.asList(workerWrapper));
} else {
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
}
});
} else {
final ExecutorService commonPool = getCommonPool();
commonPool.submit(() -> {
try {
boolean success = beginWork(timeout, commonPool, workerWrapper);
if (success) {
finalGroupCallback.success(Arrays.asList(workerWrapper));
} else {
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
}
});
}
}
/**
* 关闭上次使用的线程池
*

View File

@@ -72,6 +72,9 @@ public interface OnceWork {
*/
default Map<String, WorkerWrapper<?, ?>> getWrappersOfState(ResultState... ofState) {
final HashSet<ResultState> states = new HashSet<>(Arrays.asList(ofState));
if (states.isEmpty()) {
return new HashMap<>(1);
}
return getWrappers().entrySet().stream()
.filter(entry -> states.contains(entry.getValue().getWorkResult().getResultState()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
@@ -123,6 +126,8 @@ public interface OnceWork {
}
/**
* 返回{@link Future}视图。
*
* @param sleepCheckInterval 为防止线程爆炸,在{@link Future#get(long, TimeUnit)}方法时使用隔一段时间检查一次。
* 该Function的参数为总超时毫秒值返回值为检查时间间隔。
* @return 返回 {@link AsFuture}封装对象。
@@ -152,16 +157,19 @@ public interface OnceWork {
}
/**
* 同步等待取消
* 同步等待取消
*
* @param ignore__mayInterruptIfRunning 该参数将被无视。因为暂未实现“修改允许打断属性”功能。 // todo 等待实现
* @param ignore 该参数将被无视。因为暂未实现“修改允许打断属性”功能。todo : await implement
*/
@Override
public boolean cancel(boolean ignore__mayInterruptIfRunning) {
public boolean cancel(boolean ignore) {
try {
if (onceWork.isFinish()) {
return false;
}
onceWork.pleaseCancelAndAwaitFinish();
} catch (InterruptedException e) {
throw new RuntimeException("", e);
throw new RuntimeException("interrupted when await finish in : " + this, e);
}
return true;
}
@@ -189,7 +197,7 @@ public interface OnceWork {
*/
@Override
public Map<String, WorkerWrapper<?, ?>> get(long timeout,
@SuppressWarnings("NullableProblems") TimeUnit unit)
TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
final long millis = Objects.requireNonNull(unit).toMillis(timeout);
final long interval = Math.max(1, Math.min(millis, sleepCheckInterval.apply(millis)));
@@ -206,7 +214,7 @@ public interface OnceWork {
@Override
public String toString() {
return "(asFuture from " + this + ")";
return "(asFuture from " + onceWork + ")@" + Integer.toHexString(this.hashCode());
}
}

View File

@@ -28,6 +28,7 @@ public class WorkResult<V> {
* 返回不可修改的DEFAULT单例。
*/
public static <V> WorkResult<V> defaultResult() {
//noinspection unchecked
return (WorkResult<V>) DEFAULT;
}

View File

@@ -348,6 +348,7 @@ class StableWorkerWrapperBuilder<T, V, BUILDER_SUB_CLASS extends StableWorkerWra
wrapper.getWrapperStrategy().setDependWrapperStrategyMapper(mapper);
}
if (selfIsMustSet != null && selfIsMustSet.size() > 0) {
//noinspection deprecation
selfIsMustSet.forEach(next -> Optional.ofNullable(next.getWrapperStrategy().getDependMustStrategyMapper())
.ifPresent(mustMapper -> mustMapper.addDependMust(wrapper)));
}

View File

@@ -1,6 +1,7 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.exception.CancelSkippedException;
import com.jd.platform.async.exception.CancelException;
import com.jd.platform.async.exception.EndsNormallyException;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.callback.DefaultCallback;
@@ -113,7 +114,8 @@ public abstract class WorkerWrapper<T, V> {
this.id = id;
//允许不设置回调
if (callback == null) {
callback = new DefaultCallback<>();
//noinspection unchecked
callback = (ICallback<T, V>) DefaultCallback.getInstance();
}
this.callback = callback;
this.allowInterrupt = allowInterrupt;
@@ -237,7 +239,7 @@ public abstract class WorkerWrapper<T, V> {
*/
public void cancel() {
if (State.setState(state, states_of_beforeWorkingEnd, SKIP, null)) {
fastFail(false, new CancelSkippedException(), true);
fastFail(false, new CancelException(), true);
}
}
@@ -270,7 +272,7 @@ public abstract class WorkerWrapper<T, V> {
callback.result(success, param, _workResult);
} catch (Exception e) {
if (setState(state, states_of_skipOrAfterWork, ERROR, null)) {
fastFail(false, e, _workResult.getEx() instanceof SkippedException);
fastFail(false, e, _workResult.getEx() instanceof EndsNormallyException);
}
}
};
@@ -281,8 +283,8 @@ public abstract class WorkerWrapper<T, V> {
};
final BiConsumer<Boolean, Exception> __function__fastFail_callbackResult$false_beginNext =
(fastFail_isTimeout, fastFail_exception) -> {
boolean isSkip = fastFail_exception instanceof SkippedException;
fastFail(fastFail_isTimeout && !isSkip, fastFail_exception, isSkip);
boolean isEndsNormally = fastFail_exception instanceof EndsNormallyException;
fastFail(fastFail_isTimeout && !isEndsNormally, fastFail_exception, isEndsNormally);
__function__callbackResultOfFalse_beginNext.run();
};
final Runnable __function__doWork =
@@ -413,13 +415,14 @@ public abstract class WorkerWrapper<T, V> {
* 快速失败。
* 该方法不负责检查状态,请自行控制。
*
* @param isTimeout 是否是因为超时而快速失败
* @param e 设置异常信息到{@link WorkResult#getEx()}
* @param isTimeout 是否是因为超时而快速失败
* @param e 设置异常信息到{@link WorkResult#getEx()}
* @param isEndsNormally 是否是因正常情况正常而结束,例如跳过{@link SkippedException}、取消{@link CancelException}。
*/
protected void fastFail(boolean isTimeout, Exception e, boolean isSkip) {
protected void fastFail(boolean isTimeout, Exception e, boolean isEndsNormally) {
// 试图打断正在执行{@link IWorker#action(Object, Map)}的线程
Thread _doWorkingThread;
if ((_doWorkingThread = doWorkingThread.get()) != null
if ((_doWorkingThread = this.doWorkingThread.get()) != null
// 不会打断自己
&& !Objects.equals(Thread.currentThread(), _doWorkingThread)) {
_doWorkingThread.interrupt();
@@ -427,7 +430,7 @@ public abstract class WorkerWrapper<T, V> {
// 尚未处理过结果则设置
workResult.compareAndSet(null, new WorkResult<>(
worker.defaultValue(),
isTimeout ? ResultState.TIMEOUT : (isSkip ? ResultState.DEFAULT : ResultState.EXCEPTION),
isTimeout ? ResultState.TIMEOUT : (isEndsNormally ? ResultState.DEFAULT : ResultState.EXCEPTION),
e
));
}
@@ -520,8 +523,9 @@ public abstract class WorkerWrapper<T, V> {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(400)
.append("WorkerWrapper{id=").append(id)
final StringBuilder sb = new StringBuilder(256)
.append(this.getClass().getSimpleName())
.append("{id=").append(id)
.append(", state=").append(of(state.get()))
.append(", param=").append(param)
.append(", workResult=").append(workResult)
@@ -571,6 +575,7 @@ public abstract class WorkerWrapper<T, V> {
this.dependOnUpWrapperStrategyMapper = dependOnUpWrapperStrategyMapper;
}
@SuppressWarnings("deprecation")
@Override
public DependMustStrategyMapper getDependMustStrategyMapper() {
return dependMustStrategyMapper;

View File

@@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit;
*
* @author create by TcSnZh on 2021/5/4-下午1:26
*/
@SuppressWarnings("unused")
public interface WorkerWrapperBuilder<T, V> {
/**
* 设置唯一id。
@@ -56,6 +57,7 @@ public interface WorkerWrapperBuilder<T, V> {
*/
SetDepend<T, V> setDepend();
@SuppressWarnings({"UnusedReturnValue"})
interface SetDepend<T, V> {
/**
* 设置在本Wrapper之前的上游Wrapper。
@@ -112,10 +114,8 @@ public interface WorkerWrapperBuilder<T, V> {
* @param wrapper 需要设置特殊策略的Wrapper。
* @param strategy 特殊策略。
*/
@SuppressWarnings("UnusedReturnValue")
SetDepend<T, V> specialDependWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper<?, ?> wrapper);
@SuppressWarnings("unused")
default SetDepend<T, V> specialDependWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper... wrappers) {
if (strategy == null || wrappers == null) {
return this;
@@ -158,7 +158,6 @@ public interface WorkerWrapperBuilder<T, V> {
return setDepend().wrapper(wrappers).end();
}
@SuppressWarnings("unused")
default WorkerWrapperBuilder<T, V> depends(Collection<WorkerWrapper> wrappers) {
return setDepend().wrapper(wrappers).end();
}
@@ -167,7 +166,6 @@ public interface WorkerWrapperBuilder<T, V> {
return setDepend().wrapper(wrappers).strategy(strategy).end();
}
@SuppressWarnings("unused")
default WorkerWrapperBuilder<T, V> depends(DependenceStrategy strategy, Collection<WorkerWrapper> wrappers) {
return setDepend().wrapper(wrappers).strategy(strategy).end();
}
@@ -208,7 +206,6 @@ public interface WorkerWrapperBuilder<T, V> {
*/
SetNext<T, V> mustToNextWrapper(WorkerWrapper<?, ?> wrapper);
@SuppressWarnings("unused")
default SetNext<T, V> requireToNextWrapper(WorkerWrapper<?, ?> wrapper, boolean must) {
return must ? mustToNextWrapper(wrapper) : wrapper(wrapper);
}
@@ -234,7 +231,6 @@ public interface WorkerWrapperBuilder<T, V> {
return setNext().wrapper(wrappers).end();
}
@SuppressWarnings("unused")
default WorkerWrapperBuilder<T, V> nextOf(Collection<WorkerWrapper> wrappers) {
return setNext().wrapper(wrappers).end();
}

View File

@@ -1,6 +1,5 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.exception.CancelSkippedException;
import com.jd.platform.async.executor.PollingCenter;
import java.util.Collection;

View File

@@ -8,6 +8,7 @@ import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy;
import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author create by TcSnZh on 2021/5/17-下午6:23
@@ -99,12 +100,33 @@ public interface WrapperStrategy extends DependenceStrategy, SkipStrategy {
abstract class AbstractWrapperStrategy implements WrapperStrategy {
@Override
public String toString() {
return "WrapperStrategy{" +
"dependWrapperStrategyMapper=" + getDependWrapperStrategyMapper() +
", dependMustStrategyMapper=" + getDependMustStrategyMapper() +
", dependenceStrategy=" + getDependenceStrategy() +
", skipStrategy=" + getSkipStrategy() +
'}';
final StringBuilder sb = new StringBuilder(128)
.append(this.getClass().getSimpleName()).append('{');
final AtomicBoolean needAppendSplit = new AtomicBoolean();
appendNotNullProperty(sb, "dependWrapperStrategyMapper=",
getDependWrapperStrategyMapper(), needAppendSplit, ", ");
appendNotNullProperty(sb, "dependMustStrategyMapper=",
getDependMustStrategyMapper(), needAppendSplit, ", ");
appendNotNullProperty(sb, "dependenceStrategy=",
getDependenceStrategy(), needAppendSplit, ", ");
appendNotNullProperty(sb, "skipStrategy=",
getSkipStrategy(), needAppendSplit, ", ");
return sb.append('}').toString();
}
private static void appendNotNullProperty(StringBuilder sb,
String propPrefix,
Object prop,
AtomicBoolean needAppendSplit,
@SuppressWarnings("SameParameterValue") String split) {
if (prop == null) {
return;
}
if (needAppendSplit.get()) {
sb.append(split);
}
sb.append(propPrefix).append(prop);
needAppendSplit.set(true);
}
}
@@ -117,6 +139,7 @@ public interface WrapperStrategy extends DependenceStrategy, SkipStrategy {
return null;
}
@SuppressWarnings("deprecation")
@Override
public DependMustStrategyMapper getDependMustStrategyMapper() {
return null;

View File

@@ -14,6 +14,7 @@ import java.util.stream.Collectors;
*
* @author create by TcSnZh on 2021/5/4-下午1:24
*/
@SuppressWarnings("UnusedReturnValue")
public class DependMustStrategyMapper implements DependenceStrategy {
private final Set<WorkerWrapper<?, ?>> mustDependSet = new LinkedHashSet<>();

View File

@@ -6,6 +6,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 对不同的{@link WorkerWrapper}调用者实行个性化依赖响应策略。
@@ -27,7 +28,6 @@ public class DependOnUpWrapperStrategyMapper implements DependenceStrategy {
@SuppressWarnings("UnusedReturnValue")
public DependOnUpWrapperStrategyMapper putMapping(WorkerWrapper<?, ?> targetWrapper, DependOnUpWrapperStrategy strategy) {
mapper.put(targetWrapper, strategy);
toStringCache = null;
return this;
}
@@ -42,7 +42,7 @@ public class DependOnUpWrapperStrategyMapper implements DependenceStrategy {
* @return 如果在mapper中有对fromWrapper的处理策略则使用其进行判断。否则返回JUDGE_BY_AFTER交给下一个进行判断。
*/
@Override
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?,?>> dependWrappers,
public DependenceAction.WithProperty judgeAction(Set<WorkerWrapper<?, ?>> dependWrappers,
WorkerWrapper<?, ?> thisWrapper,
WorkerWrapper<?, ?> fromWrapper) {
DependOnUpWrapperStrategy strategy = mapper.get(fromWrapper);
@@ -52,19 +52,18 @@ public class DependOnUpWrapperStrategyMapper implements DependenceStrategy {
return strategy.judge(fromWrapper);
}
/**
* 缓存toString
*/
private String toStringCache;
@Override
public String toString() {
if (toStringCache == null) {
toStringCache = "DependWrapperStrategyMapper{mapper=" + mapper.entrySet().stream()
.map(entry -> "{" + entry.getKey().getId() + ":" + entry.getValue() + "}")
.collect(Collectors.toList())
+ "}";
final StringBuilder sb = new StringBuilder(64)
.append(this.getClass().getSimpleName()).append("{mapper=");
final Set<Map.Entry<WorkerWrapper<?, ?>, DependOnUpWrapperStrategy>> entrySet = mapper.entrySet();
entrySet.forEach(entry -> {
sb.append(entry.getKey().getId()).append(':').append(entry.getValue()).append(", ");
});
if (entrySet.size() > 0) {
final int length = sb.length();
sb.delete(length - 2, length);
}
return toStringCache;
return sb.append('}').toString();
}
}

View File

@@ -24,7 +24,7 @@ class Case9 {
System.out.println("I am IWorker 1");
return null;
},
new DefaultCallback<>(),
DefaultCallback.getInstance(),
false,
true,
100,
@@ -38,7 +38,7 @@ class Case9 {
System.out.println("I am IWorker 2");
return null;
},
new DefaultCallback<>(),
DefaultCallback.getInstance(),
false,
true,
100,