Thrift 基於zookeeper改造模式

對於Thrift服務化的改造,主要是客戶端,能夠從以下幾個方面進行:java

1.服務端的服務註冊,客戶端自動發現,無需手工修改配置,這裏咱們使用zookeeper,但因爲zookeeper自己提供的客戶端使用較爲複雜,所以採用curator-recipes工具類進行處理服務的註冊與發現。node

2.客戶端使用鏈接池對服務調用進行管理,提高性能,這裏咱們使用Apache Commons項目commons-pool,能夠大大減小代碼的複雜度。算法

3.關於Failover/LoadBalance,因爲zookeeper的watcher,當服務端不可用是及時通知客戶端,並移除不可用的服務節點,而LoadBalance有不少算法,這裏咱們採用隨機加權方式,也是常有的負載算法,至於其餘的算法介紹參考:常見的負載均衡的基本算法。spring

4.使thrift服務的註冊和發現能夠基於spring配置,能夠提供不少的便利。apache

5.其餘的改造如:緩存

1)經過動態代理實現client和server端的交互細節透明化,讓用戶只需經過服務方提供的接口進行訪問服務器

2)Thrift經過兩種方式調用服務Client和Iface 網絡

// *) Client API 調用  
(EchoService.Client)client.echo("hello lilei");  ---(1)  
// *) Service 接口 調用  
(EchoService.Iface)service.echo("hello lilei");  ---(2)  
Client API的方式, 不推薦, 咱們推薦Service接口的方式(服務化)。

下面咱們來一一實現:session

1、pom.xml引入依賴jar包負載均衡

 

<dependency>  
            <groupId>org.apache.thrift</groupId>  
            <artifactId>libthrift</artifactId>  
            <version>0.9.2</version>  
        </dependency>  
        <dependency>  
            <groupId>commons-pool</groupId>  
            <artifactId>commons-pool</artifactId>  
            <version>1.6</version>  
        </dependency>  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-context</artifactId>  
            <version>4.0.9.RELEASE</version>  
        </dependency>  
  
        <dependency>  
            <groupId>org.apache.zookeeper</groupId>  
            <artifactId>zookeeper</artifactId>  
            <version>3.4.6</version>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.curator</groupId>  
            <artifactId>curator-recipes</artifactId>  
            <version>2.7.1</version>  
        </dependency>  
View Code

2、使用zookeeper管理服務節點配置

RPC服務往平臺化的方向發展, 會屏蔽掉更多的服務細節(服務的IP地址集羣, 集羣的擴容和遷移), 只暴露服務接口. 這部分的演化, 使得server端和client端徹底的解耦合. 二者的交互經過ConfigServer(MetaServer)的中介角色來搭線。

注: 該圖源自dubbo的官網
這邊藉助Zookeeper來扮演該角色, server扮演發佈者的角色, 而client扮演訂閱者的角色.

Zookeeper是分佈式應用協做服務. 它實現了paxos的一致性算法, 在命名管理/配置推送/數據同步/主從切換方面扮演重要的角色。 其數據組織相似文件系統的目錄結構: 

每一個節點被稱爲znode, 爲znode節點依據其特性, 又能夠分爲以下類型:
  1). PERSISTENT: 永久節點
  2). EPHEMERAL: 臨時節點, 會隨session(client disconnect)的消失而消失
  3). PERSISTENT_SEQUENTIAL: 永久節點, 其節點的名字編號是單調遞增的
  4). EPHEMERAL_SEQUENTIAL: 臨時節點, 其節點的名字編號是單調遞增的
  注: 臨時節點不能成爲父節點
  Watcher觀察模式, client能夠註冊對節點的狀態/內容變動的事件回調機制. 其Event事件的兩類屬性須要關注下:
  1). KeeperState: Disconnected,SyncConnected,Expired
  2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服務端:
  做爲具體業務服務的RPC服務發佈方, 對其自身的服務描述由如下元素構成.
  1). namespace: 命名空間,來區分不一樣應用 
  2). service: 服務接口, 採用發佈方的類全名來表示
  3). version: 版本號
  借鑑了Maven的GAV座標系, 三維座標系更符合服務平臺化的大環境. 
  *) 數據模型的設計
  具體RPC服務的註冊路徑爲: /rpc/{namespace}/{service}/{version}, 該路徑上的節點都是永久節點
  RPC服務集羣節點的註冊路徑爲: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的節點是臨時節點.

