!15 修复缺陷I4XC79

Merge pull request !15 from FIX!_I4XC79
This commit is contained in:
tianyaleixiaowu
2022-03-30 01:45:41 +00:00
committed by Gitee
8 changed files with 190 additions and 8 deletions

View File

@@ -1,4 +1,4 @@
如果只是需要用这个框架请往下看即可。如果需要深入了解这个框架是如何一步一步实现的从接到需求到每一步的思考每个类为什么这么设计为什么有这些方法也就是如何从0到1开发出这个框架作者在[csdn开了专栏](https://blog.csdn.net/tianyaleixiaowu/category_9637010.html)专门讲中间件如何从0开发包括并不限于这个小框架。京东内部同事可在cf上搜索erp也能看到。
如果只是需要用这个框架请往下看即可。如果需要深入了解这个框架是如何一步一步实现的从接到需求到每一步的思考每个类为什么这么设计为什么有这些方法也就是如何从0到1开发出这个框架作者在[csdn开了专栏](https://blog.csdn.net/tianyaleixiaowu/category_9637010.html) 专门讲中间件如何从0开发包括并不限于这个小框架。京东内部同事可在cf上搜索erp也能看到。
# 安装教程

View File

@@ -362,6 +362,9 @@ public abstract class WorkerWrapper<T, V> {
wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper);
switch (judge.getDependenceAction()) {
case TAKE_REST:
//FIXME 等待200毫秒重新投入线程池主要为了调起最后一个任务
Thread.sleep(200L);
executorService.submit(() -> this.work(executorService, fromWrapper, remainTime-(SystemClock.now()-now), group));
return;
case FAST_FAIL:
if (setState(state, STARTED, ERROR)) {

View File

@@ -1,6 +1,5 @@
package com.jd.platform.async.wrapper.strategy.depend;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
@@ -80,6 +79,14 @@ public interface DependenceStrategy {
* 被依赖的所有Wrapper都必须成功才能开始工作。
* 如果其中任一Wrapper还没有执行且不存在失败则休息。
* 如果其中任一Wrapper失败则立即失败。
*
* FIXME
* 这里有个问题,
* 假设任务A依赖B、C
*
* B执行时间比较长A-B的线程和A-C的线程都检测到B的res==nullDEFAULT
* 那么线程A就真的去休眠TAKE_REST而没有发起
* 导致整个任务长时间无法结束
*/
DependenceStrategy ALL_DEPENDENCIES_ALL_SUCCESS = new DependenceStrategy() {
@Override

View File

@@ -0,0 +1,54 @@
package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker5 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object, Map<String, WorkerWrapper<?, ?>> allWrappers) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 3";
}
@Override
public String defaultValue() {
return "worker3--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" + Thread.currentThread().getName());
} else {
System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" + Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,55 @@
package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker6 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object, Map<String, WorkerWrapper<?, ?>> allWrappers) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 3";
}
@Override
public String defaultValue() {
return "worker3--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" + Thread.currentThread().getName());
} else {
System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" + Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,55 @@
package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker7 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object, Map<String, WorkerWrapper<?, ?>> allWrappers) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 3";
}
@Override
public String defaultValue() {
return "worker3--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" + Thread.currentThread().getName());
} else {
System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" + Thread.currentThread().getName());
}
}
}

View File

@@ -1,32 +1,37 @@
package v15.cases;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperBuilder;
import java.util.concurrent.ExecutionException;
/**
* 示例:简单示例--复杂点的
*
* @author create by TcSnZh on 2021/5/8-下午10:29
*/
class Case1 {
private static WorkerWrapperBuilder<?, ?> builder(String id) {
return WorkerWrapper.<String, String>builder()
.id(id)
.worker((param, allWrappers) -> {
System.out.println("wrapper(id=" + id + ") is working");
try {
Thread.sleep(50);
if ("F".equals(id)) {
System.out.println("wrapper(id=" + id + ") is working");
Thread.sleep(12000);
} else {
System.out.println("wrapper(id=" + id + ") is worki444ng");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
return id;
});
}
public static void main(String[] args) {
long now = SystemClock.now();
WorkerWrapper<?, ?> a = builder("A").build();
WorkerWrapper<?, ?> d;
builder("H")
@@ -43,10 +48,11 @@ class Case1 {
)
.build();
try {
Async.work(1000, a, d).awaitFinish();
Async.work(10000, a, d).awaitFinish();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("now:" + (SystemClock.now() - now));
/* 输出:
wrapper(id=D) is working
wrapper(id=A) is working
@@ -58,5 +64,6 @@ class Case1 {
wrapper(id=H) is working
*/
}
}

View File

@@ -315,6 +315,7 @@ public class HashedWheelTimer extends AbstractWheelTimer {
startTimeInitialized.countDown();
do {
//TODO 时间轮这里结束不了任务
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);