Hadoop2源碼分析-RPC探索實戰

1.概述

  在《Hadoop2源碼分析-RPC機制初識》博客中,咱們對RPC機制有了初步的認識和了解,下面咱們對Hadoop V2的RPC機制作進一步探索,在研究Hadoop V2的RPC機制,咱們須要掌握相關的Java基礎知識,如:Java NIO、動態代理與反射等。本篇博客介紹的內容目錄以下所示:html

  • Java NIO簡述
  • Java NIO實例演示
  • 動態代理與反射簡述
  • 動態代理與反射實例演示
  • Hadoop V2 RPC框架使用實例

  下面開始今天的博客介紹。java

2.Java NIO簡述

  Java NIO又稱Java New IO,它替代了Java IO API,提供了與標準IO不一樣的IO工做方式。Java NIO由一下核心組件組成:apache

  • Channels:鏈接通道,即能從通道讀取數據,又能寫數據到通道。能夠異步讀寫,讀寫從Buffer開始。
  • Buffers:消息緩衝區,用於和NIO通道進行交互。所謂緩衝區,它是一塊能夠讀寫的內存,該內存被封裝成NIO的Buffer對象,並提供相應的方法,以便於訪問。
  • Selectors:通道管理器,它能檢測到Java NIO中多個通道,單獨的線程能夠管理多個通道,間接的管理多個網絡鏈接。

  下圖爲Java NIO的工做原理圖,以下圖所示:服務器

3.Java NIO實例演示

  • NIOServer

  首先,咱們來看NIOServer的代碼塊。代碼內容以下所示:網絡

package cn.hadoop.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hadoop.conf.ConfigureAPI;

/**
 * @Date May 8, 2015
 *
 * @Author dengjie
 *
 * @Note Defined nio server
 */
public class NIOServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);

    // The channel manager
    private Selector selector;

    /**
     * Get ServerSocket channel and initialize
     * 
     * 1.Get a ServerSocket channel
     * 
     * 2.Set channel for non blocking
     * 
     * 3.The channel corresponding to the ServerSocket binding to port port
     * 
     * 4.Get a channel manager
     * 
     * 5.The channel manager and the channel binding, and the channel registered
     * SelectionKey.OP_ACCEPT event
     * 
     * @param port
     * @throws IOException
     */
    public void init(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.socket().bind(new InetSocketAddress(port));
        this.selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    /**
     * listen selector
     * 
     * @throws IOException
     */
    public void listen() throws IOException {
        LOGGER.info("Server has start success");
        while (true) {
            selector.select();
            Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                ite.remove();
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel channel = server.accept();
                    channel.configureBlocking(false);// 非阻塞
                    channel.write(ByteBuffer.wrap(new String("Send test info to client").getBytes()));
                    channel.register(this.selector, SelectionKey.OP_READ);// 設置讀的權限
                } else if (key.isReadable()) {
                    read(key);
                }
            }
        }
    }

    /**
     * Deal client send event
     */
    public void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer);
        byte[] data = buffer.array();
        String info = new String(data).trim();
        LOGGER.info("Server receive info : " + info);
        ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes());
        channel.write(outBuffer);// 將消息回送給客戶端
    }

    public static void main(String[] args) {
        try {
            NIOServer server = new NIOServer();
            server.init(ConfigureAPI.ServerAddress.NIO_PORT);
            server.listen();
        } catch (Exception ex) {
            ex.printStackTrace();
            LOGGER.error("NIOServer main run error,info is " + ex.getMessage());
        }
    }
}
  • NIOClient

  而後,咱們在來看NIOClient的代碼塊,代碼具體內容以下所示:app

package cn.hadoop.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hadoop.conf.ConfigureAPI;

/**
 * @Date May 8, 2015
 *
 * @Author dengjie
 *
 * @Note Defined NIO client
 */
