本文主要研究一下Elasticsearch的RunOncejava
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.javagit
public class RunOnce implements Runnable { private final Runnable delegate; private final AtomicBoolean hasRun; public RunOnce(final Runnable delegate) { this.delegate = Objects.requireNonNull(delegate); this.hasRun = new AtomicBoolean(false); } @Override public void run() { if (hasRun.compareAndSet(false, true)) { delegate.run(); } } /** * {@code true} if the {@link RunOnce} has been executed once. */ public boolean hasRun() { return hasRun.get(); } }
elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.javagithub
public class RunOnceTests extends ESTestCase { public void testRunOnce() { final AtomicInteger counter = new AtomicInteger(0); final RunOnce runOnce = new RunOnce(counter::incrementAndGet); assertFalse(runOnce.hasRun()); runOnce.run(); assertTrue(runOnce.hasRun()); assertEquals(1, counter.get()); runOnce.run(); assertTrue(runOnce.hasRun()); assertEquals(1, counter.get()); } public void testRunOnceConcurrently() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); final RunOnce runOnce = new RunOnce(counter::incrementAndGet); final Thread[] threads = new Thread[between(3, 10)]; final CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(() -> { try { latch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } runOnce.run(); }); threads[i].start(); } latch.countDown(); for (Thread thread : threads) { thread.join(); } assertTrue(runOnce.hasRun()); assertEquals(1, counter.get()); } public void testRunOnceWithAbstractRunnable() { final AtomicInteger onRun = new AtomicInteger(0); final AtomicInteger onFailure = new AtomicInteger(0); final AtomicInteger onAfter = new AtomicInteger(0); final RunOnce runOnce = new RunOnce(new AbstractRunnable() { @Override protected void doRun() throws Exception { onRun.incrementAndGet(); throw new RuntimeException("failure"); } @Override public void onFailure(Exception e) { onFailure.incrementAndGet(); } @Override public void onAfter() { onAfter.incrementAndGet(); } }); final int iterations = randomIntBetween(1, 10); for (int i = 0; i < iterations; i++) { runOnce.run(); assertEquals(1, onRun.get()); assertEquals(1, onFailure.get()); assertEquals(1, onAfter.get()); assertTrue(runOnce.hasRun()); } } }
RunOnce實現了Runnable接口,它的構造器要求輸入Runnable,同時構造了hasRun變量;run方法會先使用compareAndSet將hasRun由false設置爲true,若是成功則執行代理的Runnable的run方法;hasRun方法則返回hasRun值併發