擴展Redis的Jedis客戶端,哨兵模式讀請求走Slave集羣

擴展Redis的Jedis客戶端,哨兵模式讀請求走Slave集羣

版權聲明:本文爲博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處連接和本聲明。
本文連接: http://www.javashuo.com/article/p-kmezlufy-ek.html

Redis哨兵模式,由Sentinel節點和Redis節點組成,哨兵節點負責監控Redis的健康情況,負責協調Redis主從複製的關係。html

本文不詳細討論Redis哨兵模式,關於哨兵的詳細介紹能夠參考(http://www.javashuo.com/article/p-hcihbkbc-mh.htmljava

 

在使用哨兵模式之後,客戶端不能直接鏈接到Redis集羣,而是鏈接到哨兵集羣,經過哨兵節點獲取Redis主節點(Master)的信息,再進行鏈接,下面給出一小段代碼。redis

  1.  
    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
  2.  
    jedisPoolConfig.setMaxTotal( 10);
  3.  
    jedisPoolConfig.setMaxIdle( 5);
  4.  
    jedisPoolConfig.setMinIdle( 5);
  5.  
     
  6.  
    Set<String> sentinels = new HashSet<>(Arrays.asList(
  7.  
    "192.168.80.112:26379",
  8.  
    "192.168.80.113:26379",
  9.  
    "192.168.80.114:26379"
  10.  
    ));
  11.  
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
  12.  
    poolConfig.setMaxTotal( 10);
  13.  
    poolConfig.setMaxIdle( 5);
  14.  
    poolConfig.setMinIdle( 5);
  15.  
    JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig);

能夠看到,客戶端只配置了哨兵集羣的IP地址,經過哨兵獲取redis主節點信息,再與其進行鏈接,下面給出關鍵代碼的源碼分析,下面代碼片斷講述瞭如何獲取主節點信息。數據庫

  1.  
    private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
  2.  
    //主節點ip與port對象
  3.  
    HostAndPort master = null;
  4.  
    //是否能夠鏈接到哨兵節點
  5.  
    boolean sentinelAvailable = false;
  6.  
     
  7.  
    log.info( "Trying to find master from available Sentinels...");
  8.  
     
  9.  
    //循環全部的哨兵節點,依次進行鏈接,邏輯以下:
  10.  
    //先鏈接第一個,若是鏈接成功,可以獲取主節點信息,方法返回,不然鏈接第二個,第三個,第N個。
  11.  
    //若是所有都失敗,則拋出異常,RedisPool初始化失敗
  12.  
    for (String sentinel : sentinels) {
  13.  
     
  14.  
    //哨兵的ip和port
  15.  
    final HostAndPort hap = HostAndPort.parseString(sentinel);
  16.  
     
  17.  
    log.info( "Connecting to Sentinel " + hap);
  18.  
     
  19.  
    Jedis jedis = null;
  20.  
    try {
  21.  
    //與哨兵節點進行鏈接,這裏可能會出錯,好比哨兵掛了。。
  22.  
    jedis = new Jedis(hap.getHost(), hap.getPort());
  23.  
     
  24.  
    //查詢主節點信息
  25.  
    List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
  26.  
     
  27.  
    //設置能夠鏈接到哨兵
  28.  
    sentinelAvailable = true;
  29.  
     
  30.  
    //若是主節點信息獲取不到,則繼續循環,換一下哨兵繼續上面邏輯
  31.  
    if (masterAddr == null || masterAddr.size() != 2) {
  32.  
    log.warn( "Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
  33.  
    + ".");
  34.  
    continue;
  35.  
    }
  36.  
     
  37.  
    //獲取到主節點信息
  38.  
    master = toHostAndPort(masterAddr);
  39.  
    log.info( "Found Redis master at " + master);
  40.  
    break;
  41.  
    } catch (JedisException e) {
  42.  
    // resolves #1036, it should handle JedisException there's another chance
  43.  
    // of raising JedisDataException
  44.  
    //出異常直接忽略,繼續循環
  45.  
    log.warn( "Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
  46.  
    + ". Trying next one.");
  47.  
    } finally {
  48.  
    if (jedis != null) {
  49.  
    jedis.close();
  50.  
    }
  51.  
    }
  52.  
    }
  53.  
     
  54.  
    //若是所有哨兵都獲取不到主節點信息則拋出異常
  55.  
    if (master == null) {
  56.  
    //能夠鏈接到哨兵,可是查詢不到主節點信息
  57.  
    if (sentinelAvailable) {
  58.  
    // can connect to sentinel, but master name seems to not
  59.  
    // monitored
  60.  
    throw new JedisException("Can connect to sentinel, but " + masterName
  61.  
    + " seems to be not monitored...");
  62.  
    } else {
  63.  
    //鏈接不到哨兵 有可能哨兵所有掛了
  64.  
    throw new JedisConnectionException("All sentinels down, cannot determine where is "
  65.  
    + masterName + " master is running...");
  66.  
    }
  67.  
    }
  68.  
     
  69.  
    log.info( "Redis master running at " + master + ", starting Sentinel listeners...");
  70.  
     
  71.  
    //下面的邏輯是上面能夠拿到主節點信息時纔會執行
  72.  
    //循環哨兵,創建訂閱消息,監聽節點切換消息,若是redis集羣節點發生變更,這裏會收到通知
  73.  
    for (String sentinel : sentinels) {
  74.  
    final HostAndPort hap = HostAndPort.parseString(sentinel);
  75.  
    JedisSentinelSlavePool.MasterListener masterListener = new JedisSentinelSlavePool.MasterListener(masterName, hap.getHost(), hap.getPort());
  76.  
    // whether MasterListener threads are alive or not, process can be stopped
  77.  
    masterListener.setDaemon( true);
  78.  
    masterListeners.add(masterListener);
  79.  
    masterListener.start();
  80.  
    }
  81.  
     
  82.  
    //返回主節點信息
  83.  
    return master;
  84.  
    }

