使用CompletableFuture+ExecutorService+Logback的多線程測試

1. 環境

Java: jdk1.8.0_144多線程

2. 背景

Java多線程執行任務時,Logback輸出的主線程和各個子線程的業務日誌須要區分時,能夠根據線程池和執行的線程來區分,但若要把它們聯繫起來只能根據時間線,既麻煩又沒法保證準確性。app

2018-10-27 23:09:22 [INFO][com.lxp.tool.log.LogAndCatchExceptionRunnableTest][main][testRun][38] -> test start
2018-10-27 23:09:22 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] -> This is runnable.
2018-10-27 23:09:22 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-2][lambda$getRunnable$0][16] -> This is runnable.
2018-10-27 23:09:22 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] -> This is runnable.
2018-10-27 23:09:22 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-2][lambda$getRunnable$0][16] -> This is runnable.
2018-10-27 23:09:22 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] -> This is runnable.
2018-10-27 23:09:22 [INFO][com.lxp.tool.log.LogAndCatchExceptionRunnableTest][main][testRun][48] -> test finish

org.slf4j.MDC類提供了一個極好的解決方案,它能夠爲各個線程設置獨有的上下文,當有必要時也能夠把主線程的上下文複製給子線程,此時子線程能夠擁有主線程+子線程的信息,在子線程退出前恢復到主線程上下文,如此一來,日誌信息能夠極大地便利定位問題,org.slf4j.MDC類在線程上下文切換上的應用記錄本文的目的之一。
另外一個則是過去一直被本身忽略的多線程時退出的問題,任務須要多線程執行有兩種可能場景ide

  • 多個任務互相獨立,某個任務失敗並不該該影響其它的任務繼續執行
  • 多個子任務組成一個完整的主任務,若某個子任務失敗它應該直接退出,不須要等全部子任務完成

3. org.slf4j.MDC類在線程上下文切換時的應用

3.1 實現包裝線程

  • AbstractLogWrapper
public class AbstractLogWrapper<T> {
    private final T job;
    private final Map<?, ?> context;

    public AbstractLogWrapper(T t) {
        this.job = t;
        this.context = MDC.getCopyOfContextMap();
    }

    public void setLogContext() {
        if (this.context != null) {
            MDC.setContextMap(this.context);
        }
    }

    public void clearLogContext() {
        MDC.clear();
    }

    public T getJob() {
        return this.job;
    }
}
  • LogRunnable
public class LogRunnable extends AbstractLogWrapper<Runnable> implements Runnable {
    public LogRunnable(Runnable runnable) {
        super(runnable);
    }

    @Override
    public void run() {
        // 把主線程上下文復到子線程
        this.setLogContext();
        try {
            getJob().run();
        } finally {
            // 恢復主線程上下文
            this.clearLogContext();
        }
    }
}
  • LogAndCatchExceptionRunnable
public class LogAndCatchExceptionRunnable extends AbstractLogWrapper<Runnable> implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogAndCatchExceptionRunnable.class);

    public LogAndCatchExceptionRunnable(Runnable runnable) {
        super(runnable);
    }

    @Override
    public void run() {
        // 把主線程上下文復到子線程
        this.setLogContext();
        try {
            getJob().run();
        } catch (Exception e) { // Catch全部異常阻止其繼續傳播
            LOGGER.error(e.getMessage(), e);
        } finally {
            // 恢復主線程上下文
            this.clearLogContext();
        }
    }
}

3.2 配置%X輸出當前線程相關聯的NDC

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="stdot" class="ch.qos.logback.core.ConsoleAppender">
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>%d{yyyy-MM-dd HH:mm:ss} [%p][%c][%t][%M][%L] %replace(Test_Method=%X{method} runn-able=%X{runn_able}){'.+=( |$)', ''} -> %m%n</pattern>
        </layout>
    </appender>
    <root level="debug">
        <appender-ref ref="stdot"/>
    </root>
</configuration>

3.3 配置線程相關信息並測試

class RunnabeTestHelper {
    private static final Logger LOGGER = LoggerFactory.getLogger(RunnabeTestHelper.class);
    private static final String RUNNABLE = "runn_able";

    static Runnable getRunnable() {
        return () -> {
            MDC.put(RUNNABLE, String.valueOf(System.currentTimeMillis()));
            LOGGER.info("This is runnable.");
        };
    }
}
  • 測試方法
@Test
    public void testRun() {
        try {
            MDC.put("method", "testRun");
            LOGGER.info("test start");
            LogAndCatchExceptionRunnable logRunnable = spy(new LogAndCatchExceptionRunnable(RunnabeTestHelper.getRunnable()));
            Set<String> set = new HashSet<>();
            doAnswer(invocation -> set.add(invocation.getMethod().getName())).when(logRunnable).setLogContext();
            doAnswer(invocation -> set.add(invocation.getMethod().getName())).when(logRunnable).clearLogContext();

            List<CompletableFuture<Void>> futures = IntStream.rangeClosed(0, 4).mapToObj(index -> CompletableFuture.runAsync(logRunnable, executorService)).collect(Collectors.toList());
            futures.forEach(CompletableFuture::join);
            assertEquals("[setLogContext, clearLogContext]", set.toString());
            LOGGER.info("test finish");
        } finally {
            MDC.clear();
        }
    }
  • 測試結果
