本文將分享一個高可用的池化 Thrift Client 及其源碼實現,歡迎閱讀源碼(Github)並使用,同時歡迎提出寶貴的意見和建議,本人將持續完善。html
本文的主要目標讀者是對 Thrift 有必定了解並使用的童鞋,如對 Thrift 的基礎知識瞭解很少或者想重溫一下基礎知識,推薦先閱讀本站文章《和 Thrift 的一場美麗邂逅》。java
下面進入正題。git
咱們知道,Thrift 是一個 RPC 框架體系,能夠很是方便的進行跨語言 RPC 服務的開發和調用。然而,它並無提供針對多個 Server 的 Smart Client【1】。好比,你有一個服務 service,分別部署在 116.31.1.1 和 116.31.1.2 兩臺服務器上,當你須要從 Client 端調用該 service 的某個遠程方法的時候,你只能在代碼中顯式指定使用 116.31.1.1 或者 116.31.1.2 其中的一個。這種狀況下,你調用的時候沒法預知所指定 IP 對應的服務是否可用,而且當該服務不可用時,沒法隱式自動切換到調用另一個 IP 對應的服務。也就是說,服務的狀態對你並非透明的,而且沒法作到服務的負載均衡和高可用。github
此外,當你調用遠程方法時,每次你都得新建一個鏈接,當請求量很大時,不斷的建立、刪除鏈接所耗費的服務資源是巨大的。算法
所以,咱們須要這麼一個組件,使服務狀態透明化並底層實現負載均衡和高可用,讓你能夠專一於業務邏輯的實現,提高工做效率和服務的質量。下面咱們就對該組件(ThrifJ)進行詳細的剖析。服務器
目前最新版本爲1.0.1(點此關注最新版本的更新),首先在項目中引入 thriftj-1.0.1.jar,或在 Maven 依賴中加入:app
<dependency> <groupId>com.github.cyfonly</groupId> <artifactId>thriftj</artifactId> <version>1.0.1</version> </dependency>
須要注意的是,ThriftJ 基於 slf4j 構建,所以你須要在項目中增長具體日誌實現的依賴,好比 log4j 或 logback。負載均衡
而後在項目中,參照如下這段代碼進行調用:框架
//Thrift server 列表 private static final String servers = "127.0.0.1:10001,127.0.0.1:10002"; //TTransport 驗證器 ConnectionValidator validator = new ConnectionValidator() { @Override public boolean isValid(TTransport object) { return object.isOpen(); } }; //鏈接對象池配置 GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig(); //failover 策略 FailoverStrategy failoverStrategy = new FailoverStrategy(); //構造 ThriftClient 對象並配置 final ThriftClient thriftClient = new ThriftClient(); thriftClient.servers(servers) .loadBalance(Constant.LoadBalance.RANDOM) .connectionValidator(validator) .poolConfig(poolConfig) .failoverStrategy(failoverStrategy) .connTimeout(5) .backupServers("") .serviceLevel(Constant.ServiceLevel.NOT_EMPTY) .start(); //打印從 ThriftClient 獲取到的可用服務列表 List<ThriftServer> servers = thriftClient.getAvailableServers(); for(ThriftServer server : servers){ System.out.println(server.getHost() + ":" + server.getPort()); } //服務調用 if(servers.size()>0){ try{ TestThriftJ.Client client = thriftClient.iface(TestThriftJ.Client.class); QryResult result = client.qryTest(1); System.out.println("result[code=" + result.code + " msg=" + result.msg + "]"); }catch(Throwable t){ logger.error("-------------exception happen", t); } }
友情提示:除 servers 必須配置外,其餘配置均爲可選(使用默認配置)異步
基於 commons-pool2 中的 KeyedPooledObjectFactory,以 ThriftServer 爲 key,TTransport 爲 value 進行實現。關鍵代碼以下:
@Override public PooledObject<TTransport> makeObject(ThriftServer thriftServer) throws Exception { TSocket tsocket = new TSocket(thriftServer.getHost(), thriftServer.getPort()); tsocket.setTimeout(timeout); TFramedTransport transport = new TFramedTransport(tsocket); transport.open(); DefaultPooledObject<TTransport> result = new DefaultPooledObject<TTransport>(transport); logger.trace("Make new thrift connection: {}:{}", thriftServer.getHost(), thriftServer.getPort()); return result; } @Override public boolean validateObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) { boolean isValidate; try { if (failoverChecker == null) { isValidate = pooledObject.getObject().isOpen(); } else { ConnectionValidator validator = failoverChecker.getConnectionValidator(); isValidate = pooledObject.getObject().isOpen() && (validator == null || validator.isValid(pooledObject.getObject())); } } catch (Throwable e) { logger.warn("Fail to validate tsocket: {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e}); isValidate = false; } if (failoverChecker != null && !isValidate) { failoverChecker.getFailoverStrategy().fail(thriftServer); } logger.info("ValidateObject isValidate:{}", isValidate); return isValidate; } @Override public void destroyObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) throws Exception { TTransport transport = pooledObject.getObject(); if (transport != null) { transport.close(); logger.trace("Close thrift connection: {}:{}", thriftServer.getHost(), thriftServer.getPort()); } }
在使用鏈接對象時,根據用戶的自定義鏈接池配置建立鏈接池,並實現鏈接對象的獲取、回池、清除以及鏈接池的關閉操做。關鍵代碼以下:
public DefaultThriftConnectionPool(KeyedPooledObjectFactory<ThriftServer, TTransport> factory, GenericKeyedObjectPoolConfig config) { connections = new GenericKeyedObjectPool<>(factory, config); } @Override public TTransport getConnection(ThriftServer thriftServer) { try { return connections.borrowObject(thriftServer); } catch (Exception e) { logger.warn("Fail to get connection for {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e}); throw new RuntimeException(e); } } @Override public void returnConnection(ThriftServer thriftServer, TTransport transport) { connections.returnObject(thriftServer, transport); } @Override public void returnBrokenConnection(ThriftServer thriftServer, TTransport transport) { try { connections.invalidateObject(thriftServer, transport); } catch (Exception e) { logger.warn("Fail to invalid object:{},{}", new Object[] { thriftServer, transport, e }); } } @Override public void close() { connections.close(); } @Override public void clear(ThriftServer thriftServer) { connections.clear(thriftServer); }
須要實現服務狀態的透明化,就必須在底層實現服務的監測、隔離和恢復。在 ThriftJ 中,調用 ThriftClient 時會啓動一個線程對服務進行異步監測,用戶能夠指定檢驗規則(對應配置爲 ConnectionValidator)以及 failover 策略(對應配置爲 FailoverStrategy,能夠指定失敗的次數、失效持續時間和恢復持續時間)。默認狀況下,服務驗證規則爲判斷 TTransport 是否處於開啓狀態,即:
if (this.validator == null) { this.validator = new ConnectionValidator() { @Override public boolean isValid(TTransport object) { return object.isOpen(); } }; }
而默認的 failover 策略爲
以上功能基於 Guava cache 實現,關鍵代碼以下:
/** * 使用默認 failover 策略 */ public FailoverStrategy() { this(DEFAULT_FAIL_COUNT, DEFAULT_FAIL_DURATION, DEFAULT_RECOVER_DURATION); } /** * 自定義 failover 策略 * @param failCount 失敗次數 * @param failDuration 失效持續時間 * @param recoverDuration 恢復持續時間 */ public FailoverStrategy(final int failCount, long failDuration, long recoverDuration) { this.failDuration = failDuration; this.failedList = CacheBuilder.newBuilder().weakKeys().expireAfterWrite(recoverDuration, TimeUnit.MILLISECONDS).build(); this.failCountMap = CacheBuilder.newBuilder().weakKeys().build(new CacheLoader<T, EvictingQueue<Long>>() { @Override public EvictingQueue<Long> load(T key) throws Exception { return EvictingQueue.create(failCount); } }); } public void fail(T object) { logger.info("Server {}:{} failed.", ((ThriftServer)object).getHost(),((ThriftServer)object).getPort()); boolean addToFail = false; try { EvictingQueue<Long> evictingQueue = failCountMap.get(object); synchronized (evictingQueue) { evictingQueue.add(System.currentTimeMillis()); if (evictingQueue.remainingCapacity() == 0 && evictingQueue.element() >= (System.currentTimeMillis() - failDuration)) { addToFail = true; } } } catch (ExecutionException e) { logger.error("Ops.", e); } if (addToFail) { failedList.put(object, Boolean.TRUE); logger.info("Server {}:{} failed. Add to fail list.", ((ThriftServer)object).getHost(), ((ThriftServer)object).getPort()); } } public Set<T> getFailed() { return failedList.asMap().keySet(); }
ThriftJ 提供了四種可選的負載均衡策略:
在用戶不顯式指定的狀況下,默認採用隨機算法。具體算法的實如今此就再也不進行過多的描述了。
須要注意的是,ThriftJ 嚴格規範了調用的語義,好比使用哈希策略時,必需要指定 hash key;當使用非哈希的其餘策略時,必定不能指定 key,避免形成理解的二義性。
ThriftJ 提供了多種可配置的服務級別,並根據服務級別進行服務降級處理,其對應關係以下:
其中 ThriftJ 默認使用的服務級別是 NOT_EMPTY。服務降級處理的關鍵代碼以下:
private List<ThriftServer> getAvailableServers(boolean all) { List<ThriftServer> returnList = new ArrayList<>(); Set<ThriftServer> failedServers = failoverStrategy.getFailed(); for (ThriftServer thriftServer : serverList) { if (!failedServers.contains(thriftServer)) returnList.add(thriftServer); } if (this.serviceLevel == Constant.ServiceLevel.SERVERS_ONLY) { return returnList; } if ((all || returnList.isEmpty()) && !backupServerList.isEmpty()) { for (ThriftServer thriftServer : backupServerList) { if (!failedServers.contains(thriftServer)) returnList.add(thriftServer); } } if (this.serviceLevel == Constant.ServiceLevel.ALL_SERVERS) { return returnList; } if(returnList.isEmpty()){ returnList.addAll(serverList); } return returnList; }
技術的提高源自無私的分享,好的技術或工具分享出來,並不會讓本身失去什麼,反而能夠在你們共同研究和溝通後使之得到更好的完善。不要擔憂本身寫的工具不夠好,不要懼怕本身的技術不夠牛,誰能一步就登天呢?
請熱愛你的熱愛!
【1】Smart Client:好比 MongoClient,可自動發現集羣服務節點、自動故障轉移和負載均衡。