記一次Nacos的issue修復之併發致使的NPE異常

ISSUE

Spring boot 應用啓動被終止 #21java

錯誤分析

DeferredApplicationEventPublisher的繼承關係git

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.event.ContextRefreshedEvent;

public class DeferredApplicationEventPublisher implements ApplicationEventPublisher, ApplicationListener<ContextRefreshedEvent> {
  ...
}
複製代碼

DeferredApplicationEventPublisher的依賴圖github

DeferredApplicationEventPublisher的依賴圖

如今來分析具體出現NPE錯誤的緣由web

先看EventPublishingConfigService中的addListenerspring

@Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
  Listener listenerAdapter = new DelegatingEventPublishingListener(configService, dataId, group, applicationEventPublisher, executor, listener);
  configService.addListener(dataId, group, listenerAdapter);
  publishEvent(new NacosConfigListenerRegisteredEvent(configService, dataId, group, listener, true));
}
複製代碼

而後看DelegatingEventPublishingListener代碼的繼承關係api

import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.springframework.context.ApplicationEventPublisher;

import java.util.concurrent.Executor;

final class DelegatingEventPublishingListener implements Listener {
  DelegatingEventPublishingListener(ConfigService configService, String dataId, String groupId, ApplicationEventPublisher applicationEventPublisher, Executor executor, Listener delegate) {
    this.configService = configService;
    this.dataId = dataId;
    this.groupId = groupId;
    this.applicationEventPublisher = applicationEventPublisher;
    this.executor = executor;
    this.delegate = delegate;
  }
}
複製代碼

能夠看到,在建立DelegatingEventPublishingListener對象的時候,會傳入一個線程池Executor,以及一個ApplicationEventPublisher(其實就是DeferredApplicationEventPublisher安全

而後再看看CacheData.safeNotifyListener()方法作了什麼操做併發

private void safeNotifyListener(final String dataId, final String group, final String content, final String md5, final ManagerListenerWrap listenerWrap) {
        final Listener listener = listenerWrap.listener;
        Runnable job = new Runnable() {
            public void run() {
                ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
                ClassLoader appClassLoader = listener.getClass().getClassLoader();
                try {
                    if (listener instanceof AbstractSharedListener) {
                        AbstractSharedListener adapter = (AbstractSharedListener)listener;
                        adapter.fillContext(dataId, group);
                        LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                    }
                    // 執行回調以前先將線程classloader設置爲具體webapp的classloader,以避免回調方法中調用spi接口是出現異常或錯用(多應用部署纔會有該問題)。
                    Thread.currentThread().setContextClassLoader(appClassLoader);

                    ConfigResponse cr = new ConfigResponse();
                    cr.setDataId(dataId);
                    cr.setGroup(group);
                    cr.setContent(content);
                    configFilterChainManager.doFilter(null, cr);
                    String contentTmp = cr.getContent();
                    listener.receiveConfigInfo(contentTmp);
                    listenerWrap.lastCallMd5 = md5;
                    LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                        listener);
                } catch (NacosException de) {
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
                        dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());
                } catch (Throwable t) {
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
                        md5, listener, t.getCause());
                } finally {
                    Thread.currentThread().setContextClassLoader(myClassLoader);
                }
            }
        };

        final long startNotify = System.currentTimeMillis();
        try {
            if (null != listener.getExecutor()) {
                listener.getExecutor().execute(job);
            } else {
                job.run();
            }
        }
  ...
}
複製代碼

這裏看到,safeNotifyListener是將事件廣播給全部的Listener,而後有一段及其重要的代碼段,它就是致使LinkedList出現併發使用的緣由app

listener.getExecutor().execute(job);
複製代碼

這裏還記得剛剛說過的DelegatingEventPublishingListener對象在建立之初有傳入Executor參數嗎?這裏Listener調用Executor將上述的任務調入線程池中進行調度,所以,致使了DeferredApplicationEventPublisher可能存在併發的使用webapp

錯誤復現

public class DeferrNPE {

    private static LinkedList<String> list = new LinkedList<>();

    private static CountDownLatch latch = new CountDownLatch(3);
    private static CountDownLatch start = new CountDownLatch(3);

    private static class MyListener implements Runnable {

        @Override
        public void run() {
            start.countDown();
            try {
                start.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            list.add(String.valueOf(System.currentTimeMillis()));
            latch.countDown();
        }
    }

    public static void main(String[] args) {
        MyListener l1 = new MyListener();
        MyListener l2 = new MyListener();
        MyListener l3 = new MyListener();
        new Thread(l1).start();
        new Thread(l2).start();
        new Thread(l3).start();
        try {
            latch.await();
            Iterator<String> iterator = list.iterator();
            while (iterator.hasNext()) {
                System.out.println(iterator.next());
                iterator.remove();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
複製代碼

最終修正

因爲是非線程安全使用在併發的場景下,所以只能更改上層nacos-spring-context的容器使用,將原先的非線程安全的LinkedList轉爲線程安全的ConcurrentLinkedQueue

相關文章
相關標籤/搜索