架構設計:系統間通訊(14)——RPC實例Apache Thrift 下篇(2)

(接上篇《架構設計:系統間通訊(13)——RPC實例Apache Thrift 下篇(1)》)java

三、正式開始編碼

我已經在CSDN的資源區上傳了這個示例工程的全部代碼(http://download.csdn.net/detail/yinwenjie/9289999)。讀者能夠直接到資源下載站進行下載(不收積分哦~~^_^)。這篇文章將緊接上文,主要介紹這個工程幾個主要的類代碼。node

3-一、編寫服務端主程序

服務端主程序的類名:processor.MainProcessor,它負責在服務端啓動Apache Thrift而且在服務監聽啓動成功後,鏈接到zookeeper,註冊這個服務的基本信息。apache

這裏要注意一下,Apache Thrift的服務監聽是阻塞式的,因此processor.MainProcessor的Apache Thrift操做應該另起線程進行(processor.MainProcessor.StartServerThread),而且經過線程間的鎖定操做,保證zookeeper的鏈接必定是在Apache Thrift成功啓動後才進行json

package processor; import java.io.IOException; import java.util.Set; import java.util.concurrent.Executors; import net.sf.json.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TThreadPoolServer.Args; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import business.BusinessServicesMapping; import thrift.iface.DIYFrameworkService; import thrift.iface.DIYFrameworkService.Iface; public class MainProcessor { static {
        BasicConfigurator.configure();
    } /**
     * 日誌
     */ private static final Log LOGGER = LogFactory.getLog(MainProcessor.class); private static final Integer SERVER_PORT = 8090; /**
     * 專門用於鎖定以保證這個主線程不退出的一個object對象
     */ private static final Object WAIT_OBJECT = new Object(); /**
     * 標記apache thrift是否啓動成功了
     * 只有apache thrift啓動成功了,才須要鏈接到zk
     */ private boolean isthriftStart = false; public static void main(String[] args) { /*
         * 主程序要作的事情:
         * 
         * 一、啓動thrift服務。而且服務調用者的請求
         * 二、鏈接到zk,並向zk註冊本身提供的服務名稱,告知zk真實的訪問地址、訪問端口
         * (向zk註冊的服務,存儲在BusinessServicesMapping這個類的K-V常量中)
         * */ //一、========啓動thrift服務 MainProcessor mainProcessor = new MainProcessor();
        mainProcessor.startServer(); // 一直等待,apache thrift啓動完成 synchronized (mainProcessor) { try { while(!mainProcessor.isthriftStart) {
                    mainProcessor.wait();
                }
            } catch (InterruptedException e) {
                MainProcessor.LOGGER.error(e);
                System.exit(-1);
            }
        } //二、========鏈接到zk try {
            mainProcessor.connectZk();
        } catch (IOException | KeeperException | InterruptedException e) {
            MainProcessor.LOGGER.error(e);
            System.exit(-1);
        } // 這個wait在業務層面,沒有任何意義。只是爲了保證這個守護線程不會退出 synchronized (MainProcessor.WAIT_OBJECT) { try {
                MainProcessor.WAIT_OBJECT.wait();
            } catch (InterruptedException e) {
                MainProcessor.LOGGER.error(e);
                System.exit(-1);
            }
        }
    } /**
     * 這個私有方法用於鏈接到zk上,而且註冊相關服務
     * @throws IOException 
     * @throws InterruptedException 
     * @throws KeeperException 
     */ private void connectZk() throws IOException, KeeperException, InterruptedException { // 讀取這個服務提供者,須要在zk上註冊的服務 Set<String> serviceNames = BusinessServicesMapping.SERVICES_MAPPING.keySet(); // 若是沒有任何服務須要註冊到zk,那麼這個服務提供者就沒有繼續註冊的必要了 if(serviceNames == null || serviceNames.isEmpty()) { return;
        } // 默認的監聽器 MyDefaultWatcher defaultWatcher = new MyDefaultWatcher(); // 鏈接到zk服務器集羣,添加默認的watcher監聽 ZooKeeper zk = new ZooKeeper("192.168.61.128:2181", 120000, defaultWatcher); //建立一個父級節點Service Stat pathStat = null; try {
            pathStat = zk.exists("/Service", defaultWatcher); //若是條件成立,說明節點不存在(只須要判斷一個節點的存在性便可) //建立的這個節點是一個「永久狀態」的節點 if(pathStat == null) {
                zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch(Exception e) {
            System.exit(-1);
        } // 開始添加子級節點,每個子級節點都表示一個這個服務提供者提供的業務服務 for (String serviceName : serviceNames) {
            JSONObject nodeData = new JSONObject();
            nodeData.put("ip", "127.0.0.1");
            nodeData.put("port", MainProcessor.SERVER_PORT);
            zk.create("/Service/" + serviceName, nodeData.toString().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } //執行到這裏,說明全部的service都啓動完成了 MainProcessor.LOGGER.info("===================全部service都啓動完成了,主線程開始啓動===================");
    } /**
     * 這個私有方法用於開啓Apache thrift服務端,並進行持續監聽
     * @throws TTransportException
     */ private void startServer() {
        Thread startServerThread = new Thread(new StartServerThread());
        startServerThread.start();
    } private class StartServerThread implements Runnable { @Override public void run() {
            MainProcessor.LOGGER.info("看到這句就說明thrift服務端準備工做 ...."); // 服務執行控制器(只要是調度服務的具體實現該如何運行) TProcessor tprocessor = new DIYFrameworkService.Processor<Iface>(new DIYFrameworkServiceImpl()); // 基於阻塞式同步IO模型的Thrift服務,正式生產環境不建議用這個 TServerSocket serverTransport = null; try {
                serverTransport = new TServerSocket(MainProcessor.SERVER_PORT);
            } catch (TTransportException e) {
                MainProcessor.LOGGER.error(e);
                System.exit(-1);
            } // 爲這個服務器設置對應的IO網絡模型、設置使用的消息格式封裝、設置線程池參數 Args tArgs = new Args(serverTransport);
            tArgs.processor(tprocessor);
            tArgs.protocolFactory(new TBinaryProtocol.Factory());
            tArgs.executorService(Executors.newFixedThreadPool(100)); // 啓動這個thrift服務 TThreadPoolServer server = new TThreadPoolServer(tArgs);
            server.setServerEventHandler(new StartServerEventHandler());
            server.serve();
        }
    } /**
     * 爲這個TThreadPoolServer對象,設置是一個事件處理器。
     * 以便在TThreadPoolServer正式開始監聽服務請求前,通知mainProcessor:
     * 「Apache Thrift已經成功啓動了」
     * @author yinwenjie
     *
     */ private class StartServerEventHandler implements TServerEventHandler { @Override public void preServe() { /*
             * 須要實現這個方法,以便在服務啓動成功後,
             * 通知mainProcessor: 「Apache Thrift已經成功啓動了」
             * */ MainProcessor.this.isthriftStart = true; synchronized (MainProcessor.this) {
                MainProcessor.this.notify();
            }
        } /* (non-Javadoc)
         * @see org.apache.thrift.server.TServerEventHandler#createContext(org.apache.thrift.protocol.TProtocol, org.apache.thrift.protocol.TProtocol)
         */ @Override public ServerContext createContext(TProtocol input, TProtocol output) { /*
             * 無需實現
             * */ return null;
        } @Override public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { /*
             * 無需實現
             * */ } @Override public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) { /*
             * 無需實現
             * */ }
    } /**
     * 這是默認的watcher,什麼也沒有,也不須要有什麼<br>
     * 由於按照功能需求,服務器端並不須要監控zk上的任何目錄變化事件
     * @author yinwenjie
     */ private class MyDefaultWatcher implements Watcher { public void process(WatchedEvent event) {

        }
    }
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241