下面的代碼判斷,分析了客戶端如何感知到redis主從節點關係發生變化,原理是經過訂閱哨兵的頻道獲取的,當又新的主節點出現,則清空原有鏈接池,根據新的主節點從新建立鏈接對象。apache

  1.  
     
  2.  
     
  3.  
    running. set(true);
  4.  
     
  5.  
    //死循環
  6.  
    while (running.get()) {
  7.  
     
  8.  
    //與哨兵進行鏈接
  9.  
    j = new Jedis(host, port);
  10.  
     
  11.  
    try {
  12.  
    // double check that it is not being shutdown
  13.  
    if (!running.get()) {
  14.  
    break;
  15.  
    }
  16.  
     
  17.  
    //訂閱頻道監聽節點切換消息
  18.  
    j.subscribe( new JedisPubSub() {
  19.  
    @ Override
  20.  
    public void onMessage(String channel, String message) {
  21.  
    log.info( "Sentinel " + host + ":" + port + " published: " + message + ".");
  22.  
     
  23.  
    String[] switchMasterMsg = message.split( " ");
  24.  
     
  25.  
    if (switchMasterMsg.length > 3) {
  26.  
     
  27.  
    if (masterName.equals(switchMasterMsg[0])) {
  28.  
    //toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))
  29.  
    //這裏獲取了新的主節點的ip與端口
  30.  
    //調用initPool清空原來的鏈接池,這樣一來,當須要獲取jedis時,池會根據新的主接線建立對象
  31.  
    initPool(toHostAndPort(Arrays.asList(switchMasterMsg[ 3], switchMasterMsg[4])));
  32.  
    } else {
  33.  
    log.info( "Ignoring message on +switch-master for master name "
  34.  
    + switchMasterMsg[ 0] + ", our master name is " + masterName);
  35.  
    }
  36.  
     
  37.  
    } else {
  38.  
    log.warn( "Invalid message received on Sentinel " + host + ":" + port
  39.  
    + " on channel +switch-master: " + message);
  40.  
    }
  41.  
    }
  42.  
    }, "+switch-master");
  43.  
     
  44.  
    } catch (JedisConnectionException e) {
  45.  
     
  46.  
    if (running.get()) {
  47.  
    log.info( "Lost connection to Sentinel at " + host + ":" + port
  48.  
    + ". Sleeping 5000ms and retrying.", e);
  49.  
    try {
  50.  
    //若是鏈接哨兵異常,則等待若干時間後無限重試
  51.  
    Thread.sleep(subscribeRetryWaitTimeMillis);
  52.  
    } catch (InterruptedException e1) {
  53.  
    log.info( "Sleep interrupted: ", e1);
  54.  
    }
  55.  
    } else {
  56.  
    log.info( "Unsubscribing from Sentinel at " + host + ":" + port);
  57.  
    }
  58.  
    } finally {
  59.  
    j.close();
  60.  
    }
  61.  
    }
  62.  
    }