1.定義Zookeeper的客戶端的管理

ZookeeperFactory.Java

package cn.slimsmart.thrift.rpc.zookeeper;  
  
import org.apache.curator.framework.CuratorFramework;  
import org.apache.curator.framework.CuratorFrameworkFactory;  
import org.apache.curator.retry.ExponentialBackoffRetry;  
import org.springframework.beans.factory.FactoryBean;  
import org.springframework.util.StringUtils;  
  
/** 
 * 獲取zookeeper客戶端連接 
 */  
public class ZookeeperFactory implements FactoryBean<CuratorFramework> {  
  
    private String zkHosts;  
    // session超時  
    private int sessionTimeout = 30000;  
    private int connectionTimeout = 30000;  
  
    // 共享一個zk連接  
    private boolean singleton = true;  
  
    // 全局path前綴,經常使用來區分不一樣的應用  
    private String namespace;  
  
    private final static String ROOT = "rpc";  
  
    private CuratorFramework zkClient;  
  
    public void setZkHosts(String zkHosts) {  
        this.zkHosts = zkHosts;  
    }  
  
    public void setSessionTimeout(int sessionTimeout) {  
        this.sessionTimeout = sessionTimeout;  
    }  
  
    public void setConnectionTimeout(int connectionTimeout) {  
        this.connectionTimeout = connectionTimeout;  
    }  
  
    public void setSingleton(boolean singleton) {  
        this.singleton = singleton;  
    }  
  
    public void setNamespace(String namespace) {  
        this.namespace = namespace;  
    }  
  
    public void setZkClient(CuratorFramework zkClient) {  
        this.zkClient = zkClient;  
    }  
  
    @Override  
    public CuratorFramework getObject() throws Exception {  
        if (singleton) {  
            if (zkClient == null) {  
                zkClient = create();  
                zkClient.start();  
            }  
            return zkClient;  
        }  
        return create();  
    }  
  
    @Override  
    public Class<?> getObjectType() {  
        return CuratorFramework.class;  
    }  
  
    @Override  
    public boolean isSingleton() {  
        return singleton;  
    }  
  
    public CuratorFramework create() throws Exception {  
        if (StringUtils.isEmpty(namespace)) {  
            namespace = ROOT;  
        } else {  
            namespace = ROOT +"/"+ namespace;  
        }  
        return create(zkHosts, sessionTimeout, connectionTimeout, namespace);  
    }  
  
    public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {  
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();  
        return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)  
                .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))  
                .defaultData(null).build();  
    }  
  
    public void close() {  
        if (zkClient != null) {  
            zkClient.close();  
        }  
    }  
}  
View Code

2.服務端註冊服務

因爲服務端配置須要獲取本機的IP地址,所以定義IP獲取接口

ThriftServerIpResolve.java

package cn.slimsmart.thrift.rpc.zookeeper;  
  
/** 
 *  
 * 解析thrift-server端IP地址,用於註冊服務 
 * 1) 能夠從一個物理機器或者虛機的特殊文件中解析 
 * 2) 能夠獲取指定網卡序號的Ip 
 * 3) 其餘 
 */  
public interface ThriftServerIpResolve {  
      
    String getServerIp() throws Exception;  
      
    void reset();  
      
    //當IP變動時,將會調用reset方法  
    static interface IpRestCalllBack{  
        public void rest(String newIp);  
    }  
}  
View Code

