SpringData-Redis發佈訂閱自動重連分析
RedisMessageListenerContainer
配置
@Bean @Autowired RedisMessageListenerContainer redisContainer(JedisConnectionFactory redisConnectionFactory, RedisMessageListener a) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); List<Topic> topics = Lists.newArrayList(new ChannelTopic( CHANNEL), new ChannelTopic(CHANNEL) ); container.addMessageListener(new MessageListenerAdapter(a), topics); return container; }
啓動分析
添加頻道監聽java
//RedisMessageListenerContainer.java public void addMessageListener(MessageListener listener, Collection<? extends Topic> topics) { addListener(listener, topics); lazyListen(); }
這個AddListener會 對Topic作一些記錄,patternMapping, channelMapping,去重等等,而後最關鍵的一步:redis
//RedisMessageListenerContainer.java //addListener // check the current listening state if (listening) { subscriptionTask.subscribeChannel(channels.toArray(new byte[channels.size()][])); subscriptionTask.subscribePattern(patterns.toArray(new byte[patterns.size()][])); }
//RedisMessageListenerContainer.java void subscribeChannel(byte[]... channels) { if (channels != null && channels.length > 0) { if (connection != null) { synchronized (localMonitor) { Subscription sub = connection.getSubscription(); if (sub != null) { sub.subscribe(channels); } } } } }
//JedisSubscription.java protected void doSubscribe(byte[]... channels) { jedisPubSub.subscribe(channels); }
可是啓動以前 這個listening=false。故該代碼不生效。再看lazyListen方法:服務器
//RedisMessageListenerContainer.java private void lazyListen() { boolean debug = logger.isDebugEnabled(); boolean started = false; if (isRunning()) { if (!listening) { synchronized (monitor) { if (!listening) { if (channelMapping.size() > 0 || patternMapping.size() > 0) { subscriptionExecutor.execute(subscriptionTask); listening = true; started = true; } } } if (debug) { if (started) { logger.debug("Started listening for Redis messages"); } else { logger.debug("Postpone listening for Redis messages until actual listeners are added"); } } } } }
調用addMessageListener的時候,isRunning()=false 也不生效。app
最後:當@Bean構造完成的時候 ,LifeCycle進入start的時候,該Container啓動。框架
//RedisMessageListenerContainer.java public void start() { if (!running) { running = true; // wait for the subscription to start before returning // technically speaking we can only be notified right before the subscription starts synchronized (monitor) { lazyListen(); if (listening) { try { // wait up to 5 seconds for Subscription thread monitor.wait(initWait); } catch (InterruptedException e) { // stop waiting } } } if (logger.isDebugEnabled()) { logger.debug("Started RedisMessageListenerContainer"); } } }
這個時候,running=true了。 而後調用 lazyListen(),確實比較Lazy。 這個時候,啓動子線程來執行訂閱和監聽。async
subscriptionExecutor.execute(subscriptionTask);
這個subscriptionTask的構造以下:this
//RedisMessageListenerContainer.java public void run() { synchronized (localMonitor) { subscriptionTaskRunning = true; } try { connection = connectionFactory.getConnection(); if (connection.isSubscribed()) { throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening"); } boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory); // NB: async drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription. if (!asyncConnection) { synchronized (monitor) { monitor.notify(); } } SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription(); if (asyncConnection) { SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime()); synchronized (monitor) { monitor.notify(); } } } catch (Throwable t) { handleSubscriptionException(t); } finally { // this block is executed once the subscription thread has ended, this may or may not mean // the connection has been unsubscribed, depending on driver synchronized (localMonitor) { subscriptionTaskRunning = false; localMonitor.notify(); } } }
這裏connection 確定不是subscribed。 而後他根據Redis的客戶端類型來判斷是不是阻塞的 若是是阻塞的類型,則喚醒一下被阻塞的Container線程。(???)spa
而後,最關鍵的是:eventuallyPerformSubscription(),最終發起訂閱的,並輪詢訂閱的是方法。線程
//RDMLC private SubscriptionPresentCondition eventuallyPerformSubscription() { SubscriptionPresentCondition condition = null; if (channelMapping.isEmpty()) { condition = new PatternSubscriptionPresentCondition(); connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet())); } else { if (patternMapping.isEmpty()) { condition = new SubscriptionPresentCondition(); } else { // schedule the rest of the subscription subscriptionExecutor.execute(new PatternSubscriptionTask()); condition = new PatternSubscriptionPresentCondition(); } connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet())); } return condition; }
以connection.subscribe()爲例:即將發起訂閱,注意這裏是利用DispatchMessageListener作的事件分發監聽器。debug
//JedisConnection.java public void subscribe(MessageListener listener, byte[]... channels) { if (isSubscribed()) { throw new RedisSubscribedConnectionException( "Connection already subscribed; use the connection Subscription to cancel or add new channels"); } if (isQueueing()) { throw new UnsupportedOperationException(); } if (isPipelined()) { throw new UnsupportedOperationException(); } try { BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener); subscription = new JedisSubscription(listener, jedisPubSub, channels, null); jedis.subscribe(jedisPubSub, channels); } catch (Exception ex) { throw convertJedisAccessException(ex); } }
//BinaryJedis.java public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) { client.setTimeoutInfinite(); try { jedisPubSub.proceed(client, channels); } finally { client.rollbackTimeout(); } }
這裏調用了BinaryJedisPubSub的proceed()。
這裏先提出兩個問題? 要訂閱是否是要發起subscribe命令給Redis?發起 subscribe channel命令,而後Listener怎麼辦?
這裏調用是jedis.subscribe(jedisPubSub, channels);而一開始 subscibeChannels的實現卻不太同樣?
下面看jedisPubSub:
public void proceed(Client client, byte[]... channels) { this.client = client; client.subscribe(channels); client.flush(); process(client); }
這裏subscribe是再次發起訂閱請求,而後process輪詢檢查消息。
異常處理
再看看JedisConnection類subscribe方法的異常的處理:
protected DataAccessException convertJedisAccessException(Exception ex) { if (ex instanceof NullPointerException) { // An NPE before flush will leave data in the OutputStream of a pooled connection broken = true; } DataAccessException exception = EXCEPTION_TRANSLATION.translate(ex); if (exception instanceof RedisConnectionFailureException) { broken = true; } return exception; }
EXCEPTION_TRANSLATION.translate(ex); 會調用:PassThroughExceptionTranslationStrategy的Convert。
public class JedisExceptionConverter implements Converter<Exception, DataAccessException> { public DataAccessException convert(Exception ex) { if (ex instanceof DataAccessException) { return (DataAccessException) ex; } if (ex instanceof JedisDataException) { return new InvalidDataAccessApiUsageException(ex.getMessage(), ex); } if (ex instanceof JedisConnectionException) { return new RedisConnectionFailureException(ex.getMessage(), ex); } if (ex instanceof JedisException) { return new InvalidDataAccessApiUsageException(ex.getMessage(), ex); } if (ex instanceof UnknownHostException) { return new RedisConnectionFailureException("Unknown host " + ex.getMessage(), ex); } if (ex instanceof IOException) { return new RedisConnectionFailureException("Could not connect to Redis server", ex); } return null; } }
那麼,當Jedis拋錯:JedisConnectionException 服務器彷佛斷開了鏈接 這個時候,subscribe 從而拋出RedisConnectionFailureException。
最後,再看RedisMessageListenerContainerd的run方法內的異常處理: 這個時候,
protected void handleSubscriptionException(Throwable ex) { listening = false; subscriptionTask.closeConnection(); if (ex instanceof RedisConnectionFailureException) { if (isRunning()) { logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms"); sleepBeforeRecoveryAttempt(); lazyListen(); } } else { logger.error("SubscriptionTask aborted with exception:", ex); } }
到這個時候,isRunning仍是true的(當且僅當LifeCycle進入close的時候,纔會變成false),結果就會在 recoveryInterval ms以後,重啓調用lazyListen(),再次啓動訂閱和監聽。
實際效果
實際上,我在服務器上的錯誤日誌中,我確實看到了
Connection failure occurred. Restarting subscription task after 5000 ms
總結
SpringData-Redis,可以解決手動處理Redis pub/sub的訂閱被意外斷開,致使監聽失敗的問題。 他能確保,服務持續監聽,出現異常時,可以從新訂閱並監聽給定的頻道。 因此,仍是用框架吧,比本身手寫的發佈訂閱更可靠。