由此咱們知道,Redis的客戶端,在哨兵模式下的實現,讀寫都是走Master,那麼缺點是顯而易見的,那就是若干個Slave徹底變成了熱備,沒有系統分擔壓力,接下來咱們擴展它,讓它支持能夠在Slave節點讀取數據,這樣咱們的程序,在寫入數據時走Master,在讀取數據時走Slave,大大提升了系統的性能。緩存

 

第一步,咱們重寫這個類 JedisSentinelSlavePool extends Pool<Jedis>負載均衡

全部代碼都拷貝JedisSentinelPool,只修改了下面代碼,建立了JedisSlaveFactory,傳入了哨兵集羣信息,和哨兵的名字。dom

  1.  
    private void initPool(HostAndPort master) {
  2.  
    if (!master.equals(currentHostMaster)) {
  3.  
    currentHostMaster = master;
  4.  
    if (factory == null) {
  5.  
    factory = new JedisSlaveFactory(sentinels, masterName, connectionTimeout,
  6.  
    soTimeout, password, database, clientName, false, null, null, null);
  7.  
    initPool(poolConfig, factory);
  8.  
    } else {
  9.  
    internalPool.clear();
  10.  
    }
  11.  
     
  12.  
    log.info( "Created JedisPool to master at " + master);
  13.  
    }
  14.  
    }

 

第二步,建立JedisSlaveFactory。ide

makeObject這個方法,是Redis鏈接池獲取底層鏈接的地方,我麼只須要在這裏,建立一個鏈接到Slave節點的對象便可,源碼分析

