[源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(1)

[源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(1)

0x00 摘要

前面咱們有文章介紹了Amazon Dynamo系統架構 和 NetFlix Dynomite。html

咱們今天來看看 NetFlix Dynomite 的 Java 客戶端 DynoJedisClient 如何實現。分析客戶端是由於,此客戶端的做用很相似於集羣master,其思路是:java驅動提供多個策略接口,能夠用來驅動程序行爲調優。包括負載均衡,重試請求,管理節點鏈接等等。java

由於 Dynomite 對於本文來講,過於龐大&底層,並且 DynoJedisClient 與 Dynomite 耦合過於緊密, 因此咱們從最簡單的功能點出發看看 DynoJedisClient,因而咱們能夠想到的功能點是:node

  • 如何提供基本功能,即提供數據庫鏈接池;
  • 如何管理節點鏈接;
  • 如何拓撲感知;
  • 如何負載均衡;
  • 如何故障轉移;
  • 故障轉移;

因此咱們接下來就圍繞這些基本功能點進行分析。redis

0x01 背景概念

1.1 Amazon Dynamo

亞馬遜在業務發展期間面臨一些問題,主要受限於關係型數據庫的可擴展性和高可用性,所以研發了一套新的、基於 KV 存儲模型的數據庫,將之命名爲 Dynamo,其主要採起徹底的分佈式、去中心化的架構。算法

相較於傳統的關係型數據庫 MySQLDynamo 的功能目標與之有一些細小的差異,例如: Amazon 的業務場景多數狀況並不須要支持複雜查詢,卻要求必要的單節點故障容錯性、數據最終一致性(即犧牲數據強一致優先保障可用性)、較強的可擴展性等。sql

1.2 NetFlix Dynomite

Dynomite 是 NetFlix 對亞馬遜分佈式存儲引擎 Dynamo 的一個開源通用實現,它不只支持基於內存的 K/V 數據庫,還支持持久化的 Mysql、BerkeleyDb、LevelDb 等數據庫,並具備簡單、高效、支持跨數據中心的數據複製等優勢。數據庫

Dynomite 的最終目標是提供數據庫存儲引擎不能提供的簡單、高效、跨數據中心的數據複製功能。目前,Dynomite 已經實現了對 Redis 和 Memcached 的支持。服務器

0x02 Netflix選型思路

Netflix選擇Dynomite,是由於:架構

  • 其具備性能,多數據中心複製和高可用性的特色;併發

  • Dynomite提供分片和可插拔的數據存儲引擎,容許在數據需求增長垂直和水平擴展;

  • Dynomite在Redis之上提供了高可用性、對等複製以及一致性等特性,用於構建分佈式集羣隊列。

  • Dyno爲持久鏈接提供鏈接池;

  • Dyno能夠爲鏈接池配置爲拓撲感知;

  • 故障轉移:Dyno爲應用程序提供特定的本地機架,us-east-1a的客戶端將鏈接到相同區域的Dynomite/Redis節點,除非該節點不可用,在這種狀況下該客戶端將進行故障轉移。這個屬性被用於經過區域劃分隊列。

Dynomite對於本文來講,過於底層。

因此咱們重點就看看 DynoJedisClient 如何實現後面幾點,固然,這幾點其實也沒法脫離Dynomite,咱們只是力爭剝離出來

0x03 基礎知識

3.1 Data Center

Data Center 是由多個Rack組成的邏輯集合。

Data Center 能夠是一個機房或者一個區域的設備組合。

3.2 Rack

這是一個邏輯集合,有多個彼此臨近node的組成。好比一個機架上的全部物理機器。可簡單的理解爲存放服務器的機櫃。

數據中心與機架是什麼關係呢?N:1,1:N,M:N。

  • 若是隻須要幾臺服務器就能知足業務需求,這些服務器至少有2個數據中心,那這種狀況下多個數據中心能夠放在1個機架上,不過這種狀況對數據災備來講是不太保險的。
  • 第2種狀況是1個數據中心至關於1個機房,那機房裏會有多個機架。
  • 第3種狀況M:N爲多個機房的多個數據中心置於多個機架上。

3.2 Rings and Tokens

由集羣管理的數據就是一個環。環中的每一個節點被分配一個或多個由token描述的數據範圍,肯定在環中的位置。

Token是用於標識每一個分區的64位整數ID,範圍是-2^63 -- 2^63-1。經過hash算法計算partition key的hash值,以此肯定存放在哪一個節點。

Token也決定了每一個節點存儲的數據的分佈範圍,每一個節點保存的數據的key在(前一個節點Token,本節點Token]的半開半閉區間內,全部的節點造成一個首尾相接的環。

0x04 需求 & 思路

由於要爲上層屏蔽信息,因此 DynoJedisClient 就須要應對各類複雜信息,須要對系統有深入的瞭解,好比:

  • 如何維護鏈接,爲持久鏈接提供鏈接池;
  • 如何維護拓撲;
  • 如何負載均衡;
  • 如何故障轉移;
  • 如何自動重試及發現,好比自動重試掛掉的主機。自動發現集羣中的其餘主機。
  • 如何監控底層機架狀態;

所以,DynoJedisClient 的思路是:java驅動提供多個策略接口,能夠用來驅動程序行爲調優。包括負載均衡,重試請求,管理節點鏈接等等

0x05 使用

示例代碼以下:

public static void main(String[] args) throws IOException {
    final String clusterName = args[0];
    int version = Integer.parseInt(args[1]);
    final DynoQueueDemo demo = new DynoQueueDemo(clusterName, "us-east-1e");
    Properties props = new Properties();
    props.load(DynoQueueDemo.class.getResourceAsStream("/demo.properties"));
    for (String name : props.stringPropertyNames()) {
        System.setProperty(name, props.getProperty(name));
    }
    try {
        demo.initWithRemoteClusterFromEurekaUrl(args[0], 8102, false);
        if (version == 1) {
            demo.runSimpleV1Demo(demo.client);
        } else if (version == 2) {
            demo.runSimpleV2QueueDemo(demo.client);
        }
        Thread.sleep(10000);
    } catch (Exception ex) {
        ex.printStackTrace();
    } finally {
        demo.stop();
        logger.info("Done");
    }
}

以及輔助函數:

public void initWithRemoteClusterFromEurekaUrl(final String clusterName, final int port, boolean lock) throws Exception {
        initWithRemoteCluster(clusterName, getHostsFromDiscovery(clusterName), port, lock);
}
    
private void initWithRemoteCluster(String clusterName, final List<Host> hosts, final int port, boolean lock) throws Exception {
        final HostSupplier clusterHostSupplier = () -> hosts;

        if (lock)
            initDynoLockClient(clusterHostSupplier, null, "test", clusterName);
        else
            init(clusterHostSupplier, port, null);
}
    
public void initDynoLockClient(HostSupplier hostSupplier, TokenMapSupplier tokenMapSupplier, String appName,
                                   String clusterName) {
        dynoLockClient = new DynoLockClient.Builder().withApplicationName(appName)
                .withDynomiteClusterName(clusterName)
                .withTimeoutUnit(TimeUnit.MILLISECONDS)
                .withTimeout(10000)
                .withHostSupplier(hostSupplier)
                .withTokenMapSupplier(tokenMapSupplier).build();
}

0x06 配置

在 DynoJedisClient 之中,有以下重要配置類。

6.1 缺省配置

ConnectionPoolConfigurationImpl主要是提供缺省配置。

public class ConnectionPoolConfigurationImpl implements ConnectionPoolConfiguration {
    // DEFAULTS
    private static final LoadBalancingStrategy DEFAULT_LB_STRATEGY = LoadBalancingStrategy.TokenAware;
    private static final CompressionStrategy DEFAULT_COMPRESSION_STRATEGY = CompressionStrategy.NONE;

    private HostSupplier hostSupplier;
    private TokenMapSupplier tokenSupplier;
    private HostConnectionPoolFactory hostConnectionPoolFactory;
    private HashPartitioner hashPartitioner;
    private LoadBalancingStrategy lbStrategy = DEFAULT_LB_STRATEGY;
    private CompressionStrategy compressionStrategy = DEFAULT_COMPRESSION_STRATEGY;
}

6.2 策略配置

ArchaiusConnectionPoolConfiguration最主要是提供了若干策略,包括負載,壓縮,重試:

  • LoadBalancingStrategy parseLBStrategy(String propertyPrefix) 是負載策略;
  • CompressionStrategy parseCompressionStrategy(String propertyPrefix) 是壓縮策略;
  • RetryPolicyFactory parseRetryPolicyFactory(String propertyPrefix) 是重試策略;

具體以下:

public class ArchaiusConnectionPoolConfiguration extends ConnectionPoolConfigurationImpl {
    ......

    private final LoadBalancingStrategy loadBalanceStrategy;
    private final CompressionStrategy compressionStrategy;
    private final ErrorRateMonitorConfig errorRateConfig;
    private final RetryPolicyFactory retryPolicyFactory;
    private final DynamicBooleanProperty failOnStartupIfNoHosts;
    private final DynamicIntProperty lockVotingSize;
    
    ......
}

0x07 定義

DynoJedisClient 定義以下,咱們能夠看到最重要的成員變量就是鏈接池ConnectionPool。

public class DynoJedisClient implements JedisCommands, BinaryJedisCommands, MultiKeyCommands,ScriptingCommands, MultiKeyBinaryCommands, DynoJedisCommands {

    private final String appName;
    private final String clusterName;
    
    private final ConnectionPool<Jedis> connPool;
    
    private final AtomicReference<DynoJedisPipelineMonitor> pipelineMonitor = new AtomicReference<DynoJedisPipelineMonitor>();

    protected final DynoOPMonitor opMonitor;
    protected final ConnectionPoolMonitor cpMonitor;
}

0x08 邏輯鏈接池

由於 DynoJedisClient 最主要是管理鏈接池,因此咱們首先介紹 邏輯鏈接池 ConnectionPoolImpl。

鏈接池層爲應用程序抽象全部鏈接管理。在這裏,咱們能夠配置全部內容,例如指定池選項,負載平衡策略,重試策略或默認一致性級別。

ConnectionPoolImpl 是核心類,其主要功能是:

  • 對於從HostSupplier得到的各類HostConnectionPool進行維護,造成一個HostConnectionPool集合;
  • 對於HostSupplier檢測到的hosts,進行添加刪除;
  • 從HostConnectionPool提取Connection,進行Operation的執行;
  • 在執行Operation時,採用HostSelectionStrategy,好比:basically Round Robin 或者 TokenAware策略;
  • 使用health check monitor來進行錯誤率跟蹤。health check monitor能夠決定重用HostConnectionPool,以及fallback到remote數據中心的HostConnectionPools執行;
  • 使用RetryPolicy來執行operation;

具體定義以下:

public class ConnectionPoolImpl<CL> implements ConnectionPool<CL>, TopologyView {

    private final ConcurrentHashMap<Host, HostConnectionPool<CL>> cpMap = new ConcurrentHashMap<Host, HostConnectionPool<CL>>();
  
    private final ConnectionPoolHealthTracker<CL> cpHealthTracker;

    private final HostConnectionPoolFactory<CL> hostConnPoolFactory;
    private final ConnectionFactory<CL> connFactory;
    private final ConnectionPoolConfiguration cpConfiguration;
    private final ConnectionPoolMonitor cpMonitor;

    private final ScheduledExecutorService idleThreadPool = Executors.newSingleThreadScheduledExecutor();

    private final HostsUpdater hostsUpdater;
    private final ScheduledExecutorService connPoolThreadPool = Executors.newScheduledThreadPool(1);

    private HostSelectionWithFallback<CL> selectionStrategy;

    private Type poolType;
}

此時邏輯以下:

+------------------------+
|DynoJedisClient         |
|                        |
|                        |            +------------------------+
|                        |            |                        |
|          connPool +-------------->  |   ConnectionPoolImpl   |
|                        |            |                        |
|                        |            +------------------------+
+------------------------+

8.1 啓動

鏈接池 啓動邏輯是:

  • 利用hostsUpdater來獲取到的host進行配置添加;
  • 啓用health check monitor來進行錯誤率跟蹤;

具體以下:

@Override
public Future<Boolean> start() throws DynoException {

        HostSupplier hostSupplier = cpConfiguration.getHostSupplier();
        HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
        cpMonitor.setHostCount(hostStatus.getHostCount());

        Collection<Host> hostsUp = hostStatus.getActiveHosts();
        final ExecutorService threadPool = Executors.newFixedThreadPool(Math.max(10, hostsUp.size()));
        final List<Future<Void>> futures = new ArrayList<Future<Void>>();

    	// 利用hostsUpdater來獲取到的host進行配置添加
        for (final Host host : hostsUp) {
            // Add host connection pool, but don't init the load balancer yet
            futures.add(threadPool.submit(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    addHost(host, false);
                    return null;
                }
            }));
        }

    	// 啓用health check monitor來進行錯誤率跟蹤
        boolean success = started.compareAndSet(false, true);
        if (success) {
       
            selectionStrategy = initSelectionStrategy();
            cpHealthTracker.start();
            connPoolThreadPool.scheduleWithFixedDelay(new Runnable() {

                @Override
                public void run() {
                        HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
                        cpMonitor.setHostCount(hostStatus.getHostCount());
                        Logger.debug(hostStatus.toString());
                        updateHosts(hostStatus.getActiveHosts(), hostStatus.getInactiveHosts());
                }

            }, 15 * 1000, 30 * 1000, TimeUnit.MILLISECONDS);

            MonitorConsole.getInstance().registerConnectionPool(this);
            registerMonitorConsoleMBean(MonitorConsole.getInstance());
        }
        return getEmptyFutureTask(true);
}

