bit0chat 是一個基於 Netty 的 IM 即時通信框架java
PS:bit0chat,bit後面沒有0,開源中國認爲我文章中包含辱罵的信息,想了半天可能只有這個緣由了!git
項目地址: https://github.com/all4you/bit0chat (將0刪掉)github
特性:算法
TODO:數據庫
bit0chat-example 模塊提供了一個服務端與客戶端的實現示例,能夠參照該示例進行本身的業務實現。promise
要啓動服務端,須要獲取一個 Server 的實例,能夠經過 ServerFactory 來獲取。服務器
目前只實現了單機模式下的 Server ,經過 SimpleServerFactory 只須要定義一個端口便可獲取一個單機的 Server 實例,以下所示:架構
public class StandaloneServerApplication { public static void main(String[] args) { Server server = SimpleServerFactory.getInstance() .newServer(8864); server.start(); } }
服務端啓動成功後,將顯示以下信息:框架
目前只實現了直連服務器的客戶端,經過 SimpleClientFactory 只須要指定一個 ServerAttr 便可獲取一個客戶端,而後進行客戶端與服務端的鏈接,以下所示:異步
public class DirectConnectServerClientApplication { public static void main(String[] args) { Client client = SimpleClientFactory.getInstance() .newClient(ServerAttr.getLocalServer(8864)); client.connect(); doClientBiz(client); } }
客戶端鏈接上服務端後,將顯示以下信息:
目前客戶端提供了三種 Func,分別是:登陸,查看在線用戶列表,發送單聊消息,每種 Func 有不一樣的命令格式。
經過在客戶端中執行如下命令 -lo houyi 123456
便可實現登陸,目前用戶中心還未實現,經過 Mock 的方式實現一個假的用戶服務,因此輸入任何的用戶名密碼都會登陸成功,而且會爲用戶建立一個用戶id。
登陸成功後,顯示以下:
再啓動一個客戶端,而且也執行登陸,登陸成功後,能夠執行 -lu
命令,獲取在線用戶列表,目前用戶是保存在內存中,獲取的結果以下所示:
用 gris 這個用戶向 houyi 這個用戶發送單聊信息,只要執行 -pc 1 hello,houyi
命令便可
其中第二個參數數要發送消息給那個用戶的用戶id,第三個參數是消息內容
消息發送方,發送完消息:
消息接收方,接收到消息:
客戶端和服務端之間維持着心跳,雙方都會檢查鏈接是否可用,客戶端每隔5s會向服務端發送一個 PingPacket,而服務端接收到這個 PingPacket 以後,會回覆一個 PongPacket,這樣表示雙方都是健康的。
當由於某種緣由,服務端沒有收到客戶端發送的消息,服務端將會把該客戶端的鏈接斷開,一樣的客戶端也會作這樣的檢查。
當客戶端與服務端之間的鏈接斷開以後,將會觸發客戶端 HealthyChecker 的 channelInactive 方法,從而進行客戶端的斷線重連。
單機版的架構只涉及到服務端、客戶端,另外有二者之間的協議層,以下圖所示:
除了服務端和客戶端以外,還有三大中心:消息中心,用戶中心,連接中心。
單機版沒法作到高可用,性能與可服務的用戶數也有必定的限制,因此須要有可擴展的集羣版,集羣版在單機版的基礎上增長了一個路由層,客戶端經過路由層來得到可用的服務端地址,而後與服務端進行通信,以下圖所示:
客戶端發送消息給另外一個用戶,服務端接收到這個請求後,從 Connection中心中獲取目標用戶「掛」在哪一個服務端下,若是在本身名下,那最簡單直接將消息推送給目標用戶便可,若是在其餘服務端,則須要將該請求轉交給目標服務端,讓目標服務端將消息推送給目標用戶。
經過一個自定義協議來實現服務端與客戶端之間的通信,協議中有以下幾個字段:
* * <p> * The structure of a Packet is like blow: * +----------+----------+----------------------------+ * | size | value | intro | * +----------+----------+----------------------------+ * | 1 bytes | 0xBC | magic number | * | 1 bytes | | serialize algorithm | * | 4 bytes | | packet symbol | * | 4 bytes | | content length | * | ? bytes | | the content | * +----------+----------+----------------------------+ * </p> *
每一個字段的含義
所佔字節 | 用途 |
---|---|
1 | 魔數,默認爲 0xBC |
1 | 序列化的算法 |
4 | Packet 的類型 |
4 | Packet 的內容長度 |
? | Packet 的內容 |
序列化算法將會決定該 Packet 在編解碼時,使用何種序列化方式。
Packet 的類型將會決定到達服務端的字節流將被反序列化爲什麼種 Packet,也決定了該 Packet 將會被哪一個 PacketHandler 進行處理。
內容長度將會解決 Packet 的拆包與粘包問題,服務端在解析字節流時,將會等到字節的長度達到內容的長度時,才進行字節的讀取。
除此以外,Packet 中還會存儲一個 sync 字段,該字段將指定服務端在處理該 Packet 的數據時是否須要使用異步的業務線程池來處理。
服務端與客戶端各自維護了一個健康檢查的服務,即 Netty 爲咱們提供的 IdleStateHandler,經過繼承該類,而且實現 channelIdle 方法便可實現鏈接 「空閒」 時的邏輯處理,當出現空閒時,目前咱們只關心讀空閒,咱們既能夠認爲這條連接出現問題了。
那麼只須要在連接出現問題時,將這條連接關閉便可,以下所示:
public class IdleStateChecker extends IdleStateHandler { private static final int DEFAULT_READER_IDLE_TIME = 15; private int readerTime; public IdleStateChecker(int readerIdleTime) { super(readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime, 0, 0, TimeUnit.SECONDS); readerTime = readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime; } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { log.warn("[{}] Hasn't read data after {} seconds, will close the channel:{}", IdleStateChecker.class.getSimpleName(), readerTime, ctx.channel()); ctx.channel().close(); } }
另外,客戶端須要額外再維護一個健康檢查器,正常狀況下他負責定時向服務端發送心跳,當連接的狀態變成 inActive 時,該檢查器將負責進行重連,以下所示:
public class HealthyChecker extends ChannelInboundHandlerAdapter { private static final int DEFAULT_PING_INTERVAL = 5; private Client client; private int pingInterval; public HealthyChecker(Client client, int pingInterval) { Assert.notNull(client, "client can not be null"); this.client = client; this.pingInterval = pingInterval <= 0 ? DEFAULT_PING_INTERVAL : pingInterval; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); schedulePing(ctx); } private void schedulePing(ChannelHandlerContext ctx) { ctx.executor().schedule(() -> { Channel channel = ctx.channel(); if (channel.isActive()) { log.debug("[{}] Send a PingPacket", HealthyChecker.class.getSimpleName()); channel.writeAndFlush(new PingPacket()); schedulePing(ctx); } }, pingInterval, TimeUnit.SECONDS); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.executor().schedule(() -> { log.info("[{}] Try to reconnecting...", HealthyChecker.class.getSimpleName()); client.connect(); }, 5, TimeUnit.SECONDS); ctx.fireChannelInactive(); } }
咱們知道,Netty 中維護着兩個 IO 線程池,一個 boss 主要負責連接的創建,另一個 worker 主要負責連接上的數據讀寫,咱們不該該使用 IO 線程來處理咱們的業務,由於這樣極可能會對 IO 線程形成阻塞,致使新連接沒法及時創建或者數據沒法及時讀寫。
爲了解決這個問題,咱們須要在業務線程池中來處理咱們的業務邏輯,可是這並非絕對的,若是咱們要執行的邏輯很簡單,不會形成太大的阻塞,則能夠直接在 IO 線程中處理,好比客戶端發送一個 Ping 服務端回覆一個 Pong,這種狀況是沒有必要在業務線程池中進行處理的,由於處理完了最終仍是要交給 IO 線程去寫數據。可是若是一個業務邏輯須要查詢數據庫或者讀取文件,這每每比較耗時間,因此就須要將這些操做封裝起來交給業務線程池去處理。
服務端容許客戶端在傳輸的 Packet 中指定採用何種方式進行業務的處理,服務端在將字節流解碼成 Packet 以後,會根據 Packet 中的 sync 字段的值,肯定怎樣對該 Packet 進行處理,以下所示:
public class ServerPacketDispatcher extends SimpleChannelInboundHandler<Packet> { @Override public void channelRead0(ChannelHandlerContext ctx, Packet request) { // if the packet should be handled async if (request.getAsync() == AsyncHandle.ASYNC) { EventExecutor channelExecutor = ctx.executor(); // create a promise Promise<Packet> promise = new DefaultPromise<>(channelExecutor); // async execute and get a future Future<Packet> future = executor.asyncExecute(promise, ctx, request); future.addListener(new GenericFutureListener<Future<Packet>>() { @Override public void operationComplete(Future<Packet> f) throws Exception { if (f.isSuccess()) { Packet response = f.get(); writeResponse(ctx, response); } } }); } else { // sync execute and get the response packet Packet response = executor.execute(ctx, request); writeResponse(ctx, response); } } }
bit0chat 除了能夠做爲 IM 框架以外,還能夠做爲一個通用的通信框架。
Packet 做爲通信的載體,經過繼承 AbstractPacket 便可快速實現本身的業務,搭配 PacketHandler 做爲數據處理器便可實現客戶端與服務端的通信。