在《Hadoop2源碼分析-RPC機制初識》博客中,咱們對RPC機制有了初步的認識和了解,下面咱們對Hadoop V2的RPC機制作進一步探索,在研究Hadoop V2的RPC機制,咱們須要掌握相關的Java基礎知識,如:Java NIO、動態代理與反射等。本篇博客介紹的內容目錄以下所示:html
下面開始今天的博客介紹。java
Java NIO又稱Java New IO,它替代了Java IO API,提供了與標準IO不一樣的IO工做方式。Java NIO由一下核心組件組成:apache
下圖爲Java NIO的工做原理圖,以下圖所示:服務器
首先,咱們來看NIOServer的代碼塊。代碼內容以下所示:網絡
package cn.hadoop.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.conf.ConfigureAPI; /** * @Date May 8, 2015 * * @Author dengjie * * @Note Defined nio server */ public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); // The channel manager private Selector selector; /** * Get ServerSocket channel and initialize * * 1.Get a ServerSocket channel * * 2.Set channel for non blocking * * 3.The channel corresponding to the ServerSocket binding to port port * * 4.Get a channel manager * * 5.The channel manager and the channel binding, and the channel registered * SelectionKey.OP_ACCEPT event * * @param port * @throws IOException */ public void init(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); this.selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); } /** * listen selector * * @throws IOException */ public void listen() throws IOException { LOGGER.info("Server has start success"); while (true) { selector.select(); Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); ite.remove(); if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false);// 非阻塞 channel.write(ByteBuffer.wrap(new String("Send test info to client").getBytes())); channel.register(this.selector, SelectionKey.OP_READ);// 設置讀的權限 } else if (key.isReadable()) { read(key); } } } } /** * Deal client send event */ public void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); byte[] data = buffer.array(); String info = new String(data).trim(); LOGGER.info("Server receive info : " + info); ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes()); channel.write(outBuffer);// 將消息回送給客戶端 } public static void main(String[] args) { try { NIOServer server = new NIOServer(); server.init(ConfigureAPI.ServerAddress.NIO_PORT); server.listen(); } catch (Exception ex) { ex.printStackTrace(); LOGGER.error("NIOServer main run error,info is " + ex.getMessage()); } } }
而後,咱們在來看NIOClient的代碼塊,代碼具體內容以下所示:app
package cn.hadoop.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.conf.ConfigureAPI; /** * @Date May 8, 2015 * * @Author dengjie * * @Note Defined NIO client */ public class NIOClient { private static final Logger LOGGER = LoggerFactory.getLogger(NIOClient.class); private Selector selector; /** * Get ServerSocket channel and initialize */ public void init(String ip, int port) throws Exception { SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); this.selector = Selector.open(); channel.connect(new InetSocketAddress(ip, port)); channel.register(selector, SelectionKey.OP_CONNECT); } /** * listen selector */ public void listen() throws Exception { while (true) { selector.select(); Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); ite.remove(); if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key.channel(); if (channel.isConnectionPending()) { channel.finishConnect(); } channel.configureBlocking(false);// 非阻塞 channel.write(ByteBuffer.wrap(new String("Send test info to server").getBytes())); channel.register(this.selector, SelectionKey.OP_READ); } else if (key.isReadable()) { read(key); } } } } /** * Deal client send event */ public void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); byte[] data = buffer.array(); String info = new String(data).trim(); LOGGER.info("Client receive info : " + info); ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes()); channel.write(outBuffer); } public static void main(String[] args) { try { NIOClient client = new NIOClient(); client.init(ConfigureAPI.ServerAddress.NIO_IP, ConfigureAPI.ServerAddress.NIO_PORT); client.listen(); } catch (Exception ex) { ex.printStackTrace(); LOGGER.error("NIOClient main run has error,info is " + ex.getMessage()); } } }
下面給出ConfigureAPI類的代碼,內容以下所示:框架
package cn.hadoop.conf; /** * @Date May 7, 2015 * * @Author dengjie * * @Note Defined rpc info */ public class ConfigureAPI { public interface VersionID { public static final long RPC_VERSION = 7788L; } public interface ServerAddress { public static final int NIO_PORT = 8888; public static final String NIO_IP = "127.0.0.1"; } }
在Java中,動態代理主要用來作方法的加強,能夠在不修改源碼的狀況下,加強一些方法。另外,還有一個做用就是作遠程調用,好比如今有Java接口,該接口的實現部署在非本地服務器上,在編寫客戶端代碼時,因爲無法直接生成該對象,這個時候就須要考慮使用動態代理了。異步
而反射,利用了Class類做爲反射實例化對象的基本應用,對於一個實例化對象而言,它須要調用類中的構造方法,屬性和通常方法,這些操做均可以經過反射機制來完成。下面咱們用一個實例來理解這些理論。socket
package cn.java.base; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; /** * @Date May 7, 2015 * * @Author dengjie */ public class JProxy { public static void main(String[] args) { JInvocationHandler ji = new JInvocationHandler(); Subject sub = (Subject) ji.bind(new RealSubject()); System.out.println(sub.say("dengjie", 25)); } } interface Subject { public String say(String name, int age); } class RealSubject implements Subject { @Override public String say(String name, int age) { return name + "," + age; } } class JInvocationHandler implements InvocationHandler { private Object object = null; public Object bind(Object object) { this.object = object; return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object tmp = method.invoke(this.object, args); return tmp; } }
package cn.java.base; /** * @Date May 7, 2015 * * @Author dengjie */ public class JReflect { public static void main(String[] args) { Fruit f = Factory.getInstance(Orange.class.getName()); if (f != null) { f.eat(); } } } interface Fruit { public abstract void eat(); } class Apple implements Fruit { @Override public void eat() { System.out.println("apple"); } } class Orange implements Fruit { @Override public void eat() { System.out.println("orange"); } } class Factory { public static Fruit getInstance(String className) { Fruit f = null; try { f = (Fruit) Class.forName(className).newInstance(); } catch (Exception e) { e.printStackTrace(); } return f; } }
本實例主要演示經過Hadoop V2的RPC框架實現一個計算兩個整數的Add和Sub,服務接口爲 CaculateService ,繼承於 VersionedProtocol ,具體代碼以下所示:ide
package cn.hadoop.service; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.ProtocolInfo; import org.apache.hadoop.ipc.VersionedProtocol; import cn.hadoop.conf.ConfigureAPI; /** * @Date May 7, 2015 * * @Author dengjie * * @Note Data calculate service interface */ @ProtocolInfo(protocolName = "", protocolVersion = ConfigureAPI.VersionID.RPC_VERSION) public interface CaculateService extends VersionedProtocol { // defined add function public IntWritable add(IntWritable arg1, IntWritable arg2); // defined sub function public IntWritable sub(IntWritable arg1, IntWritable arg2); }
注意,本工程使用的是Hadoop-2.6.0版本,這裏CaculateService接口須要加入註解,來聲明版本號。
CaculateServiceImpl類實現CaculateService接口。代碼以下所示:
package cn.hadoop.service.impl; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.ProtocolSignature; import cn.hadoop.conf.ConfigureAPI; import cn.hadoop.service.CaculateService; /** * @Date May 7, 2015 * * @Author dengjie * * @Note Implements CaculateService class */ public class CaculateServiceImpl implements CaculateService { public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException { return this.getProtocolSignature(arg0, arg1, arg2); } /** * Check the corresponding version */ public long getProtocolVersion(String arg0, long arg1) throws IOException { return ConfigureAPI.VersionID.RPC_VERSION; } /** * Add nums */ public IntWritable add(IntWritable arg1, IntWritable arg2) { return new IntWritable(arg1.get() + arg2.get()); } /** * Sub nums */ public IntWritable sub(IntWritable arg1, IntWritable arg2) { return new IntWritable(arg1.get() - arg2.get()); } }
CaculateServer服務類,對外提供服務,具體代碼以下所示:
package cn.hadoop.rpc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.slf4j.LoggerFactory; import org.slf4j.Logger; import cn.hadoop.service.CaculateService; import cn.hadoop.service.impl.CaculateServiceImpl; /** * @Date May 7, 2015 * * @Author dengjie * * @Note Server Main */ public class CaculateServer { private static final Logger LOGGER = LoggerFactory.getLogger(CaculateServer.class); public static final int IPC_PORT = 9090; public static void main(String[] args) { try { Server server = new RPC.Builder(new Configuration()).setProtocol(CaculateService.class) .setBindAddress("127.0.0.1").setPort(IPC_PORT).setInstance(new CaculateServiceImpl()).build(); server.start(); LOGGER.info("CaculateServer has started"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); LOGGER.error("CaculateServer server error,message is " + ex.getMessage()); } } }
注意,在Hadoop V2版本中,獲取RPC下的Server對象不能在使用RPC.getServer()方法了,該方法已被移除,取而代之的是使用Builder方法來構建新的Server對象。
RPCClient客戶端類,用於訪問Server端,具體代碼實現以下所示:
package cn.hadoop.rpc; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.RPC; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.service.CaculateService; /** * @Date May 7, 2015 * * @Author dengjie * * @Note RPC Client Main */ public class RPCClient { private static final Logger LOGGER = LoggerFactory.getLogger(RPCClient.class); public static void main(String[] args) { InetSocketAddress addr = new InetSocketAddress("127.0.0.1", CaculateServer.IPC_PORT); try { RPC.getProtocolVersion(CaculateService.class); CaculateService service = (CaculateService) RPC.getProxy(CaculateService.class, RPC.getProtocolVersion(CaculateService.class), addr, new Configuration()); int add = service.add(new IntWritable(2), new IntWritable(3)).get(); int sub = service.sub(new IntWritable(5), new IntWritable(2)).get(); LOGGER.info("2+3=" + add); LOGGER.info("5-2=" + sub); } catch (Exception ex) { ex.printStackTrace(); LOGGER.error("Client has error,info is " + ex.getMessage()); } } }
Hadoop V2 RPC服務端截圖預覽,以下所示:
Hadoop V2 RPC客戶端截圖預覽,以下所示:
Hadoop V2 RPC框架對Socket通訊進行了封裝,定義了本身的基類接口VersionProtocol。該框架須要經過網絡以序列化的方式傳輸對象,關於Hadoop V2的序列化能夠參考《Hadoop2源碼分析-序列化篇》,傳統序列化對象較大。框架內部實現了基於Hadoop本身的服務端對象和客戶端對象。服務端對象經過new RPC.Builder().builder()的方式來獲取,客戶端對象經過RPC.getProxy()的方式來獲取。而且都須要接受Configuration對象,該對象實現了Hadoop相關文件的配置。
這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!