8.2 配置Host

啓動過程當中,添加host邏輯以下:

  • 依據host獲取HostConnectionPool;
  • 把HostConnectionPool加入到集合;
  • 把 host,HostConnectionPool加入到選擇策略selectionStrategy;
  • 依據host設置health check monitor;

具體以下:

public boolean addHost(Host host, boolean refreshLoadBalancer) {

        HostConnectionPool<CL> connPool = cpMap.get(host);

        final HostConnectionPool<CL> hostPool = hostConnPoolFactory.createHostConnectionPool(host, this);

        HostConnectionPool<CL> prevPool = cpMap.putIfAbsent(host, hostPool);
        if (prevPool == null) {
            // This is the first time we are adding this pool.
            try {
                int primed = hostPool.primeConnections();
                if (hostPool.isActive()) {
                    if (refreshLoadBalancer) {
                        selectionStrategy.addHost(host, hostPool);
                    }
                    cpHealthTracker.initializePingHealthchecksForPool(hostPool);
                    cpMonitor.hostAdded(host, hostPool);
                } else {
                    cpMap.remove(host);
                }
                return primed > 0;
            } catch (DynoException e) {
                cpMap.remove(host);
                return false;
            }
        } 
}

8.3 獲取HostConnectionPool

關於獲取HostConnectionPool,有同步和異步 兩種實現方式,具體以下。

