輕量級RPC框架開發

一、掌握RPC原理 java

二、掌握nio操做 spring

三、掌握netty簡單的api bootstrap

四、掌握自定義RPC框架 api

1. RPC原理學習 緩存

1.1. 什麼是RPC 服務器

RPC(Remote Procedure Call Protocol)——遠程過程調用協議,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。在OSI網絡通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分佈式多程序在內的應用程序更加容易。 網絡

RPC採用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,而後等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息到達爲止。當一個調用信息到達,服務器得到進程參數,計算結果,發送答覆信息,而後等待下一個調用信息,最後,客戶端調用進程接收答覆信息,得到進程結果,而後調用執行繼續進行。 app

1.2. RPC原理 框架

運行時,一次客戶機對服務器的RPC調用,其內部操做大體有以下十步: 異步

1.調用客戶端句柄;執行傳送參數

2.調用本地系統內核發送網絡消息

3.消息傳送到遠程主機

4.服務器句柄獲得消息並取得參數

5.執行遠程過程

6.執行的過程將結果返回服務器句柄

7.服務器句柄返回結果,調用遠程系統內核

8.消息傳回本地主機

9.客戶句柄由內核接收消息

10.客戶接收句柄返回的數據

1.3. hadoopRPC演示

見代碼

2. nio原理學習(nio的優點不在於數據傳送的速度)

2.1. 簡介

nio 是New IO 的簡稱,在jdk1.4 裏提供的新api 。Sun 官方標榜的特性以下: 爲全部的原始類型提供(Buffer)緩存支持。字符集編碼解碼解決方案。 Channel :一個新的原始I/O 抽象。 支持鎖和內存映射文件的文件訪問接口。 提供多路(non-bloking) 非阻塞式的高伸縮性網絡I/O 。

2.2. 傳統socket和socket nio代碼

見代碼

2.3. socket nio原理

2.3.1. 傳統的I/O

使用傳統的I/O程序讀取文件內容, 並寫入到另外一個文件(或Socket), 以下程序:

File.read(fileDesc, buf, len);

Socket.send(socket, buf, len);

會有較大的性能開銷, 主要表如今一下兩方面:

1. 上下文切換(context switch), 此處有4次用戶態和內核態的切換

2. Buffer內存開銷, 一個是應用程序buffer, 另外一個是系統讀取buffer以及socket buffer

其運行示意圖以下

1) 先將文件內容從磁盤中拷貝到操做系統buffer

2) 再從操做系統buffer拷貝到程序應用buffer

3) 從程序buffer拷貝到socket buffer

4) 從socket buffer拷貝到協議引擎.

2.3.2. NIO

NIO技術省去了將操做系統的read buffer拷貝到程序的buffer, 以及從程序buffer拷貝到socket buffer的步驟, 直接將 read buffer 拷貝到 socket buffer. java 的 FileChannel.transferTo() 方法就是這樣的實現, 這個實現是依賴於操做系統底層的sendFile()實現的.

publicvoid transferTo(long position, long count, WritableByteChannel target);

他的底層調用的是系統調用sendFile()方法

sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

以下圖

3. netty經常使用API學習

3.1. netty簡介

Netty是基於Java NIO的網絡應用框架.

    Netty是一個NIO client-server(客戶端服務器)框架,使用Netty能夠快速開發網絡應用,例如服務器和客戶端協議。Netty提供了一種新的方式來使開發網絡應用程序,這種新的方式使得它很容易使用和有很強的擴展性。Netty的內部實現時很複雜的,可是Netty提供了簡單易用的api從網絡處理代碼中解耦業務邏輯。Netty是徹底基於NIO實現的,因此整個Netty都是異步的。

        網絡應用程序一般須要有較高的可擴展性,不管是Netty仍是其餘的基於Java NIO的框架,都會提供可擴展性的解決方案。Netty中一個關鍵組成部分是它的異步特性.

3.2. netty的helloworld

3.2.1. 下載netty包

•        下載netty包,下載地址http://netty.io/

3.2.2. 服務端啓動類

package com.netty.demo.server;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

