分佈式Java--基於消息方式實現系統間通訊

分佈式系統之間通訊能夠分爲兩種:java

  • 基於消息方式實現系統間通訊
  • 基於遠程調用方式實現系統間通訊

基於消息方式實現系統間通訊

分佈式子系統之間須要通訊時,就發送消息。通常通訊的兩個要點是:消息處理和消息傳輸。服務器

  • 消息處理:例如讀取數據和寫入數據。基於消息方式實現系統通訊的消息處理能夠分爲同步消息和異步消息。同步消息通常採用的是BIO(Blocking IO)和NIO(Non-Blocking IO);異步消息通常採用AIO方式。
  • 消息傳輸:消息傳輸須要藉助網絡協議來實現,TCP/IP協議和UDP/IP協議能夠用來完成消息傳輸。

術語解釋:

  1. BIO:同步阻塞IO。就是當發生IO的讀或者寫操做時,均爲阻塞操做。只有程序讀到了流或者將流寫入操做系統後,纔會釋放資源。
  2. NIO: 同步非阻塞IO。是基於事件驅動思想的。從程序角度想,當發起IO的讀和寫操做時,是非阻塞的。當Socket有流可讀或者能夠寫Socket時,操做系統會通知應用程序進行處理,應用再將流讀取到緩衝區或操做系統。
  3. AIO: 異步IO。一樣基於事件驅動思想。當有流可讀取時,操做系統會將流讀取到read方法的緩衝區,而後通知應用程序;對於寫操做,操做系統將write方法傳入的流寫入完畢時,操做系統主動通知應用程序。
  4. TCP/IP: 一種可靠的網絡數據傳輸協議。要求通訊雙方先創建鏈接,再進行通訊。
  5. UDP/IP: 一種不可靠的網絡數據傳輸協議。並不直接給通訊雙方創建鏈接,而是發送到網絡上通訊。

四種方法實現基於消息進行系統間通訊

TCP/IP+BIO

在Java中可基於Socket、ServerSocket來實現TCP/IP+BIO的系統通訊。網絡

  • Socket主要用於實現創建鏈接即網絡IO的操做
  • ServerSocket主要用於實現服務器端口的監聽即Socket對象的獲取

爲了知足服務端能夠同時接受多個請求,最簡單的方法是生成多個Socket。但這樣會產生兩個問題:異步

  • 生成太對Socket會消耗過多資源
  • 頻繁建立Socket會致使系統性能的不足

爲了解決上面的問題,一般採用鏈接池的方式來維護Socket。一方面能限制Socket的個數;另外一方面避免重複建立Socket帶來的性能降低問題。這裏有一個問題就是設置合適的相應超時時間。由於鏈接池中Socket個數是有限的,確定會形成激烈的競爭和等待。socket

客戶端代碼:

//建立鏈接
Socket socket = new Socket(目標IP或域名, 目標端口);
//BufferedReader用於讀取服務端返回的數據
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//PrintWriter向服務器寫入流
PrintWriter out = new PrintWriter(socket.getOutputStream(),true);
//像服務端發送流
out.println("hello");
//阻塞讀取服務端的返回信息
in.readLine();

服務端代碼:

//建立對本地端口的監聽
PrintWriter out = new PrintWriter(socket.getOutputStream(),true);
//向服務器發送字符串信息
out.println("hello");
//阻塞讀取服務端的返回信息
in.readLine();

TCP/IP+NIO

Java能夠基於Clannel和Selector的相關類來實現TCP/IP+NIO方式的系統間通訊。Channel有SocketClannel和ServerSocketChannel兩種。分佈式

  • SocketClannel: 用於創建鏈接、監聽事件及操做讀寫。
  • ServerSocketClannel: 用於監聽端口即監聽鏈接事件。
  • Selecter: 獲取是否有要處理的事件。

客戶端代碼

SocketChannel channel = SocketChannel.open();
//設置爲非阻塞模式
channel.configureBlocking(false);
//對於非阻塞模式,當即返回false,表示鏈接正在創建中
channel.connect(SocketAdress);
Selector selector = Selector.open();
//向channel註冊selector以及感興趣的鏈接事件
channel.regester(selector,SelectionKey.OP_CONNECT);
//阻塞至有感興趣的IO事件發生,或到達超時時間
int nKeys = selector.select(超時時間【毫秒計】);
//若是但願一直等待知道有感興趣的事件發生
//int nKeys = selector.select();
//若是但願不阻塞直接返回當前是否有感興趣的事件發生
//int nKeys = selector.selectNow();