private class SyncHostConnectionPoolFactory implements HostConnectionPoolFactory<CL> {
        @Override
        public HostConnectionPool<CL> createHostConnectionPool(Host host, ConnectionPoolImpl<CL> parentPoolImpl) {
            return new HostConnectionPoolImpl<CL>(host, connFactory, cpConfiguration, cpMonitor);
        }
}

private class AsyncHostConnectionPoolFactory implements HostConnectionPoolFactory<CL> {
        @Override
        public HostConnectionPool<CL> createHostConnectionPool(Host host, ConnectionPoolImpl<CL> parentPoolImpl) {
            return new SimpleAsyncConnectionPoolImpl<CL>(host, connFactory, cpConfiguration, cpMonitor);
        }
}

8.4 執行

邏輯鏈接池 有兩種執行方式:executeWithRing 與 executeWithFailover。

  • executeWithRing使用較少,因此不詳細介紹。

  • executeWithFailover 是 利用selectionStrategy獲取Connection,在此Connection之上進行執行。若是失敗就各類重試。

public <R> OperationResult<R> executeWithFailover(Operation<CL, R> op) throws DynoException {

        RetryPolicy retry = cpConfiguration.getRetryPolicyFactory().getRetryPolicy();
        retry.begin();

        do {
            Connection<CL> connection = null;

            try {
                connection = selectionStrategy.getConnectionUsingRetryPolicy(op,
                        cpConfiguration.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS, retry);

                updateConnectionContext(connection.getContext(), connection.getHost());

                OperationResult<R> result = connection.execute(op);

                // Add context to the result from the successful execution
                result.setNode(connection.getHost()).addMetadata(connection.getContext().getAll());

                retry.success();
                cpMonitor.incOperationSuccess(connection.getHost(), System.currentTimeMillis() - startTime);

                return result;

            } finally {
                if (connection != null) {
                    if (connection.getLastException() != null
                            && connection.getLastException() instanceof FatalConnectionException) {
                        connection.getParentConnectionPool().recycleConnection(connection);
                        // note - don't increment connection closed metric here;
                        // it's done in closeConnection
                    } else {
                        connection.getContext().reset();
                        connection.getParentConnectionPool().returnConnection(connection);
                    }
                }
            }

        } while (retry.allowRetry());
        throw lastException;
    }