/**

* • 配置服務器功能,如線程、端口 • 實現服務器處理程序,它包含業務邏輯,決定當有一個請求鏈接或接收數據時該作什麼

*

* @author ll

*

*/

public class EchoServer {

private final int port;

public EchoServer(int port) {

this.port = port;

}

public void start() throws Exception {

EventLoopGroup eventLoopGroup = null;

try {

//建立ServerBootstrap實例來引導綁定和啓動服務器

ServerBootstrap serverBootstrap = new ServerBootstrap();

//建立NioEventLoopGroup對象來處理事件,如接受新鏈接、接收數據、寫數據等等

eventLoopGroup = new NioEventLoopGroup();

//指定通道類型爲NioServerSocketChannel,設置InetSocketAddress讓服務器監聽某個端口已等待客戶端鏈接。

serverBootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress("localhost",port).childHandler(new ChannelInitializer<Channel>() {

//設置childHandler執行全部的鏈接請求

@Override

protected void initChannel(Channel ch) throws Exception {

ch.pipeline().addLast(new EchoServerHandler());

}

});

// 最後綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,而後服務器等待通道關閉,由於使用sync(),因此關閉操做也會被阻塞。

ChannelFuture channelFuture = serverBootstrap.bind().sync();

System.out.println("開始監聽,端口爲:" + channelFuture.channel().localAddress());

channelFuture.channel().closeFuture().sync();

} finally {

eventLoopGroup.shutdownGracefully().sync();

}

}

public static void main(String[] args) throws Exception {

new EchoServer(20000).start();

}

}

3.2.3. 服務端回調方法

package com.netty.demo.server;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

System.out.println("server 讀取數據……");

//讀取數據

        ByteBuf buf = (ByteBuf) msg;

        byte[] req = new byte[buf.readableBytes()];

        buf.readBytes(req);

        String body = new String(req, "UTF-8");

        System.out.println("接收客戶端數據:" + body);

        //向客戶端寫數據

        System.out.println("server向client發送數據");

        String currentTime = new Date(System.currentTimeMillis()).toString();

        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());

        ctx.write(resp);

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

System.out.println("server 讀取數據完畢..");

        ctx.flush();//刷新後纔將數據發出到SocketChannel

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

cause.printStackTrace();

ctx.close();

}

}

3.2.4. 客戶端啓動類

package com.netty.demo.client;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**

* • 鏈接服務器 • 寫數據到服務器 • 等待接受服務器返回相同的數據 • 關閉鏈接

*

* @author ll

*

*/

public class EchoClient {

private final String host;

private final int port;

public EchoClient(String host, int port) {

this.host = host;

this.port = port;

}

public void start() throws Exception {

EventLoopGroup nioEventLoopGroup = null;

try {

//建立Bootstrap對象用來引導啓動客戶端

Bootstrap bootstrap = new Bootstrap();

//建立EventLoopGroup對象並設置到Bootstrap中,EventLoopGroup能夠理解爲是一個線程池,這個線程池用來處理鏈接、接受數據、發送數據

nioEventLoopGroup = new NioEventLoopGroup();

//建立InetSocketAddress並設置到Bootstrap中,InetSocketAddress是指定鏈接的服務器地址

bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))

.handler(new ChannelInitializer<SocketChannel>() {

//添加一個ChannelHandler,客戶端成功鏈接服務器後就會被執行

@Override

protected void initChannel(SocketChannel ch)

throws Exception {

ch.pipeline().addLast(new EchoClientHandler());

}

});

// • 調用Bootstrap.connect()來鏈接服務器

ChannelFuture f = bootstrap.connect().sync();

// • 最後關閉EventLoopGroup來釋放資源

f.channel().closeFuture().sync();

} finally {

nioEventLoopGroup.shutdownGracefully().sync();

}

}

public static void main(String[] args) throws Exception {

new EchoClient("localhost", 20000).start();

}

}

3.2.5. 客戶端回調方法

package com.netty.demo.client;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.ByteBufUtil;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {  

      //客戶端鏈接服務器後被調用

    @Override  

    public void channelActive(ChannelHandlerContext ctx) throws Exception {  

            System.out.println("客戶端鏈接服務器,開始發送數據……");

            byte[] req = "QUERY TIME ORDER".getBytes();

            ByteBuf  firstMessage = Unpooled.buffer(req.length);

        firstMessage.writeBytes(req);

        ctx.writeAndFlush(firstMessage);  

    }  

  //•        從服務器接收到數據後調用

    @Override  

    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {  

             System.out.println("client 讀取server數據..");

         //服務端返回消息後

         ByteBuf buf = (ByteBuf) msg;

         byte[] req = new byte[buf.readableBytes()];

         buf.readBytes(req);

         String body = new String(req, "UTF-8");

         System.out.println("服務端數據爲 :" + body);

}  

  //•        發生異常時被調用

    @Override  

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  

             System.out.println("client exceptionCaught..");

         // 釋放資源

         ctx.close();

    }  

}  

3.3. netty中handler的執行順序

3.3.1. 簡介

Handler在netty中,無疑佔據着很是重要的地位。Handler與Servlet中的filter很像,經過Handler能夠完成通信報文的解碼編碼、攔截指定的報文、統一對日誌錯誤進行處理、統一對請求進行計數、控制Handler執行與否。一句話,沒有它作不到的只有你想不到的。

Netty中的全部handler都實現自ChannelHandler接口。按照輸出輸出來分,分爲ChannelInboundHandler、ChannelOutboundHandler兩大類。ChannelInboundHandler對從客戶端發往服務器的報文進行處理,通常用來執行解碼、讀取客戶端數據、進行業務處理等;ChannelOutboundHandler對從服務器發往客戶端的報文進行處理,通常用來進行編碼、發送報文到客戶端。

Netty中,能夠註冊多個handler。ChannelInboundHandler按照註冊的前後順序執行;ChannelOutboundHandler按照註冊的前後順序逆序執行,以下圖所示,按照註冊的前後順序對Handler進行排序,request進入Netty後的執行順序爲:

3.3.2. 總結

在使用Handler的過程當中,須要注意:

一、ChannelInboundHandler之間的傳遞,經過調用 ctx.fireChannelRead(msg) 實現;調用ctx.write(msg) 將傳遞到ChannelOutboundHandler。

二、ctx.write()方法執行後,須要調用flush()方法才能令它當即執行。

三、流水線pipeline中outhandler不能放在最後,不然不生效

四、Handler的消費處理放在最後一個處理。

3.4. netty發送對象

3.4.1. 簡介

Netty中,通信的雙方創建鏈接後,會把數據按照ByteBuf的方式進行傳輸,例如http協議中,就是經過HttpRequestDecoder對ByteBuf數據流進行處理,轉換成http的對象。基於這個思路,我自定義一種通信協議:Server和客戶端直接傳輸java對象。

實現的原理是經過Encoder把java對象轉換成ByteBuf流進行傳輸,經過Decoder把ByteBuf轉換成java對象進行處理,處理邏輯以下圖所示:

3.4.2. 代碼

見代碼

4. Spring(IOC/AOP)註解學習

4.1. spring的初始化順序

在spring的配置文件中配置bean,以下

<bean class="com.spring3.hello.BeanFactory.One">
        <constructor-arg name="one" value ="one">
</bean>  

<bean class="com.spring3.hello.BeanFactory.Two">
        <constructor-arg name="two" value ="two">
</bean>  

在One類和Two類中,分別實現一個參數的構造以下

加載spring配置文件,初始化bean以下

那麼。結果如何呢?

結論:spring會按照bean的順序依次初始化xml中配置的全部bean

4.1.1. 經過ApplicationContextAware加載Spring上下文環境

在One中實現ApplicationContextAware接口會出現如何的變換呢?

結果

4.1.2. InitializingBean的做用

在One中實現InitializingBean接口呢?

結果:

4.1.3. 若是使用註解@Component

使用@Component注入類,那麼它的順序是如何呢?

4.1.4. 結論

一、 spring先檢查註解注入的bean,並將它們實例化

二、 而後spring初始化bean的順序是按照xml中配置的順序依次執行構造

三、 若是某個類實現了ApplicationContextAware接口,會在類初始化完成後調用setApplicationContext()方法進行操做

四、 若是某個類實現了InitializingBean接口,會在類初始化完成後,並在setApplicationContext()方法執行完畢後,調用afterPropertiesSet()方法進行操做

4.2. 註解使用回顧

一、在spring中,用註解來向Spring容器註冊Bean。須要在applicationContext.xml中註冊<context:component-scan base-package=」pagkage1[,pagkage2,…,pagkageN]」/>。

二、若是某個類的頭上帶有特定的註解@Component/@Repository/@Service/@Controller,就會將這個對象做爲Bean註冊進Spring容器

三、在使用spring管理的bean時,無需在對調用的對象進行new的過程,只需使用@Autowired將須要的bean注入本類便可

4.3. 自定義註解

4.3.1. 解釋

一、自定義註解的做用:在反射中獲取註解,以取得註解修飾的「類、方法、屬性」的相關解釋。

二、java內置註解

     @Target 表示該註解用於什麼地方,可能的 ElemenetType 參數包括:  

            ElemenetType.CONSTRUCTOR   構造器聲明  

            ElemenetType.FIELD   域聲明(包括 enum 實例)  

            ElemenetType.LOCAL_VARIABLE   局部變量聲明  

            ElemenetType.METHOD   方法聲明  

            ElemenetType.PACKAGE   包聲明  

            ElemenetType.PARAMETER   參數聲明  

            ElemenetType.TYPE   類,接口(包括註解類型)或enum聲明

      @Retention 表示在什麼級別保存該註解信息。可選的 RetentionPolicy 參數包括:  

           RetentionPolicy.SOURCE   註解將被編譯器丟棄  

           RetentionPolicy.CLASS   註解在class文件中可用,但會被VM丟棄  

           RetentionPolicy.RUNTIME   JVM將在運行期也保留註釋,所以能夠經過反射機制讀取註解的信息。

4.3.2. 實現

定義自定義註解

@Target({ ElementType.TYPE })//註解用在接口上

@Retention(RetentionPolicy.RUNTIME)//VM將在運行期也保留註釋,所以能夠經過反射機制讀取註解的信息

@Component

public @interface RpcService {

String value();

}

二、將直接類加到須要使用的類上,咱們能夠經過獲取註解,來獲得這個類

@RpcService("HelloService")

public class HelloServiceImpl implements HelloService {

    public String hello(String name) {

        return "Hello! " + name;

    }

}

三、類實現的接口

public interface HelloService {

    String hello(String name);

}

四、經過ApplicationContext獲取全部標記這個註解的類

@Component

public class MyServer implements ApplicationContextAware {

@SuppressWarnings("resource")

public static void main(String[] args) {

       new ClassPathXmlApplicationContext("spring2.xml");

    }

public void setApplicationContext(ApplicationContext ctx)

throws BeansException {

Map<String, Object> serviceBeanMap = ctx

.getBeansWithAnnotation(RpcService.class);

for (Object serviceBean : serviceBeanMap.values()) {

try {

Method method = serviceBean.getClass().getMethod("hello", new Class[]{String.class});

Object invoke = method.invoke(serviceBean, "bbb");

System.out.println(invoke);

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

五、結合spring實現junit測試

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = "classpath:spring2.xml")

public class MyServer implements ApplicationContextAware {

@Test

public void helloTest1() {

}

public void setApplicationContext(ApplicationContext ctx)

throws BeansException {

Map<String, Object> serviceBeanMap = ctx

.getBeansWithAnnotation(RpcService.class);

for (Object serviceBean : serviceBeanMap.values()) {

try {

Method method = serviceBean.getClass().getMethod("hello",

new Class[] { String.class });

Object invoke = method.invoke(serviceBean, "bbb");

System.out.println(invoke);

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

---------------------
做者:醬g
來源:CSDN
原文:https://blog.csdn.net/qq_33283716/article/details/80375179 版權聲明:本文爲博主原創文章,轉載請附上博文連接!

相關文章
相關標籤/搜索