在上一篇的徒手擼一個簡單的RPC框架中再最後的服務器和客戶端鏈接的時候只是簡單的寫了Socket鏈接,以爲有些不妥。正好最近學習了Netty,在平時工做中沒機會運用,因而本身就給本身出需求將以前的項目改造一下。git
在學習Netty以前呢咱們首先得了解IO和NIOgithub
IO編程模型簡單來講就是上一篇我寫的服務端與客戶端的鏈接,客戶端與服務端創建鏈接通訊後,必須等待服務端返回信息才能進行下一步的動做,這期間線程一直是等待狀態。IO模型在客戶端較少的狀況下是沒問題的,可是一旦有大量客戶端與服務端進行鏈接,那麼就會出問題。咱們簡單的分析一下緣由。編程
while
循環中用專門的線程來維護。可是線程是操做系統中很是寶貴的資源,每一個操做系統能建立的線程也是有限的。IO模型有這麼多的問題,因而在JDK1.4中提出了NIO的概念,就是爲了解決以上的問題bootstrap
咱們一一對應上面的問題來看NIO用什麼技術來解決的bash
第一個問題是代碼的問題,咱們就不討論了服務器
線程有限的問題:NIO中提出了Selector
概念,IO中是每一個鏈接都會由一個線程阻塞來維護,NIO中用Selector
來管理這些鏈接,若是有消息的傳入或傳出,那麼就創建相應的線程了處理。這樣服務器只須要阻塞一個Selector
線程,就能夠管理多個鏈接了。框架
具體的Selector
文章能夠看我以前的NIO中選擇器Selector,裏面有介紹詳細的Selector
用法。異步
這裏舉個例子應該就明白的,比如你去釣魚,IO就是一人一個魚竿,等着魚上來,中間哪也不能去,而NIO就是一我的能守着好幾個魚竿。ide
這就是NIO模型解決操做系統中線程有限的問題。函數
CPU在線程之間頻繁切換,因爲NIO中只管理了一個Selector
線程,那麼這個問題也就相應的解決了
NIO中提出了Channel
和Buffer
的概念,就比如在嚮往的生活第一季中摘玉米中,是用竹筐一次一次背快呢?仍是接一輛車子來回運送快?固然是車子來回運送快了,而這裏的Buffer
就比如車子。具體的Channel
和Buffer
的解釋能夠看我以前的文章Java中IO和NIO和JAVA中NIO再深刻。
那麼爲何就和Netty扯上關係了呢?其實我以爲NIO之於Netty的關係就比如Servlet之於Tomcat的關係,Netty只是對於NIO進行了進一步的封裝,讓使用者更加簡便的編程。
此次改造分爲服務端和客戶端的改造
接下來咱們就利用Netty將咱們的服務器端與客戶端鏈接通訊部分進行改造一下,首先咱們先加上對於Netty的依賴
compile 'io.netty:netty-all:4.1.6.Final'
複製代碼
而後編寫服務端的代碼,服務端的代碼很是簡單
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
.group(boos, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//得到實現類處理事後的返回值
String invokeMethodMes = CommonDeal.getInvokeMethodMes(msg);
ByteBuf encoded = ctx.alloc().buffer(4 * invokeMethodMes.length());
encoded.writeBytes(invokeMethodMes.getBytes());
ctx.writeAndFlush(encoded);
}
});
}
}).bind(20006);
複製代碼
這和咱們日常寫的Socket鏈接有些區別,能夠看到咱們建了兩個NioEventLoopGroup
一個boss一個worker,爲何會有兩個呢?
從這個圖裏面咱們能夠看到,boss是專門用來對外鏈接的,worker則是像NIO中Selector
用來處理各類讀寫的請求。
其實難點就是在客戶端,由於Netty是異步事件驅動的框架,什麼是異步呢?
客戶端與服務端的任何I/O操做都將當即返回,等待服務端處理完成之後會調用指定的回調函數進行處理。在這個過程當中客戶端一直沒有阻塞。因此咱們在客戶端與服務端請求處理時,若是得到異步處理的結果呢?Netty提供有一種獲取異步回調結果的,可是那是添加監聽器。而咱們的RPC調用在最後返回結果的時候必須得阻塞等待結果的返回,因此咱們須要本身寫一個簡單的獲取異步回調結果的程序。想法以下。
那麼咱們就圍繞這兩步來進行編碼。
客戶端想要獲取服務端消息時如何等待呢?這裏咱們就能夠用wait()
public Response getMessage(){
synchronized (object){
while (!success){
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
複製代碼
那麼讀到消息之後如何喚醒呢?
public void setMessage(Response response){
synchronized (object){
this.response = response;
this.success = true;
object.notify();
}
}
複製代碼
這樣就解決了咱們上面提出的兩個問題了。接下來編寫客戶端的代碼
private final Map<Long,MessageFuture> futureMap = new ConcurrentHashMap<>();
private CountDownLatch countDownLatch = new CountDownLatch(1);
public void connect(String requestJson,Long threadId){
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().
addLast(new StringDecoder()).
addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Gson gson = new Gson();
Response response = gson.fromJson(msg, Response.class);
MessageFuture messageFuture = futureMap.get(threadId);
messageFuture.setMessage(response);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
futureMap.put(threadId,new MessageFuture());
countDownLatch.countDown();
ByteBuf encoded = ctx.alloc().buffer(4 * requestJson.length());
encoded.writeBytes(requestJson.getBytes());
ctx.writeAndFlush(encoded);
}
});
}
}).connect("127.0.0.1", 20006);
}
public Response getResponse(Long threadId){
MessageFuture messageFuture = null;
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
messageFuture = futureMap.get(threadId);
return messageFuture.getMessage();
}
複製代碼
這裏面咱們用到了CountDownLatch
類,即等待發送完消息之後通知我能獲取數據了。這裏面的代碼和服務端的差很少,其中有區別的地方就是在發送數據的時候將線程ID和MessageFuture
放入Map中,在獲得服務端發送的數據時取出並放入獲得的Response。
到目前爲止咱們就完成了咱們的項目改造,只是簡單的應用了一下Netty的客戶端和服務端的通訊,由於在學習的過程當中若是沒有運用的話,那麼感受記憶沒有那麼牢靠,因此就有了這次的項目改造的計劃。雖然完成了簡單的通訊,可是我知道還有些地方須要優化,例如用synchronized
在之後學習了AQS
之後但願也可以學以至用將這裏給改一下。