此時邏輯以下:

+----------------------+
+-------------------+      |ConnectionPoolImpl    |
|DynoJedisClient    |      |                      |
|                   |      |                      |         +--------------+
|                   |      |       hostsUpdater +-------->  | HostSupplier |
|                   |      |                      |         +--------------+
|     connPool +---------> |                      |
|                   |      |                      |         +--------------------------+
|                   |      |              cpMap +-------->  |[Host, HostConnectionPool]|
+-------------------+      |                      |         |               +          |
                           +----------------------+         |               |          |
                                                            +--------------------------+
                                                                            |
                                                                            |
                                                                            |
                                                                            v
                                                            +---------------+-----+
                                                            |                     |
                                                            | HostConnectionPool  |
                                                            |                     |
                                                            +---------------------+

0x09 具體鏈接池

HostConnectionPool 是具體鏈接池實現,此類爲每個Host節點維護一個有效鏈接池

具體是:

  • HostConnectionPool 使用 LinkedBlockingQueue availableConnections 來維護全部有效鏈接,當client須要一個鏈接,須要從queue中提取。
  • 因此,availableConnections 就是有效鏈接池。
  • availableConnections 之中每個 鏈接就是一個 Connection;
  • 這個 Connection (JedisConnection)是經過 JedisConnectionFactory 創建的;
  • 另外,每個 JedisConnection 裏面有:
    • HostConnectionPool hostPool;
    • Jedis jedisClient;

