diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index d737d4e..fdac5b7 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -7,6 +7,9 @@ import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapperBuilder; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * 示例:简单示例--复杂点的 * @@ -27,6 +30,7 @@ class Case1 { } } catch (InterruptedException e) { e.printStackTrace(); + throw new RuntimeException("被中断了"); } return id; }).callback((new ICallback() { @@ -37,6 +41,9 @@ class Case1 { @Override public void result(boolean success, String param, WorkResult workResult) { + // if ("H".equals(id)) { + // int a=1/0; + // } System.out.println("\t\twrapper(id=" + id + ") callback " + (success ? "success " : "fail ") + ", workResult is " + workResult); @@ -45,7 +52,8 @@ class Case1 { .allowInterrupt(true); } - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); long now = SystemClock.now(); WorkerWrapper a = builder("A").build(); WorkerWrapper d = builder("D").build(); @@ -63,10 +71,11 @@ class Case1 { ) .build(); try { - Async.work(1000, a, d).awaitFinish(); + Async.work(1000, executorService, a, d).awaitFinish(); } catch (InterruptedException e) { e.printStackTrace(); } + executorService.shutdown(); System.out.println("now:" + (SystemClock.now() - now)); /* 输出: wrapper(id=D) is working diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/AbstractWheelTimer.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/AbstractWheelTimer.java index d26fa1e..9cf4cfb 100644 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/AbstractWheelTimer.java +++ b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/AbstractWheelTimer.java @@ -1,5 +1,7 @@ package com.jd.platform.async.openutil.timer; +import java.util.concurrent.atomic.AtomicInteger; + /** * @author create by TcSnZh on 2021/5/12-下午6:36 */ @@ -8,6 +10,8 @@ public abstract class AbstractWheelTimer implements Timer, AutoCloseable { public static final int WORKER_STATE_STARTED = 1; public static final int WORKER_STATE_SHUTDOWN = 2; + protected final AtomicInteger workerState = new AtomicInteger(WORKER_STATE_INIT); // 0 - init, 1 - started, 2 - shut down + public abstract void start(); @SuppressWarnings("RedundantThrows") @@ -15,4 +19,9 @@ public abstract class AbstractWheelTimer implements Timer, AutoCloseable { public void close() throws Exception { stop(); } + + protected boolean changeState(int workerStateStarted, int workerStateShutdown) { + return workerState.compareAndSet(workerStateStarted, workerStateShutdown); + } + } diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java index fbb6d1d..0e472a4 100644 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java +++ b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java @@ -2,7 +2,6 @@ package com.jd.platform.async.openutil.timer; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; @@ -22,8 +21,6 @@ public class HashedWheelTimer extends AbstractWheelTimer { private final Worker worker = new Worker(); private final Thread workerThread; - @SuppressWarnings({"unused", "FieldMayBeFinal"}) - private final AtomicInteger workerState = new AtomicInteger(WORKER_STATE_INIT); // 0 - init, 1 - started, 2 - shut down private final long tickDuration; private final HashedWheelBucket[] wheel; @@ -207,7 +204,7 @@ public class HashedWheelTimer extends AbstractWheelTimer { public void start() { switch (workerState.get()) { case WORKER_STATE_INIT: - if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) { + if (changeState(WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; @@ -238,7 +235,7 @@ public class HashedWheelTimer extends AbstractWheelTimer { TimerTask.class.getSimpleName()); } - if (!workerState.compareAndSet(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { + if (!changeState(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // state is init or shutdown . return Collections.emptySet(); } @@ -315,7 +312,6 @@ public class HashedWheelTimer extends AbstractWheelTimer { startTimeInitialized.countDown(); do { - //TODO 时间轮这里一直执行,结束不了任务 final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask);