分佈式系統之間通訊能夠分爲兩種:java
分佈式子系統之間須要通訊時,就發送消息。通常通訊的兩個要點是:消息處理和消息傳輸。服務器
在Java中可基於Socket、ServerSocket來實現TCP/IP+BIO的系統通訊。網絡
爲了知足服務端能夠同時接受多個請求,最簡單的方法是生成多個Socket。但這樣會產生兩個問題:異步
爲了解決上面的問題,一般採用鏈接池的方式來維護Socket。一方面能限制Socket的個數;另外一方面避免重複建立Socket帶來的性能降低問題。這裏有一個問題就是設置合適的相應超時時間。由於鏈接池中Socket個數是有限的,確定會形成激烈的競爭和等待。socket
//建立鏈接 Socket socket = new Socket(目標IP或域名, 目標端口); //BufferedReader用於讀取服務端返回的數據 BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); //PrintWriter向服務器寫入流 PrintWriter out = new PrintWriter(socket.getOutputStream(),true); //像服務端發送流 out.println("hello"); //阻塞讀取服務端的返回信息 in.readLine();
//建立對本地端口的監聽 PrintWriter out = new PrintWriter(socket.getOutputStream(),true); //向服務器發送字符串信息 out.println("hello"); //阻塞讀取服務端的返回信息 in.readLine();
Java能夠基於Clannel和Selector的相關類來實現TCP/IP+NIO方式的系統間通訊。Channel有SocketClannel和ServerSocketChannel兩種。分佈式
SocketChannel channel = SocketChannel.open(); //設置爲非阻塞模式 channel.configureBlocking(false); //對於非阻塞模式,當即返回false,表示鏈接正在創建中 channel.connect(SocketAdress); Selector selector = Selector.open(); //向channel註冊selector以及感興趣的鏈接事件 channel.regester(selector,SelectionKey.OP_CONNECT); //阻塞至有感興趣的IO事件發生,或到達超時時間 int nKeys = selector.select(超時時間【毫秒計】); //若是但願一直等待知道有感興趣的事件發生 //int nKeys = selector.select(); //若是但願不阻塞直接返回當前是否有感興趣的事件發生 //int nKeys = selector.selectNow(); //若是有感興趣的事件 SelectionKey sKey = null; if(nKeys>0){ Set<SelectionKey> keys = selector.selectedKeys(); for(SelectionKey key:keys){ //對於發生鏈接的事件 if(key.isConnectable()){ SocketChannel sc = (SocketChannel)key.channel(); sc.configureBlocking(false); //註冊感興趣的IO讀事件 sKey = sc.register(selector,SelectionKey.OP_READ); //完成鏈接的創建 sc.finishConnect(); } //有流可讀取 else if(key.isReadable()){ ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel sc = (SocketChannel) key.channel(); int readBytes = 0; try{ int ret = 0; try{ //讀取目前可讀取的值,此步爲阻塞操做 while((ret=sc.read(buffer))>0){ readBytes += ret; } } fanally{ buffer.flip(); } } finally{ if(buffer!=null){ buffer.clear(); } } } //可寫入流 else if(key.isWritable()){ //取消對OP_WRITE事件的註冊 key.interestOps(key.interestOps() & (!SelectionKey.OP_WRITE)); SocketChannel sc = (SocketChannel) key.channel(); //此步爲阻塞操做 int writtenedSize = sc.write(ByteBuffer); //如未寫入,則繼續註冊感興趣的OP_WRITE事件 if(writtenedSize==0){ key.interestOps(key.interestOps()|SelectionKey.OP_WRITE); } } } Selector.selectedKeys().clear(); } //對於要寫入的流,可直接調用channel.write來完成。只有在未寫入成功時纔要註冊OP_WRITE事件 int wSize = channel.write(ByteBuffer); if(wSize == 0){ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); }
ServerSocketChannel ssc = ServerSocketChannel.open(); ServerSocket serverSocket = ssc.socket(); //綁定要監聽的接口 serverSocket.bind(new InetSocketAdress(port)); ssc.configureBlocking(false); //註冊感興趣的鏈接創建事件 ssc.register(selector,SelectionKey.OP_ACCEPT);
Java對UDP/IP方式的網絡數據傳輸一樣採用Socket機制,只是UDP/IP下的Socket沒有創建鏈接,所以沒法雙向通訊。若是須要雙向通訊,必須兩端都生成UDP Server。性能
Java中經過DatagramSocket和DatagramPacket來實現UDP/IP+BIO方式和系統間通訊。spa
因爲UDP雙端不創建鏈接,因此也就不存在競爭問題,只是最終讀寫流的動做是同步的。操作系統
//若是但願雙向通訊,必須啓動一個監聽端口承擔服務器的職責 //若是不能綁定到指定端口,則拋出SocketException DatagramSocket serverSocket = new DatagramSocket(監聽的端口); byte[] buffer = new byte[65507]; DatagramPacket receivePacket = new DatagramPacket(buffer,buffer.length); DatagramSocket socket = new DatagramSocket(); DatagramPacket packet = new DatagramPacket(datas,datas.length,server.length); //阻塞方式發送packet到指定的服務器和端口 socket.send(packet); //阻塞並同步讀取流消息,若是讀取的流消息比packet長,則刪除更長的消息 //當鏈接不上目標地址和端口時,拋出PortUnreachableException DatagramSocket.setSoTimeout(超時時間--毫秒級); serverSocket.receive(receivePacket);
Java中能夠經過DatagramClannel和ByteBuffer來實現UDP/IP方式的系統間通訊。rest
//讀取流信息 DatagramChannel receiveChannel = DatagramChannel.open(); receiveChannel.configureBlocking(false); DatagramSocket socket = receiveChannel.socket(); socket.bind(new InetSocketAddress(rport)); Selector selector = Selector.open(); receiveChannel.register(selector, SelectionKey.OP_REEAD); //以後便可像TCP/IP+NIO中對selector遍歷同樣的方式進行流信息的讀取 //... //寫入流信息 DatagramChannel sendChannel = DatagramChannel.open(); sendChannel.configureBlocking(false); SocketAdress target = new InetSocketAdress("127.0.0.1",sport); sendChannel.connect(target); //阻塞寫入流 sendChannel.write(ByteBuffer);