3-二、編寫服務具體實現

服務端具體實現的代碼很簡單,就是在IDL腳本生成了java代碼後,對DIYFrameworkService接口進行的實現。服務器

package processor; import java.nio.ByteBuffer; import net.sf.json.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.thrift.TException; import business.BusinessService; import business.BusinessServicesMapping; import business.exception.BizException; import business.exception.ResponseCode; import business.pojo.AbstractPojo; import business.pojo.BusinessResponsePojo; import business.pojo.DescPojo; import thrift.iface.DIYFrameworkService.Iface; import thrift.iface.EXCCODE; import thrift.iface.RESCODE; import thrift.iface.Reponse; import thrift.iface.Request; import thrift.iface.ServiceException; import utils.JSONUtils; /**
 * IDL文件中,咱們定義的惟一服務接口DIYFrameworkService.Iface的惟一實現
 * @author yinwenjie
 *
 */ public class DIYFrameworkServiceImpl implements Iface { /**
     * 日誌
     */ public static final Log LOGGER = LogFactory.getLog(DIYFrameworkServiceImpl.class); /* (non-Javadoc)
     * @see thrift.iface.DIYFrameworkService.Iface#send(thrift.iface.Request)
     */ @SuppressWarnings("unchecked") @Override public Reponse send(Request request) throws ServiceException, TException { /*
         * 因爲MainProcessor中,在Apache Thrift 服務端啓動時已經加入了線程池,因此這裏就不須要再使用線程池了
         * 這個服務方法的實現,須要作如下事情:
         * 
         * 一、根據request中,描述的具體服務名稱,在配置信息中查找具體的服務類
         * 二、使用java的反射機制,調用具體的服務類(BusinessService接口的實現類)。
         * 三、根據具體的業務處理結構,構造Reponse對象,並進行返回
         * */ //一、=================== String serviceName = request.getServiceName();
        String className = BusinessServicesMapping.SERVICES_MAPPING.get(serviceName); //未發現服務 if(StringUtils.isEmpty(className)) { return this.buildErrorReponse("無效的服務" , null);
        } //二、=================== // 首先獲得以json爲描述格式的請求參數信息 JSONObject paramJSON = null; try { byte [] paramJSON_bytes = request.getParamJSON(); if(paramJSON_bytes != null && paramJSON_bytes.length > 0) {
                String paramJSON_string = new String(paramJSON_bytes);
                paramJSON = JSONObject.fromObject(paramJSON_string);
            }
        } catch(Exception e) {
            DIYFrameworkServiceImpl.LOGGER.error(e); // 向調用者拋出異常 throw new ServiceException(EXCCODE.PARAMNOTFOUND, e.getMessage());
        } // 試圖進行反射 BusinessService<AbstractPojo> businessServiceInstance = null; try {
            businessServiceInstance = (BusinessService<AbstractPojo>)Class.forName(className).newInstance();
        } catch (Exception e) {
            DIYFrameworkServiceImpl.LOGGER.error(e); // 向調用者拋出異常 throw new ServiceException(EXCCODE.SERVICENOTFOUND, e.getMessage());
        } // 進行調用 AbstractPojo returnPojo = null; try {
            returnPojo = businessServiceInstance.handle(paramJSON);
        } catch (BizException e) {
            DIYFrameworkServiceImpl.LOGGER.error(e); return this.buildErrorReponse(e.getMessage() , e.getResponseCode());
        } // 構造處理成功狀況下的返回信息 BusinessResponsePojo responsePojo = new BusinessResponsePojo();
        responsePojo.setData(returnPojo);
        DescPojo descPojo = new DescPojo("", ResponseCode._200);
        responsePojo.setDesc(descPojo); // 生成json String returnString = JSONUtils.toString(responsePojo); byte[] returnBytes = returnString.getBytes();
        ByteBuffer returnByteBuffer = ByteBuffer.allocate(returnBytes.length);
        returnByteBuffer.put(returnBytes); // 構造response Reponse reponse = new Reponse(RESCODE._200, returnByteBuffer); return reponse;
    } /**
     * 這個私有方法,用於構造「Thrift中錯誤的返回信息」
     * @param erroe_mess
     * @return */ private Reponse buildErrorReponse(String erroe_mess , ResponseCode responseCode) { // 構造返回信息 BusinessResponsePojo responsePojo = new BusinessResponsePojo();
        responsePojo.setData(null);
        DescPojo descPojo = new DescPojo(erroe_mess, responseCode == null?ResponseCode._504:responseCode);
        responsePojo.setDesc(descPojo); // 存儲byteBuffer; String responseJSON = JSONUtils.toString(responsePojo); byte[] responseJSON_bytes = responseJSON.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(responseJSON_bytes.length);
        byteBuffer.put(byteBuffer);
        Reponse reponse = new Reponse(RESCODE._500, byteBuffer); return reponse;
    }
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133

