Scoket通訊--------這是一個例子,能夠在這個例子的基礎上進行相應的拓展,核心也是在多線程任務上進行修改javascript
package cn.itcast.bigdata.socket;
import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; public class ServiceServer { public static void main(String[] args) throws Exception { // 建立一個serversocket,綁定到本機的8899端口上 ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress("localhost", 8899)); // 接受客戶端的鏈接請求;accept是一個阻塞方法,會一直等待,到有客戶端請求鏈接才返回 while (true) { Socket socket = server.accept(); new Thread(new ServiceServerTask(socket)).start(); } } }
package cn.itcast.bigdata.socket;
import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; public class ServiceClient { public static void main(String[] args) throws Exception { /*ServiceIterface service = ProxyUtils.getProxy(ServiceIterface.class,"methodA",hostname,port); Result = service.methodA(parameters);*/ // 向服務器發出請求創建鏈接 Socket socket = new Socket("localhost", 8899); // 從socket中獲取輸入輸出流 InputStream inputStream = socket.getInputStream(); OutputStream outputStream = socket.getOutputStream(); PrintWriter pw = new PrintWriter(outputStream); pw.println("hello"); pw.flush(); BufferedReader br = new BufferedReader(new InputStreamReader(inputStream)); String result = br.readLine(); System.out.println(result); inputStream.close(); outputStream.close(); socket.close(); } }
package cn.itcast.bigdata.socket;
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; public class ServiceServerTask implements Runnable{ Socket socket ; InputStream in=null; OutputStream out = null; public ServiceServerTask(Socket socket) { this.socket = socket; } //業務邏輯:跟客戶端進行數據交互 @Override public void run() { try { //從socket鏈接中獲取到與client之間的網絡通訊輸入輸出流 in = socket.getInputStream(); out = socket.getOutputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(in)); //從網絡通訊輸入流中讀取客戶端發送過來的數據 //注意:socketinputstream的讀數據的方法都是阻塞的 String param = br.readLine();// 按理說這裏應該改成多行的就是while(true) /** * 做業: * 將如下業務調用邏輯寫成更加通用的:能夠根據客戶端發過來的調用類名、調用方法名、調用該參數來靈活調用 * * 《反射》 * */ GetDataServiceImpl getDataServiceImpl = new GetDataServiceImpl(); String result = getDataServiceImpl.getData(param); //將調用結果寫到sokect的輸出流中,以發送給客戶端 PrintWriter pw = new PrintWriter(out); pw.println(result); pw.flush(); } catch (IOException e) { e.printStackTrace(); }finally{ try { in.close(); out.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
package cn.itcast.bigdata.socket;
public class GetDataServiceImpl { public String getData(String param){ return "ok-"+param; } }
PS:這是傳統的調用方式,當線程多了之後就不同了。 NIO 非阻塞IO,是新的 可是不知道怎麼使用,如今知道了 NIO就是使用Netty框架,性能高不少
PS : RPC是一種遠程調用的協議,有不少的實現,WebService就是一種實現。
-------------------------------------自定義RPC的框架-
PS:1.正常一個應用,藍線上面是用戶須要配置的。配置好怎樣交換呢?首先spring啓動好後,會掃描註解,而後把javabean放在hashmap結構中。
2.當有客戶端有請求過來時,傳統是用socket通訊,可是這種方式性能會出現問題。如今一般使用Netty來解決這個問題(而不是使用傳統的socket通訊)。
3.客戶端也是註解,他是經過動態代理包裝整合向外暴漏接口。而後二者經過socket通訊。
---------------
4.若是不該用zookeeper的話,這就是一個完成的sp ring的使用。使用zookeeper之後就能夠 遠程調用了。(由於註冊了服務器的地址和端口)
PS:在實現上,服務器端經過spring自定義自定義註解 獲取相應的 service, 而後經過 netty (NIO)進行異步傳輸,使用zookeeper進行 使用RPC服務器配置。
PS :netty是爲了解決傳統的阻塞的問題,netty是nio的一種實現。
----------------------------------------------------------------------------html
PS:傳統的要在內核之間交互java
PS :Netty是一個很龐大的體系, 若是精通netty的話,薪水是普通屌絲的好幾倍。hadoop、spark都是有用netty
PS:
0.首先從操做系統的底層是支持異步IO的,可是以前的異步IO是C++的多,Java的異步IO少,源生的socket是同步的性能很差,急需異步框架出現
1.傳統BIO模式是這樣的,來一個請求,建立一個線程;
後來出現了線程池或者使用消息隊列來實現一個線程或多個線程處理N個請求的模型,其實底層仍是同步IO,稱之爲「僞異步IO ,這樣其實仍是會出現問題,若是隊列滿的話,仍是會出現鏈接超時。只能用NIO出手了。web
NIO
PS : 低負載、低併發應該使用同步阻塞; 高併發進行 NIOspring
1.類庫的簡介: buffer、channel、selectorbootstrap
2.看了源碼之後,發現NIO的源碼比較複雜瀏覽器
總結: 由於原聲NIO java代碼編寫起來比較費勁,因此不太鼓勵用java原聲代碼(須要對多線程比較熟悉),服務器
而是推薦使用netty,由於netty就是高併發的解決方案websocket
PS : Netty打包部署也很簡單就是一個jar文件,具體代碼看下面網絡
PS:
PS :
PS:日常Java網絡傳輸數據的時候比較慢,後來出現了幾種框架
PS : 上面是netty的位置
PS:WebSocket他是基於Tcp的新的,比傳統的更好
PS : 案例-----看慕課網的培訓教程代碼
package com.imooc.netty; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * 存儲整個工程的全局配置 * @author liuyazhuang * */ public class NettyConfig { /** * 存儲每個客戶端接入進來時的channel對象 */ public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); }
package com.imooc.netty; import java.util.Date; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil; /** * 接收/處理/響應客戶端websocket請求的核心業務處理類 * @author liuyazhuang * */ public class MyWebSocketHandler extends SimpleChannelInboundHandler<Object> { private WebSocketServerHandshaker handshaker; private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket"; //客戶端與服務端建立鏈接的時候調用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { NettyConfig.group.add(ctx.channel());//group像是集合類 System.out.println("客戶端與服務端鏈接開啓..."); } //客戶端與服務端斷開鏈接的時候調用 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { NettyConfig.group.remove(ctx.channel()); System.out.println("客戶端與服務端鏈接關閉..."); } //服務端接收客戶端發送過來的數據結束以後調用 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } //工程出現異常的時候調用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } //服務端處理客戶端websocket請求的核心方法 @Override protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception { //處理客戶端向服務端發起http握手請求的業務 if (msg instanceof FullHttpRequest) { handHttpRequest(context, (FullHttpRequest)msg); }else if (msg instanceof WebSocketFrame) { //處理websocket鏈接業務 handWebsocketFrame(context, (WebSocketFrame)msg); } } /** * 處理客戶端與服務端以前的websocket業務 * @param ctx * @param frame */ private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){ //判斷是不是關閉websocket的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain()); } //判斷是不是ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } //判斷是不是二進制消息,若是是二進制消息,拋出異常 if( ! (frame instanceof TextWebSocketFrame) ){ System.out.println("目前咱們不支持二進制消息"); throw new RuntimeException("【"+this.getClass().getName()+"】不支持消息"); } //返回應答消息 //獲取客戶端向服務端發送的消息 String request = ((TextWebSocketFrame) frame).text(); System.out.println("服務端收到客戶端的消息====>>>" + request); if(request.equals("CXLL")){ request = "正在辦理查詢流量"; } TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + " ===>>> " + request); //羣發,服務端向每一個鏈接上來的客戶端 羣發 消息 NettyConfig.group.writeAndFlush(tws);//響應到瀏覽器中的內容!!!! } /** * 處理客戶端向服務端發起http握手請求的業務 * @param ctx * @param req */ private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){ if (!req.getDecoderResult().isSuccess() || ! ("websocket".equals(req.headers().get("Upgrade")))) {//不是websocket的請求 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( WEB_SOCKET_URL, null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); }else{ handshaker.handshake(ctx.channel(), req); } } /** * 服務端向客戶端響應消息 * @param ctx * @param req * @param res */ private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res){ if (res.getStatus().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } //服務端向客戶端發送數據 ChannelFuture f = ctx.channel().writeAndFlush(res); if (res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } }
package com.imooc.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; /** * 初始化鏈接時候的各個組件 * @author liuyazhuang * */ public class MyWebSocketChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel e) throws Exception { e.pipeline().addLast("http-codec", new HttpServerCodec()); e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); e.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); e.pipeline().addLast("handler", new MyWebSocketHandler()); } }
package com.imooc.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 程序的入口,負責啓動應用 * @author liuyazhuang * */ public class Main { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new MyWebSocketChannelHandler()); System.out.println("服務端開啓等待客戶端鏈接...."); Channel ch = b.bind(8888).sync().channel(); ch.closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally{ //優雅的退出程序 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
<html> <head> <meta http-equiv="Content-Type" content="text/html; charset = utf-8"/> <title>WebSocket客戶端</title> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://localhost:8888/websocket"); socket.onmessage = function(event){ var ta = document.getElementById('responseContent'); ta.value += event.data + "\r\n"; }; socket.onopen = function(event){ var ta = document.getElementById('responseContent'); ta.value = "你當前的瀏覽器支持WebSocket,請進行後續操做\r\n"; }; socket.onclose = function(event){ var ta = document.getElementById('responseContent'); ta.value = ""; ta.value = "WebSocket鏈接已經關閉\r\n"; }; }else{ alert("您的瀏覽器不支持WebSocket"); } function send(message){ if(!window.WebSocket){ return; } if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("WebSocket鏈接沒有創建成功!!"); } } </script> </head> <body> <form onSubmit="return false;"> <input type = "text" name = "message" value = ""/> <br/><br/> <input type = "button" value = "發送WebSocket請求消息" onClick = "send(this.form.message.value)"/> <hr color="red"/> <h2>客戶端接收到服務端返回的應答消息</h2> <textarea id = "responseContent" style = "width:1024px; height:300px"></textarea> </form> </body> </html>
------------------------------------------------------------------------------------------------------
Dubbo底層用的就是Netty
3.2. netty的helloworld 3.2.1. 下載netty包 • 下載netty包,下載地址http://netty.io/ 3.2.2. 服務端啓動類 package com.netty.demo.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * • 配置服務器功能,如線程、端口 • 實現服務器處理程序,它包含業務邏輯,決定當有一個請求鏈接或接收數據時該作什麼 * * @author wilson * */ public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup eventLoopGroup = null; try { //建立ServerBootstrap實例來引導綁定和啓動服務器 ServerBootstrap serverBootstrap = new ServerBootstrap(); //建立NioEventLoopGroup對象來處理事件,如接受新鏈接、接收數據、寫數據等等 eventLoopGroup = new NioEventLoopGroup(); //指定通道類型爲NioServerSocketChannel,設置InetSocketAddress讓服務器監聽某個端口已等待客戶端鏈接。 serverBootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress("localhost",port).childHandler(new ChannelInitializer<Channel>() { //設置childHandler執行全部的鏈接請求 @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }); // 最後綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,而後服務器等待通道關閉,由於使用sync(),因此關閉操做也會被阻塞。 ChannelFuture channelFuture = serverBootstrap.bind().sync();//主要用於異步操做通知回調 System.out.println("開始監聽,端口爲:" + channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync();//方法柱塞,等待服務端鏈路關閉後,Main函數才退出 } finally { eventLoopGroup.shutdownGracefully().sync(); //優雅的退出,釋放全部的資源 } } public static void main(String[] args) throws Exception { new EchoServer(20000).start(); } } 3.2.3. 服務端回調方法 package com.netty.demo.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class EchoServerHandler extends ChannelInboundHandlerAdapter {// channelInboundHandlerAdapter 對於網絡事件進行讀寫操做 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server 讀取數據……"); //讀取數據 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("接收客戶端數據:" + body); //向客戶端寫數據 System.out.println("server向client發送數據"); String currentTime = new Date(System.currentTimeMillis()).toString(); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("server 讀取數據完畢.."); ctx.flush();//刷新後纔將數據發出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 3.2.4. 客戶端啓動類 package com.netty.demo.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; /** * • 鏈接服務器 • 寫數據到服務器 • 等待接受服務器返回相同的數據 • 關閉鏈接 * * @author wilson * */ public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup nioEventLoopGroup = null; try { //建立Bootstrap對象用來引導啓動客戶端 Bootstrap bootstrap = new Bootstrap(); //建立EventLoopGroup對象並設置到Bootstrap中,EventLoopGroup能夠理解爲是一個線程池,這個線程池用來處理鏈接、接受數據、發送數據;專門用來網絡數據的處理 nioEventLoopGroup = new NioEventLoopGroup(); //建立InetSocketAddress並設置到Bootstrap中,InetSocketAddress是指定鏈接的服務器地址 bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { //添加一個ChannelHandler,客戶端成功鏈接服務器後就會被執行 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); // • 調用Bootstrap.connect()來鏈接服務器 ChannelFuture f = bootstrap.connect().sync(); // • 最後關閉EventLoopGroup來釋放資源 f.channel().closeFuture().sync(); } finally { nioEventLoopGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("localhost", 20000).start(); } } 3.2.5. 客戶端回調方法 package com.netty.demo.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { //客戶端鏈接服務器後被調用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶端鏈接服務器,開始發送數據……"); byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuf firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); ctx.writeAndFlush(firstMessage); } //• 從服務器接收到數據後調用 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client 讀取server數據.."); //服務端返回消息後 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("服務端數據爲 :" + body); } //• 發生異常時被調用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught.."); // 釋放資源 ctx.close(); } }
PS:談談我的的理解
圖上面是傳統的IO交互方式,會出現io阻塞。而後又下圖NIO,我也看不太明白具體是怎樣的交互方式,可是在netty引入的地方我知道和傳統IO流差很少,感受更方便一些。
速度應該也更快速一些。 後面的框架都是根據這個完成的。 還有就是不少hadoop的框架並不難,都是業務邏輯。
---------------------------------
PS : 這些實現是爲了 講解其實就是dubbo的簡單實現,