具體以下:

public class HostConnectionPoolImpl<CL> implements HostConnectionPool<CL> {

    // The connections available for this connection pool
    private final LinkedBlockingQueue<Connection<CL>> availableConnections = new LinkedBlockingQueue<Connection<CL>>(); 

    // Private members required by this class
    private final Host host;
    private final ConnectionFactory<CL> connFactory;
    private final ConnectionPoolConfiguration cpConfig;
    private final ConnectionPoolMonitor monitor;

    // states that dictate the behavior of the pool

    // cp not inited is the starting state of the pool. The pool will not allow connections to be borrowed in this state
    private final ConnectionPoolState<CL> cpNotInited = new ConnectionPoolNotInited();
    // cp active is where connections of the pool can be borrowed and returned
    private final ConnectionPoolState<CL> cpActive = new ConnectionPoolActive(this);
    // cp reconnecting is where connections cannot be borrowed and all returning connections will be shutdown
    private final ConnectionPoolState<CL> cpReconnecting = new ConnectionPoolReconnectingOrDown();
    // similar to reconnecting
    private final ConnectionPoolState<CL> cpDown = new ConnectionPoolReconnectingOrDown();

    // The thread safe reference to the pool state
    private final AtomicReference<ConnectionPoolState<CL>> cpState = new AtomicReference<ConnectionPoolState<CL>>(cpNotInited);
}

9.1 生成Connection

首先咱們要看看 如何生成 Connection,大體就是從 connFactory 中直接獲取,而後執行監控等相應操做。

@Override
public Connection<CL> createConnection() {

            try {
                Connection<CL> connection;
                if (cpConfig.isConnectToDatastore()) {
                    
                    // 具體創建鏈接操做
                    connection = connFactory.createConnectionWithDataStore(pool);
                    
                } else if (cpConfig.isConnectionPoolConsistencyProvided()) {
                    connection = connFactory.createConnectionWithConsistencyLevel(pool, cpConfig.getConnectionPoolConsistency());
                } else {
                    connection = connFactory.createConnection(pool);
                }

                connection.open();
                availableConnections.add(connection);

                monitor.incConnectionCreated(host);
                numActiveConnections.incrementAndGet();

                return connection;
            } 
}

