教你用 Netty 實現一個簡單的 RPC!

做者:莫那魯道
http://www.javashuo.com/article/p-zizifchg-nx.html
2019-11-14 09:19:00html

衆所周知,dubbo 底層使用了 Netty 做爲網絡通信框架,而 Netty 的高性能咱們以前也分析過源碼,對他也算仍是比較瞭解了。java

今天咱們就本身用 Netty 實現一個簡單的 RPC 框架。spring

一、需求

模仿 dubbo,消費者和提供者約定接口和協議,消費者遠程調用提供者,提供者返回一個字符串,消費者打印提供者返回的數據。底層網絡通訊使用 Netty 4.1.16。bootstrap

二、設計

  1. 建立一個接口,定義抽象方法。用於消費者和提供者之間的約定。緩存

  2. 建立一個提供者,該類須要監聽消費者的請求,並按照約定返回數據。網絡

  3. 建立一個消費者,該類須要透明的調用本身不存在的方法,內部須要使用 Netty 請求提供者返回數據。intellij-idea

三、 實現

1. 建立 maven 項目,導入 Netty 4.1.16。框架

<groupId>cn.thinkinjava</groupId>
<artifactId>rpc-demo</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
   <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.16.Final</version>
   </dependency>
</dependencies>

2. 項目目錄結構以下:maven

3. 設計接口ide

===============

一個簡單的 hello world:

public interface HelloService {
	String hello(String msg);
}

4. 提供者相關實現

==================

4.1. 首先實現約定接口,用於返回客戶端數據:

/**
 * 實現類
 */
public class HelloServiceImpl implements HelloService {
  public String hello(String msg) {
    return msg != null ? msg + " -----> I am fine." : "I am fine.";
  }
}

4.2. 實現 Netty 服務端和自定義 handler

啓動 Netty Server 代碼:

private static void startServer0(String hostName, int port) {    try {
     ServerBootstrap bootstrap = new ServerBootstrap();
     NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
     bootstrap.group(eventLoopGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {            @Override
           protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             p.addLast(new StringDecoder());
             p.addLast(new StringEncoder());
             p.addLast(new HelloServerHandler());
           }
         });
     bootstrap.bind(hostName, port).sync();
   } catch (InterruptedException e) {
     e.printStackTrace();
   }
 }

上面的代碼中添加了 String類型的編解碼 handler,添加了一個自定義 handler。

自定義 handler 邏輯以下:

/**
* 用於處理請求數據
*/public class HelloServerHandler extends ChannelInboundHandlerAdapter {  @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {    // 如何符合約定,則調用本地方法,返回數據
   if (msg.toString().startsWith(ClientBootstrap.providerName)) {
     String result = new HelloServiceImpl()
         .hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
     ctx.writeAndFlush(result);
   }
 }
}

這裏顯示判斷了是否符合約定(並無使用複雜的協議,只是一個字符串判斷),而後建立一個具體實現類,並調用方法寫回客戶端。爲何Netty這麼火?爲何?

還須要一個啓動類:

public class ServerBootstrap {  public static void main(String[] args) {
   NettyServer.startServer("localhost", 8088);
 }
}

好,關於提供者的代碼就寫完了,主要就是建立一個 netty 服務端,實現一個自定義的 handler,自定義 handler 判斷是否符合之間的約定(算是協議吧),若是符合,就建立一個接口的實現類,並調用他的方法返回字符串。

5. 消費者相關實現

消費者有一個須要注意的地方,就是調用須要透明,也就是說,框架使用者不用關心底層的網絡實現。這裏咱們能夠使用 JDK 的動態代理來實現這個目的。

思路:客戶端調用代理方法,返回一個實現了 HelloService 接口的代理對象,調用代理對象的方法,返回結果。

咱們須要在代理中作手腳,當調用代理方法的時候,咱們須要初始化 Netty 客戶端,還須要向服務端請求數據,並返回數據。

5.1. 首先建立代理相關的類

