對於Thrift服務化的改造,主要是客戶端,能夠從以下幾個方面進行:java
1.服務端的服務註冊,客戶端自動發現,無需手工修改配置,這裏咱們使用zookeeper,但因爲zookeeper自己提供的客戶端使用較爲複雜,所以採用curator-recipes工具類進行處理服務的註冊與發現。node
2.客戶端使用鏈接池對服務調用進行管理,提高性能,這裏咱們使用Apache Commons項目commons-pool,能夠大大減小代碼的複雜度。git
3.關於Failover/LoadBalance,因爲zookeeper的watcher,當服務端不可用是及時通知客戶端,並移除不可用的服務節點,而LoadBalance有不少算法,這裏咱們採用隨機加權方式,也是常有的負載算法,至於其餘的算法介紹參考:常見的負載均衡的基本算法。算法
4.使thrift服務的註冊和發現能夠基於spring配置,能夠提供不少的便利。spring
5.其餘的改造如:apache
1)經過動態代理實現client和server端的交互細節透明化,讓用戶只需經過服務方提供的接口進行訪問api
2)Thrift經過兩種方式調用服務Client和Iface,Client API的方式, 不推薦, 咱們推薦Service接口的方式(服務化)。緩存
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.11.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.2.RELEASE</version> </dependency> <!-- 使用Netflix開源的zk開發--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency> </dependencies>
PC服務往平臺化的方向發展, 會屏蔽掉更多的服務細節(服務的IP地址集羣, 集羣的擴容和遷移), 只暴露服務接口. 這部分的演化, 使得server端和client端徹底的解耦合. 二者的交互經過ConfigServer(MetaServer)的中介角色來搭線。服務器
注: 該圖源自dubbo的官網
這邊藉助Zookeeper來扮演該角色, server扮演發佈者的角色, 而client扮演訂閱者的角色.網絡
Zookeeper是分佈式應用協做服務. 它實現了paxos的一致性算法, 在命名管理/配置推送/數據同步/主從切換方面扮演重要的角色。 其數據組織相似文件系統的目錄結構:
每一個節點被稱爲znode, 爲znode節點依據其特性, 又能夠分爲以下類型:
1). PERSISTENT: 永久節點
2). EPHEMERAL: 臨時節點, 會隨session(client disconnect)的消失而消失
3). PERSISTENT_SEQUENTIAL: 永久節點, 其節點的名字編號是單調遞增的
4). EPHEMERAL_SEQUENTIAL: 臨時節點, 其節點的名字編號是單調遞增的
注: 臨時節點不能成爲父節點
Watcher觀察模式, client能夠註冊對節點的狀態/內容變動的事件回調機制. 其Event事件的兩類屬性須要關注下:
1). KeeperState: Disconnected,SyncConnected,Expired
2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服務端:
做爲具體業務服務的RPC服務發佈方, 對其自身的服務描述由如下元素構成.
1). namespace: 命名空間,來區分不一樣應用
2). service: 服務接口, 採用發佈方的類全名來表示
3). version: 版本號
借鑑了Maven的GAV座標系, 三維座標系更符合服務平臺化的大環境.
*) 數據模型的設計
具體RPC服務的註冊路徑爲: /rpc/{namespace}/{service}/{version}, 該路徑上的節點都是永久節點
RPC服務集羣節點的註冊路徑爲: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的節點是臨時節點.
1.定義Zookeeper的客戶端的管理
ZookeeperFactory.java
package com.lpf.thrift.rpc.zookeeper; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.beans.factory.FactoryBean; /** * Created by lpf on 2018-06-11. * 獲取zookeeper客戶端連接 */ public class ZookeeperFactory implements FactoryBean<CuratorFramework>{ private String zkHosts; // session超時 private int sessionTimeout = 30000; private int connectionTimeout = 30000; // 共享一個zk連接 private boolean singleton = true; private CuratorFramework zkClient; // 全局path前綴,經常使用來區分不一樣的應用 private String namespace; private final static String ROOT = "rpc"; //注意必定要實現屬性的set方法,不然在spring bean注入的地方會拿不到值 public void setZkHosts(String zkHosts) { this.zkHosts = zkHosts; } public void setSessionTimeout(int sessionTimeout) { this.sessionTimeout = sessionTimeout; } public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout = connectionTimeout; } public void setSingleton(boolean singleton) { this.singleton = singleton; } public void setNamespace(String namespace) { this.namespace = namespace; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @Override public CuratorFramework getObject() throws Exception { if (singleton){ if (zkClient == null){ zkClient = create(); zkClient.start(); } return zkClient; } return create(); } private CuratorFramework create() throws Exception { if (namespace == null || namespace == ""){ namespace = ROOT; }else { namespace = ROOT+"/"+namespace; } return create(zkHosts, sessionTimeout, connectionTimeout, namespace); } public static CuratorFramework create(String zkHosts, int sessionTimeout, int connectionTimeout, String namespace) { return CuratorFrameworkFactory.builder() .connectString(zkHosts) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .canBeReadOnly(true) .namespace(namespace) .retryPolicy(new ExponentialBackoffRetry(1000,Integer.MAX_VALUE)) .defaultData(null) .build(); } @Override public Class<?> getObjectType() { return CuratorFramework.class; } @Override public boolean isSingleton() { return singleton; } public void close() { if (zkClient != null) { zkClient.close(); } } }
2.服務端註冊服務
因爲服務端配置須要獲取本機的IP地址,所以定義IP獲取接口
ThriftServerIpResolve.java
package com.lpf.thrift.rpc.zookeeper; /** * 解析thrift-server端IP地址,用於註冊服務 * 1) 能夠從一個物理機器或者虛機的特殊文件中解析 * 2) 能夠獲取指定網卡序號的Ip * 3) 其餘 * Created by lpf on 2018-06-11. */ public interface ThriftServerIpResolve { String getServerIp() throws Exception; void reset(); //當IP變動時,將會調用reset方法 static interface IpRestCalllBack{ public void rest(String newIp); } }
能夠對該接口作不通的實現,下面咱們基於網卡獲取IP地址,也能夠經過配置serverIp
ThriftServerIpLocalNetworkResolve.java
package com.lpf.thrift.rpc.zookeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.util.Enumeration; /** * 基於網卡獲取IP地址 * Created by lpf on 2018-06-11. */ public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve{ private Logger logger = LoggerFactory.getLogger(getClass()); //緩存 private String serverIp; @Override public String getServerIp() throws Exception { if (serverIp != null){ return serverIp; } // 一個主機有多個網絡接口 try { Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces(); while (netInterfaces.hasMoreElements()) { NetworkInterface netInterface = netInterfaces.nextElement(); // 每一個網絡接口,都會有多個"網絡地址",好比必定會有lookback地址,會有siteLocal地址等.以及IPV4或者IPV6 . Enumeration<InetAddress> addresses = netInterface.getInetAddresses(); while (addresses.hasMoreElements()) { InetAddress address = addresses.nextElement(); if(address instanceof Inet6Address){ continue; } if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) { serverIp = address.getHostAddress(); logger.info("resolve rpc ip :"+ serverIp); return serverIp; } } } } catch (SocketException e) { e.printStackTrace(); } return serverIp; } @Override public void reset() { serverIp = null; } }
接下來咱們定義發佈服務接口,並實現將服務信息(服務接口、版本號,IP、port、weight)發佈到zookeeper中。
ThriftServerAddressRegister.java
package com.lpf.thrift.rpc.zookeeper; /** * 發佈服務地址及端口到服務註冊中心,這裏是zookeeper服務器 * Created by lpf on 2018-06-11. */ public interface ThriftServerAddressRegister { /** * 發佈服務接口 * @param service 服務接口名稱,一個產品中不能重複 * @param version 服務接口的版本號,默認1.0.0 * @param address 服務發佈的地址和端口 */ void register(String service,String version,String address); }
實現:ThriftServerAddressRegisterZookeeper.java
package com.lpf.thrift.rpc.zookeeper; import com.lpf.thrift.rpc.ThriftException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 註冊服務列表到Zookeeper * Created by lpf on 2018-06-11. */ public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{ private Logger logger = LoggerFactory.getLogger(getClass()); private CuratorFramework zkClient; public ThriftServerAddressRegisterZookeeper(){} public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){ this.zkClient = zkClient; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @Override public void register(String service, String version, String address) { if(zkClient.getState() == CuratorFrameworkState.LATENT){ zkClient.start(); } if (version == null || version == ""){ version ="1.0.0"; } //建立臨時節點 try { zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath("/"+service+"/"+version+"/"+address); } catch (Exception e) { logger.error("register api address to zookeeper exception:{}",e); throw new ThriftException("register api address to zookeeper exception:{}", e); } } public void close(){ zkClient.close(); } }
3.客戶端發現服務
定義獲取服務地址接口
ThriftServerAddressProvider.java
package com.lpf.thrift.rpc.zookeeper; import java.net.InetSocketAddress; import java.util.List; /** * 獲取服務地址接口 * Created by lpf on 2018-06-11. */ public interface ThriftServerAddressProvider { //獲取服務名稱 String getService(); /** * 獲取全部服務端地址 * @return */ List<InetSocketAddress> findServerAddressList(); /** * 選取一個合適的address,能夠隨機獲取等' * 內部可使用合適的算法. * @return */ InetSocketAddress selector(); void close(); }
基於zookeeper服務地址自動發現實現:ThriftServerAddressProviderZookeeper.java
package com.lpf.thrift.rpc.zookeeper; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import java.io.Closeable; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.CountDownLatch; /** * @author lpf * 使用zookeeper做爲"config"中心,使用apache-curator方法庫來簡化zookeeper開發 */ public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean ,Closeable{ private Logger logger = LoggerFactory.getLogger(getClass()); // 註冊服務 private String service; // 服務版本號 private String version = "1.0.0"; //zk客戶端 private CuratorFramework zkClient; private CountDownLatch countDownLatch= new CountDownLatch(1);//避免 zk尚未鏈接上,就去調用服務 private PathChildrenCache cachedPath; // 用來保存當前provider所接觸過的地址記錄,當zookeeper集羣故障時,可使用trace中地址,做爲"備份" private Set<String> trace = new HashSet<String>(); private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>(); private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>(); private Object lock = new Object(); // 默認權重 private static final Integer DEFAULT_WEIGHT = 1; public void setService(String service) { this.service = service; } public void setVersion(String version) { this.version = version; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } public ThriftServerAddressProviderZookeeper() { } public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) { this.zkClient = zkClient; } @Override public void afterPropertiesSet() throws Exception { // 若是zk還沒有啓動,則啓動 if (zkClient.getState() == CuratorFrameworkState.LATENT) { zkClient.start(); } buildPathChildrenCache(zkClient, getServicePath(), true); cachedPath.start(StartMode.POST_INITIALIZED_EVENT); countDownLatch.await(); } private String getServicePath(){ return "/" + service + "/" + version; } private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception { cachedPath = new PathChildrenCache(client, path, cacheData); cachedPath.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { PathChildrenCacheEvent.Type eventType = event.getType(); switch (eventType) { case CONNECTION_RECONNECTED: logger.info("Connection is reconection."); break; case CONNECTION_SUSPENDED: logger.info("Connection is suspended."); break; case CONNECTION_LOST: logger.warn("Connection error,waiting..."); return; case INITIALIZED: // countDownLatch.countDown(); logger.warn("Connection init ..."); default: // } // 任何節點的時機數據變更,都會rebuild,此處爲一個"簡單的"作法. cachedPath.rebuild(); rebuild(); countDownLatch.countDown(); } protected void rebuild() throws Exception { List<ChildData> children = cachedPath.getCurrentData(); if (children == null || children.isEmpty()) { // 有可能全部的thrift server都與zookeeper斷開了連接 // 可是,有可能,thrift client與thrift server之間的網絡是良好的 // 所以此處是否須要清空container,是須要多方面考慮的. container.clear(); logger.error("thrift rpc-cluster error...."); return; } List<InetSocketAddress> current = new ArrayList<InetSocketAddress>(); String path = null; for (ChildData data : children) { path = data.getPath(); logger.debug("get path:"+path); path = path.substring(getServicePath().length()+1); logger.debug("get serviceAddress:"+path); String address = new String(path.getBytes(), "utf-8"); current.addAll(transfer(address)); trace.add(address); } Collections.shuffle(current); synchronized (lock) { container.clear(); container.addAll(current); inner.clear(); inner.addAll(current); } } }); } private List<InetSocketAddress> transfer(String address) { String[] hostname = address.split(":"); Integer weight = DEFAULT_WEIGHT; if (hostname.length == 3) { weight = Integer.valueOf(hostname[2]); } String ip = hostname[0]; Integer port = Integer.valueOf(hostname[1]); List<InetSocketAddress> result = new ArrayList<InetSocketAddress>(); // 根據優先級,將ip:port添加屢次到地址集中,而後隨機取地址實現負載 for (int i = 0; i < weight; i++) { result.add(new InetSocketAddress(ip, port)); } return result; } @Override public List<InetSocketAddress> findServerAddressList() { return Collections.unmodifiableList(container); } @Override public synchronized InetSocketAddress selector() { if (inner.isEmpty()) { if (!container.isEmpty()) { inner.addAll(container); } else if (!trace.isEmpty()) { synchronized (lock) { for (String hostname : trace) { container.addAll(transfer(hostname)); } Collections.shuffle(container); inner.addAll(container); } } } return inner.poll(); } @Override public void close() { try { cachedPath.close(); zkClient.close(); } catch (Exception e) { } } @Override public String getService() { return service; } }
還能夠經過配置獲取服務地址
ThriftServiceServerFactory.java
package com.lpf.thrift.rpc; import com.lpf.thrift.rpc.zookeeper.ThriftServerAddressRegister; import com.lpf.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve; import com.lpf.thrift.rpc.zookeeper.ThriftServerIpResolve; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.springframework.beans.factory.InitializingBean; import java.lang.instrument.IllegalClassFormatException; import java.lang.reflect.Constructor; /** * 服務端註冊服務工廠 * Created by lpf on 2018-06-11. */ public class ThriftServiceServerFactory implements InitializingBean{ // 服務註冊本機端口 private Integer port = 8299; // 優先級 private Integer weight = 1;// default // 服務實現類 private Object service;// serice實現類 //服務版本號 private String version; //服務註冊 private ThriftServerAddressRegister thriftServerAddressRegister; // 解析本機IP private ThriftServerIpResolve thriftServerIpResolve; private ServerThread serverThread; public void setPort(Integer port) { this.port = port; } public void setWeight(Integer weight) { this.weight = weight; } public void setService(Object service) { this.service = service; } public void setVersion(String version) { this.version = version; } public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) { this.thriftServerAddressRegister = thriftServerAddressRegister; } @Override public void afterPropertiesSet() throws Exception { if (thriftServerIpResolve == null){ thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve(); } String serverIP = thriftServerIpResolve.getServerIp(); if (serverIP == null || serverIP.equals("")){ throw new ThriftException("cant find rpc ip..."); } String hostname = serverIP + ":" + port + ":" + weight; Class<?> serviceClass = service.getClass(); // 獲取實現類接口 Class<?>[] interfaces = serviceClass.getInterfaces(); if (interfaces.length == 0) { throw new IllegalClassFormatException("api-class should implements Iface"); } // reflect,load "Processor"; TProcessor processor = null; String serviceName = null; for (Class<?> clazz : interfaces) { String cname = clazz.getSimpleName(); if (!cname.equals("Iface")) { continue; } serviceName = clazz.getEnclosingClass().getName(); String pname = serviceName + "$Processor"; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class<?> pclass = classLoader.loadClass(pname); if (!TProcessor.class.isAssignableFrom(pclass)) { continue; } Constructor<?> constructor = pclass.getConstructor(clazz); processor = (TProcessor) constructor.newInstance(service); break; } catch (Exception e) { // } } if (processor == null) { throw new IllegalClassFormatException("api-class should implements Iface"); } //須要單獨的線程,由於serve方法是阻塞的. serverThread = new ServerThread(processor, port); serverThread.start(); // 註冊服務 if (thriftServerAddressRegister != null) { thriftServerAddressRegister.register(serviceName, version, hostname); } } class ServerThread extends Thread { private TServer server; ServerThread(TProcessor processor, int port) throws Exception { TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port); TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport); TProcessorFactory processorFactory = new TProcessorFactory(processor); tArgs.processorFactory(processorFactory); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true)); server = new TThreadedSelectorServer(tArgs); } @Override public void run(){ try{ //啓動服務 server.serve(); }catch(Exception e){ // } } public void stopServer(){ server.stop(); } } public void close() { serverThread.stopServer(); } }
客戶端鏈接池實現:ThriftClientPoolFactory.java
package com.lpf.thrift.rpc; import com.lpf.thrift.rpc.zookeeper.ThriftServerAddressProvider; import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.thrift.TServiceClient; import org.apache.thrift.TServiceClientFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; /** * 鏈接池,thrift-client for spring */ public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> { private Logger logger = LoggerFactory.getLogger(getClass()); private final ThriftServerAddressProvider serverAddressProvider; private final TServiceClientFactory<TServiceClient> clientFactory; private PoolOperationCallBack callback; protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; } protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory, PoolOperationCallBack callback) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; this.callback = callback; } static interface PoolOperationCallBack { // 銷燬client以前執行 void destroy(TServiceClient client); // 建立成功是執行 void make(TServiceClient client); } @Override public void destroyObject(TServiceClient client) throws Exception { if (callback != null) { try { callback.destroy(client); } catch (Exception e) { logger.warn("destroyObject:{}", e); } } logger.info("destroyObject:{}", client); TTransport pin = client.getInputProtocol().getTransport(); pin.close(); TTransport pout = client.getOutputProtocol().getTransport(); pout.close(); } @Override public void activateObject(TServiceClient client) throws Exception { } @Override public void passivateObject(TServiceClient client) throws Exception { } @Override public boolean validateObject(TServiceClient client) { TTransport pin = client.getInputProtocol().getTransport(); logger.info("validateObject input:{}", pin.isOpen()); TTransport pout = client.getOutputProtocol().getTransport(); logger.info("validateObject output:{}", pout.isOpen()); return pin.isOpen() && pout.isOpen(); } @Override public TServiceClient makeObject() throws Exception { InetSocketAddress address = serverAddressProvider.selector(); if(address==null){ new ThriftException("No provider available for remote api"); } TSocket tsocket = new TSocket(address.getHostName(), address.getPort()); TTransport transport = new TFramedTransport(tsocket); TProtocol protocol = new TBinaryProtocol(transport); TServiceClient client = this.clientFactory.getClient(protocol); transport.open(); if (callback != null) { try { callback.make(client); } catch (Exception e) { logger.warn("makeObject:{}", e); } } return client; } }
客戶端服務代理工廠實現:ThriftServiceClientProxyFactory.java
package com.lpf.thrift.rpc; import com.lpf.thrift.rpc.zookeeper.ThriftServerAddressProvider; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.thrift.TServiceClient; import org.apache.thrift.TServiceClientFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import java.io.Closeable; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; /** * 客戶端代理工廠 */ public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean,Closeable { private Logger logger = LoggerFactory.getLogger(getClass()); private Integer maxActive = 32;// 最大活躍鏈接數 private Integer idleTime = 180000;// ms,default 3 min,連接空閒時間, -1,關閉空閒檢測 private ThriftServerAddressProvider serverAddressProvider; private Object proxyClient; private Class<?> objectClass; private GenericObjectPool<TServiceClient> pool; private ThriftClientPoolFactory.PoolOperationCallBack callback = new ThriftClientPoolFactory.PoolOperationCallBack() { @Override public void make(TServiceClient client) { logger.info("create"); } @Override public void destroy(TServiceClient client) { logger.info("destroy"); } }; public void setMaxActive(Integer maxActive) { this.maxActive = maxActive; } public void setIdleTime(Integer idleTime) { this.idleTime = idleTime; } public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) { this.serverAddressProvider = serverAddressProvider; } @Override public void afterPropertiesSet() throws Exception { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 加載Iface接口 objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface"); // 加載Client.Factory類 Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory"); TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance(); ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback); pool = new GenericObjectPool<TServiceClient>(clientPool, makePoolConfig()); // InvocationHandler handler = makeProxyHandler();//方式1 InvocationHandler handler = makeProxyHandler2();//方式2 proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, handler); } private InvocationHandler makeProxyHandler() throws Exception{ ThriftServiceClient2Proxy handler = null; TServiceClient client = pool.borrowObject(); try { handler = new ThriftServiceClient2Proxy(client); pool.returnObject(client); }catch (Exception e){ pool.invalidateObject(client); throw new ThriftException("獲取代理對象失敗"); } return handler; } private InvocationHandler makeProxyHandler2() throws Exception{ ThriftServiceClientProxy handler = new ThriftServiceClientProxy(pool); return handler; } private GenericObjectPool.Config makePoolConfig() { GenericObjectPool.Config poolConfig = new GenericObjectPool.Config(); poolConfig.maxActive = maxActive; poolConfig.maxIdle = 1; poolConfig.minIdle = 0; poolConfig.minEvictableIdleTimeMillis = idleTime; poolConfig.timeBetweenEvictionRunsMillis = idleTime * 2L; poolConfig.testOnBorrow=true; poolConfig.testOnReturn=false; poolConfig.testWhileIdle=false; return poolConfig; } @Override public Object getObject() throws Exception { return proxyClient; } @Override public Class<?> getObjectType() { return objectClass; } @Override public boolean isSingleton() { return true; } @Override public void close() { if(pool!=null){ try { pool.close(); } catch (Exception e) { e.printStackTrace(); } } if (serverAddressProvider != null) { serverAddressProvider.close(); } } }
面咱們看一下服務端和客戶端的配置;
服務端spring-context-thrift-server.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd " default-lazy-init="false"> <!-- zookeeper --> <bean id="thriftZookeeper" class="com.lpf.thrift.rpc.zookeeper.ZookeeperFactory" destroy-method="close"> <property name="zkHosts" value="192.168.10.42:2181"/> <property name="namespace" value="com.lpf.thrift.rpc" /> <property name="connectionTimeout" value="3000" /> <property name="sessionTimeout" value="3000" /> <property name="singleton" value="true" /> </bean> <bean id="serviceAddressRegister" class="com.lpf.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper" destroy-method="close"> <property name="zkClient" ref="thriftZookeeper" /> </bean> <bean id="smartService" class="com.lpf.thrift.server.service.SmartServiceImpl"/> <bean id="helloService" class="com.lpf.thrift.server.service.HelloServiceImpl"/> <bean class="com.lpf.thrift.rpc.ThriftServiceServerFactory" destroy-method="close"> <property name="service" ref="smartService" /> <property name="port" value="9000" /> <property name="version" value="1.0.0" /> <property name="weight" value="1" /> <property name="thriftServerAddressRegister" ref="serviceAddressRegister" /> </bean> <bean class="com.lpf.thrift.rpc.ThriftServiceServerFactory" destroy-method="close"> <property name="service" ref="helloService" /> <property name="port" value="9001" /> <property name="version" value="1.0.0" /> <property name="weight" value="1" /> <property name="thriftServerAddressRegister" ref="serviceAddressRegister" /> </bean> </beans>
客戶端:spring-context-thrift-client.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd" default-lazy-init="false"> <!-- zookeeper --> <bean id="thriftZookeeper" class="com.lpf.thrift.rpc.zookeeper.ZookeeperFactory" destroy-method="close"> <property name="zkHosts" value="192.168.10.42:2181" /> <property name="namespace" value="com.lpf.thrift.rpc"/> <property name="connectionTimeout" value="3000" /> <property name="sessionTimeout" value="3000" /> <property name="singleton" value="true" /> </bean> <bean id="smartService" class="com.lpf.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close"> <property name="maxActive" value="5" /> <property name="idleTime" value="1800000" /> <property name="serverAddressProvider"> <bean class="com.lpf.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper"> <property name="service" value="com.lpf.thrift.api.SmartService" /> <property name="version" value="1.0.0" /> <property name="zkClient" ref="thriftZookeeper" /> </bean> </property> </bean> <bean id="helloService" class="com.lpf.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close"> <property name="maxActive" value="5" /> <property name="idleTime" value="1800000" /> <property name="serverAddressProvider"> <bean class="com.lpf.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper"> <property name="service" value="com.lpf.thrift.api.HelloService" /> <property name="version" value="1.0.0" /> <property name="zkClient" ref="thriftZookeeper" /> </bean> </property> </bean> </beans>
運行服務端後,咱們能夠看見zookeeper註冊了多個服務地址。
詳細實例這裏就不詳述了,請參考實例代碼 https://gitee.com/lemonLove/thrift_zk