春節前咱們智能停車平臺出現,用戶支付成功通知停車能力提供平臺延時較高,最大達到了5分鐘,形成車場車輛積壓嚴重,客訴很嚴重。java
出現的問題上面描述了,先總體說明一下智能停車的線上支付總體流程;tomcat
這個問題出如今2.3流程。當時消息隊列消費狀況以下圖:服務器
發現消費延時較大,徹底來不及消費裏面的支付消息。ide
通過排查代碼發現是單線程消費消息隊列,通知邏輯是串行的,效率可想而知,另外因爲停車能力提供方接口響應也較慢,平均響應時間將近300毫秒,所以單線程每秒鐘最多通知3筆訂單。測試
因爲臨近春節,不便修改代碼,一開始想到的解決方案是擴容,但分析下來因爲消費支付消息隊列使用的是kafka,這個topic只有4個partition,而每一個partition智能由1個線程進行消費,而支付系統與消息隊列耦合較大,沒法進行擴容,所以該方案不具操做性。優化
那麼只能第二條路,優化代碼邏輯。優化策略是使用線程池進行能力提供方的支付通知;線程
因爲消費partition的線程只能單一,所以消費線程暫時沒法優化,而通知線程進行線程池優化。代碼以下:code
//線程池隊列 private LinkedBlockingDeque<Runnable> linkedBlockingDeque; //通知線程池 private ThreadPoolExecutor notifyExecutor; //消息隊列消費狀態 private volatile boolean isRun; if(null == notifyExecutor){ linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(8); notifyExecutor = new ThreadPoolExecutor(8, 32, 20L, TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy()); isRun = true; }
因爲線上服務器cpu核數爲8,所以core線程數定義爲8,然而通知能力方接口延時較長,並無徹底消耗cpu資源,所以最大線程數定義爲32,當線程和隊列都滿狀況下,有新任務加入,使用CallerRunsPolicy策略,即調用線程自己執行該任務。blog
改造後需考慮服務重啓沒法處理已消費消息的狀況,而應用是基於tomcat的,所以須要註冊事件通知:接口
@Component public class MessageConsumerLauncher implements ApplicationListener<ApplicationEvent> { private static final Logger logger = LogManager.getLogger(MessageConsumerLauncher.class.getName()); @Autowired private OrderStatusMessageReceiver orderStatusMessageReceiver; @Override public void onApplicationEvent(ApplicationEvent event) { logger.info("MessageConsumerLauncher.onApplicationEvent ContextRefreshedEvent " + event.getSource().toString() + "; " + event.getClass().getName()); if(event instanceof ContextClosedEvent){ try { logger.info("MessageConsumerLauncher.onApplicationEvent.orderStatusMessageReceiver stopping..."); orderStatusMessageReceiver.stopReceiver(); logger.info("MessageConsumerLauncher.onApplicationEvent.orderStatusMessageReceiveron.ApplicationEvent() stopping..."); } catch (Exception e) { logger.error(e.getStackTrace()); } } } }
stopReceiver的邏輯爲:
public void stopReceiver(){ logger.info("stop begin OrderStatusMessageReceiver threadPool..........."); if(null != threadPool) { isRun = false; //狀態置爲partition消費關閉 threadPool.shutdown(); //關閉partition消費線程池(只有1個線程) } logger.info("stop end OrderStatusMessageReceiver threadPool..........."); logger.info("stop begin OrderStatusMessageReceiver threadPool..........."); if(null != notifyExecutor){ notifyExecutor.shutdown(); //關閉通知線程池 } logger.info("stop end OrderStatusMessageReceiver threadPool..........."); }
當notifyExecutor執行完剩餘任務,應用關閉。
通過上面改造進在測試環境進行壓測,因爲測試環境使用4cpu,4G內存,生產環境使用8cpu,8G內存。測試結果以下:
效率有將近12倍的提高,徹底達到系統要求。