能夠對該接口作不通的實現,下面咱們基於網卡獲取IP地址,也能夠經過配置serverIp

ThriftServerIpLocalNetworkResolve.java

package cn.slimsmart.thrift.rpc.zookeeper;  
  
import java.net.Inet6Address;  
import java.net.InetAddress;  
import java.net.NetworkInterface;  
import java.net.SocketException;  
import java.util.Enumeration;  
  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
  
/** 
 * 解析網卡Ip 
 * 
 */  
public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {  
      
    private Logger logger = LoggerFactory.getLogger(getClass());  
  
    //緩存  
    private String serverIp;  
      
    public void setServerIp(String serverIp) {  
        this.serverIp = serverIp;  
    }  
  
    @Override  
    public String getServerIp() {  
        if (serverIp != null) {  
            return serverIp;  
        }  
        // 一個主機有多個網絡接口  
        try {  
            Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();  
            while (netInterfaces.hasMoreElements()) {  
                NetworkInterface netInterface = netInterfaces.nextElement();  
                // 每一個網絡接口,都會有多個"網絡地址",好比必定會有lookback地址,會有siteLocal地址等.以及IPV4或者IPV6 .  
                Enumeration<InetAddress> addresses = netInterface.getInetAddresses();  
                while (addresses.hasMoreElements()) {  
                    InetAddress address = addresses.nextElement();  
                    if(address instanceof Inet6Address){  
                        continue;  
                    }  
                    if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {  
                        serverIp = address.getHostAddress();  
                        logger.info("resolve server ip :"+ serverIp);  
                        continue;  
                    }  
                }  
            }  
        } catch (SocketException e) {  
            e.printStackTrace();  
        }  
        return serverIp;  
    }  
  
    @Override  
    public void reset() {  
        serverIp = null;  
    }  
}  
View Code

接下來咱們定義發佈服務接口,並實現將服務信息(服務接口、版本號,IP、port、weight)發佈到zookeeper中。
ThriftServerAddressRegister.java 

package cn.slimsmart.thrift.rpc.zookeeper;  
  
/** 
 * 發佈服務地址及端口到服務註冊中心,這裏是zookeeper服務器 
 */  
public interface ThriftServerAddressRegister {  
    /** 
     * 發佈服務接口 
     * @param service 服務接口名稱,一個產品中不能重複 
     * @param version 服務接口的版本號,默認1.0.0 
     * @param address 服務發佈的地址和端口 
     */  
    void register(String service,String version,String address);  
}  
View Code

實現:ThriftServerAddressRegisterZookeeper.java

package cn.slimsmart.thrift.rpc.zookeeper;  
  
import java.io.UnsupportedEncodingException;  
  
import org.apache.curator.framework.CuratorFramework;  
import org.apache.curator.framework.imps.CuratorFrameworkState;  
import org.apache.zookeeper.CreateMode;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.springframework.util.StringUtils;  
  
import cn.slimsmart.thrift.rpc.ThriftException;  
  
/** 
 *  註冊服務列表到Zookeeper 
 */  
public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{  
      
    private Logger logger = LoggerFactory.getLogger(getClass());  
      
    private CuratorFramework zkClient;  
      
    public ThriftServerAddressRegisterZookeeper(){}  
      
    public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){  
        this.zkClient = zkClient;  
    }  
  
    public void setZkClient(CuratorFramework zkClient) {  
        this.zkClient = zkClient;  
    }  
  
    @Override  
    public void register(String service, String version, String address) {  
        if(zkClient.getState() == CuratorFrameworkState.LATENT){  
            zkClient.start();  
        }  
        if(StringUtils.isEmpty(version)){  
            version="1.0.0";  
        }  
        //臨時節點  
        try {  
            zkClient.create()  
                .creatingParentsIfNeeded()  
                .withMode(CreateMode.EPHEMERAL)  
                .forPath("/"+service+"/"+version+"/"+address);  
        } catch (UnsupportedEncodingException e) {  
            logger.error("register service address to zookeeper exception:{}",e);  
            throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);  
        } catch (Exception e) {  
            logger.error("register service address to zookeeper exception:{}",e);  
            throw new ThriftException("register service address to zookeeper exception:{}", e);  
        }  
    }  
      
    public void close(){  
        zkClient.close();  
    }  
}  
View Code