//若是有感興趣的事件
SelectionKey sKey = null;
if(nKeys>0){
    Set<SelectionKey> keys = selector.selectedKeys();
    for(SelectionKey key:keys){
        //對於發生鏈接的事件
        if(key.isConnectable()){
            SocketChannel sc = (SocketChannel)key.channel();
            sc.configureBlocking(false);
            //註冊感興趣的IO讀事件
            sKey = sc.register(selector,SelectionKey.OP_READ);
            //完成鏈接的創建
            sc.finishConnect();
        }
        //有流可讀取
        else if(key.isReadable()){
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel sc = (SocketChannel) key.channel();
            int readBytes = 0;
            try{
                int ret = 0;
                try{
                    //讀取目前可讀取的值,此步爲阻塞操做
                    while((ret=sc.read(buffer))>0){
                        readBytes += ret;
                    }
                }
                fanally{
                    buffer.flip();
                }
             }
             finally{
                 if(buffer!=null){
                        buffer.clear();
                 }
             }
        }
        //可寫入流
        else if(key.isWritable()){
            //取消對OP_WRITE事件的註冊
            key.interestOps(key.interestOps() & (!SelectionKey.OP_WRITE));
            SocketChannel sc = (SocketChannel) key.channel();
            //此步爲阻塞操做
            int writtenedSize = sc.write(ByteBuffer);
            //如未寫入,則繼續註冊感興趣的OP_WRITE事件
            if(writtenedSize==0){
                key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
            }
        }
    }
    Selector.selectedKeys().clear();
}
//對於要寫入的流,可直接調用channel.write來完成。只有在未寫入成功時纔要註冊OP_WRITE事件
int wSize = channel.write(ByteBuffer);
if(wSize == 0){
    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}

服務端代碼

ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket serverSocket = ssc.socket();
//綁定要監聽的接口
serverSocket.bind(new InetSocketAdress(port));
ssc.configureBlocking(false);
//註冊感興趣的鏈接創建事件
ssc.register(selector,SelectionKey.OP_ACCEPT);

UDP/IP+BIO

Java對UDP/IP方式的網絡數據傳輸一樣採用Socket機制,只是UDP/IP下的Socket沒有創建鏈接,所以沒法雙向通訊。若是須要雙向通訊,必須兩端都生成UDP Server。性能

Java中經過DatagramSocket和DatagramPacket來實現UDP/IP+BIO方式和系統間通訊。spa

  • DatagramSocket:負責監聽端口和讀寫數據
  • DatagramPacket:做爲數據流對象進行傳輸

因爲UDP雙端不創建鏈接,因此也就不存在競爭問題,只是最終讀寫流的動做是同步的。操作系統

關鍵代碼(服務端和客戶端基本同樣)

//若是但願雙向通訊,必須啓動一個監聽端口承擔服務器的職責
//若是不能綁定到指定端口,則拋出SocketException
DatagramSocket serverSocket = new DatagramSocket(監聽的端口);
byte[] buffer = new byte[65507];
DatagramPacket receivePacket = new DatagramPacket(buffer,buffer.length);
DatagramSocket socket = new DatagramSocket();
DatagramPacket packet = new DatagramPacket(datas,datas.length,server.length);
//阻塞方式發送packet到指定的服務器和端口
socket.send(packet);
//阻塞並同步讀取流消息,若是讀取的流消息比packet長,則刪除更長的消息
//當鏈接不上目標地址和端口時,拋出PortUnreachableException
DatagramSocket.setSoTimeout(超時時間--毫秒級);
serverSocket.receive(receivePacket);

UDP/IP+NIO

Java中能夠經過DatagramClannel和ByteBuffer來實現UDP/IP方式的系統間通訊。rest

  • DatagramClannel:負責監聽端口及進行讀寫
  • ByteBuffer:用於數據傳輸

關鍵代碼(客戶端和服務端都相似)

//讀取流信息
DatagramChannel receiveChannel = DatagramChannel.open();
receiveChannel.configureBlocking(false);
DatagramSocket socket = receiveChannel.socket();
socket.bind(new InetSocketAddress(rport));
Selector selector = Selector.open();
receiveChannel.register(selector, SelectionKey.OP_REEAD);
//以後便可像TCP/IP+NIO中對selector遍歷同樣的方式進行流信息的讀取
//...


//寫入流信息
DatagramChannel sendChannel = DatagramChannel.open();
sendChannel.configureBlocking(false);
SocketAdress target = new InetSocketAdress("127.0.0.1",sport);
sendChannel.connect(target);
//阻塞寫入流
sendChannel.write(ByteBuffer);
相關文章
相關標籤/搜索