思路就是經過哨兵集羣,獲取到可用的slave節點信息,而後隨機選取一個建立對象,達到負載均衡的效果。

  1.  
    package com.framework.core.redis;
  2.  
     
  3.  
    import lombok.extern.slf4j.Slf4j;
  4.  
    import org.apache.commons.pool2.PooledObject;
  5.  
    import org.apache.commons.pool2.PooledObjectFactory;
  6.  
    import org.apache.commons.pool2.impl.DefaultPooledObject;
  7.  
    import redis.clients.jedis.BinaryJedis;
  8.  
    import redis.clients.jedis.HostAndPort;
  9.  
    import redis.clients.jedis.Jedis;
  10.  
    import redis.clients.jedis.exceptions.JedisConnectionException;
  11.  
    import redis.clients.jedis.exceptions.JedisException;
  12.  
     
  13.  
    import javax.net.ssl.HostnameVerifier;
  14.  
    import javax.net.ssl.SSLParameters;
  15.  
    import javax.net.ssl.SSLSocketFactory;
  16.  
    import java.util.*;
  17.  
     
  18.  
    @Slf4j
  19.  
    public class JedisSlaveFactory implements PooledObjectFactory<Jedis> {
  20.  
     
  21.  
    private final Set<String> sentinels;
  22.  
    private final String masterName;
  23.  
    private final int connectionTimeout;
  24.  
    private final int soTimeout;
  25.  
    private final String password;
  26.  
    private final int database;
  27.  
    private final String clientName;
  28.  
    private final boolean ssl;
  29.  
    private final SSLSocketFactory sslSocketFactory;
  30.  
    private SSLParameters sslParameters;
  31.  
    private HostnameVerifier hostnameVerifier;
  32.  
    private Random random;
  33.  
     
  34.  
    public JedisSlaveFactory(final Set<String> sentinels, final String masterName, final int connectionTimeout,
  35.  
    final int soTimeout, final String password, final int database, final String clientName,
  36.  
    final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters,
  37.  
    final HostnameVerifier hostnameVerifier) {
  38.  
    this.sentinels = sentinels;
  39.  
    this.masterName = masterName;
  40.  
    this.connectionTimeout = connectionTimeout;
  41.  
    this.soTimeout = soTimeout;
  42.  
    this.password = password;
  43.  
    this.database = database;
  44.  
    this.clientName = clientName;
  45.  
    this.ssl = ssl;
  46.  
    this.sslSocketFactory = sslSocketFactory;
  47.  
    this.sslParameters = sslParameters;
  48.  
    this.hostnameVerifier = hostnameVerifier;
  49.  
    this.random = new Random();
  50.  
    }
  51.  
     
  52.  
    @Override
  53.  
    public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
  54.  
    final BinaryJedis jedis = pooledJedis.getObject();
  55.  
    if (jedis.getDB() != database) {
  56.  
    jedis.select(database);
  57.  
    }
  58.  
    }
  59.  
     
  60.  
    /**
  61.  
    * 銷燬redis底層鏈接
  62.  
    */
  63.  
    @Override
  64.  
    public void destroyObject(PooledObject<Jedis> pooledJedis){
  65.  
    log.debug( "destroyObject =" + pooledJedis.getObject());
  66.  
    final BinaryJedis jedis = pooledJedis.getObject();
  67.  
    if (jedis.isConnected()) {
  68.  
    try {
  69.  
    jedis.quit();
  70.  
    jedis.disconnect();
  71.  
    } catch (Exception e) {
  72.  
    }
  73.  
    }
  74.  
    }
  75.  
     
  76.  
    /**
  77.  
    * 建立Redis底層鏈接對象,返回池化對象.
  78.  
    */
  79.  
    @Override
  80.  
    public PooledObject<Jedis> makeObject() {
  81.  
    List<HostAndPort> slaves = this.getAlivedSlaves();
  82.  
     
  83.  
    //在slave節點中隨機選取一個節點進行鏈接
  84.  
    int index = slaves.size() == 1 ? 0 : random.nextInt(slaves.size());
  85.  
    final HostAndPort hostAndPort = slaves.get(index);
  86.  
     
  87.  
    log.debug( "Create jedis instance from slaves=[" + slaves + "] , choose=[" + hostAndPort + "]");
  88.  
     
  89.  
    //建立redis客戶端
  90.  
    final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
  91.  
    soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
  92.  
     
  93.  
    //測試鏈接,設置密碼,數據庫.
  94.  
    try {
  95.  
    jedis.connect();
  96.  
    if (null != this.password) {
  97.  
    jedis.auth( this.password);
  98.  
    }
  99.  
    if (database != 0) {
  100.  
    jedis.select(database);
  101.  
    }
  102.  
    if (clientName != null) {
  103.  
    jedis.clientSetname(clientName);
  104.  
    }
  105.  
    } catch (JedisException je) {
  106.  
    jedis.close();
  107.  
    throw je;
  108.  
    }
  109.  
     
  110.  
    return new DefaultPooledObject<Jedis>(jedis);
  111.  
    }
  112.  
     
  113.  
     
  114.  
    /**
  115.  
    * 獲取可用的RedisSlave節點信息
  116.  
    */
  117.  
    private List<HostAndPort> getAlivedSlaves() {
  118.  
    log.debug( "Get alived salves start...");
  119.  
     
  120.  
    List<HostAndPort> alivedSalaves = new ArrayList<>();
  121.  
    boolean sentinelAvailable = false;
  122.  
     
  123.  
    //循環哨兵,創建鏈接獲取slave節點信息
  124.  
    //當某個哨兵鏈接失敗,會忽略異常鏈接下一個哨兵
  125.  
    for (String sentinel : sentinels) {
  126.  
    final HostAndPort hap = HostAndPort.parseString(sentinel);
  127.  
     
  128.  
    log.debug( "Connecting to Sentinel " + hap);
  129.  
     
  130.  
    Jedis jedis = null;
  131.  
    try {
  132.  
    jedis = new Jedis(hap.getHost(), hap.getPort());
  133.  
     
  134.  
    List<Map<String, String>> slavesInfo = jedis.sentinelSlaves(masterName);
  135.  
     
  136.  
    //能夠鏈接到哨兵
  137.  
    sentinelAvailable = true;
  138.  
     
  139.  
    //沒有查詢到slave信息,循環下一個哨兵
  140.  
    if (slavesInfo == null || slavesInfo.size() == 0) {
  141.  
    log.warn( "Cannot get slavesInfo, master name: " + masterName + ". Sentinel: " + hap
  142.  
    + ". Trying next one.");
  143.  
    continue;
  144.  
    }
  145.  
     
  146.  
    //獲取可用的Slave信息
  147.  
    for (Map<String, String> slave : slavesInfo) {
  148.  
    if(slave.get("flags").equals("slave")) {
  149.  
    String host = slave.get( "ip");
  150.  
    int port = Integer.valueOf(slave.get("port"));
  151.  
    HostAndPort hostAndPort = new HostAndPort(host, port);
  152.  
     
  153.  
    log.info( "Found alived redis slave:[" + hostAndPort + "]");
  154.  
     
  155.  
    alivedSalaves.add(hostAndPort);
  156.  
    }
  157.  
    }
  158.  
     
  159.  
    log.debug( "Get alived salves end...");
  160.  
    break;
  161.  
    } catch (JedisException e) {
  162.  
    //當前哨兵鏈接失敗,忽略錯誤鏈接下一個哨兵
  163.  
    log.warn( "Cannot get slavesInfo from sentinel running @ " + hap + ". Reason: " + e
  164.  
    + ". Trying next one.");
  165.  
    } finally {
  166.  
    if (jedis != null) {
  167.  
    jedis.close();
  168.  
    }
  169.  
    }
  170.  
    }
  171.  
     
  172.  
    //沒有可用的slave節點信息
  173.  
    if (alivedSalaves.isEmpty()) {
  174.  
    if (sentinelAvailable) {
  175.  
    throw new JedisException("Can connect to sentinel, but " + masterName
  176.  
    + " cannot find any redis slave");
  177.  
    } else {
  178.  
    throw new JedisConnectionException("All sentinels down");
  179.  
    }
  180.  
    }
  181.  
     
  182.  
    return alivedSalaves;
  183.  
    }
  184.  
     
  185.  
    @Override
  186.  
    public void passivateObject(PooledObject<Jedis> pooledJedis) {
  187.  
    }
  188.  
     
  189.  
    /**
  190.  
    * 檢查jedis客戶端是否有效
  191.  
    * @param pooledJedis 池中對象
  192.  
    * @return true有效 false無效
  193.  
    */
  194.  
    @Override
  195.  
    public boolean validateObject(PooledObject<Jedis> pooledJedis) {
  196.  
    final BinaryJedis jedis = pooledJedis.getObject();
  197.  
    try {
  198.  
    //是否TCP鏈接 && 是否ping通 && 是否slave角色
  199.  
    boolean result = jedis.isConnected()
  200.  
    && jedis.ping().equals( "PONG")
  201.  
    && jedis.info( "Replication").contains("role:slave");
  202.  
     
  203.  
    log.debug( "ValidateObject Jedis=["+jedis+"] host=[ " + jedis.getClient().getHost() +
  204.  
    "] port=[" + jedis.getClient().getPort() +"] return=[" + result + "]");
  205.  
    return result;
  206.  
    } catch (final Exception e) {
  207.  
    log.warn( "ValidateObject error jedis client cannot use", e);
  208.  
    return false;
  209.  
    }
  210.  
    }
  211.  
     
  212.  
    }