public class NIOClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(NIOClient.class);

    private Selector selector;

    /**
     * Get ServerSocket channel and initialize
     */
    public void init(String ip, int port) throws Exception {
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        this.selector = Selector.open();
        channel.connect(new InetSocketAddress(ip, port));
        channel.register(selector, SelectionKey.OP_CONNECT);
    }

    /**
     * listen selector
     */
    public void listen() throws Exception {
        while (true) {
            selector.select();
            Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                ite.remove();
                if (key.isConnectable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    if (channel.isConnectionPending()) {
                        channel.finishConnect();
                    }
                    channel.configureBlocking(false);// 非阻塞

                    channel.write(ByteBuffer.wrap(new String("Send test info to server").getBytes()));
                    channel.register(this.selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    read(key);
                }

            }

        }
    }

    /**
     * Deal client send event
     */
    public void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer);
        byte[] data = buffer.array();
        String info = new String(data).trim();
        LOGGER.info("Client receive info : " + info);
        ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes());
        channel.write(outBuffer);
    }

    public static void main(String[] args) {
        try {
            NIOClient client = new NIOClient();
            client.init(ConfigureAPI.ServerAddress.NIO_IP, ConfigureAPI.ServerAddress.NIO_PORT);
            client.listen();
        } catch (Exception ex) {
            ex.printStackTrace();
            LOGGER.error("NIOClient main run has error,info is " + ex.getMessage());
        }
    }
}
  • ConfigureAPI

  下面給出ConfigureAPI類的代碼,內容以下所示:框架

package cn.hadoop.conf;

/**
 * @Date May 7, 2015
 *
 * @Author dengjie
 *
 * @Note Defined rpc info
 */
public class ConfigureAPI {

    public interface VersionID {
        public static final long RPC_VERSION = 7788L;
    }

    public interface ServerAddress {
        public static final int NIO_PORT = 8888;
        public static final String NIO_IP = "127.0.0.1";
    }

}

4.動態代理和反射簡述

  在Java中,動態代理主要用來作方法的加強,能夠在不修改源碼的狀況下,加強一些方法。另外,還有一個做用就是作遠程調用,好比如今有Java接口,該接口的實現部署在非本地服務器上,在編寫客戶端代碼時,因爲無法直接生成該對象,這個時候就須要考慮使用動態代理了。異步

  而反射,利用了Class類做爲反射實例化對象的基本應用,對於一個實例化對象而言,它須要調用類中的構造方法,屬性和通常方法,這些操做均可以經過反射機制來完成。下面咱們用一個實例來理解這些理論。socket

5.動態代理和反射實例演示

5.1動態代理

  • JProxy
package cn.java.base;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @Date May 7, 2015
 * 
 * @Author dengjie
 */
public class JProxy {
    
    public static void main(String[] args) {
        JInvocationHandler ji = new JInvocationHandler();
        Subject sub = (Subject) ji.bind(new RealSubject());
        System.out.println(sub.say("dengjie", 25));
    }

}

interface Subject {
    public String say(String name, int age);
}

class RealSubject implements Subject {

    @Override
    public String say(String name, int age) {
        return name + "," + age;
    }

}

class JInvocationHandler implements InvocationHandler {

    private Object object = null;

    public Object bind(Object object) {
        this.object = object;
        return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Object tmp = method.invoke(this.object, args);
        return tmp;
    }

}

5.2反射

  • JReflect
package cn.java.base;

/**
 * @Date May 7, 2015
 * 
 * @Author dengjie
 */
public class JReflect {
    public static void main(String[] args) {
        Fruit f = Factory.getInstance(Orange.class.getName());
        if (f != null) {
            f.eat();
        }
    }
}

interface Fruit {
    public abstract void eat();
}

class Apple implements Fruit {

    @Override
    public void eat() {
        System.out.println("apple");
    }

}

class Orange implements Fruit {

    @Override
    public void eat() {
        System.out.println("orange");
    }

}

class Factory {
    public static Fruit getInstance(String className) {
        Fruit f = null;
        try {
            f = (Fruit) Class.forName(className).newInstance();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return f;
    }
}

6.Hadoop V2 RPC框架使用實例

  本實例主要演示經過Hadoop V2的RPC框架實現一個計算兩個整數的Add和Sub,服務接口爲 CaculateService ,繼承於 VersionedProtocol ,具體代碼以下所示:ide

  • CaculateService
package cn.hadoop.service;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.VersionedProtocol;

import cn.hadoop.conf.ConfigureAPI;

/**
 * @Date May 7, 2015
 *
 * @Author dengjie
 *
 * @Note Data calculate service interface
 */
@ProtocolInfo(protocolName = "", protocolVersion = ConfigureAPI.VersionID.RPC_VERSION)
public interface CaculateService extends VersionedProtocol {