public class RpcConsumer {  private static ExecutorService executor = Executors
     .newFixedThreadPool(Runtime.getRuntime().availableProcessors());  private static HelloClientHandler client;  /**
  * 建立一個代理對象
  */
 public Object createProxy(final Class<?> serviceClass,      final String providerName) {    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),        new Class<?>[]{serviceClass}, (proxy, method, args) -> {          if (client == null) {
           initClient();
         }          // 設置參數
         client.setPara(providerName + args[0]);          return executor.submit(client).get();
       });
 }  /**
  * 初始化客戶端
  */
 private static void initClient() {
   client = new HelloClientHandler();
   EventLoopGroup group = new NioEventLoopGroup();
   Bootstrap b = new Bootstrap();
   b.group(group)
       .channel(NioSocketChannel.class)
       .option(ChannelOption.TCP_NODELAY, true)
       .handler(new ChannelInitializer<SocketChannel>() {          @Override
         public void initChannel(SocketChannel ch) throws Exception {
           ChannelPipeline p = ch.pipeline();
           p.addLast(new StringDecoder());
           p.addLast(new StringEncoder());
           p.addLast(client);
         }
       });    try {
     b.connect("localhost", 8088).sync();
   } catch (InterruptedException e) {
     e.printStackTrace();
   }
 }
}

該類有 2 個方法,建立代理和初始化客戶端。

初始化客戶端邏輯:建立一個 Netty 的客戶端,並鏈接提供者,並設置一個自定義 handler,和一些 String 類型的編解碼器。

建立代理邏輯:使用 JDK 的動態代理技術,代理對象中的 invoke 方法實現以下:若是 client 沒有初始化,則初始化 client,這個 client 既是 handler ,也是一個 Callback。將參數設置進 client ,使用線程池調用 client 的 call 方法並阻塞等待數據返回。

看看 HelloClientHandler 的實現:

public class HelloClientHandler extends ChannelInboundHandlerAdapter implements Callable {  private ChannelHandlerContext context;  private String result;  private String para;  @Override
 public void channelActive(ChannelHandlerContext ctx) {
   context = ctx;
 }  /**
  * 收到服務端數據,喚醒等待線程
  */
 @Override
 public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
   result = msg.toString();
   notify();
 }  /**
  * 寫出數據,開始等待喚醒
  */
 @Override
 public synchronized Object call() throws InterruptedException {
   context.writeAndFlush(para);
   wait();    return result;
 }  void setPara(String para) {    this.para = para;
 }
}

該類緩存了 ChannelHandlerContext,用於下次使用,有兩個屬性:返回結果和請求參數。

當成功鏈接後,緩存 ChannelHandlerContext,當調用 call 方法的時候,將請求參數發送到服務端,等待。當服務端收到並返回數據後,調用 channelRead 方法,將返回值賦值個 result,並喚醒等待在 call 方法上的線程。此時,代理對象返回數據。

再看看設計的測試類:

public class ClientBootstrap {  public static final String providerName = "HelloService#hello#";  public static void main(String[] args) throws InterruptedException {

   RpcConsumer consumer = new RpcConsumer();    // 建立一個代理對象
   HelloService service = (HelloService) consumer
       .createProxy(HelloService.class, providerName);    for (; ; ) {
     Thread.sleep(1000);
     System.out.println(service.hello("are you ok ?"));
   }
 }
}

測試類首先建立了一個代理對象,而後每隔一秒鐘調用代理的 hello 方法,並打印服務端返回的結果。

測試結果

成功打印。

四、總結

看了這麼久的 Netty 源碼,咱們終於實現了一個本身的 Netty 應用,雖然這個應用很簡單,甚至代碼寫的有些粗糙,但功能仍是實現了,RPC 的目的就是容許像調用本地服務同樣調用遠程服務,須要對使用者透明,因而咱們使用了動態代理。並使用 Netty 的 handler 發送數據和響應數據,完成了一次簡單的 RPC 調用。

固然,仍是那句話,代碼比較簡單,主要是思路,以及瞭解 RPC 底層的實現。

近期熱文推薦:

1.Java 15 正式發佈, 14 個新特性,刷新你的認知!!

2.終於靠開源項目弄到 IntelliJ IDEA 激活碼了,真香!

3.我用 Java 8 寫了一段邏輯,同事直呼看不懂,你試試看。。

4.吊打 Tomcat ,Undertow 性能很炸!!

5.《Java開發手冊(嵩山版)》最新發布,速速下載!

以爲不錯,別忘了隨手點贊+轉發哦!

相關文章
相關標籤/搜索