使用的時候跟原來同樣,建立slave鏈接池就能夠了。

  1.  
    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
  2.  
    //鏈接池中最大對象數量
  3.  
    jedisPoolConfig.setMaxTotal( 100);
  4.  
    //最大可以保持idel狀態的對象數
  5.  
    jedisPoolConfig.setMaxIdle( 1);
  6.  
    //最小可以保持idel狀態的對象數
  7.  
    jedisPoolConfig.setMinIdle( 1);
  8.  
    //當池內沒有可用資源,最大等待時長
  9.  
    jedisPoolConfig.setMaxWaitMillis( 3000);
  10.  
    //表示有一個idle object evitor線程對object進行掃描,調用validateObject方法.
  11.  
    jedisPoolConfig.setTestWhileIdle( true);
  12.  
    //evitor線程對object進行掃描的時間間隔
  13.  
    jedisPoolConfig.setTimeBetweenEvictionRunsMillis( 30000);
  14.  
    //表示對象的空閒時間,若是超過這個時間對象沒有被使用則變爲idel狀態
  15.  
    //而後才能被idle object evitor掃描並驅逐;
  16.  
    //這一項只有在timeBetweenEvictionRunsMillis大於0時和setTestWhileIdle=true時纔有意義
  17.  
    //-1 表示對象不會變成idel狀態
  18.  
    jedisPoolConfig.setMinEvictableIdleTimeMillis( 60000);
  19.  
    //表示idle object evitor每次掃描的最多的對象數;
  20.  
    jedisPoolConfig.setNumTestsPerEvictionRun( 10);
  21.  
     
  22.  
    //在從池中獲取對象時調用validateObject方法檢查
  23.  
    jedisPoolConfig.setTestOnBorrow( false);
  24.  
    //在把對象放回池中時調用validateObject方法檢查
  25.  
    jedisPoolConfig.setTestOnReturn( false);
  26.  
     
  27.  
    Set<String> sentinels = new HashSet<>(Arrays.asList(
  28.  
    "192.168.80.112:26379",
  29.  
    "192.168.80.113:26379",
  30.  
    "192.168.80.114:26379"
  31.  
    ));
  32.  
     
  33.  
    JedisSentinelSlavePool pool = new JedisSentinelSlavePool("mymaster", sentinels, jedisPoolConfig);

 

與Spring集成,分別建立不一樣的對象便可,在程序中查詢接口能夠先走slave進行查詢,查詢不到在查詢master, master也沒有則寫入緩存,返回數據,下載在查詢slave就同步過去啦,這樣一來redis的性能會大幅度的提高。

  1.  
    @Primary
  2.  
    @Bean(name = "redisTemplateMaster")
  3.  
    public RedisTemplate<Object, Object> redisTemplateMaster() {
  4.  
    RedisTemplate<Object, Object> template = new RedisTemplate<>();
  5.  
    template.setConnectionFactory(redisMasterConnectionFactory());
  6.  
    template.setKeySerializer(new StringRedisSerializer());
  7.  
    template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
  8.  
    return template;
  9.  
    }
  10.  
     
  11.  
    @Bean(name = "redisTemplateSlave")
  12.  
    public RedisTemplate<Object, Object> redisTemplateSlave() {
  13.  
    RedisTemplate<Object, Object> template = new RedisTemplate<>();
  14.  
    template.setConnectionFactory(redisSlaveConnectionFactory());
  15.  
    template.setKeySerializer(new StringRedisSerializer());
  16.  
    template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
  17.  
    return template;
  18.  
    }
相關文章
相關標籤/搜索