前面咱們有文章介紹了Amazon Dynamo系統架構 和 NetFlix Dynomite。html
咱們今天來看看 NetFlix Dynomite 的 Java 客戶端 DynoJedisClient 如何實現。分析客戶端是由於,此客戶端的做用很相似於集羣master,其思路是:java驅動提供多個策略接口,能夠用來驅動程序行爲調優。包括負載均衡,重試請求,管理節點鏈接等等。java
由於 Dynomite 對於本文來講,過於龐大&底層,並且 DynoJedisClient 與 Dynomite 耦合過於緊密, 因此咱們從最簡單的功能點出發看看 DynoJedisClient,因而咱們能夠想到的功能點是:node
因此咱們接下來就圍繞這些基本功能點進行分析。redis
亞馬遜在業務發展期間面臨一些問題,主要受限於關係型數據庫的可擴展性和高可用性,所以研發了一套新的、基於 KV
存儲模型的數據庫,將之命名爲 Dynamo
,其主要採起徹底的分佈式、去中心化的架構。算法
相較於傳統的關係型數據庫 MySQL
,Dynamo
的功能目標與之有一些細小的差異,例如: Amazon
的業務場景多數狀況並不須要支持複雜查詢,卻要求必要的單節點故障容錯性、數據最終一致性(即犧牲數據強一致優先保障可用性)、較強的可擴展性等。sql
Dynomite 是 NetFlix 對亞馬遜分佈式存儲引擎 Dynamo 的一個開源通用實現,它不只支持基於內存的 K/V 數據庫,還支持持久化的 Mysql、BerkeleyDb、LevelDb 等數據庫,並具備簡單、高效、支持跨數據中心的數據複製等優勢。數據庫
Dynomite 的最終目標是提供數據庫存儲引擎不能提供的簡單、高效、跨數據中心的數據複製功能。目前,Dynomite 已經實現了對 Redis 和 Memcached 的支持。服務器
Netflix選擇Dynomite,是由於:架構
其具備性能,多數據中心複製和高可用性的特色;併發
Dynomite提供分片和可插拔的數據存儲引擎,容許在數據需求增長垂直和水平擴展;
Dynomite在Redis之上提供了高可用性、對等複製以及一致性等特性,用於構建分佈式集羣隊列。
Dyno爲持久鏈接提供鏈接池;
Dyno能夠爲鏈接池配置爲拓撲感知;
故障轉移:Dyno爲應用程序提供特定的本地機架,us-east-1a的客戶端將鏈接到相同區域的Dynomite/Redis節點,除非該節點不可用,在這種狀況下該客戶端將進行故障轉移。這個屬性被用於經過區域劃分隊列。
Dynomite對於本文來講,過於底層。
因此咱們重點就看看 DynoJedisClient 如何實現後面幾點,固然,這幾點其實也沒法脫離Dynomite,咱們只是力爭剝離出來。
Data Center 是由多個Rack組成的邏輯集合。
Data Center 能夠是一個機房或者一個區域的設備組合。
這是一個邏輯集合,有多個彼此臨近node的組成。好比一個機架上的全部物理機器。可簡單的理解爲存放服務器的機櫃。
數據中心與機架是什麼關係呢?N:1,1:N,M:N。
由集羣管理的數據就是一個環。環中的每一個節點被分配一個或多個由token描述的數據範圍,肯定在環中的位置。
Token是用於標識每一個分區的64位整數ID,範圍是-2^63 -- 2^63-1。經過hash算法計算partition key的hash值,以此肯定存放在哪一個節點。
Token也決定了每一個節點存儲的數據的分佈範圍,每一個節點保存的數據的key在(前一個節點Token,本節點Token]的半開半閉區間內,全部的節點造成一個首尾相接的環。
由於要爲上層屏蔽信息,因此 DynoJedisClient 就須要應對各類複雜信息,須要對系統有深入的瞭解,好比:
所以,DynoJedisClient 的思路是:java驅動提供多個策略接口,能夠用來驅動程序行爲調優。包括負載均衡,重試請求,管理節點鏈接等等。
示例代碼以下:
public static void main(String[] args) throws IOException { final String clusterName = args[0]; int version = Integer.parseInt(args[1]); final DynoQueueDemo demo = new DynoQueueDemo(clusterName, "us-east-1e"); Properties props = new Properties(); props.load(DynoQueueDemo.class.getResourceAsStream("/demo.properties")); for (String name : props.stringPropertyNames()) { System.setProperty(name, props.getProperty(name)); } try { demo.initWithRemoteClusterFromEurekaUrl(args[0], 8102, false); if (version == 1) { demo.runSimpleV1Demo(demo.client); } else if (version == 2) { demo.runSimpleV2QueueDemo(demo.client); } Thread.sleep(10000); } catch (Exception ex) { ex.printStackTrace(); } finally { demo.stop(); logger.info("Done"); } }
以及輔助函數:
public void initWithRemoteClusterFromEurekaUrl(final String clusterName, final int port, boolean lock) throws Exception { initWithRemoteCluster(clusterName, getHostsFromDiscovery(clusterName), port, lock); } private void initWithRemoteCluster(String clusterName, final List<Host> hosts, final int port, boolean lock) throws Exception { final HostSupplier clusterHostSupplier = () -> hosts; if (lock) initDynoLockClient(clusterHostSupplier, null, "test", clusterName); else init(clusterHostSupplier, port, null); } public void initDynoLockClient(HostSupplier hostSupplier, TokenMapSupplier tokenMapSupplier, String appName, String clusterName) { dynoLockClient = new DynoLockClient.Builder().withApplicationName(appName) .withDynomiteClusterName(clusterName) .withTimeoutUnit(TimeUnit.MILLISECONDS) .withTimeout(10000) .withHostSupplier(hostSupplier) .withTokenMapSupplier(tokenMapSupplier).build(); }
在 DynoJedisClient 之中,有以下重要配置類。
ConnectionPoolConfigurationImpl主要是提供缺省配置。
public class ConnectionPoolConfigurationImpl implements ConnectionPoolConfiguration { // DEFAULTS private static final LoadBalancingStrategy DEFAULT_LB_STRATEGY = LoadBalancingStrategy.TokenAware; private static final CompressionStrategy DEFAULT_COMPRESSION_STRATEGY = CompressionStrategy.NONE; private HostSupplier hostSupplier; private TokenMapSupplier tokenSupplier; private HostConnectionPoolFactory hostConnectionPoolFactory; private HashPartitioner hashPartitioner; private LoadBalancingStrategy lbStrategy = DEFAULT_LB_STRATEGY; private CompressionStrategy compressionStrategy = DEFAULT_COMPRESSION_STRATEGY; }
ArchaiusConnectionPoolConfiguration最主要是提供了若干策略,包括負載,壓縮,重試:
具體以下:
public class ArchaiusConnectionPoolConfiguration extends ConnectionPoolConfigurationImpl { ...... private final LoadBalancingStrategy loadBalanceStrategy; private final CompressionStrategy compressionStrategy; private final ErrorRateMonitorConfig errorRateConfig; private final RetryPolicyFactory retryPolicyFactory; private final DynamicBooleanProperty failOnStartupIfNoHosts; private final DynamicIntProperty lockVotingSize; ...... }
DynoJedisClient 定義以下,咱們能夠看到最重要的成員變量就是鏈接池ConnectionPool。
public class DynoJedisClient implements JedisCommands, BinaryJedisCommands, MultiKeyCommands,ScriptingCommands, MultiKeyBinaryCommands, DynoJedisCommands { private final String appName; private final String clusterName; private final ConnectionPool<Jedis> connPool; private final AtomicReference<DynoJedisPipelineMonitor> pipelineMonitor = new AtomicReference<DynoJedisPipelineMonitor>(); protected final DynoOPMonitor opMonitor; protected final ConnectionPoolMonitor cpMonitor; }
由於 DynoJedisClient 最主要是管理鏈接池,因此咱們首先介紹 邏輯鏈接池 ConnectionPoolImpl。
鏈接池層爲應用程序抽象全部鏈接管理。在這裏,咱們能夠配置全部內容,例如指定池選項,負載平衡策略,重試策略或默認一致性級別。
ConnectionPoolImpl 是核心類,其主要功能是:
具體定義以下:
public class ConnectionPoolImpl<CL> implements ConnectionPool<CL>, TopologyView { private final ConcurrentHashMap<Host, HostConnectionPool<CL>> cpMap = new ConcurrentHashMap<Host, HostConnectionPool<CL>>(); private final ConnectionPoolHealthTracker<CL> cpHealthTracker; private final HostConnectionPoolFactory<CL> hostConnPoolFactory; private final ConnectionFactory<CL> connFactory; private final ConnectionPoolConfiguration cpConfiguration; private final ConnectionPoolMonitor cpMonitor; private final ScheduledExecutorService idleThreadPool = Executors.newSingleThreadScheduledExecutor(); private final HostsUpdater hostsUpdater; private final ScheduledExecutorService connPoolThreadPool = Executors.newScheduledThreadPool(1); private HostSelectionWithFallback<CL> selectionStrategy; private Type poolType; }
此時邏輯以下:
+------------------------+ |DynoJedisClient | | | | | +------------------------+ | | | | | connPool +--------------> | ConnectionPoolImpl | | | | | | | +------------------------+ +------------------------+
鏈接池 啓動邏輯是:
具體以下:
@Override public Future<Boolean> start() throws DynoException { HostSupplier hostSupplier = cpConfiguration.getHostSupplier(); HostStatusTracker hostStatus = hostsUpdater.refreshHosts(); cpMonitor.setHostCount(hostStatus.getHostCount()); Collection<Host> hostsUp = hostStatus.getActiveHosts(); final ExecutorService threadPool = Executors.newFixedThreadPool(Math.max(10, hostsUp.size())); final List<Future<Void>> futures = new ArrayList<Future<Void>>(); // 利用hostsUpdater來獲取到的host進行配置添加 for (final Host host : hostsUp) { // Add host connection pool, but don't init the load balancer yet futures.add(threadPool.submit(new Callable<Void>() { @Override public Void call() throws Exception { addHost(host, false); return null; } })); } // 啓用health check monitor來進行錯誤率跟蹤 boolean success = started.compareAndSet(false, true); if (success) { selectionStrategy = initSelectionStrategy(); cpHealthTracker.start(); connPoolThreadPool.scheduleWithFixedDelay(new Runnable() { @Override public void run() { HostStatusTracker hostStatus = hostsUpdater.refreshHosts(); cpMonitor.setHostCount(hostStatus.getHostCount()); Logger.debug(hostStatus.toString()); updateHosts(hostStatus.getActiveHosts(), hostStatus.getInactiveHosts()); } }, 15 * 1000, 30 * 1000, TimeUnit.MILLISECONDS); MonitorConsole.getInstance().registerConnectionPool(this); registerMonitorConsoleMBean(MonitorConsole.getInstance()); } return getEmptyFutureTask(true); }
啓動過程當中,添加host邏輯以下:
具體以下:
public boolean addHost(Host host, boolean refreshLoadBalancer) { HostConnectionPool<CL> connPool = cpMap.get(host); final HostConnectionPool<CL> hostPool = hostConnPoolFactory.createHostConnectionPool(host, this); HostConnectionPool<CL> prevPool = cpMap.putIfAbsent(host, hostPool); if (prevPool == null) { // This is the first time we are adding this pool. try { int primed = hostPool.primeConnections(); if (hostPool.isActive()) { if (refreshLoadBalancer) { selectionStrategy.addHost(host, hostPool); } cpHealthTracker.initializePingHealthchecksForPool(hostPool); cpMonitor.hostAdded(host, hostPool); } else { cpMap.remove(host); } return primed > 0; } catch (DynoException e) { cpMap.remove(host); return false; } } }
關於獲取HostConnectionPool,有同步和異步 兩種實現方式,具體以下。
private class SyncHostConnectionPoolFactory implements HostConnectionPoolFactory<CL> { @Override public HostConnectionPool<CL> createHostConnectionPool(Host host, ConnectionPoolImpl<CL> parentPoolImpl) { return new HostConnectionPoolImpl<CL>(host, connFactory, cpConfiguration, cpMonitor); } } private class AsyncHostConnectionPoolFactory implements HostConnectionPoolFactory<CL> { @Override public HostConnectionPool<CL> createHostConnectionPool(Host host, ConnectionPoolImpl<CL> parentPoolImpl) { return new SimpleAsyncConnectionPoolImpl<CL>(host, connFactory, cpConfiguration, cpMonitor); } }
邏輯鏈接池 有兩種執行方式:executeWithRing 與 executeWithFailover。
executeWithRing使用較少,因此不詳細介紹。
executeWithFailover 是 利用selectionStrategy獲取Connection,在此Connection之上進行執行。若是失敗就各類重試。
public <R> OperationResult<R> executeWithFailover(Operation<CL, R> op) throws DynoException { RetryPolicy retry = cpConfiguration.getRetryPolicyFactory().getRetryPolicy(); retry.begin(); do { Connection<CL> connection = null; try { connection = selectionStrategy.getConnectionUsingRetryPolicy(op, cpConfiguration.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS, retry); updateConnectionContext(connection.getContext(), connection.getHost()); OperationResult<R> result = connection.execute(op); // Add context to the result from the successful execution result.setNode(connection.getHost()).addMetadata(connection.getContext().getAll()); retry.success(); cpMonitor.incOperationSuccess(connection.getHost(), System.currentTimeMillis() - startTime); return result; } finally { if (connection != null) { if (connection.getLastException() != null && connection.getLastException() instanceof FatalConnectionException) { connection.getParentConnectionPool().recycleConnection(connection); // note - don't increment connection closed metric here; // it's done in closeConnection } else { connection.getContext().reset(); connection.getParentConnectionPool().returnConnection(connection); } } } } while (retry.allowRetry()); throw lastException; }
此時邏輯以下:
+----------------------+ +-------------------+ |ConnectionPoolImpl | |DynoJedisClient | | | | | | | +--------------+ | | | hostsUpdater +--------> | HostSupplier | | | | | +--------------+ | connPool +---------> | | | | | | +--------------------------+ | | | cpMap +--------> |[Host, HostConnectionPool]| +-------------------+ | | | + | +----------------------+ | | | +--------------------------+ | | | v +---------------+-----+ | | | HostConnectionPool | | | +---------------------+
HostConnectionPool 是具體鏈接池實現,此類爲每個Host節點維護一個有效鏈接池。
具體是:
具體以下:
public class HostConnectionPoolImpl<CL> implements HostConnectionPool<CL> { // The connections available for this connection pool private final LinkedBlockingQueue<Connection<CL>> availableConnections = new LinkedBlockingQueue<Connection<CL>>(); // Private members required by this class private final Host host; private final ConnectionFactory<CL> connFactory; private final ConnectionPoolConfiguration cpConfig; private final ConnectionPoolMonitor monitor; // states that dictate the behavior of the pool // cp not inited is the starting state of the pool. The pool will not allow connections to be borrowed in this state private final ConnectionPoolState<CL> cpNotInited = new ConnectionPoolNotInited(); // cp active is where connections of the pool can be borrowed and returned private final ConnectionPoolState<CL> cpActive = new ConnectionPoolActive(this); // cp reconnecting is where connections cannot be borrowed and all returning connections will be shutdown private final ConnectionPoolState<CL> cpReconnecting = new ConnectionPoolReconnectingOrDown(); // similar to reconnecting private final ConnectionPoolState<CL> cpDown = new ConnectionPoolReconnectingOrDown(); // The thread safe reference to the pool state private final AtomicReference<ConnectionPoolState<CL>> cpState = new AtomicReference<ConnectionPoolState<CL>>(cpNotInited); }
首先咱們要看看 如何生成 Connection,大體就是從 connFactory 中直接獲取,而後執行監控等相應操做。
@Override public Connection<CL> createConnection() { try { Connection<CL> connection; if (cpConfig.isConnectToDatastore()) { // 具體創建鏈接操做 connection = connFactory.createConnectionWithDataStore(pool); } else if (cpConfig.isConnectionPoolConsistencyProvided()) { connection = connFactory.createConnectionWithConsistencyLevel(pool, cpConfig.getConnectionPoolConsistency()); } else { connection = connFactory.createConnection(pool); } connection.open(); availableConnections.add(connection); monitor.incConnectionCreated(host); numActiveConnections.incrementAndGet(); return connection; } }
JedisConnectionFactory 的 createConnectionWithDataStore 函數執行了具體 創建鏈接操做,涉及到 Jedis 不少朋友應該都很熟悉。
簡略版代碼以下:
public class JedisConnectionFactory implements ConnectionFactory<Jedis> { private final OperationMonitor opMonitor; private final SSLSocketFactory sslSocketFactory; public JedisConnectionFactory(OperationMonitor monitor, SSLSocketFactory sslSocketFactory) { this.opMonitor = monitor; this.sslSocketFactory = sslSocketFactory; } @Override public Connection<Jedis> createConnectionWithDataStore(HostConnectionPool<Jedis> pool) { return new JedisConnection(pool, true); } // TODO: raghu compose redisconnection with jedisconnection in it public class JedisConnection implements Connection<Jedis> { private final HostConnectionPool<Jedis> hostPool; private final Jedis jedisClient; public JedisConnection(HostConnectionPool<Jedis> hostPool, boolean connectDataStore) { this.hostPool = hostPool; Host host = hostPool.getHost(); int port = connectDataStore ? host.getDatastorePort() : host.getPort(); if (sslSocketFactory == null) { JedisShardInfo shardInfo = new JedisShardInfo(host.getHostAddress(), port, hostPool.getConnectionTimeout(), hostPool.getSocketTimeout(), Sharded.DEFAULT_WEIGHT); jedisClient = new Jedis(shardInfo); } else { JedisShardInfo shardInfo = new JedisShardInfo(host.getHostAddress(), port, hostPool.getConnectionTimeout(), hostPool.getSocketTimeout(), Sharded.DEFAULT_WEIGHT, true, sslSocketFactory, new SSLParameters(), null); jedisClient = new Jedis(shardInfo); } } @Override public HostConnectionPool<Jedis> getParentConnectionPool() { return hostPool; } public Jedis getClient() { return jedisClient; } } }
此時邏輯以下:
+----------------------+ +-------------------+ |ConnectionPoolImpl | |DynoJedisClient | | | | | | | +--------------+ | | | hostsUpdater +--------> | HostSupplier | | | | | +--------------+ | connPool +---------> | | | | | | +--------------------------+ | | | cpMap +--------> |[Host, HostConnectionPool]| +-------------------+ | | | + | +----------------------+ | | | +--------------------------+ | | +-----------------------------+ | | JedisConnectionFactory | v | | +---------------+-------------------------------------------+ | | createConnectionWithDataStore | HostConnectionPool | | | | | | sslSocketFactory | <------------------------------------------------+ connFactory Host | | | | | | | | LinkedBlockingQueue<Connection<CL<> availableConnections | +-----------------------------+ | | +------------------------------+----------------------------+ + ^ | +----------------------------------------+ | | |JedisConnection | | | | | | | return | | return | | | HostConnectionPool<Jedis> hostPool | | +---------------> | | +--------------------------------+ | Jedis(shardInfo) jedisClient | | | +----------------------------------------+
手機上以下:
用戶使用 borrowConnection 來獲得 鏈接,而且作監控。
@Override public Connection<CL> borrowConnection(int duration, TimeUnit unit) { // Start recording how long it takes to get the connection - for insight/metrics long startTime = System.nanoTime() / 1000; Connection<CL> conn = null; // wait on the connection pool with a timeout conn = availableConnections.poll(duration, unit); long delay = System.nanoTime() / 1000 - startTime; monitor.incConnectionBorrowed(host, delay); }
這裏拓撲主要指的是token環,咱們再複習下概念。
在 Dynomite 之中,由集羣管理的數據就是一個環。環中的每一個節點被分配一個或多個由token描述的數據範圍,toekn 能夠肯定在環中的位置。
Token是用於標識每一個分區的64位整數ID,範圍是-2^63 -- 2^63-1。經過hash算法計算partition key的hash值,以此肯定存放在哪一個節點。
Token決定了每一個節點存儲的數據的分佈範圍,每一個節點保存的數據的key在(前一個節點Token,本節點Token]的半開半閉區間內,全部的節點造成一個首尾相接的環。
TopologyView表明了服務器拓撲的只讀視圖。
public interface TopologyView { /** * Retrieves a read-only view of the server topology * * @return An unmodifiable map of server-id to list of token status */ Map<String, List<TokenPoolTopology.TokenStatus>> getTopologySnapshot(); /** * Returns the token for the given key. * * @param key The key of the record stored in dynomite * @return Long The token that owns the given key */ Long getTokenForKey(String key); }
ConnectionPoolImpl 實現了 TopologyView,即 implements TopologyView。
因此 ConnectionPoolImpl 自己就是一個 TopologyView。
public class ConnectionPoolImpl<CL> implements ConnectionPool<CL>, TopologyView { public TokenPoolTopology getTopology() { return selectionStrategy.getTokenPoolTopology(); } @Override public Map<String, List<TokenPoolTopology.TokenStatus>> getTopologySnapshot() { return Collections.unmodifiableMap(selectionStrategy.getTokenPoolTopology().getAllTokens()); } @Override public Long getTokenForKey(String key) { if (cpConfiguration .getLoadBalancingStrategy() == ConnectionPoolConfiguration.LoadBalancingStrategy.TokenAware) { return selectionStrategy.getTokenForKey(key); } return null; } }
在 DynoJedisClient 中獲取 TopologyView 就是直接 獲取了 ConnectionPoolImpl。
public TopologyView getTopologyView() { return this.getConnPool(); }
因此此時邏輯圖上加入了 TopologyView 。
+----------------------+ +-------------------+ |ConnectionPoolImpl | |DynoJedisClient | | | | | | | +--------------+ | | | hostsUpdater +--------> | HostSupplier | | | | | +--------------+ | connPool +---------> | | | | | | +--------------------------+ | TopologyView +------> | cpMap +--------> |[Host, HostConnectionPool]| | | | | | + | +-------------------+ +----------------------+ | | | +--------------------------+ | | +-----------------------------+ | | JedisConnectionFactory | v | | +---------------+-------------------------------------------+ | | createConnectionWithDataStore | HostConnectionPool | | | | | | sslSocketFactory | <------------------------------------------------+ connFactory Host | | | | | | | | LinkedBlockingQueue<Connection<CL<> availableConnections | +-----------------------------+ | | +------------------------------+----------------------------+ + ^ | +----------------------------------------+ | | |JedisConnection | | | | | | | return | | return | | | HostConnectionPool<Jedis> hostPool | | +---------------> | | +--------------------------------+ | Jedis(shardInfo) jedisClient | | | +----------------------------------------+
手機以下:
TokenPoolTopology 屬於 拓撲 的具體實現。
getTopologySnapshot就是return map
。就是獲得對應了全部 rack 的 TokenStatus,這就是拓撲。
其實你們仔細想一想就能夠理解,拓撲不就是 「當前全部機架上分別有哪些東西,這些東西是什麼狀態" 的一個邏輯集合嘛。
具體定義以下,其核心成員是兩個:
這樣就有兩個不一樣維度能夠分別處理這些 token了。
public class TokenPoolTopology { private final ConcurrentHashMap<String, List<TokenStatus>> map = new ConcurrentHashMap<String, List<TokenStatus>>(); private final ConcurrentHashMap<String, Map<Long, Host>> rackTokenHostMap = new ConcurrentHashMap<String, Map<Long, Host>>(); public ConcurrentHashMap<String, List<TokenStatus>> getAllTokens() { return map; } public void addToken(String rack, Long token, HostConnectionPool<?> hostPool) { List<TokenStatus> list = map.get(rack); if (list == null) { list = new ArrayList<TokenStatus>(); map.put(rack, list); } list.add(new TokenStatus(token, hostPool)); } public void addHostToken(String rack, Long token, Host host) { Map<Long, Host> tokenHostMap = rackTokenHostMap.get(rack); if (tokenHostMap == null) { tokenHostMap = new HashMap<>(); rackTokenHostMap.put(rack, tokenHostMap); } tokenHostMap.put(token, host); } }
TokenPoolTopology 具體在 ConnectionPoolImpl 和 HostSelectionWithFallback 都有使用。
ConnectionPoolImpl中以下處理,或者直接返回由上層再處理,或者就是直接返回 TokenPoolTopology 之中的全部 token 給上層:
public TokenPoolTopology getTopology() { return selectionStrategy.getTokenPoolTopology(); } public Map<String, List<TokenPoolTopology.TokenStatus>> getTopologySnapshot() { return Collections.unmodifiableMap(selectionStrategy.getTokenPoolTopology().getAllTokens()); }
HostSelectionWithFallback中也有TokenPoolTopology的使用,只是用來 failover/fallback使用。
public class HostSelectionWithFallback<CL> { // Represents the *initial* topology from the token supplier. This does not affect selection of a host connection // pool for traffic. It only affects metrics such as failover/fallback private final AtomicReference<TokenPoolTopology> topology = new AtomicReference<>(null); }
HostSelectionWithFallback中 也利用 host tokens 來創建或者更新已有的 TokenPoolTopology。
/** * Create token pool topology from the host tokens * * @param allHostTokens * @return tokenPoolTopology with the host information */ public TokenPoolTopology createTokenPoolTopology(List<HostToken> allHostTokens) { TokenPoolTopology topology = new TokenPoolTopology(replicationFactor.get()); for (HostToken hostToken : allHostTokens) { String rack = hostToken.getHost().getRack(); topology.addHostToken(rack, hostToken.getToken(), hostToken.getHost()); } updateTokenPoolTopology(topology); return topology; }
至此,鏈接管理和拓撲感知部分已經分析完畢,下文將繼續分析自動發現和故障轉移。