SpringData-Redis發佈訂閱自動重連分析

SpringData-Redis發佈訂閱自動重連分析

RedisMessageListenerContainer

配置

@Bean
@Autowired
RedisMessageListenerContainer redisContainer(JedisConnectionFactory redisConnectionFactory, RedisMessageListener a) {
    RedisMessageListenerContainer container
            = new RedisMessageListenerContainer();
    container.setConnectionFactory(redisConnectionFactory);
    List<Topic> topics = Lists.newArrayList(new ChannelTopic(
                    CHANNEL),
            new ChannelTopic(CHANNEL)
    );
    container.addMessageListener(new MessageListenerAdapter(a), topics);
    return container;
}

啓動分析

添加頻道監聽java

//RedisMessageListenerContainer.java

public void addMessageListener(MessageListener listener, Collection<? extends Topic> topics) {
	addListener(listener, topics);
	lazyListen();
}

這個AddListener會 對Topic作一些記錄,patternMapping, channelMapping,去重等等,而後最關鍵的一步:redis

//RedisMessageListenerContainer.java
//addListener
// check the current listening state
	if (listening) {
		subscriptionTask.subscribeChannel(channels.toArray(new byte[channels.size()][]));
		subscriptionTask.subscribePattern(patterns.toArray(new byte[patterns.size()][]));
	}
//RedisMessageListenerContainer.java

void subscribeChannel(byte[]... channels) {
	if (channels != null && channels.length > 0) {
		if (connection != null) {
			synchronized (localMonitor) {
				Subscription sub = connection.getSubscription();
				if (sub != null) {
					sub.subscribe(channels);
				}
			}
		}
	}
}
//JedisSubscription.java
	protected void doSubscribe(byte[]... channels) {
		jedisPubSub.subscribe(channels);
	}

可是啓動以前 這個listening=false。故該代碼不生效。再看lazyListen方法:服務器

//RedisMessageListenerContainer.java
private void lazyListen() {
		boolean debug = logger.isDebugEnabled();
		boolean started = false;

		if (isRunning()) {
			if (!listening) {
				synchronized (monitor) {
					if (!listening) {
						if (channelMapping.size() > 0 || patternMapping.size() > 0) {
							subscriptionExecutor.execute(subscriptionTask);
							listening = true;
							started = true;
						}
					}
				}
				if (debug) {
					if (started) {
						logger.debug("Started listening for Redis messages");
					} else {
						logger.debug("Postpone listening for Redis messages until actual listeners are added");
					}
				}
			}
		}
	}

調用addMessageListener的時候,isRunning()=false 也不生效。app

最後:當@Bean構造完成的時候 ,LifeCycle進入start的時候,該Container啓動。框架

//RedisMessageListenerContainer.java

    public void start() {
		if (!running) {
			running = true;
			// wait for the subscription to start before returning
			// technically speaking we can only be notified right before the subscription starts
			synchronized (monitor) {
				lazyListen();
				if (listening) {
					try {
						// wait up to 5 seconds for Subscription thread
						monitor.wait(initWait);
					} catch (InterruptedException e) {
						// stop waiting
					}
				}
			}

			if (logger.isDebugEnabled()) {
				logger.debug("Started RedisMessageListenerContainer");
			}
		}
	}

這個時候,running=true了。 而後調用 lazyListen(),確實比較Lazy。 這個時候,啓動子線程來執行訂閱和監聽。async

subscriptionExecutor.execute(subscriptionTask);

這個subscriptionTask的構造以下:this

//RedisMessageListenerContainer.java
public void run() {
	synchronized (localMonitor) {
		subscriptionTaskRunning = true;
	}
	try {
		connection = connectionFactory.getConnection();
		if (connection.isSubscribed()) {
			throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
		}

		boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory);

		// NB: async drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
		if (!asyncConnection) {
			synchronized (monitor) {
				monitor.notify();
			}
		}

		SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription();

		if (asyncConnection) {
			SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime());

			synchronized (monitor) {
				monitor.notify();
			}
		}
	} catch (Throwable t) {
		handleSubscriptionException(t);
	} finally {
		// this block is executed once the subscription thread has ended, this may or may not mean
		// the connection has been unsubscribed, depending on driver
		synchronized (localMonitor) {
			subscriptionTaskRunning = false;
			localMonitor.notify();
		}
	}
}

這裏connection 確定不是subscribed。 而後他根據Redis的客戶端類型來判斷是不是阻塞的 若是是阻塞的類型,則喚醒一下被阻塞的Container線程。(???)spa

而後,最關鍵的是:eventuallyPerformSubscription(),最終發起訂閱的,並輪詢訂閱的是方法。線程

//RDMLC

