Kafka網絡採用的是Reactor模式,是一種基於事件驅動的模式。熟悉Java編程的讀者應該瞭解Java NIO提供了Reactor模式的API。常見的單線程Java NIO編程模式如圖所示。 java
熟悉NIO編程都應該知道這個Selector,咱們能夠經過輪詢它來獲取監聽事件,而後經過事件來進行不一樣的處理,好比OP_ACCEPT鏈接,OP_READ讀取數據等等。react
這樣簡單的處理對於客戶端是沒什麼問題,但對於服務端來講就有些缺點了。在服務端,咱們要求讀取請求、處理請求以及發送響應各個環節必須能迅速完成,而且要儘量作到互不影響。因此咱們就須要對上述簡單的模型進行修改。git
爲了知足高併發的需求,也爲了充分利用服務器的資源,咱們對上述的架構稍做調整,將網絡讀寫的邏輯與業務處理的邏輯進行拆分,讓其由不一樣的線程池來處理,如圖所示。 github
若是不想看本文下面這個很挫的Reactor模型,能夠直接看Kafka的源碼 ~ 若是須要稍微藉助一點中文註釋,我已經標註了十分多的註釋~ 能夠直接看這個版本,基於Kafka0.10.0.1的源碼解讀 ,固然也能夠直接去看官方版本。apache
SocketServer就是它的入口。 編程
其中,內部類 Acceptor 負責創建並配置新鏈接 api
內部類 Processor 負責處理IO事件。 緩存
KafkaRequestHandler 這個類負責業務的處理。 服務器
而業務處理和IO之間的橋則是 RequestChannel。 網絡
事先聲明,如下這個很挫(但也簡單)的Reactor模型只是保證它能用,並且思路和Kafka大體一致,並無去作不少的異常處理!!不少細節地方也作得不是很到位。
3.1 回憶一下selector是怎麼用的
//1. 獲取服務端通道 ServerSocketChannel ssChannel = ServerSocketChannel.open(); ssChannel.bind(new InetSocketAddress(9898)); //2. 設置爲非阻塞模式 ssChannel.configureBlocking(false); //3. 打開一個監聽器 Selector selector = Selector.open(); //4. 向監聽器註冊接收事件 ssChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { //5. 獲取監聽器上全部的監聽事件值 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //6. 若是有值 while (it.hasNext()) { //7. 取到SelectionKey SelectionKey key = it.next(); //8. 根據key值判斷對應的事件 if (key.isAcceptable()) { //9. 接入處理 SocketChannel socketChannel = ssChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { //10. 可讀事件處理 SocketChannel channel = (SocketChannel) key.channel(); readMsg(channel); } //11. 移除當前key it.remove(); } }
這就是咱們上面提到的第一張圖的模型,咱們發現它的IO操做和業務處理是雜糅在一塊兒的。固然咱們簡單的作可使用一個業務處理的線程池負責處理業務。
可是咱們這裏是要去實現第二個圖的模型~
3.2 實現負責創建鏈接的Acceptor
public Acceptor(InetSocketAddress inetSocketAddress, Processor[] processors) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket() .bind(inetSocketAddress); this.serverSocketChannel = serverSocketChannel; this.selector = Selector.open(); this.processors = processors;// 先忽略這個東西 = = }
@Override public void run() { if (init) { System.out.println("已能夠開始創建鏈接"); init = false; } try { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } int currentProcessors = 0; while (true) { try { int ready = selector.select(500); // 半秒輪詢一次 if (ready > 0) { Iterator<SelectionKey> selectionKeys = selector.selectedKeys() .iterator(); while (selectionKeys.hasNext()) { SelectionKey selectionKey = selectionKeys.next(); selectionKeys.remove(); if (selectionKey.isAcceptable()) { this.accept(selectionKey, processors[currentProcessors]); currentProcessors = (currentProcessors + 1) % processors.length; } else { throw new RuntimeException("不該該出現的狀況,由於只訂閱了OP_ACCEPT"); } } } } catch (IOException e) { e.printStackTrace(); } } } // 創建鏈接,而且使用RoundRobin分配給一個Processor,也就是負責IO的角色 public void accept(SelectionKey selectionKey, Processor processor) throws IOException { SelectableChannel channel = selectionKey.channel(); SocketChannel socketChannel = ((ServerSocketChannel) channel).accept(); socketChannel.configureBlocking(false); socketChannel.socket() .setTcpNoDelay(true); socketChannel.socket() .setKeepAlive(true); // 將須要鏈接的socketChannel轉交給processor去處理 processor.accept(socketChannel); }
3.3 實現負責處理IO的Processor
public Processor(String name, RequestChannel requestChannel, ConcurrentHashMap<SelectionKey, ArrayBlockingQueue<ByteBuffer>> inFlightResponse) throws IOException { this.name = name; this.newConnection = new ConcurrentLinkedQueue<>(); this.selector = Selector.open(); this.inFlightResponse = inFlightResponse; this.requestChannel = requestChannel; } protected void accept(SocketChannel socketChannel) { try { System.out.println(name + "正在與" + socketChannel.getLocalAddress() + "創建鏈接"); } catch (IOException e) { e.printStackTrace(); } newConnection.add(socketChannel); // 還須要wakeUp,若是輪詢阻塞了,告訴它能夠不阻塞了 selector.wakeup(); }
@Override public void run() { while (true) { /* * 處理新連接 */ while (!newConnection.isEmpty()) { SocketChannel socketChannel = newConnection.poll(); try { socketChannel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } }
新接收到的數據,咱們會將其丟進 RequestChannel,並取消關注OP_READ,保證不會讓多個請求同時進來。
requestChannel.sendRequest(new Request(selectionKey, byteBuffer));// 接受完數據後,把數據丟進隊列
而最新處理完的數據,咱們則會將其緩存在 inFlightRequest ,並關注OP_WIRTE。這是仿照 Kafka 的 inFlightRequest 作的,固然作得很粗糙。
Kafka 的 inFlightRequest 是將對應每一個節點請求/應答的請求和響應放在了隊列中,確保在同一時間段內,一個節點只會有一個請求和應答。這也巧妙的避開了拆包粘包問題,首先 Kafka 保證了不會同時對一個節點發送請求,其次,Kafka 使用了自定的協議(其實就是包頭上標明瞭整個包的長度再加上CRC校驗)來保證一次請求的完整性。
咱們的Selector輪詢中,會將剛纔在上一步中關注了OP_WRITE的SelectionKey連同要返回的數據一同拿出,並進行處理,處理完成後,取消關注OP_WRITE,並從新關注OP_READ。
/* * 將新應答放入緩衝隊列 */ Response response = requestChannel.receiveResponse(); while (response != null) { SelectionKey key = response.getSelectionKey(); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); ArrayBlockingQueue<ByteBuffer> inFlight = inFlightResponse.getOrDefault(response.getSelectionKey(), new ArrayBlockingQueue<>(100)); inFlightResponse.put(response.getSelectionKey(), inFlight); try { inFlight.put(response.getByteBuffer()); } catch (InterruptedException e) { e.printStackTrace(); } response = requestChannel.receiveResponse(); } int ready = selector.select(500);// 半秒輪詢一次 if (ready > 0) { Iterator<SelectionKey> selectionKeys = selector.selectedKeys() .iterator(); while (selectionKeys.hasNext()) { SelectionKey selectionKey = selectionKeys.next(); selectionKeys.remove(); /* * 處理新請求 */ if (selectionKey.isReadable()) { System.out.println(name + "正在處理新請求"); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024);// 懶得定協議,就默認取這麼多吧 = = socketChannel.read(byteBuffer);// TODO 劃重點 byteBuffer.flip(); requestChannel.sendRequest(new Request(selectionKey, byteBuffer));// 接受完數據後,把數據丟進隊列 selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_READ);// 再也不關注read } /* * 處理新應答 */ if (selectionKey.isWritable()) { System.out.println(name + "正在處理新應答"); ByteBuffer send = inFlightResponse.get(selectionKey)// // TODO 劃重點 .poll(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.write(send); selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE); selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_READ); } } }
/** * Created by Anur IjuoKaruKas on 2018/12/13 */ public class RequestChannel { private ArrayBlockingQueue<Request> requestQueue; private ArrayBlockingQueue<Response> responseQueue; public RequestChannel() { requestQueue = new ArrayBlockingQueue<>(100); responseQueue = new ArrayBlockingQueue<>(100); } .......... }
3.4 實現負責處理業務的Handler
很容易想到,Handler 實際上就是負責從 RequestChannel 的 requestQueue 中拉取須要處理的數據,並塞回 RequestChannel 的 responseQueue 中。
咱們能夠根據接收數據的不一樣,來進行不一樣的業務處理。甚至若是須要拓展,這裏能夠像 netty 同樣,僅僅把 Handler 當成Boss,具體業務的執行能夠建立相應的線程池去進行處理,好比說 Fetch 業務比較耗時,我能夠建立一個較大的線程池,去執行Fetch業務,而 Hello 業務,咱們只須要 Executors.newSingleThreadExecutor() 便可。
@Override public void run() { while (true) { Request request = requestChannel.receiveRequest(); if (request != null) { System.out.println("接收的請求將由" + name + "進行處理"); handler(request.getSelectionKey(), request.getByteBuffer()); } } } public void handler(SelectionKey selectionKey, ByteBuffer byteBuffer) { byte[] bytes = byteBuffer.array(); String msg = new String(bytes); try { Thread.sleep(500); // 模擬業務處理 } catch (InterruptedException e) { e.printStackTrace(); } ByteBuffer response; if (msg.startsWith("Fetch")) { response = ByteBuffer.allocate(2048); response.put("Fetch ~~~~~~~~~~".getBytes()); response.put(bytes); response.flip(); } else if (msg.startsWith("Hello")) { response = ByteBuffer.allocate(2048); response.put("Hi ~~~~~~~~~~".getBytes()); response.put(bytes); response.flip(); } else { response = ByteBuffer.allocate(2048); response.put("Woww ~~~~~~~~~~".getBytes()); response.put(bytes); response.flip(); } System.out.println(name + "處理完畢,正將處理結果返回給Processor"); requestChannel.sendResponse(new Response(selectionKey, response)); }
3.5 運行咱們很挫的模型
咱們會發現如今這個很挫的 Reactor 模型的拓展性卻很好,大頭的兩個 Processor 和 Handler 都是能夠隨意拓展數量的。Kafka 也是這麼作的,不過 Kafka 是根據服務器核心的數量來建立 processor 和 handler 的:
// processors的建立 val protocol = endpoint.protocolType // 網絡協議 val processorEndIndex = processorBeginIndex + numProcessorThreads for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, protocol) // 建立Processor // 在這裏面會 // 循環啓動processor線程 val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) // 建立Acceptor // handlers的建立 // 保存KafkaRequestHandler的執行線程 val threads = new Array[Thread](numThreads) // KafkaRequestHandler集合 val runnables = new Array[KafkaRequestHandler](numThreads) for (i <- 0 until numThreads) { runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() }
這裏進行簡單處理,我將全部的東西通通扔進一個線程池。
運行一下咱們的整個模型,而後咱們使用 Hercules 模擬客戶端對咱們的服務器進行請求。
/** * Created by Anur IjuoKaruKas on 2018/12/12 */ public class Reactor { public static final int PORT = 9999; public static void main(String[] args) throws IOException { RequestChannel requestChannel = new RequestChannel(); ConcurrentHashMap<SelectionKey, ArrayBlockingQueue<ByteBuffer>> inFlightResponse = new ConcurrentHashMap<>(); Processor processor1 = new Processor("p1", requestChannel, inFlightResponse); Processor processor2 = new Processor("p2", requestChannel, inFlightResponse); Acceptor acceptor = new Acceptor(new InetSocketAddress(PORT), new Processor[] { processor1, processor2 }); ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.execute(acceptor); executorService.execute(processor1); executorService.execute(processor2); Handler handler1 = new Handler("h1", requestChannel); Handler handler2 = new Handler("h2", requestChannel); executorService.execute(handler1); executorService.execute(handler2); } }
創建鏈接後,咱們模擬兩個客戶端,依次發送 ‘hello baby’,‘Fetch msg’ 和 ‘感謝gaojingyu_gw發現問題’。
獲得以下響應:
而且服務器日誌以下:
咱們發現,p1和p2會交替從Acceptor中獲取新的鏈接。h1和h2也交替會從RequestChannel中獲取任務來進行執行~
另外額外感謝gaojingyu_gw發現問題,反饋沒法創建更多鏈接。博主來來回回看了不少個地方,終於發現原版的代碼確實沒法創建更多的鏈接,Acceptor、Processor中的輪詢代碼有誤,錯誤代碼以下:
Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { if (selectionKey.isAcceptable()) { this.accept(selectionKey, processors[currentProcessors]); currentProcessors = (currentProcessors + 1) % processors.length; } else { throw new RuntimeException("不該該出現的狀況,由於只訂閱了OP_ACCEPT"); } }
咱們在循環selectionKeys的時候,不能直接循環。咱們須要得到其迭代器,並在每次得到迭代器的下一個元素時,將這個元素移除。爲何不能直接循環:
Keys are added to the selected-key set by selection operations. A key may be removed directly from the selected-key set by invoking the set's remove method or by invoking the remove method of an iterator obtained from the set. Keys are never removed from the selected-key set in any other way; they are not, in particular, removed as a side effect of selection operations. Keys may not be added directly to the selected-key set.
正確代碼以下:
Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator(); while (selectionKeys.hasNext()) { SelectionKey selectionKey = selectionKeys.next(); selectionKeys.remove(); if (selectionKey.isAcceptable()) { this.accept(selectionKey, processors[currentProcessors]); currentProcessors = (currentProcessors + 1) % processors.length; } else { throw new RuntimeException("不該該出現的狀況,由於只訂閱了OP_ACCEPT"); } }
具體的代碼請點擊這裏,直接拉取下來便可運行,運行的主類是 src/reactor/Reactor
以爲好的話能夠順手爲文章點個贊喲~謝謝各位看官老爺!
參考文獻:
Kafka 源碼 0.10.0.1