Jedis的JedisSentinelPool源代碼分析-監聽切換事件

https://segmentfault.com/a/1190000002690506java

 

本專欄與Redis相關的文章redis

Redis Sentinel機制與用法(一)
Redis Sentinel機制與用法(二)
Jedis的JedisSentinelPool源代碼分析
Jedis的Sharded源代碼分析
Redis 主從 Replication 的配置
詳解Redis SORT命令 
JedisCommand接口說明數據庫

概述

Jedis是Redis官方推薦的Java客戶端,更多Redis的客戶端能夠參考Redis官網客戶端列表。Redis-Sentinel做爲官方推薦的HA解決方案,Jedis也在客戶端角度實現了對Sentinel的支持,主要實如今JedisSentinelPool.java這個類中,下文會分析這個類的實現。apache

屬性

JedisSentinelPool類裏有如下的屬性:segmentfault

//基於apache的commom-pool2的對象池配置
    protected GenericObjectPoolConfig poolConfig;
    
    //超時時間,默認是2000
    protected int timeout = Protocol.DEFAULT_TIMEOUT;
    
    //sentinel的密碼
    protected String password;

    //redis數據庫的數目
    protected int database = Protocol.DEFAULT_DATABASE;

    //master監聽器,當master的地址發生改變時,會觸發這些監聽者
    protected Set<MasterListener> masterListeners = new HashSet<MasterListener>();

    protected Logger log = Logger.getLogger(getClass().getName());
    
    //Jedis實例建立工廠
    private volatile JedisFactory factory;
    
    //當前的master,HostAndPort是一個簡單的包裝了ip和port的模型類
    private volatile HostAndPort currentHostMaster;

構造器

構造器的代碼以下:多線程

public JedisSentinelPool(String masterName, Set<String> sentinels, final GenericObjectPoolConfig poolConfig, int timeout, final String password, final int database) {

        this.poolConfig = poolConfig;
        this.timeout = timeout;
        this.password = password;
        this.database = database;

        HostAndPort master = initSentinels(sentinels, masterName);
        initPool(master);
}

構造器一開始對實例變量進行賦值,參數sentinels是客戶端所須要打交道的Redis-Sentinel,容許有多個,用一個集合來盛裝。ide

而後經過initSentinels方法與sentinel溝通後,肯定當前sentinel所監視的master是哪個。而後經過master來建立好對象池,以便後續從對象池中取出一個Jedis實例,來對master進行操做。memcached

initSentinels方法

initSentinels方法的代碼以下所示,我加了一些註釋:函數

private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {

        HostAndPort master = null;
        boolean sentinelAvailable = false;

        log.info("Trying to find master from available Sentinels...");

        // 有多個sentinels,遍歷這些個sentinels
        for (String sentinel : sentinels) {
            // host:port表示的sentinel地址轉化爲一個HostAndPort對象。
            final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));

            log.fine("Connecting to Sentinel " + hap);

            Jedis jedis = null;
            try {
                // 鏈接到sentinel
                jedis = new Jedis(hap.getHost(), hap.getPort());

                // 根據masterName獲得master的地址,返回一個list,host= list[0], port =
                // list[1]
                List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

                // connected to sentinel...
                sentinelAvailable = true;

                if (masterAddr == null || masterAddr.size() != 2) {
                    log.warning("Can not get master addr, master name: " + masterName
                            + ". Sentinel: " + hap + ".");
                    continue;
                }

                master = toHostAndPort(masterAddr);
                log.fine("Found Redis master at " + master);
                // 若是在任何一個sentinel中找到了master,再也不遍歷sentinels
                break;
            } catch (JedisConnectionException e) {
                log.warning("Cannot connect to sentinel running @ " + hap
                        + ". Trying next one.");
            } finally {
                // 關閉與sentinel的鏈接
                if (jedis != null) {
                    jedis.close();
                }
            }
        }

        // 到這裏,若是master爲null,則說明有兩種狀況,一種是全部的sentinels節點都down掉了,一種是master節點沒有被存活的sentinels監控到
        if (master == null) {
            if (sentinelAvailable) {
                // can connect to sentinel, but master name seems to not
                // monitored
                throw new JedisException("Can connect to sentinel, but " + masterName
                        + " seems to be not monitored...");
            } else {
                throw new JedisConnectionException(
                        "All sentinels down, cannot determine where is " + masterName
                                + " master is running...");
            }
        }

        //若是走到這裏,說明找到了master的地址
        log.info("Redis master running at " + master + ", starting Sentinel listeners...");

        //啓動對每一個sentinels的監聽
        for (String sentinel : sentinels) {
            final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
            MasterListener masterListener = new MasterListener(masterName, hap.getHost(),
                    hap.getPort());
            masterListeners.add(masterListener);
            masterListener.start();
        }

        return master;
    }

能夠看到initSentinels方法的參數有一個masterName,就是咱們所須要查找的master的名字。
一開始,遍歷多個sentinels,一個一個鏈接到sentinel,去詢問關於masterName的消息,能夠看到是經過jedis.sentinelGetMasterAddrByName()方法去鏈接sentinel,並詢問當前的master的地址。點進這個方法去看看,源代碼是這樣寫的:ui

