mirror of
https://gitee.com/jd-platform-opensource/asyncTool.git
synced 2026-03-22 04:27:15 +08:00
v1.5 优化并确定了使用单线程轮询的算法策略
This commit is contained in:
@@ -231,18 +231,20 @@ public class WrapperEndingInspector implements Comparable<WrapperEndingInspector
|
||||
* 请求轮询。
|
||||
*/
|
||||
private void tryPolling() {
|
||||
if (inspectionSet.size() < POLLING_POOL.getActiveCount()) {
|
||||
if (inspectionSet.size() < SINGLETON_POLLING_POOL.getActiveCount()) {
|
||||
// 线程数 > inspector数,理论上已经各个线程都在忙活了,不去新开线程。
|
||||
return;
|
||||
}
|
||||
POLLING_POOL.submit(() -> {
|
||||
if (!inspectionSet.isEmpty()) {
|
||||
SINGLETON_POLLING_POOL.submit(() -> {
|
||||
int expectCount;
|
||||
while (!inspectionSet.isEmpty()) {
|
||||
// expectCount是本线程用来记录本次循环开始时inspectionSet的个数。
|
||||
// 每当移出一个inspector时,该值-1。
|
||||
expectCount = inspectionSet.size();
|
||||
// 开始检查
|
||||
for (WrapperEndingInspector inspector : inspectionSet) {
|
||||
// 这个inspector的写锁被占用,说明其他的轮询线程正在扫描这个inspector
|
||||
// 那就让其他的轮询线程自己忙活去,咱们找下一个。
|
||||
if (!inspector.writePollingLock.writeLock().tryLock()) {
|
||||
continue;
|
||||
}
|
||||
// 直接抢锁,轮询期间禁止修改inspector
|
||||
inspector.writePollingLock.writeLock().lock();
|
||||
try {
|
||||
if (PollingCenter.this.inspectorIsEnd(inspector)) {
|
||||
// inspector中的wrapper调用结束了
|
||||
@@ -251,6 +253,7 @@ public class WrapperEndingInspector implements Comparable<WrapperEndingInspector
|
||||
synchronized (inspector.endCDL) {
|
||||
if (inspector.endCDL.getCount() > 0) {
|
||||
inspectionSet.remove(inspector);
|
||||
expectCount--;
|
||||
inspector.endCDL.countDown();
|
||||
}
|
||||
}
|
||||
@@ -260,6 +263,15 @@ public class WrapperEndingInspector implements Comparable<WrapperEndingInspector
|
||||
inspector.writePollingLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
/*
|
||||
* 根据 expectCount == inspectionSet.size() 的值,由于本线程1个线程在轮询:
|
||||
* 1. 若值为true,表示轮询过程中没有新的inspector被添加进set中。此时就可以break了。
|
||||
* . 之所以可以break,是因为这个inspection还没有调用结束,在其结束前还会来催促轮询的。
|
||||
* 2. 若值为false,表示有新的inspector在本线程轮询时,被加入到了set中,且没有被我们迭代到。此时还要重新轮询一次。
|
||||
*/
|
||||
if (expectCount == inspectionSet.size()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -295,12 +307,16 @@ public class WrapperEndingInspector implements Comparable<WrapperEndingInspector
|
||||
return instance;
|
||||
}
|
||||
|
||||
private static final ThreadPoolExecutor POLLING_POOL = new ThreadPoolExecutor(
|
||||
/**
|
||||
* 单线程的轮询线程池
|
||||
*/
|
||||
private static final ThreadPoolExecutor SINGLETON_POLLING_POOL = new ThreadPoolExecutor(
|
||||
0,
|
||||
// 轮询线程数量尽可能少
|
||||
Math.max(Runtime.getRuntime().availableProcessors() / 16, 1),
|
||||
// 轮询线程数必须为1
|
||||
1,
|
||||
15L,
|
||||
TimeUnit.SECONDS,
|
||||
// 必须保存至少一个轮询请求,以便在本线程轮询结束时,获取到已轮询过的线程提交的轮询请求
|
||||
new ArrayBlockingQueue<>(1),
|
||||
new ThreadFactory() {
|
||||
private final AtomicInteger threadCount = new AtomicInteger(0);
|
||||
@@ -319,7 +335,7 @@ public class WrapperEndingInspector implements Comparable<WrapperEndingInspector
|
||||
return "asyncTool-wrapperEndingInspectorPollingCenterPool-threadFactory";
|
||||
}
|
||||
},
|
||||
// 多的轮询请求就丢了
|
||||
// 多的就丢了,反正都是催这一个线程去轮询
|
||||
new ThreadPoolExecutor.DiscardPolicy()
|
||||
) {
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user