9.2 JedisConnectionFactory

JedisConnectionFactory 的 createConnectionWithDataStore 函數執行了具體 創建鏈接操做,涉及到 Jedis 不少朋友應該都很熟悉。

簡略版代碼以下:

public class JedisConnectionFactory implements ConnectionFactory<Jedis> {

    private final OperationMonitor opMonitor;
    private final SSLSocketFactory sslSocketFactory;

    public JedisConnectionFactory(OperationMonitor monitor, SSLSocketFactory sslSocketFactory) {
        this.opMonitor = monitor;
        this.sslSocketFactory = sslSocketFactory;
    }

    @Override
    public Connection<Jedis> createConnectionWithDataStore(HostConnectionPool<Jedis> pool) {
        return new JedisConnection(pool, true);
    }

    // TODO: raghu compose redisconnection with jedisconnection in it
    public class JedisConnection implements Connection<Jedis> {

        private final HostConnectionPool<Jedis> hostPool;
        private final Jedis jedisClient;

        public JedisConnection(HostConnectionPool<Jedis> hostPool, boolean connectDataStore) {
            this.hostPool = hostPool;
            Host host = hostPool.getHost();

            int port = connectDataStore ? host.getDatastorePort() : host.getPort();

            if (sslSocketFactory == null) {
                JedisShardInfo shardInfo = new JedisShardInfo(host.getHostAddress(), port,
                        hostPool.getConnectionTimeout(), hostPool.getSocketTimeout(), Sharded.DEFAULT_WEIGHT);

                jedisClient = new Jedis(shardInfo);
            } else {
                JedisShardInfo shardInfo = new JedisShardInfo(host.getHostAddress(), port,
                        hostPool.getConnectionTimeout(), hostPool.getSocketTimeout(), Sharded.DEFAULT_WEIGHT,
                        true, sslSocketFactory, new SSLParameters(), null);

                jedisClient = new Jedis(shardInfo);
            }
        }

        @Override
        public HostConnectionPool<Jedis> getParentConnectionPool() {
            return hostPool;
        }

        public Jedis getClient() {
            return jedisClient;
        }
    }
}

此時邏輯以下:

+----------------------+
                   +-------------------+      |ConnectionPoolImpl    |
                   |DynoJedisClient    |      |                      |
                   |                   |      |                      |         +--------------+
                   |                   |      |       hostsUpdater +-------->  | HostSupplier |
                   |                   |      |                      |         +--------------+
                   |     connPool +---------> |                      |
                   |                   |      |                      |         +--------------------------+
                   |                   |      |              cpMap +-------->  |[Host, HostConnectionPool]|
                   +-------------------+      |                      |         |               +          |
                                              +----------------------+         |               |          |
                                                                               +--------------------------+
                                                                                               |
                                                                                               |
+-----------------------------+                                                                |
| JedisConnectionFactory      |                                                                v
|                             |                                                +---------------+-------------------------------------------+
|                             |          createConnectionWithDataStore         | HostConnectionPool                                        |
|                             |                                                |                                                           |
|  sslSocketFactory           |  <------------------------------------------------+ connFactory      Host                                  |
|                             |                                                |                                                           |
|                             |                                                |  LinkedBlockingQueue<Connection<CL<> availableConnections |
+-----------------------------+                                                |                                                           |
                                                                               +------------------------------+----------------------------+
               +                                                                                              ^
               |                  +----------------------------------------+                                  |
               |                  |JedisConnection                         |                                  |
               |                  |                                        |                                  |
               |   return         |                                        |   return                         |
               |                  |     HostConnectionPool<Jedis> hostPool |                                  |
               +--------------->  |                                        | +--------------------------------+
                                  |     Jedis(shardInfo) jedisClient       |
                                  |                                        |
                                  +----------------------------------------+

手機上以下:

9.3 獲取Connection

用戶使用 borrowConnection 來獲得 鏈接,而且作監控。

