上一篇博客,講述Hadoop V2的序列化機制,這爲咱們學習Hadoop V2的RPC機制奠基了基礎。RPC的內容涵蓋的信息有點多,包含Hadoop的序列化機制,RPC,代理,NIO等。若對Hadoop序列化不瞭解的同窗,能夠參考《Hadoop2源碼分析-序列化篇》。今天這篇博客爲你們介紹的內容目錄以下:html
那麼,下面開始今天的學習之路。java
首先,咱們要弄明白,什麼是RPC?RPC能用來作什麼?git
RPC的全程是Remote Procedure Call,中文釋爲遠程過程調用。也就是說,調用的過程代碼(業務服務代碼)並不在調用者本地運行,而是要實現調用着和被調用着之間的鏈接通訊,有同窗可能已經發現,這個和C/S模式很像。沒錯,RPC的基礎通訊模式是基於C/S進程間相互通訊的模式來實現的,它對Client端提供遠程接口服務,其RPC原理圖以下所示:apache
咱們都知道,在過去的編程概念中,過程是由開發人員在本地編譯完成的,而且只能侷限在本地運行的某一段代碼,即主程序和過程程序是一種本地調用關係。所以,這種結構在現在網絡飛速發展的狀況下已沒法適應實際的業務需求。並且,傳統過程調用模式沒法充分利用網絡上其餘主機的資源,如CPU,內存等,也沒法提升代碼在Bean之間的共享,使得資源浪費較大。編程
而RPC的出現,正好有效的解決了傳統過程當中存在的這些不足。經過RPC,咱們能夠充分利用非共享內存的機器,能夠簡便的將應用分佈在多臺機器上,相似集羣分佈。這樣方便的實現過程代碼共享,提升系統資源的利用率。減小單個集羣的壓力,實現負載均衡。網絡
在學習Hadoop V2的RPC機制以前,咱們先來熟悉第三方的RPC機制是如何工做的,下面我以Thrift框架爲例子。app
Thrift是一個軟件框架,用來進行可擴展且跨語言的服務開發協議。它擁有強大的代碼生成引擎,支持C++,Java,Python,PHP,Ruby等編程語言。Thrift容許定義一個簡單的定義文件(以.thirft結尾),文件中包含數據類型和服務接口。用以做爲輸入文件,編譯器生成代碼用來方便的生成RPC客戶端和服務端通訊的編程語言。具體Thrift安裝過程請參考《Mac OS X 下搭建thrift環境》。負載均衡
下面給出Thrift的原理圖,以下所示:
框架
下面爲你們解釋一下上面的原理圖,首先,咱們編譯完thrift定義文件後(這裏我使用的是Java語言),會生成對應的Java類文件,該類的Iface接口定義了咱們所規範的接口函數。在服務端,實現Iface接口,編寫對應函數下的業務邏輯,啓動服務。客戶端一樣須要生成的Java類文件,以供Client端調用相應的接口函數,監聽服務端的IP和PORT來獲取鏈接對象。dom
package cn.rpc.main; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.rpc.service.StatQueryService; import cn.rpc.service.impl.StatQueryServiceImpl; /** * @Date Mar 23, 2015 * * @Author dengjie */ public class StatsServer { private static Logger logger = LoggerFactory.getLogger(StatsServer.class); private final int PORT = 9090; @SuppressWarnings({ "rawtypes", "unchecked" }) private void start() { try { TNonblockingServerSocket socket = new TNonblockingServerSocket(PORT); final StatQueryService.Processor processor = new StatQueryService.Processor(new StatQueryServiceImpl()); THsHaServer.Args arg = new THsHaServer.Args(socket); /* * Binary coded format efficient, intensive data transmission, The * use of non blocking mode of transmission, according to the size * of the block, similar to the Java of NIO */ arg.protocolFactory(new TCompactProtocol.Factory()); arg.transportFactory(new TFramedTransport.Factory()); arg.processorFactory(new TProcessorFactory(processor)); TServer server = new THsHaServer(arg); server.serve(); } catch (Exception ex) { ex.printStackTrace(); } } public static void main(String[] args) { try { logger.info("start thrift server..."); StatsServer stats = new StatsServer(); stats.start(); } catch (Exception ex) { ex.printStackTrace(); logger.error(String.format("run thrift server has error,msg is %s", ex.getMessage())); } } }
package cn.rpc.test; import java.util.Map; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import cn.rpc.service.StatQueryService; /** * @Date Mar 23, 2015 * * @Author dengjie * * @Note Test thrift client */ public class StatsClient { public static final String ADDRESS = "127.0.0.1"; public static final int PORT = 9090; public static final int TIMEOUT = 30000; public static void main(String[] args) { if (args.length < 4) { System.out.println("args length must >= 4,current length is " + args.length); System.out.println("<info>****************</info>"); System.out.println("ADDRESS,beginDate,endDate,kpiCode,..."); System.out.println("<info>****************</info>"); return; } TTransport transport = new TFramedTransport(new TSocket(args[0], PORT, TIMEOUT)); TProtocol protocol = new TCompactProtocol(transport); StatQueryService.Client client = new StatQueryService.Client(protocol); String beginDate = args[1]; // "20150308" String endDate = args[2]; // "20150312" String kpiCode = args[3]; // "login_times" String userName = ""; int areaId = 0; String type = ""; String fashion = ""; try { transport.open(); Map<String, String> map = client.queryConditionDayKPI(beginDate, endDate, kpiCode, userName, areaId, type, fashion); System.out.println(map.toString()); } catch (Exception e) { e.printStackTrace(); } finally { transport.close(); } } }
這個類的代碼量太大,暫不貼出。須要的同窗請到如下地址下載。
下載地址:git@gitlab.com:dengjie/Resource.git
下面實現其中一個函數的內容,代碼以下所示:
package cn.rpc.service.impl; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.thrift.TException; import cn.rpc.conf.ConfigureAPI; import cn.rpc.dao.KpiDao; import cn.rpc.domain.ReportParam; import cn.rpc.domain.ReportResult; import cn.rpc.service.StatQueryService; import cn.rpc.util.MapperFactory; /** * @Date Mar 23, 2015 * * @Author dengjie */ public class StatQueryServiceImpl implements StatQueryService.Iface { public Map<String, String> queryDayKPI(String beginDate, String endDate, String kpiCode) throws TException { return null; } public Map<String, String> queryConditionDayKPI(String beginDate, String endDate, String kpiCode, String userName, int areaId, String type, String fashion) throws TException { Map<String, String> res = new HashMap<String, String>(); ReportParam param = new ReportParam(); param.setBeginDate(beginDate + ""); param.setEndDate(endDate + ""); param.setKpiCode(kpiCode); param.setUserName(userName == "" ? null : userName); param.setDistrictId(areaId < 0 ? 0 : areaId); param.setProductStyle(fashion == "" ? null : fashion); param.setCustomerProperty(type == "" ? null : type); List<ReportResult> chart = ((KpiDao) MapperFactory.createMapper(KpiDao.class)).getChartAmount(param); Map<String, Integer> title = ((KpiDao) MapperFactory.createMapper(KpiDao.class)).getTitleAmount(param); List<Map<String, Integer>> tableAmount = ((KpiDao) MapperFactory.createMapper(KpiDao.class)) .getTableAmount(param); String avgTime = kpiCode.split("_")[0]; param.setKpiCode(avgTime + "_avg_time"); List<Map<String, Integer>> tableAvgTime = ((KpiDao) MapperFactory.createMapper(KpiDao.class)) .getTableAmount(param); res.put(ConfigureAPI.RESMAPKEY.CHART, chart.toString()); res.put(ConfigureAPI.RESMAPKEY.TITLE, title.toString()); res.put(ConfigureAPI.RESMAPKEY.TABLEAMOUNT, tableAmount.toString()); res.put(ConfigureAPI.RESMAPKEY.TABLEAVG, tableAvgTime.toString()); return res; } public Map<String, String> queryDetail(String beginDate, String endDate, String userName) throws TException { // TODO Auto-generated method stub return null; } }
Hadoop V2中的RPC採用的是本身獨立開發的協議,其核心內容包含服務端,客戶端,交互協議。源碼內容都在hadoop-common-project項目的org.apache.hadoop.ipc包下面。
package org.apache.hadoop.ipc; import java.io.IOException; /** * Superclass of all protocols that use Hadoop RPC. * Subclasses of this interface are also supposed to have * a static final long versionID field. */ public interface VersionedProtocol { /** * Return protocol version corresponding to protocol interface. * @param protocol The classname of the protocol interface * @param clientVersion The version of the protocol that the client speaks * @return the version that the server will speak * @throws IOException if any IO error occurs */ public long getProtocolVersion(String protocol, long clientVersion) throws IOException; /** * Return protocol version corresponding to protocol interface. * @param protocol The classname of the protocol interface * @param clientVersion The version of the protocol that the client speaks * @param clientMethodsHash the hashcode of client protocol methods * @return the server protocol signature containing its version and * a list of its supported methods * @see ProtocolSignature#getProtocolSignature(VersionedProtocol, String, * long, int) for a default implementation */ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException; }
該類中的兩個方法一個是做爲版本,另外一個做爲簽名用。
/** An RPC Server. */ public abstract static class Server extends org.apache.hadoop.ipc.Server { boolean verbose; static String classNameBase(String className) { String[] names = className.split("\\.", -1); if (names == null || names.length == 0) { return className; } return names[names.length-1]; }
對外提供服務,處理Client端的請求,並返回處理結果。
至於Client端,監聽Server端的IP和PORT,封裝請求數據,並接受Response。
這篇博客贅述了RPC的相關內容,讓你們先熟悉一下RPC的相關機制和流程,並簡述了Hadoop V2的RPC機制,關於Hadoop V2的RPC詳細內容會在下一篇博客中給你們分享。這裏只是讓你們先對Hadoop V2的RPC機制有個初步的認識。
這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!