3.客戶端發現服務

定義獲取服務地址接口

ThriftServerAddressProvider.java

package cn.slimsmart.thrift.rpc.zookeeper;  
  
import java.net.InetSocketAddress;  
import java.util.List;  
  
/** 
 * thrift server-service地址提供者,以便構建客戶端鏈接池 
 */  
public interface ThriftServerAddressProvider {  
      
    //獲取服務名稱  
    String getService();  
  
    /** 
     * 獲取全部服務端地址 
     * @return 
     */  
    List<InetSocketAddress> findServerAddressList();  
  
    /** 
     * 選取一個合適的address,能夠隨機獲取等' 
     * 內部可使用合適的算法. 
     * @return 
     */  
    InetSocketAddress selector();  
  
    void close();  
}  
View Code

基於zookeeper服務地址自動發現實現:ThriftServerAddressProviderZookeeper.java

package cn.slimsmart.thrift.rpc.zookeeper;  
  
import java.net.InetSocketAddress;  
import java.util.ArrayList;  
import java.util.Collections;  
import java.util.HashSet;  
import java.util.LinkedList;  
import java.util.List;  
import java.util.Queue;  
import java.util.Set;  
  
import org.apache.curator.framework.CuratorFramework;  
import org.apache.curator.framework.imps.CuratorFrameworkState;  
import org.apache.curator.framework.recipes.cache.ChildData;  
import org.apache.curator.framework.recipes.cache.PathChildrenCache;  
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;  
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.springframework.beans.factory.InitializingBean;  
  
/** 
 * 使用zookeeper做爲"config"中心,使用apache-curator方法庫來簡化zookeeper開發 
 */  