@Override
public Connection<CL> borrowConnection(int duration, TimeUnit unit) {
            // Start recording how long it takes to get the connection - for insight/metrics
            long startTime = System.nanoTime() / 1000;
            Connection<CL> conn = null;
            // wait on the connection pool with a timeout
            conn = availableConnections.poll(duration, unit);
            long delay = System.nanoTime() / 1000 - startTime;
            monitor.incConnectionBorrowed(host, delay);
}

0x10 拓撲

這裏拓撲主要指的是token環,咱們再複習下概念。

在 Dynomite 之中,由集羣管理的數據就是一個環。環中的每一個節點被分配一個或多個由token描述的數據範圍,toekn 能夠肯定在環中的位置。

Token是用於標識每一個分區的64位整數ID,範圍是-2^63 -- 2^63-1。經過hash算法計算partition key的hash值,以此肯定存放在哪一個節點。

Token決定了每一個節點存儲的數據的分佈範圍,每一個節點保存的數據的key在(前一個節點Token,本節點Token]的半開半閉區間內,全部的節點造成一個首尾相接的環。

10.1 只讀視圖

TopologyView表明了服務器拓撲的只讀視圖。

public interface TopologyView {
    /**
     * Retrieves a read-only view of the server topology
     *
     * @return An unmodifiable map of server-id to list of token status
     */
    Map<String, List<TokenPoolTopology.TokenStatus>> getTopologySnapshot();

    /**
     * Returns the token for the given key.
     *
     * @param key The key of the record stored in dynomite
     * @return Long The token that owns the given key
     */
    Long getTokenForKey(String key);

}

ConnectionPoolImpl 實現了 TopologyView,即 implements TopologyView

因此 ConnectionPoolImpl 自己就是一個 TopologyView。

public class ConnectionPoolImpl<CL> implements ConnectionPool<CL>, TopologyView {
    public TokenPoolTopology getTopology() {
        return selectionStrategy.getTokenPoolTopology();
    }

    @Override
    public Map<String, List<TokenPoolTopology.TokenStatus>> getTopologySnapshot() {
        return Collections.unmodifiableMap(selectionStrategy.getTokenPoolTopology().getAllTokens());
    }

    @Override
    public Long getTokenForKey(String key) {
        if (cpConfiguration
                .getLoadBalancingStrategy() == ConnectionPoolConfiguration.LoadBalancingStrategy.TokenAware) {
            return selectionStrategy.getTokenForKey(key);
        }

        return null;
    }
}

在 DynoJedisClient 中獲取 TopologyView 就是直接 獲取了 ConnectionPoolImpl。

public TopologyView getTopologyView() {
		return this.getConnPool();
}

因此此時邏輯圖上加入了 TopologyView 。

+----------------------+
                   +-------------------+      |ConnectionPoolImpl    |
                   |DynoJedisClient    |      |                      |
                   |                   |      |                      |         +--------------+
                   |                   |      |       hostsUpdater +-------->  | HostSupplier |
                   |                   |      |                      |         +--------------+
                   |     connPool +---------> |                      |
                   |                   |      |                      |         +--------------------------+
                   |    TopologyView +------> |              cpMap +-------->  |[Host, HostConnectionPool]|
                   |                   |      |                      |         |               +          |
                   +-------------------+      +----------------------+         |               |          |
                                                                               +--------------------------+
                                                                                               |
                                                                                               |
+-----------------------------+                                                                |
| JedisConnectionFactory      |                                                                v
|                             |                                                +---------------+-------------------------------------------+
|                             |          createConnectionWithDataStore         | HostConnectionPool                                        |
|                             |                                                |                                                           |
|  sslSocketFactory           |  <------------------------------------------------+ connFactory      Host                                  |
|                             |                                                |                                                           |
|                             |                                                |  LinkedBlockingQueue<Connection<CL<> availableConnections |
+-----------------------------+                                                |                                                           |
                                                                               +------------------------------+----------------------------+
               +                                                                                              ^
               |                  +----------------------------------------+                                  |
               |                  |JedisConnection                         |                                  |
               |                  |                                        |                                  |
               |   return         |                                        |   return                         |
               |                  |     HostConnectionPool<Jedis> hostPool |                                  |
               +--------------->  |                                        | +--------------------------------+
                                  |     Jedis(shardInfo) jedisClient       |
                                  |                                        |
                                  +----------------------------------------+