2018-11-01 01:08:04 [INFO][com.lxp.tool.log.LogRunnableTest][main][testRun][41]  -> test start
2018-11-01 01:08:05 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] Test_Method=testRun runn-able=1541005685003 -> This is runnable.
2018-11-01 01:08:05 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] Test_Method=testRun runn-able=1541005685004 -> This is runnable.
2018-11-01 01:08:05 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] Test_Method=testRun runn-able=1541005685004 -> This is runnable.
2018-11-01 01:08:05 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-2][lambda$getRunnable$0][16] Test_Method=testRun runn-able=1541005685003 -> This is runnable.
2018-11-01 01:08:05 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-2][lambda$getRunnable$0][16] Test_Method=testRun runn-able=1541005685005 -> This is runnable.
2018-11-01 01:08:05 [INFO][com.lxp.tool.log.LogRunnableTest][main][testRun][50]  -> test finish

4. 多線程執行子線程出現異常時的處理

class RunnabeTestHelper {
    private static final Logger LOGGER = LoggerFactory.getLogger(RunnabeTestHelper.class);

    static Runnable getRunnable(AtomicInteger counter) {
        return () -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), e);
            }
            if (counter.incrementAndGet() == 2) {
                throw new NullPointerException();
            }
            LOGGER.info("This is {} runnable.", counter.get());
        };
    }

    static Runnable getRunnableWithCatchException(AtomicInteger counter) {
        return () -> {
            try {
                Thread.sleep(1000);
                if (counter.incrementAndGet() == 2) {
                    throw new NullPointerException();
                }
                LOGGER.info("This is {} runnable.", counter.get());
            } catch (Exception e) {
                LOGGER.error("error", e);
            }
        };
    }
}

4.1 選擇一:放充執行未執行的其它子線程

  • 調用LogRunnable,容許子線程的異常繼續傳播
@Test
    public void testRunnableWithoutCatchException() {
        Logger logger = Mockito.mock(Logger.class);
        AtomicInteger counter = new AtomicInteger(0);
        List<CompletableFuture<Void>> futures = IntStream.rangeClosed(0, 4).mapToObj(index -> CompletableFuture.runAsync(new LogRunnable(RunnabeTestHelper.getRunnable(counter)), executorService)).collect(Collectors.toList());
        try {
            futures.forEach(CompletableFuture::join);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        // 因爲子線程的異常致使主線程退出,並非全部任務都獲得執行機會
        assertEquals(2, counter.get());
        verify(logger, Mockito.times(1)).error(anyString(), any(Throwable.class));
    }

4.2 選擇二:執行完全部無異常的子線程

  • 調用LogRunnable,在線程內部阻止異常擴散
@Test
    public void testRunnableWithCatchException() {
        AtomicInteger counter = new AtomicInteger(0);
        List<CompletableFuture<Void>> futures = IntStream.rangeClosed(0, 4).mapToObj(index -> CompletableFuture.runAsync(new LogRunnable(RunnabeTestHelper.getRunnableWithCatchException(counter)), executorService)).collect(Collectors.toList());
        futures.forEach(CompletableFuture::join);
        // 因爲子線程的異常被阻止,全部線程都獲得執行機會
        assertEquals(5, counter.get());
    }
  • 調用LogAndCatchExceptionRunnable,在包裝類阻止異常擴散
@Test
    public void testRunnableWithoutCatchException() {
        AtomicInteger counter = new AtomicInteger(0);
        List<CompletableFuture<Void>> futures = IntStream.rangeClosed(0, 4).mapToObj(index -> CompletableFuture.runAsync(new LogAndCatchExceptionRunnable(RunnabeTestHelper.getRunnable(counter)), executorService)).collect(Collectors.toList());
        futures.forEach(CompletableFuture::join);
        // 因爲子線程的異常被阻止,全部線程都獲得執行機會
        assertEquals(5, counter.get());
    }

    @Test
    public void testRunnableWithCatchException() {
        AtomicInteger counter = new AtomicInteger(0);
        List<CompletableFuture<Void>> futures = IntStream.rangeClosed(0, 4).mapToObj(index -> CompletableFuture.runAsync(new LogAndCatchExceptionRunnable(RunnabeTestHelper.getRunnableWithCatchException(counter)), executorService)).collect(Collectors.toList());
        futures.forEach(CompletableFuture::join);
        // 因爲子線程的異常被阻止,全部線程都獲得執行機會
        assertEquals(5, counter.get());
    }
相關文章
相關標籤/搜索