    // defined add function
    public IntWritable add(IntWritable arg1, IntWritable arg2);

    // defined sub function
    public IntWritable sub(IntWritable arg1, IntWritable arg2);

}

  注意,本工程使用的是Hadoop-2.6.0版本,這裏CaculateService接口須要加入註解,來聲明版本號。

  CaculateServiceImpl類實現CaculateService接口。代碼以下所示:

  • CaculateServiceImpl
package cn.hadoop.service.impl;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.ProtocolSignature;

import cn.hadoop.conf.ConfigureAPI;
import cn.hadoop.service.CaculateService;

/**
 * @Date May 7, 2015
 *
 * @Author dengjie
 *
 * @Note Implements CaculateService class
 */
public class CaculateServiceImpl implements CaculateService {

    public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException {
        return this.getProtocolSignature(arg0, arg1, arg2);
    }

    /**
     * Check the corresponding version
     */
    public long getProtocolVersion(String arg0, long arg1) throws IOException {
        return ConfigureAPI.VersionID.RPC_VERSION;
    }

    /**
     * Add nums
     */
    public IntWritable add(IntWritable arg1, IntWritable arg2) {
        return new IntWritable(arg1.get() + arg2.get());
    }

    /**
     * Sub nums
     */
    public IntWritable sub(IntWritable arg1, IntWritable arg2) {
        return new IntWritable(arg1.get() - arg2.get());
    }

}

  CaculateServer服務類,對外提供服務,具體代碼以下所示:

  • CaculateServer
package cn.hadoop.rpc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

import cn.hadoop.service.CaculateService;
import cn.hadoop.service.impl.CaculateServiceImpl;

/**
 * @Date May 7, 2015
 *
 * @Author dengjie
 *
 * @Note Server Main
 */
public class CaculateServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(CaculateServer.class);

    public static final int IPC_PORT = 9090;

    public static void main(String[] args) {
        try {
            Server server = new RPC.Builder(new Configuration()).setProtocol(CaculateService.class)
                    .setBindAddress("127.0.0.1").setPort(IPC_PORT).setInstance(new CaculateServiceImpl()).build();
            server.start();
            LOGGER.info("CaculateServer has started");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            LOGGER.error("CaculateServer server error,message is " + ex.getMessage());
        }
    }

}

  注意,在Hadoop V2版本中,獲取RPC下的Server對象不能在使用RPC.getServer()方法了,該方法已被移除,取而代之的是使用Builder方法來構建新的Server對象。

  RPCClient客戶端類,用於訪問Server端,具體代碼實現以下所示:

  • RPCClient
package cn.hadoop.rpc;

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.RPC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hadoop.service.CaculateService;

/**
 * @Date May 7, 2015
 *
 * @Author dengjie
 *
 * @Note RPC Client Main
 */
public class RPCClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(RPCClient.class);

    public static void main(String[] args) {
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", CaculateServer.IPC_PORT);
        try {
            RPC.getProtocolVersion(CaculateService.class);
            CaculateService service = (CaculateService) RPC.getProxy(CaculateService.class,
                    RPC.getProtocolVersion(CaculateService.class), addr, new Configuration());
            int add = service.add(new IntWritable(2), new IntWritable(3)).get();
            int sub = service.sub(new IntWritable(5), new IntWritable(2)).get();
            LOGGER.info("2+3=" + add);
            LOGGER.info("5-2=" + sub);
        } catch (Exception ex) {
            ex.printStackTrace();
            LOGGER.error("Client has error,info is " + ex.getMessage());
        }
    }

}

  Hadoop V2 RPC服務端截圖預覽,以下所示:

  Hadoop V2 RPC客戶端截圖預覽,以下所示:

7.總結

  Hadoop V2 RPC框架對Socket通訊進行了封裝,定義了本身的基類接口VersionProtocol。該框架須要經過網絡以序列化的方式傳輸對象,關於Hadoop V2的序列化能夠參考《Hadoop2源碼分析-序列化篇》,傳統序列化對象較大。框架內部實現了基於Hadoop本身的服務端對象和客戶端對象。服務端對象經過new RPC.Builder().builder()的方式來獲取,客戶端對象經過RPC.getProxy()的方式來獲取。而且都須要接受Configuration對象,該對象實現了Hadoop相關文件的配置。

8.結束語

  這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!

相關文章
相關標籤/搜索