private SubscriptionPresentCondition eventuallyPerformSubscription() {

	SubscriptionPresentCondition condition = null;

	if (channelMapping.isEmpty()) {

		condition = new PatternSubscriptionPresentCondition();
		connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
	} else {

		if (patternMapping.isEmpty()) {
			condition = new SubscriptionPresentCondition();
		} else {
			// schedule the rest of the subscription
			subscriptionExecutor.execute(new PatternSubscriptionTask());
			condition = new PatternSubscriptionPresentCondition();
		}

		connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet()));
	}

	return condition;
}

以connection.subscribe()爲例:即將發起訂閱,注意這裏是利用DispatchMessageListener作的事件分發監聽器。debug

//JedisConnection.java

public void subscribe(MessageListener listener, byte[]... channels) {
	if (isSubscribed()) {
		throw new RedisSubscribedConnectionException(
				"Connection already subscribed; use the connection Subscription to cancel or add new channels");
	}

	if (isQueueing()) {
		throw new UnsupportedOperationException();
	}
	if (isPipelined()) {
		throw new UnsupportedOperationException();
	}

	try {
		BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);

		subscription = new JedisSubscription(listener, jedisPubSub, channels, null);
		jedis.subscribe(jedisPubSub, channels);

	} catch (Exception ex) {
		throw convertJedisAccessException(ex);
	}
}
//BinaryJedis.java

public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
    client.setTimeoutInfinite();
    try {
      jedisPubSub.proceed(client, channels);
    } finally {
      client.rollbackTimeout();
    }
}

這裏調用了BinaryJedisPubSub的proceed()。

這裏先提出兩個問題? 要訂閱是否是要發起subscribe命令給Redis?發起 subscribe channel命令,而後Listener怎麼辦?

這裏調用是jedis.subscribe(jedisPubSub, channels);而一開始 subscibeChannels的實現卻不太同樣?

下面看jedisPubSub:

public void proceed(Client client, byte[]... channels) {
    this.client = client;
    client.subscribe(channels);
    client.flush();
    process(client);
  }

這裏subscribe是再次發起訂閱請求,而後process輪詢檢查消息。

異常處理

再看看JedisConnection類subscribe方法的異常的處理:

protected DataAccessException convertJedisAccessException(Exception ex) {

	if (ex instanceof NullPointerException) {
		// An NPE before flush will leave data in the OutputStream of a pooled connection
		broken = true;
	}

	DataAccessException exception = EXCEPTION_TRANSLATION.translate(ex);
	if (exception instanceof RedisConnectionFailureException) {
		broken = true;
	}

	return exception;
}

EXCEPTION_TRANSLATION.translate(ex); 會調用:PassThroughExceptionTranslationStrategy的Convert。

public class JedisExceptionConverter implements Converter<Exception, DataAccessException> {

	public DataAccessException convert(Exception ex) {

		if (ex instanceof DataAccessException) {
			return (DataAccessException) ex;
		}
		if (ex instanceof JedisDataException) {
			return new InvalidDataAccessApiUsageException(ex.getMessage(), ex);
		}
		if (ex instanceof JedisConnectionException) {
			return new RedisConnectionFailureException(ex.getMessage(), ex);
		}
		if (ex instanceof JedisException) {
			return new InvalidDataAccessApiUsageException(ex.getMessage(), ex);
		}
		if (ex instanceof UnknownHostException) {
			return new RedisConnectionFailureException("Unknown host " + ex.getMessage(), ex);
		}
		if (ex instanceof IOException) {
			return new RedisConnectionFailureException("Could not connect to Redis server", ex);
		}

		return null;
	}
}

那麼,當Jedis拋錯:JedisConnectionException 服務器彷佛斷開了鏈接 這個時候,subscribe 從而拋出RedisConnectionFailureException。

最後,再看RedisMessageListenerContainerd的run方法內的異常處理: 這個時候,

protected void handleSubscriptionException(Throwable ex) {
	listening = false;
	subscriptionTask.closeConnection();
	if (ex instanceof RedisConnectionFailureException) {
		if (isRunning()) {
			logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
			sleepBeforeRecoveryAttempt();
			lazyListen();
		}
	} else {
		logger.error("SubscriptionTask aborted with exception:", ex);
	}
}

到這個時候,isRunning仍是true的(當且僅當LifeCycle進入close的時候,纔會變成false),結果就會在 recoveryInterval ms以後,重啓調用lazyListen(),再次啓動訂閱和監聽。

實際效果

實際上,我在服務器上的錯誤日誌中,我確實看到了

Connection failure occurred. Restarting subscription task after 5000 ms

總結

SpringData-Redis,可以解決手動處理Redis pub/sub的訂閱被意外斷開,致使監聽失敗的問題。 他能確保,服務持續監聽,出現異常時,可以從新訂閱並監聽給定的頻道。 因此,仍是用框架吧,比本身手寫的發佈訂閱更可靠。

相關文章
相關標籤/搜索