/**
   * <pre>
   * redis 127.0.0.1:26381> sentinel get-master-addr-by-name mymaster
   * 1) "127.0.0.1"
   * 2) "6379"
   * </pre>
   * @param masterName
   * @return two elements list of strings : host and port.
   */
  public List<String> sentinelGetMasterAddrByName(String masterName) {
    client.sentinel(Protocol.SENTINEL_GET_MASTER_ADDR_BY_NAME, masterName);
    final List<Object> reply = client.getObjectMultiBulkReply();
    return BuilderFactory.STRING_LIST.build(reply);
  }

調用的是與Jedis綁定的client去發送一個"get-master-addr-by-name"命令。

回到initSentinels方法中,若是沒有詢問到master的地址,那就詢問下一個sentinel。若是詢問到了master的地址,那麼將再也不遍歷sentinel集合,直接break退出循環遍歷。

若是循環結束後,master的值爲null,那麼有兩種可能:

  • 一種是全部的sentinel實例都不可用了

  • 另一種是,sentinel實例有可用的,可是沒有監控名字爲masterName的Redis。

若是master爲null,程序會拋出異常,再也不往下走了。若是master不爲null呢,繼續往下走。

能夠從代碼中看到,爲每一個sentinel都啓動了一個監聽者MasterListener。MasterListener自己是一個線程,它會去訂閱sentinel上關於master節點地址改變的消息。

接下來先分析構造方法中的另一個方法:initPool。以後再看MasterListener的實現。

initPool方法

initPool的實現源代碼以下所示:

private void initPool(HostAndPort master) {
        if (!master.equals(currentHostMaster)) {
            currentHostMaster = master;
            if (factory == null) {
                factory = new JedisFactory(master.getHost(), master.getPort(), timeout,
                        password, database);
                initPool(poolConfig, factory);
            } else {
                factory.setHostAndPort(currentHostMaster);
                // although we clear the pool, we still have to check the
                // returned object
                // in getResource, this call only clears idle instances, not
                // borrowed instances
                internalPool.clear();
            }

            log.info("Created JedisPool to master at " + master);
        }
    }

能夠看到,做爲參數傳進來的master會與實例變量currentHostMaster做比較,看看是不是相同的,爲何要做這個比較呢,由於前文中提到的MasterListener會在發現master地址改變之後,去調用initPool方法。
若是是第一次調用initPool方法(構造函數中調用),那麼會初始化Jedis實例建立工廠,若是不是第一次調用(MasterListener中調用),那麼只對已經初始化的工廠進行從新設置。
從以上也能夠看出爲何currentHostMasterfactory這兩個變量爲何要聲明爲volatile,它們會在多線程環境下被訪問和修改,所以必須保證可見性
第一次調用時,會調用initPool(poolConfig, factory)方法。
看看這個方法的源代碼:

public void initPool(final GenericObjectPoolConfig poolConfig,
            PooledObjectFactory<T> factory) {

        if (this.internalPool != null) {
            try {
                closeInternalPool();
            } catch (Exception e) {
            }
        }

        this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
    }

基本上只幹了一件事:初始化內部對象池。

MasterListener監聽者線程

直接看它的run方法實現吧:

public void run() {

            running.set(true);

            while (running.get()) {

                j = new Jedis(host, port);

                try {
                    //訂閱sentinel上關於master地址改變的消息
                    j.subscribe(new JedisPubSub() {
                        @Override
                        public void onMessage(String channel, String message) {
                            log.fine("Sentinel " + host + ":" + port + " published: "
                                    + message + ".");

                            String[] switchMasterMsg = message.split(" ");

                            if (switchMasterMsg.length > 3) {

                                if (masterName.equals(switchMasterMsg[0])) {
                                    initPool(toHostAndPort(Arrays.asList(
                                            switchMasterMsg[3], switchMasterMsg[4])));
                                } else {
                                    log.fine("Ignoring message on +switch-master for master name "
                                            + switchMasterMsg[0]
                                            + ", our master name is " + masterName);
                                }

                            } else {
                                log.severe("Invalid message received on Sentinel " + host
                                        + ":" + port + " on channel +switch-master: "
                                        + message);
                            }
                        }
                    }, "+switch-master");

                } catch (JedisConnectionException e) {

                    if (running.get()) {
                        log.severe("Lost connection to Sentinel at " + host + ":" + port
                                + ". Sleeping 5000ms and retrying.");
                        try {
                            Thread.sleep(subscribeRetryWaitTimeMillis);
                        } catch (InterruptedException e1) {
                            e1.printStackTrace();
                        }
                    } else {
                        log.fine("Unsubscribing from Sentinel at " + host + ":" + port);
                    }
                }
            }
        }

能夠看到它依然委託了Jedis去與sentinel打交道,訂閱了關於master地址變換的消息,當master地址變換時,就會再調用一次initPool方法,從新設置對象池相關的設置。

尾聲

Jedis的JedisSentinelPool的實現僅僅適用於單個master-slave。 如今有了更多的需求,既須要sentinel提供的自動主備切換機制,又須要客戶端可以作數據分片(Sharding),相似於memcached用一致性哈希進行數據分片。 接下來可能會本身在現有Jedis上實現一個支持一致性哈希分片的ShardedJedisSentinelPool。

相關文章
相關標籤/搜索