public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {  
  
    private Logger logger = LoggerFactory.getLogger(getClass());  
  
    // 註冊服務  
    private String service;  
    // 服務版本號  
    private String version = "1.0.0";  
  
    private PathChildrenCache cachedPath;  
  
    private CuratorFramework zkClient;  
  
    // 用來保存當前provider所接觸過的地址記錄  
    // 當zookeeper集羣故障時,可使用trace中地址,做爲"備份"  
    private Set<String> trace = new HashSet<String>();  
  
    private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();  
  
    private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();  
  
    private Object lock = new Object();  
  
    // 默認權重  
    private static final Integer DEFAULT_WEIGHT = 1;  
  
    public void setService(String service) {  
        this.service = service;  
    }  
  
    public void setVersion(String version) {  
        this.version = version;  
    }  
  
    public ThriftServerAddressProviderZookeeper() {  
    }  
  
    public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {  
        this.zkClient = zkClient;  
    }  
  
    public void setZkClient(CuratorFramework zkClient) {  
        this.zkClient = zkClient;  
    }  
  
    @Override  
    public void afterPropertiesSet() throws Exception {  
        // 若是zk還沒有啓動,則啓動  
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {  
            zkClient.start();  
        }  
        buildPathChildrenCache(zkClient, getServicePath(), true);  
        cachedPath.start(StartMode.POST_INITIALIZED_EVENT);  
    }  
  
    private String getServicePath(){  
        return "/" + service + "/" + version;  
    }  
    private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {  
        cachedPath = new PathChildrenCache(client, path, cacheData);  
        cachedPath.getListenable().addListener(new PathChildrenCacheListener() {  
            @Override  
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
                PathChildrenCacheEvent.Type eventType = event.getType();  
                switch (eventType) {  
                case CONNECTION_RECONNECTED:  
                    logger.info("Connection is reconection.");  
                    break;  
                case CONNECTION_SUSPENDED:  
                    logger.info("Connection is suspended.");  
                    break;  
                case CONNECTION_LOST:  
                    logger.warn("Connection error,waiting...");  
                    return;  
                default:  
                    //  
                }  
                // 任何節點的時機數據變更,都會rebuild,此處爲一個"簡單的"作法.  
                cachedPath.rebuild();  
                rebuild();  
            }  
  
            protected void rebuild() throws Exception {  
                List<ChildData> children = cachedPath.getCurrentData();  
                if (children == null || children.isEmpty()) {  
                    // 有可能全部的thrift server都與zookeeper斷開了連接  
                    // 可是,有可能,thrift client與thrift server之間的網絡是良好的  
                    // 所以此處是否須要清空container,是須要多方面考慮的.  
                    container.clear();  
                    logger.error("thrift server-cluster error....");  
                    return;  
                }  
                List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();  
                String path = null;  
                for (ChildData data : children) {  
                    path = data.getPath();  
                    logger.debug("get path:"+path);  
                    path = path.substring(getServicePath().length()+1);  
                    logger.debug("get serviceAddress:"+path);  
                    String address = new String(path.getBytes(), "utf-8");  
                    current.addAll(transfer(address));  
                    trace.add(address);  
                }  
                Collections.shuffle(current);  
                synchronized (lock) {  
                    container.clear();  
                    container.addAll(current);  
                    inner.clear();  
                    inner.addAll(current);  
  
                }  
            }  
        });  
    }  
  
    private List<InetSocketAddress> transfer(String address) {  
        String[] hostname = address.split(":");  
        Integer weight = DEFAULT_WEIGHT;  
        if (hostname.length == 3) {  
            weight = Integer.valueOf(hostname[2]);  
        }  
        String ip = hostname[0];  
        Integer port = Integer.valueOf(hostname[1]);  
        List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();  
        // 根據優先級,將ip:port添加屢次到地址集中,而後隨機取地址實現負載  
        for (int i = 0; i < weight; i++) {  
            result.add(new InetSocketAddress(ip, port));  
        }  
        return result;  
    }  
  
    @Override  
    public List<InetSocketAddress> findServerAddressList() {  
        return Collections.unmodifiableList(container);  
    }  
  
    @Override  
    public synchronized InetSocketAddress selector() {  
        if (inner.isEmpty()) {  
            if (!container.isEmpty()) {  
                inner.addAll(container);  
            } else if (!trace.isEmpty()) {  
                synchronized (lock) {  
                    for (String hostname : trace) {  
                        container.addAll(transfer(hostname));  
                    }  
                    Collections.shuffle(container);  
                    inner.addAll(container);  
                }  
            }  
        }  
        return inner.poll();  
    }  
  
    @Override  
    public void close() {  
        try {  
            cachedPath.close();  
            zkClient.close();  
        } catch (Exception e) {  
        }  
    }  
  
    @Override  
    public String getService() {  
        return service;  
    }  
  
}  
View Code

對此接口還作了一種實現,經過配置獲取服務地址,參考附件:FixedAddressProvider.java

3、服務端服務註冊實現

ThriftServiceServerFactory.java

package cn.slimsmart.thrift.rpc;  
  
import java.lang.instrument.IllegalClassFormatException;  
import java.lang.reflect.Constructor;  
  
import org.apache.thrift.TProcessor;  
import org.apache.thrift.TProcessorFactory;  
import org.apache.thrift.protocol.TBinaryProtocol;  
import org.apache.thrift.server.TServer;  
import org.apache.thrift.server.TThreadedSelectorServer;  
import org.apache.thrift.transport.TFramedTransport;  
import org.apache.thrift.transport.TNonblockingServerSocket;  
import org.springframework.beans.factory.InitializingBean;  
import org.springframework.util.StringUtils;  
  
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;  
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;  
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;  
  
