學習 Doug Lea 大神寫的——Scalable IO in Java

學習 Doug Lea 大神寫的——Scalable IO in Javajava


網絡服務

Web services、分佈式對象等等都具備相同的處理結構react

  1. Read request
  2. Decode request
  3. Process service
  4. Encode reply
  5. Send reply

基礎的網絡設計
在這裏插入圖片描述
每個處理的 handler 都在各自的線程中處理。web

代碼示例spring

public class Server01 implements Runnable {
	@Override public void run() {
		try {
			ServerSocket serverSocket = new ServerSocket(9898);
			while (!Thread.interrupted()) {
				// serverSocket.accept() 會阻塞到有客戶端鏈接,以後 Handler 會處理
				new Thread(new Handler(serverSocket.accept())).start();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private static class Handler implements Runnable {
		private final Socket socket;

		Handler(Socket socket) {
			this.socket = socket;
		}

		@Override public void run() {
			try {
				byte[] input = new byte[1024];
				// 假設能所有讀取出來
				socket.getInputStream().read(input);
				byte[] output = process(input);
				socket.getOutputStream().write(output);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		private byte[] process(byte[] input) {
			// 裏面處理邏輯
			return new byte[0];
		}
	}
}

這樣作的好處是經過 accept 事件來觸發任務的執行,將每一個任務單獨的去執行。可是缺點也很明顯若是客戶端連接過大那麼須要新建若干個線程去執行,每臺服務器能夠運行的線程數是有限的。那麼多線程的上下文切換的消耗也是巨大的。設計模式

Reactor Pattern

首先咱們先來看下什麼是事件驅動,在 java AWT 包中普遍的獲得了使用。用戶在點擊一個 button 按鈕的時候就會觸發一個事件,而後會使用觀察者模式來觸發 Listener 中的處理事件。
在這裏插入圖片描述服務器

Reactor 設計模式是基於事件驅動的一種實現方式,處理多個客戶端併發的向服務端請求服務的場景。每種服務在服務端可能由多個方法組成。reactor 會解耦併發請求的服務並分發給對應的事件處理器來處理。目前,許多流行的開源框架都用到了。相似 AWT 中的 Thread。網絡

Handlers 執行非阻塞操做的具體類,相似 AWT 中的 ActionListeners。多線程

Reactor 單線程處理任務的設計
在這裏插入圖片描述併發

代碼示例框架

public class Reactor implements Runnable {
	private final Selector            selector;
	private final ServerSocketChannel serverSocketChannel;

	public Reactor(int port) throws IOException {
		selector = Selector.open();
		serverSocketChannel = ServerSocketChannel.open();
		serverSocketChannel.socket().bind(new InetSocketAddress(port));
		serverSocketChannel.configureBlocking(false);
		SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		selectionKey.attach(new Acceptor());
	}

	@Override public void run() {
		try {
			while (!Thread.interrupted()) {
				selector.select();
				Set<SelectionKey> selectionKeys = selector.selectedKeys();
				for (SelectionKey selectionKey : selectionKeys) {
					dispatch(selectionKey);
					selectionKeys.clear();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private void dispatch(SelectionKey selectionKey) {
		Runnable runnable = (Runnable) selectionKey.attachment();
		if (null != runnable) {
			runnable.run();
		}
	}

	private class Acceptor implements Runnable {
		@Override public void run() {
			try {
				SocketChannel socketChannel = serverSocketChannel.accept();
				if (null != socketChannel) {
					new Handler(selector, socketChannel);
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	private class Handler implements Runnable {
		private final        SocketChannel socketChannel;
		private final        SelectionKey  selectionKey;
		private              ByteBuffer    input   = ByteBuffer.allocate(1024);
		private              ByteBuffer    output  = ByteBuffer.allocate(1024);
		private static final int           READING = 0, SENDING = 1;
		private int state = READING;

		Handler(Selector selector, SocketChannel socketChannel) throws IOException {
			this.socketChannel = socketChannel;
			this.socketChannel.configureBlocking(false);
			selectionKey = this.socketChannel.register(selector, 0);
			selectionKey.attach(this);
			selectionKey.interestOps(SelectionKey.OP_READ);
			selector.wakeup();
		}

		void process() {
		}

		@Override public void run() {
			try {
				if (state == READING) {
					socketChannel.read(input);
					process();
					state = SENDING;
					selectionKey.interestOps(SelectionKey.OP_WRITE);
				}
				if (state == READING) {
					socketChannel.write(output);
					selectionKey.cancel();
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

這個程序的重點在於 selectionKey.attach(); 方法每次把須要的對象傳入進去,以後在有事件觸發的時候會在 dispatch 中 attachment() 獲取到這個對象,以後直接調用 run 方法。

Reactor 多線程處理任務的設計
在這裏插入圖片描述

只須要稍微修改下 Handler 這個類

// 添加一個線程池開發時請使用自定義或 spring 的線程池
		private final ExecutorService executorService = Executors.newCachedThreadPool();
    
// 修改 run 方法
@Override public void run() {
  try {
    if (state == READING) {
      socketChannel.read(input);
      executorService.execute(new Runnable() {
        @Override public void run() {
          process();
          state = SENDING;
          selectionKey.interestOps(SelectionKey.OP_WRITE);
        }
      });
    }
    if (state == READING) {
      socketChannel.write(output);
      selectionKey.cancel();
    }
  } catch (Exception e) {
    e.printStackTrace();
  }
}

多個 Reactor
在這裏插入圖片描述
當看到這幅圖的時候感受這不就是 Netty EventLoopGroup 的工做模式嗎

  1. mainReactor 不就是 bossGroup
  2. subReactor 不就是 workGroup

至此粗略的看完了這篇文章,感受太 6 了,須要後面重複學習,此次只是瞭解大概。後面學習完會持續更新這篇文章!

相關文章
相關標籤/搜索