出自:https://my.oschina.net/hosee/blog/711632html
在學校期間你們都寫過很多程序,好比寫個hello world服務類,而後本地調用下,以下所示。這些程序的特色是服務消費方和服務提供方是本地調用關係。java
public class Test { public static void main(String[] args) { HelloWorldService helloWorldService = new HelloWorldServiceImpl(); helloWorldService.sayHello("test"); } }
而一旦踏入公司尤爲是大型互聯網公司就會發現,公司的系統都由成千上萬大大小小的服務組成,各服務部署在不一樣的機器上,由不一樣的團隊負責。node
這時就會遇到兩個問題:web
因爲各服務部署在不一樣機器,服務間的調用免不了網絡通訊過程,服務消費方每調用一個服務都要寫一坨網絡通訊相關的代碼,不只複雜並且極易出錯。編程
若是有一種方式能讓咱們像調用本地服務同樣調用遠程服務,而讓調用者對網絡通訊這些細節透明,那麼將大大提升生產力,好比服務消費方在執行helloWorldService.sayHello("test")時,實質上調用的是遠端的服務。這種方式其實就是RPC(Remote Procedure Call Protocol),在各大互聯網公司中被普遍使用,如阿里巴巴的hsf、dubbo(開源)、Facebook的thrift(開源)、Google grpc(開源)、Twitter的finagle(開源)等。網絡
要讓網絡通訊細節對使用者透明,咱們須要對通訊細節進行封裝,咱們先看下一個RPC調用的流程涉及到哪些通訊細節:數據結構
RPC的目標就是要2~8這些步驟都封裝起來,讓用戶對這些細節透明。併發
怎麼封裝通訊細節才能讓用戶像以本地調用方式調用遠程服務呢?對java來講就是使用代理!java代理有兩種方式:負載均衡
儘管字節碼生成方式實現的代理更爲強大和高效,但代碼維護不易,大部分公司實現RPC框架時仍是選擇動態代理方式。框架
下面簡單介紹下動態代理怎麼實現咱們的需求。咱們須要實現RPCProxyClient代理類,代理類的invoke方法中封裝了與遠端服務通訊的細節,消費方首先從RPCProxyClient得到服務提供方的接口,當執行helloWorldService.sayHello("test")方法時就會調用invoke方法。
public class RPCProxyClient implements java.lang.reflect.InvocationHandler{ private Object obj; public RPCProxyClient(Object obj){ this.obj=obj; } /** * 獲得被代理對象; */ public static Object getProxy(Object obj){ return java.lang.reflect.Proxy.newProxyInstance(obj.getClass().getClassLoader(), obj.getClass().getInterfaces(), new RPCProxyClient(obj)); } /** * 調用此方法執行 */ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //結果參數; Object result = new Object(); // ...執行通訊相關邏輯 // ... return result; } }
public class Test { public static void main(String[] args) { HelloWorldService helloWorldService = (HelloWorldService)RPCProxyClient.getProxy(HelloWorldService.class); helloWorldService.sayHello("test"); } }
上節講了invoke裏須要封裝通訊細節(通訊細節再後面幾章詳細探討),而通訊的第一步就是要肯定客戶端和服務端相互通訊的消息結構。客戶端的請求消息結構通常須要包括如下內容:
1)接口名稱
在咱們的例子裏接口名是「HelloWorldService」,若是不傳,服務端就不知道調用哪一個接口了;
2)方法名
一個接口內可能有不少方法,若是不傳方法名服務端也就不知道調用哪一個方法;
3)參數類型&參數值
參數類型有不少,好比有bool、int、long、double、string、map、list,甚至如struct(class);以及相應的參數值;
4)超時時間
5)requestID,標識惟一請求id,在下面一節會詳細描述requestID的用處。
同理服務端返回的消息結構通常包括如下內容。
1)返回值
2)狀態code
3)requestID
一旦肯定了消息的數據結構後,下一步就是要考慮序列化與反序列化了。
什麼是序列化?序列化就是將數據結構或對象轉換成二進制串的過程,也就是編碼的過程。
什麼是反序列化?將在序列化過程當中所生成的二進制串轉換成數據結構或者對象的過程。
爲何須要序列化?轉換爲二進制串後纔好進行網絡傳輸嘛!
爲何須要反序列化?將二進制轉換爲對象纔好進行後續處理!
現現在序列化的方案愈來愈多,每種序列化方案都有優勢和缺點,它們在設計之初有本身獨特的應用場景,那到底選擇哪一種呢?從RPC的角度上看,主要看三點:
目前互聯網公司普遍使用Protobuf、Thrift、Avro等成熟的序列化解決方案來搭建RPC框架,這些都是久經考驗的解決方案。
消息數據結構被序列化爲二進制串後,下一步就要進行網絡通訊了。目前有兩種經常使用IO通訊模型:1)BIO;2)NIO。通常RPC框架須要支持這兩種IO模型。
如何實現RPC的IO通訊框架呢?
若是使用netty的話,通常會用channel.writeAndFlush()方法來發送消息二進制串,這個方法調用後對於整個遠程調用(從發出請求到接收到結果)來講是一個異步的,即對於當前線程來講,將請求發送出來後,線程就能夠日後執行了,至於服務端的結果,是服務端處理完成後,再以消息的形式發送給客戶端的。因而這裏出現如下兩個問題:
以下圖所示,線程A和線程B同時向client socket發送請求requestA和requestB,socket前後將requestB和requestA發送至server,而server可能將responseA先返回,儘管requestA請求到達時間更晚。咱們須要一種機制保證responseA丟給ThreadA,responseB丟給ThreadB。
怎麼解決呢?
public Object get() { synchronized (this) { // 旋鎖 while (!isDone) { // 是否有結果了 wait(); //沒結果是釋放鎖,讓當前線程處於等待狀態 } } }
private void setDone(Response res) { this.res = res; isDone = true; synchronized (this) { //獲取鎖,由於前面wait()已經釋放了callback的鎖了 notifyAll(); // 喚醒處於等待的線程 } }
如何讓別人使用咱們的服務呢?有同窗說很簡單嘛,告訴使用者服務的IP以及端口就能夠了啊。確實是這樣,這裏問題的關鍵在因而自動告知仍是人肉告知。
人肉告知的方式:若是你發現你的服務一臺機器不夠,要再添加一臺,這個時候就要告訴調用者我如今有兩個ip了,大家要輪詢調用來實現負載均衡;調用者咬咬牙改了,結果某天一臺機器掛了,調用者發現服務有一半不可用,他又只能手動修改代碼來刪除掛掉那臺機器的ip。現實生產環境固然不會使用人肉方式。
有沒有一種方法能實現自動告知,即機器的增添、剔除對調用方透明,調用者再也不須要寫死服務提供方地址?固然能夠,現現在zookeeper被普遍用於實現服務自動註冊與發現功能!
簡單來說,zookeeper能夠充當一個服務註冊表
(Service Registry),讓多個服務提供者
造成一個集羣,讓服務消費者
經過服務註冊表獲取具體的服務訪問地址(ip+端口)去訪問具體的服務提供者。以下圖所示:
具體來講,zookeeper就是個分佈式文件系統,每當一個服務提供者部署後都要將本身的服務註冊到zookeeper的某一路徑上: /{service}/{version}/{ip:port}, 好比咱們的HelloWorldService部署到兩臺機器,那麼zookeeper上就會建立兩條目錄:分別爲/HelloWorldService/1.0.0/100.19.20.01:16888 /HelloWorldService/1.0.0/100.19.20.02:16888。
zookeeper提供了「心跳檢測」功能,它會定時向各個服務提供者發送一個請求(實際上創建的是一個 Socket 長鏈接),若是長期沒有響應,服務中心就認爲該服務提供者已經「掛了」,並將其剔除,好比100.19.20.02這臺機器若是宕機了,那麼zookeeper上的路徑就會只剩/HelloWorldService/1.0.0/100.19.20.01:16888。
服務消費者會去監聽相應路徑(/HelloWorldService/1.0.0),一旦路徑上的數據有任務變化(增長或減小),zookeeper都會通知服務消費方服務提供者地址列表已經發生改變,從而進行更新。
更爲重要的是zookeeper與生俱來的容錯容災能力(好比leader選舉),能夠確保服務註冊表的高可用性。
ipc.RPC類中有一些內部類,爲了你們對RPC類有個初步的印象,就先羅列幾個咱們感興趣的分析一下吧:
Invocation :用於封裝方法名和參數,做爲數據傳輸層。
ClientCache :用於存儲client對象,用socket factory做爲hash key,存儲結構爲hashMap <SocketFactory, Client>。
Invoker :是動態代理中的調用實現類,繼承了InvocationHandler.
Server :是ipc.Server的實現類。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ••• ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); ••• return value.get(); }
若是你發現這個invoke()方法實現的有些奇怪的話,那你就對了。通常咱們看到的動態代理的invoke()方法中總會有 method.invoke(ac, arg); 這句代碼。而上面代碼中卻沒有,這是爲何呢?其實使用 method.invoke(ac, arg); 是在本地JVM中調用;而在hadoop中,是將數據發送給服務端,服務端將處理的結果再返回給客戶端,因此這裏的invoke()方法必然須要進行網絡通訊。而網絡通訊就是下面的這段代碼實現的:
ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);
Invocation類在這裏封裝了方法名和參數。其實這裏網絡通訊只是調用了Client類的call()方法。那咱們接下來分析一下ipc.Client源碼吧。和第一章同樣,一樣是3個問題
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); //將傳入的數據封裝成call對象 Connection connection = getConnection(remoteId, call); //得到一個鏈接 connection.sendParam(call); // 向服務端發送call對象 boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // 等待結果的返回,在Call類的callComplete()方法裏有notify()方法用於喚醒線程 } catch (InterruptedException ie) { // 因中斷異常而終止,設置標誌interrupted爲true interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // 本地異常 throw wrapException(remoteId.getAddress(), call.error); } } else { return call.value; //返回結果數據 } } }
具體代碼的做用我已作了註釋,因此這裏再也不贅述。但到目前爲止,你依然不知道RPC機制底層的網絡鏈接是怎麼創建的。分析代碼後,咱們會發現和網絡通訊有關的代碼只會是下面的兩句了:
Connection connection = getConnection(remoteId, call); //得到一個鏈接 connection.sendParam(call); // 向服務端發送call對象
先看看是怎麼得到一個到服務端的鏈接吧,下面貼出ipc.Client類中的getConnection()方法。
private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException { if (!running.get()) { // 若是client關閉了 throw new IOException("The client is stopped"); } Connection connection; //若是connections鏈接池中有對應的鏈接對象,就不需從新建立了;若是沒有就需從新建立一個鏈接對象。 //但請注意,該//鏈接對象只是存儲了remoteId的信息,其實還並無和服務端創建鏈接。 do { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId); connections.put(remoteId, connection); } } } while (!connection.addCall(call)); //將call對象放入對應鏈接中的calls池,就不貼出源碼了 //這句代碼纔是真正的完成了和服務端創建鏈接哦~ connection.setupIOstreams(); return connection; }
下面貼出Client.Connection類中的setupIOstreams()方法:
private synchronized void setupIOstreams() throws InterruptedException { ••• try { ••• while (true) { setupConnection(); //創建鏈接 InputStream inStream = NetUtils.getInputStream(socket); //得到輸入流 OutputStream outStream = NetUtils.getOutputStream(socket); //得到輸出流 writeRpcHeader(outStream); ••• this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(inStream))); //將輸入流裝飾成DataInputStream this.out = new DataOutputStream (new BufferedOutputStream(outStream)); //將輸出流裝飾成DataOutputStream writeHeader(); // 跟新活動時間 touch(); //當鏈接創建時,啓動接受線程等待服務端傳回數據,注意:Connection繼承了Tread start(); return; } } catch (IOException e) { markClosed(e); close(); } }
再有一步咱們就知道客戶端的鏈接是怎麼創建的啦,下面貼出Client.Connection類中的setupConnection()方法:
private synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; while (true) { try { this.socket = socketFactory.createSocket(); //終於看到建立socket的方法了 this.socket.setTcpNoDelay(tcpNoDelay); ••• // 設置鏈接超時爲20s NetUtils.connect(this.socket, remoteId.getAddress(), 20000); this.socket.setSoTimeout(pingInterval); return; } catch (SocketTimeoutException toe) { /* 設置最多鏈接重試爲45次。 * 總共有20s*45 = 15 分鐘的重試時間。 */ handleConnectionFailure(timeoutFailures++, 45, toe); } catch (IOException ie) { handleConnectionFailure(ioFailures++, maxRetries, ie); } } }
終於,咱們知道了客戶端的鏈接是怎樣創建的了,其實就是建立一個普通的socket進行通訊。
下面貼出Client.Connection類的sendParam()方法吧:
public void sendParam(Call call) { if (shouldCloseConnection.get()) { return; } DataOutputBuffer d=null; try { synchronized (this.out) { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); //建立一個緩衝區 d = new DataOutputBuffer(); d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); //首先寫出數據的長度 out.write(data, 0, dataLength); //向服務端寫數據 out.flush(); } } catch(IOException e) { markClosed(e); } finally { IOUtils.closeStream(d); } }
下面貼出Client.Connection類和Client.Call類中的相關方法:
方法一: public void run() { ••• while (waitForWork()) { receiveResponse(); //具體的處理方法 } close(); ••• } 方法二: private void receiveResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { int id = in.readInt(); // 阻塞讀取id if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); Call call = calls.get(id); //在calls池中找到發送時的那個對象 int state = in.readInt(); // 阻塞讀取call對象的狀態 if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // 讀取數據 //將讀取到的值賦給call對象,同時喚醒Client等待線程,貼出setValue()代碼方法三 call.setValue(value); calls.remove(id); //刪除已處理的call } else if (state == Status.ERROR.state) { ••• } else if (state == Status.FATAL.state) { ••• } } catch (IOException e) { markClosed(e); } } 方法三: public synchronized void setValue(Writable value) { this.value = value; callComplete(); //具體實現 } protected synchronized void callComplete() { this.done = true; notify(); // 喚醒client等待線程 }
完成的功能主要是:啓動一個處理線程,讀取從服務端傳來的call對象,將call對象讀取完畢後,喚醒client處理線程。就這麼簡單,客戶端就獲取了服務端返回的數據了哦~。客戶端的源碼分析就到這裏了哦,下面咱們來分析Server端的源碼吧。
爲了讓你們對ipc.Server有個初步的瞭解,咱們先分析一下它的幾個內部類吧:
Call :用於存儲客戶端發來的請求
Listener : 監聽類,用於監聽客戶端發來的請求,同時Listener內部還有一個靜態類,Listener.Reader,當監聽器監聽到用戶請求,便讓Reader讀取用戶請求。
Responder :響應RPC請求類,請求處理完畢,由Responder發送給請求客戶端。
Connection :鏈接類,真正的客戶端請求讀取邏輯在這個類中。
Handler :請求處理類,會循環阻塞讀取callQueue中的call對象,並對其進行操做。
private void initialize(Configuration conf) throws IOException { ••• // 建立 rpc server InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf); if (dnSocketAddr != null) { int serviceHandlerCount = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); //得到serviceRpcServer this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); setRpcServiceServerAddress(conf); } //得到server this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(), handlerCount, false, conf, namesystem .getDelegationTokenSecretManager()); ••• this.server.start(); //啓動 RPC server Clients只容許鏈接該server if (serviceRpcServer != null) { serviceRpcServer.start(); //啓動 RPC serviceRpcServer 爲HDFS服務的server } startTrashEmptier(conf); }
查看Namenode初始化源碼得知:RPC的server對象是經過ipc.RPC類的getServer()方法得到的。下面我們去看看ipc.RPC類中的getServer()源碼吧:
public static Server getServer(final Object instance, final String bindAddress, final int port, final int numHandlers, final boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager); }
這時咱們發現getServer()是一個建立Server對象的工廠方法,但建立的倒是RPC.Server類的對象。哈哈,如今你明白了我前面說的「RPC.Server是ipc.Server的實現類」了吧。不過RPC.Server的構造函數仍是調用了ipc.Server類的構造函數的,因篇幅所限,就不貼出相關源碼了。
初始化Server後,Server端就運行起來了,看看ipc.Server的start()源碼吧:
/** 啓動服務 */ public synchronized void start() { responder.start(); //啓動responder listener.start(); //啓動listener handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); //逐個啓動Handler } }
分析過ipc.Client源碼後,咱們知道Client端的底層通訊直接採用了阻塞式IO編程,當時咱們曾作出猜想:Server端是否是也採用了阻塞式IO。如今咱們仔細地分析一下吧,若是Server端也採用阻塞式IO,當鏈接進來的Client端不少時,勢必會影響Server端的性能。hadoop的實現者們考慮到了這點,因此他們採用了java NIO來實現Server端,那Server端採用java NIO是怎麼創建鏈接的呢?分析源碼得知,Server端採用Listener監聽客戶端的鏈接,下面先分析一下Listener的構造函數吧:
public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // 建立ServerSocketChannel,並設置成非阻塞式 acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // 將server socket綁定到本地端口 bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); // 得到一個selector selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); //啓動多個reader線程,爲了防止請求多時服務端響應延時的問題 for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); } // 註冊鏈接事件 acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }
在啓動Listener線程時,服務端會一直等待客戶端的鏈接,下面貼出Server.Listener類的run()方法:
public void run() { ••• while (running) { SelectionKey key = null; try { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); //具體的鏈接方法 } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { ••• }
下面貼出Server.Listener類中doAccept()方法中的關鍵源碼吧:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { //創建鏈接 channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); //從readers池中得到一個reader try { reader.startAdd(); // 激活readSelector,設置adding爲true SelectionKey readKey = reader.registerChannel(channel);//將讀事件設置成興趣事件 c = new Connection(readKey, channel, System.currentTimeMillis());//建立一個鏈接對象 readKey.attach(c); //將connection對象注入readKey synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } ••• } finally { //設置adding爲false,採用notify()喚醒一個reader,其實代碼十三中啓動的每一個reader都使 //用了wait()方法等待。因篇幅有限,就不貼出源碼了。 reader.finishAdd(); } } }
當reader被喚醒,reader接着執行doRead()方法。
下面貼出Server.Listener.Reader類中的doRead()方法和Server.Connection類中的readAndProcess()方法源碼:
方法一: void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); //得到connection對象 if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); // 接受並處理請求 } catch (InterruptedException ieo) { ••• } ••• } 方法二: public int readAndProcess() throws IOException, InterruptedException { while (true) { ••• if (!rpcHeaderRead) { if (rpcHeaderBuffer == null) { rpcHeaderBuffer = ByteBuffer.allocate(2); } //讀取請求頭 count = channelRead(channel, rpcHeaderBuffer); if (count < 0 || rpcHeaderBuffer.remaining() > 0) { return count; } // 讀取請求版本號 int version = rpcHeaderBuffer.get(0); byte[] method = new byte[] {rpcHeaderBuffer.get(1)}; ••• data = ByteBuffer.allocate(dataLength); } // 讀取請求 count = channelRead(channel, data); if (data.remaining() == 0) { ••• if (useSasl) { ••• } else { processOneRpc(data.array());//處理請求 } ••• } } return count; } }
下面貼出Server.Connection類中的processOneRpc()方法和processData()方法的源碼。
方法一: private void processOneRpc(byte[] buf) throws IOException, InterruptedException { if (headerRead) { processData(buf); } else { processHeader(buf); headerRead = true; if (!authorizeConnection()) { throw new AccessControlException("Connection from " + this + " for protocol " + header.getProtocol() + " is unauthorized for user " + user); } } } 方法二: private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // 嘗試讀取id Writable param = ReflectionUtils.newInstance(paramClass, conf);//讀取參數 param.readFields(dis); Call call = new Call(id, param, this); //封裝成call callQueue.put(call); // 將call存入callQueue incRpcCount(); // 增長rpc請求的計數 }
RPC:
Web service
web service接口就是RPC中的stub組件,規定了server可以提供的服務(web service),這在server和client上是一致的,可是也是跨語言跨平臺的。同時,因爲web service規範中的WSDL文件的存在,如今各平臺的web service框架,均可以基於WSDL文件,自動生成web service接口 。
其實二者差很少,只是傳輸的協議不一樣。
1. http://www.cnblogs.com/LBSer/p/4853234.html
2. http://weixiaolu.iteye.com/blog/1504898
3. http://kyfxbl.iteye.com/blog/1745550