/** 
 * 服務端註冊服務工廠 
 */  
public class ThriftServiceServerFactory implements InitializingBean {  
    // 服務註冊本機端口  
    private Integer port = 8299;  
    // 優先級  
    private Integer weight = 1;// default  
    // 服務實現類  
    private Object service;// serice實現類  
    //服務版本號  
    private String version;  
    // 解析本機IP  
    private ThriftServerIpResolve thriftServerIpResolve;  
    //服務註冊  
    private ThriftServerAddressRegister thriftServerAddressRegister;  
  
    private ServerThread serverThread;  
      
    public void setPort(Integer port) {  
        this.port = port;  
    }  
  
    public void setWeight(Integer weight) {  
        this.weight = weight;  
    }  
  
    public void setService(Object service) {  
        this.service = service;  
    }  
  
    public void setVersion(String version) {  
        this.version = version;  
    }  
  
    public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {  
        this.thriftServerIpResolve = thriftServerIpResolve;  
    }  
  
    public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {  
        this.thriftServerAddressRegister = thriftServerAddressRegister;  
    }  
  
    @Override  
    public void afterPropertiesSet() throws Exception {  
        if (thriftServerIpResolve == null) {  
            thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();  
        }  
        String serverIP = thriftServerIpResolve.getServerIp();  
        if (StringUtils.isEmpty(serverIP)) {  
            throw new ThriftException("cant find server ip...");  
        }  
  
        String hostname = serverIP + ":" + port + ":" + weight;  
        Class<?> serviceClass = service.getClass();  
        // 獲取實現類接口  
        Class<?>[] interfaces = serviceClass.getInterfaces();  
        if (interfaces.length == 0) {  
            throw new IllegalClassFormatException("service-class should implements Iface");  
        }  
        // reflect,load "Processor";  
        TProcessor processor = null;  
        String serviceName = null;  
        for (Class<?> clazz : interfaces) {  
            String cname = clazz.getSimpleName();  
            if (!cname.equals("Iface")) {  
                continue;  
            }  
            serviceName = clazz.getEnclosingClass().getName();  
            String pname = serviceName + "$Processor";  
            try {  
                ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
                Class<?> pclass = classLoader.loadClass(pname);  
                if (!TProcessor.class.isAssignableFrom(pclass)) {  
                    continue;  
                }  
                Constructor<?> constructor = pclass.getConstructor(clazz);  
                processor = (TProcessor) constructor.newInstance(service);  
                break;  
            } catch (Exception e) {  
                //  
            }  
        }  
        if (processor == null) {  
            throw new IllegalClassFormatException("service-class should implements Iface");  
        }  
        //須要單獨的線程,由於serve方法是阻塞的.  
        serverThread = new ServerThread(processor, port);  
        serverThread.start();  
        // 註冊服務  
        if (thriftServerAddressRegister != null) {  
            thriftServerAddressRegister.register(serviceName, version, hostname);  
        }  
  
    }  
    class ServerThread extends Thread {  
        private TServer server;  
        ServerThread(TProcessor processor, int port) throws Exception {  
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);  
            TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);    
            TProcessorFactory processorFactory = new TProcessorFactory(processor);  
            tArgs.processorFactory(processorFactory);  
            tArgs.transportFactory(new TFramedTransport.Factory());    
            tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));   
            server = new TThreadedSelectorServer(tArgs);  
        }  
  
        @Override  
        public void run(){  
            try{  
                //啓動服務  
                server.serve();  
            }catch(Exception e){  
                //  
            }  
        }  
          
        public void stopServer(){  
            server.stop();  
        }  
    }  
      
    public void close() {  
        serverThread.stopServer();  
    }  
}  
View Code

4、客戶端獲取服務代理及鏈接池實現
客戶端鏈接池實現:ThriftClientPoolFactory.java

package cn.slimsmart.thrift.rpc;  
  
import java.net.InetSocketAddress;  
  