手機以下:

10.2 具體實現

TokenPoolTopology 屬於 拓撲 的具體實現。

getTopologySnapshot就是return map。就是獲得對應了全部 rack 的 TokenStatus,這就是拓撲。

其實你們仔細想一想就能夠理解,拓撲不就是 「當前全部機架上分別有哪些東西,這些東西是什麼狀態" 的一個邏輯集合嘛

具體定義以下,其核心成員是兩個:

  • map 能夠理解爲 rack 做爲key,value 是一個list,即 "該 rack 上對應的 token status 被整理成 list";
  • rackTokenHostMap 能夠理解爲 rack 做爲 key,value 是一個map,即 "該 rack 上的 token status <---> host 之間的關係被整理成一個 map";

這樣就有兩個不一樣維度能夠分別處理這些 token了。

public class TokenPoolTopology {
    
    private final ConcurrentHashMap<String, List<TokenStatus>> map = new ConcurrentHashMap<String, List<TokenStatus>>();
    
    private final ConcurrentHashMap<String, Map<Long, Host>> rackTokenHostMap = new ConcurrentHashMap<String, Map<Long, Host>>();
  
    public ConcurrentHashMap<String, List<TokenStatus>> getAllTokens() {
        return map;
    }
  
    public void addToken(String rack, Long token, HostConnectionPool<?> hostPool) {
        List<TokenStatus> list = map.get(rack);
        if (list == null) {
            list = new ArrayList<TokenStatus>();
            map.put(rack, list);
        }

        list.add(new TokenStatus(token, hostPool));
    }

    public void addHostToken(String rack, Long token, Host host) {
        Map<Long, Host> tokenHostMap = rackTokenHostMap.get(rack);
        if (tokenHostMap == null) {
            tokenHostMap = new HashMap<>();
            rackTokenHostMap.put(rack, tokenHostMap);
        }
        tokenHostMap.put(token, host);
    }
}

10.3 如何使用

TokenPoolTopology 具體在 ConnectionPoolImpl 和 HostSelectionWithFallback 都有使用

10.3.1 ConnectionPoolImpl

ConnectionPoolImpl中以下處理,或者直接返回由上層再處理,或者就是直接返回 TokenPoolTopology 之中的全部 token 給上層:

public TokenPoolTopology getTopology() {
        return selectionStrategy.getTokenPoolTopology();
}

public Map<String, List<TokenPoolTopology.TokenStatus>> getTopologySnapshot() {
        return Collections.unmodifiableMap(selectionStrategy.getTokenPoolTopology().getAllTokens());
}

10.3.2 HostSelectionWithFallback

HostSelectionWithFallback中也有TokenPoolTopology的使用,只是用來 failover/fallback使用。

public class HostSelectionWithFallback<CL> {
    // Represents the *initial* topology from the token supplier. This does not affect selection of a host connection
    // pool for traffic. It only affects metrics such as failover/fallback
    private final AtomicReference<TokenPoolTopology> topology = new AtomicReference<>(null);
}

HostSelectionWithFallback中 也利用 host tokens 來創建或者更新已有的 TokenPoolTopology。

/**
* Create token pool topology from the host tokens
*
* @param allHostTokens
* @return tokenPoolTopology with the host information
*/
public TokenPoolTopology createTokenPoolTopology(List<HostToken> allHostTokens) {
        TokenPoolTopology topology = new TokenPoolTopology(replicationFactor.get());
    
        for (HostToken hostToken : allHostTokens) {
            String rack = hostToken.getHost().getRack();
            topology.addHostToken(rack, hostToken.getToken(), hostToken.getHost());
        }
    
        updateTokenPoolTopology(topology);
        return topology;
}

至此,鏈接管理和拓撲感知部分已經分析完畢,下文將繼續分析自動發現和故障轉移。

0xFF 參考

Cassandra系列(二):系統流程

Cassandra JAVA客戶端是如何作到高性能高併發的

Cassandra之Token

http://www.ningoo.net/html/2010/cassandra_token.html

cassandra權威指南讀書筆記--客戶端

關於cassandra集羣的數據一致性問題

相關文章
相關標籤/搜索