3-三、編寫客戶端實現

在上文中已經介紹過了,客戶端有兩件事情須要作:鏈接到zookeeper查詢註冊的服務該如何訪問;而後向真實的服務提供者發起請求。代碼以下:網絡

package client; import java.nio.ByteBuffer; import java.util.List; import net.sf.json.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import thrift.iface.DIYFrameworkService.Client; import thrift.iface.Reponse; import thrift.iface.Request; import utils.JSONUtils; public class ThriftClient { /**
     * 日誌
     */ private static final Log LOGGER = LogFactory.getLog(ThriftClient.class); private static final String SERVCENAME = "queryUserDetailService"; static {
        BasicConfigurator.configure();
    } public static final void main(String[] main) throws Exception { /*
         * 服務治理框架的客戶端示例,要作如下事情:
         * 
         * 一、鏈接到zk,查詢當前zk下提供的服務列表中是否有本身須要的服務名稱(queryUserDetailService)
         * 二、若是沒有找到須要的服務名稱,則客戶端終止工做
         * 三、若是找到了服務,則經過服務給出的ip,port,基於Thrift進行正式請求
         * (這時,和zookeeper是否斷開,關係就不大了)
         * */ // 一、=========================== // 默認的監聽器 ClientDefaultWatcher defaultWatcher = new ClientDefaultWatcher(); // 鏈接到zk服務器集羣,添加默認的watcher監聽 ZooKeeper zk = new ZooKeeper("192.168.61.128:2181", 120000, defaultWatcher); /*
         * 爲何客戶端鏈接上來之後,也可能建立一個Service根目錄呢?
         * 由於正式的環境下,不能保證客戶端一點就在服務器端所有準備好的狀況下,再來作調用請求
         * */ Stat pathStat = null; try {
            pathStat = zk.exists("/Service", defaultWatcher); //若是條件成立,說明節點不存在(只須要判斷一個節點的存在性便可) //建立的這個節點是一個「永久狀態」的節點 if(pathStat == null) {
                zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch(Exception e) {
            System.exit(-1);
        } // 二、=========================== //獲取服務列表(不須要作任何的事件監聽,因此第二個參數能夠爲false) List<String> serviceList = zk.getChildren("/Service", false); if(serviceList == null || serviceList.isEmpty()) {
            ThriftClient.LOGGER.info("未發現相關服務,客戶端退出"); return;
        } //而後查看要找尋的服務是否在存在 boolean isFound = false; byte[] data; for (String serviceName : serviceList) { if(StringUtils.equals(serviceName, ThriftClient.SERVCENAME)) {
                isFound = true; break;
            }
        } if(!isFound) {
            ThriftClient.LOGGER.info("未發現相關服務,客戶端退出"); return;
        } else {
            data = zk.getData("/Service/" + ThriftClient.SERVCENAME, false, null);
        } /*
         * 執行到這裏,zk的工做就完成了,接下來zk是否斷開,就不重要了
         * */ zk.close(); if(data == null || data.length == 0) {
            ThriftClient.LOGGER.info("未發現有效的服務端地址,客戶端退出"); return;
        } // 獲得服務器地值說明 JSONObject serverTargetJSON = null;
        String serverIp;
        String serverPort; try {
            serverTargetJSON = JSONObject.fromObject(new String(data));
            serverIp = serverTargetJSON.getString("ip");
            serverPort = serverTargetJSON.getString("port");
        } catch(Exception e) {
            ThriftClient.LOGGER.error(e); return;
        } //三、=========================== TSocket transport = new TSocket(serverIp, Integer.parseInt(serverPort));
        TProtocol protocol = new TBinaryProtocol(transport); // 準備調用參數 JSONObject jsonParam = new JSONObject();
        jsonParam.put("username", "yinwenjie"); byte[] params = jsonParam.toString().getBytes();
        ByteBuffer buffer = ByteBuffer.allocate(params.length);
        buffer.put(params);
        Request request = new Request(buffer, ThriftClient.SERVCENAME); // 開始調用 Client client = new Client(protocol); // 準備傳輸 transport.open(); // 正式調用接口 Reponse reponse = client.send(request); // 必定要記住關閉 transport.close(); // 將返回信息顯示出來 ThriftClient.LOGGER.info(JSONUtils.toString(reponse));
    }
} /**
 * 這是默認的watcher,什麼也沒有,也不須要有什麼<br>
 * 由於按照功能需求,客戶端並不須要監控zk上的任何目錄變化事件
 * @author yinwenjie
 */ class ClientDefaultWatcher implements Watcher { public void process(WatchedEvent event) {

    }
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149

3-四、工程結構說明

以上代碼是服務器端、客戶端的主要代碼。整個工程還有其餘的輔助代碼,爲了讓各位讀者可以看得清楚直接,咱們將整個工程結構進行一下說明,下載後導入的工程結構以下圖所示:架構

這裏寫圖片描述

  1. 這是一個典型的JAVA工程。請使用 JDK 1.6+ 版本。咱們將講解整個工程結構。首先來看看這個工程中主要的package和它們的做用。app

  2. business:具體的業務層邏輯都在這個包裏面,其中exception包含了一個業務層異常的定義BizException,還有錯誤代碼ResponseCode;impl包中放置具體的業務層實現,它們都必須實現BusinessService接口;Pojo是業務層對象模型。client:爲了簡單起見,我將服務端的實現和客戶端的實現放置在一個工程中,client這個包就是客戶端的實現代碼了;utils包放置了兩個工具類,用來進行日期格式化的DataUtils和用來進行json轉換的JSONUtils。框架

  3. 定義的apache thrift IDL文件放置在thrift文件夾下面,名字叫作:demoHello.thrift;您可使用它生成各類語言的代碼;maven

  4. 工程須要maven的支持。

相關文章
相關標籤/搜索