import org.apache.commons.pool.BasePoolableObjectFactory;  
import org.apache.thrift.TServiceClient;  
import org.apache.thrift.TServiceClientFactory;  
import org.apache.thrift.protocol.TBinaryProtocol;  
import org.apache.thrift.protocol.TProtocol;  
import org.apache.thrift.transport.TFramedTransport;  
import org.apache.thrift.transport.TSocket;  
import org.apache.thrift.transport.TTransport;  
  
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;  
  
/** 
 * 鏈接池,thrift-client for spring 
 */  
public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {  
  
    private final ThriftServerAddressProvider serverAddressProvider;  
    private final TServiceClientFactory<TServiceClient> clientFactory;  
    private PoolOperationCallBack callback;  
  
    protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {  
        this.serverAddressProvider = addressProvider;  
        this.clientFactory = clientFactory;  
    }  
  
    protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,  
            PoolOperationCallBack callback) throws Exception {  
        this.serverAddressProvider = addressProvider;  
        this.clientFactory = clientFactory;  
        this.callback = callback;  
    }  
  
    static interface PoolOperationCallBack {  
        // 銷燬client以前執行  
        void destroy(TServiceClient client);  
  
        // 建立成功是執行  
        void make(TServiceClient client);  
    }  
  
    public void destroyObject(TServiceClient client) throws Exception {  
        if (callback != null) {  
            try {  
                callback.destroy(client);  
            } catch (Exception e) {  
                //  
            }  
        }  
        TTransport pin = client.getInputProtocol().getTransport();  
        pin.close();  
    }  
  
    public boolean validateObject(TServiceClient client) {  
        TTransport pin = client.getInputProtocol().getTransport();  
        return pin.isOpen();  
    }  
  
    @Override  
    public TServiceClient makeObject() throws Exception {  
        InetSocketAddress address = serverAddressProvider.selector();  
        TSocket tsocket = new TSocket(address.getHostName(), address.getPort());  
        TTransport transport = new TFramedTransport(tsocket);  
        TProtocol protocol = new TBinaryProtocol(transport);  
        TServiceClient client = this.clientFactory.getClient(protocol);  
        transport.open();  
        if (callback != null) {  
            try {  
                callback.make(client);  
            } catch (Exception e) {  
                //  
            }  
        }  
        return client;  
    }  
  
}  
View Code
 

客戶端服務代理工廠實現:ThriftServiceClientProxyFactory.java

package cn.slimsmart.thrift.rpc;  
  
import java.lang.reflect.InvocationHandler;  
import java.lang.reflect.Method;  
import java.lang.reflect.Proxy;  
  
import org.apache.commons.pool.impl.GenericObjectPool;  
import org.apache.thrift.TServiceClient;  
import org.apache.thrift.TServiceClientFactory;  
import org.springframework.beans.factory.FactoryBean;  
import org.springframework.beans.factory.InitializingBean;  
  
import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;  
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;  
  
/** 
 * 客戶端代理 
 */  
@SuppressWarnings({ "unchecked", "rawtypes" })  
public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {  
  
    private Integer maxActive = 32;// 最大活躍鏈接數  
  
    // ms,default 3 min,連接空閒時間  
    // -1,關閉空閒檢測  
    private Integer idleTime = 180000;  
    private ThriftServerAddressProvider serverAddressProvider;  
  
    private Object proxyClient;  
    private Class<?> objectClass;  
  
    private GenericObjectPool<TServiceClient> pool;  
  
    private PoolOperationCallBack callback = new PoolOperationCallBack() {  
        @Override  
        public void make(TServiceClient client) {  
            System.out.println("create");  
        }  
  
        @Override  
        public void destroy(TServiceClient client) {  
            System.out.println("destroy");  
        }  
    };  
      
    public void setMaxActive(Integer maxActive) {  
        this.maxActive = maxActive;  
    }  
  
    public void setIdleTime(Integer idleTime) {  
        this.idleTime = idleTime;  
    }  
  
    public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {  
        this.serverAddressProvider = serverAddressProvider;  
    }  
  
    @Override  
    public void afterPropertiesSet() throws Exception {  
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
        // 加載Iface接口  
        objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");  
        // 加載Client.Factory類  
        Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");  
        TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();  
        ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);  
        GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();  
        poolConfig.maxActive = maxActive;  
        poolConfig.minIdle = 0;  
        poolConfig.minEvictableIdleTimeMillis = idleTime;  
        poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;  
        pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);  
        proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {  
            @Override  
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
                //  
                TServiceClient client = pool.borrowObject();  
                try {  
                    return method.invoke(client, args);  
                } catch (Exception e) {  
                    throw e;  
                } finally {  
                    pool.returnObject(client);  
                }  
            }  
        });  
    }  
  
    @Override  
    public Object getObject() throws Exception {  
        return proxyClient;  
    }  
  
    @Override  
    public Class<?> getObjectType() {  
        return objectClass;  
    }  
  
    @Override  
    public boolean isSingleton() {  
        return true;  
    }  
  
    public void close() {  
        if (serverAddressProvider != null) {  
            serverAddressProvider.close();  
        }  
    }  
}  
View Code

下面咱們看一下服務端和客戶端的配置;

服務端spring-context-thrift-server.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"  
    default-lazy-init="false">  
  
    <!-- zookeeper -->  
    <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"  
        destroy-method="close">  
        <property name="zkHosts"  
            value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />  
        <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />  
        <property name="connectionTimeout" value="3000" />  
        <property name="sessionTimeout" value="3000" />  
        <property name="singleton" value="true" />  
    </bean>  
    <bean id="sericeAddressRegister"  
        class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"  
        destroy-method="close">  
        <property name="zkClient" ref="thriftZookeeper" />  
    </bean>  
    <bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />  
  
    <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
        destroy-method="close">  
        <property name="service" ref="echoSerivceImpl" />  
        <property name="port" value="9000" />  
        <property name="version" value="1.0.0" />  
        <property name="weight" value="1" />  
        <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
    </bean>  
      
    <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
        destroy-method="close">  
        <property name="service" ref="echoSerivceImpl" />  
        <property name="port" value="9001" />  
        <property name="version" value="1.0.0" />  
        <property name="weight" value="1" />  
        <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
    </bean>  
      
    <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
        destroy-method="close">  
        <property name="service" ref="echoSerivceImpl" />  
        <property name="port" value="9002" />  
        <property name="version" value="1.0.0" />  
        <property name="weight" value="1" />  
        <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
    </bean>  
</beans>  
View Code

客戶端:spring-context-thrift-client.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"  
    default-lazy-init="false">  
      
    <!-- fixedAddress -->  
    <!--   
    <bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">  
         <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />  
         <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />  
    </bean>  
    <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">  
        <property name="maxActive" value="5" />  
        <property name="idleTime" value="10000" />  
        <property name="serverAddressProvider" ref="fixedAddressProvider" />  
    </bean>  
   -->  
    <!-- zookeeper   -->  
    <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"  
        destroy-method="close">  
        <property name="zkHosts"  
            value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />  
        <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />  
        <property name="connectionTimeout" value="3000" />  
        <property name="sessionTimeout" value="3000" />  
        <property name="singleton" value="true" />  
    </bean>  
    <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">  
        <property name="maxActive" value="5" />  
        <property name="idleTime" value="1800000" />  
        <property name="serverAddressProvider">  
            <bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">  
                <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />  
                <property name="version" value="1.0.0" />  
                <property name="zkClient" ref="thriftZookeeper" />  
            </bean>  
        </property>  
    </bean>  
</beans>  
View Code

運行服務端後,咱們能夠看見zookeeper註冊了多個服務地址。 

相關文章
相關標籤/搜索