快速開始算法
bitchat-example模塊提供了一個服務端與客戶端的實現示例,能夠參照該示例進行本身的業務實現。數據庫
啓動服務端promise
要啓動服務端,須要獲取一個 Server 的實例,能夠經過 ServerFactory 來獲取。服務器
目前只實現了單機模式下的 Server ,經過 SimpleServerFactory 只須要定義一個端口便可獲取一個單機的 Server 實例,以下所示:架構
public class StandaloneServerApplication {
public static void main(String[] args) {
Server server = SimpleServerFactory.getInstance()
.newServer(8864);
server.start();
}
} 框架
服務端啓動成功後,將顯示以下信息:異步
啓動客戶端async
目前只實現了直連服務器的客戶端,經過 SimpleClientFactory 只須要指定一個 ServerAttr 便可獲取一個客戶端,而後進行客戶端與服務端的鏈接,以下所示:ide
public class DirectConnectServerClientApplication { 性能
public static void main(String[] args) {
Client client = SimpleClientFactory.getInstance()
.newClient(ServerAttr.getLocalServer(8864));
client.connect();
doClientBiz(client);
}
}
客戶端鏈接上服務端後,將顯示以下信息:
體驗客戶端的功能
目前客戶端提供了三種 Func,分別是:登陸,查看在線用戶列表,發送單聊消息,每種 Func 有不一樣的命令格式。
登陸
經過在客戶端中執行如下命令 -lo houyi 123456 便可實現登陸,目前用戶中心還未實現,經過 Mock 的方式實現一個假的用戶服務,因此輸入任何的用戶名密碼都會登陸成功,而且會爲用戶建立一個用戶id。
登陸成功後,顯示以下:
查看在線用戶
再啓動一個客戶端,而且也執行登陸,登陸成功後,能夠執行 -lu 命令,獲取在線用戶列表,目前用戶是保存在內存中,獲取的結果以下所示:
發送單聊信息
用 gris 這個用戶向 houyi 這個用戶發送單聊信息,只要執行 -pc 1 hello,houyi 命令便可
其中第二個參數數要發送消息給那個用戶的用戶id,第三個參數是消息內容
消息發送方,發送完消息:
消息接收方,接收到消息:
客戶端斷線重連
客戶端和服務端之間維持着心跳,雙方都會檢查鏈接是否可用,客戶端每隔5s會向服務端發送一個 PingPacket,而服務端接收到這個 PingPacket 以後,會回覆一個 PongPacket,這樣表示雙方都是健康的。
當由於某種緣由,服務端沒有收到客戶端發送的消息,服務端將會把該客戶端的鏈接斷開,一樣的客戶端也會作這樣的檢查。
當客戶端與服務端之間的鏈接斷開以後,將會觸發客戶端 HealthyChecker 的 channelInactive 方法,從而進行客戶端的斷線重連。
總體架構單機版
單機版的架構只涉及到服務端、客戶端,另外有二者之間的協議層,以下圖所示:
除了服務端和客戶端以外,還有三大中心:消息中心,用戶中心,連接中心。
集羣版
單機版沒法作到高可用,性能與可服務的用戶數也有必定的限制,因此須要有可擴展的集羣版,集羣版在單機版的基礎上增長了一個路由層,客戶端經過路由層來得到可用的服務端地址,而後與服務端進行通信,以下圖所示:
客戶端發送消息給另外一個用戶,服務端接收到這個請求後,從 Connection中心中獲取目標用戶「掛」在哪一個服務端下,若是在本身名下,那最簡單直接將消息推送給目標用戶便可,若是在其餘服務端,則須要將該請求轉交給目標服務端,讓目標服務端將消息推送給目標用戶。
自定義協議
經過一個自定義協議來實現服務端與客戶端之間的通信,協議中有以下幾個字段:
*
* <p>
* The structure of a Packet is like blow:
* +----------+----------+----------------------------+
* | size | value | intro |
* +----------+----------+----------------------------+
* | 1 bytes | 0xBC | magic number |
* | 1 bytes | | serialize algorithm |
* | 4 bytes | | packet symbol |
* | 4 bytes | | content length |
* | ? bytes | | the content |
* +----------+----------+----------------------------+
* </p>
*
每一個字段的含義
[td]
所佔字節
用途
1
魔數,默認爲 0xBC
1
序列化的算法
4
Packet 的類型
4
Packet 的內容長度
?
Packet 的內容
序列化算法將會決定該 Packet 在編解碼時,使用何種序列化方式。
Packet 的類型將會決定到達服務端的字節流將被反序列化爲什麼種 Packet,也決定了該 Packet 將會被哪一個 PacketHandler 進行處理。
內容長度將會解決 Packet 的拆包與粘包問題,服務端在解析字節流時,將會等到字節的長度達到內容的長度時,才進行字節的讀取。
除此以外,Packet 中還會存儲一個 sync 字段,該字段將指定服務端在處理該 Packet 的數據時是否須要使用異步的業務線程池來處理。
健康檢查
服務端與客戶端各自維護了一個健康檢查的服務,即 Netty 爲咱們提供的 IdleStateHandler,經過繼承該類,而且實現 channelIdle 方法便可實現鏈接 「空閒」 時的邏輯處理,當出現空閒時,目前咱們只關心讀空閒,咱們既能夠認爲這條連接出現問題了。
那麼只須要在連接出現問題時,將這條連接關閉便可,以下所示:
public class IdleStateChecker extends IdleStateHandler {
private static final int DEFAULT_READER_IDLE_TIME = 15;
private int readerTime;
public IdleStateChecker(int readerIdleTime) {
super(readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime, 0, 0, TimeUnit.SECONDS);
readerTime = readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime;
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
log.warn("[{}] Hasn't read data after {} seconds, will close the channel:{}",
IdleStateChecker.class.getSimpleName(), readerTime, ctx.channel());
ctx.channel().close();
}
}
另外,客戶端須要額外再維護一個健康檢查器,正常狀況下他負責定時向服務端發送心跳,當連接的狀態變成 inActive 時,該檢查器將負責進行重連,以下所示:
public class HealthyChecker extends ChannelInboundHandlerAdapter {
private static final int DEFAULT_PING_INTERVAL = 5;
private Client client;
private int pingInterval;
public HealthyChecker(Client client, int pingInterval) {
Assert.notNull(client, "client can not be null");
this.client = client;
this.pingInterval = pingInterval <= 0 ? DEFAULT_PING_INTERVAL : pingInterval;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
schedulePing(ctx);
}
private void schedulePing(ChannelHandlerContext ctx) {
ctx.executor().schedule(() -> {
Channel channel = ctx.channel();
if (channel.isActive()) {
log.debug("[{}] Send a PingPacket", HealthyChecker.class.getSimpleName());
channel.writeAndFlush(new PingPacket());
schedulePing(ctx);
}
}, pingInterval, TimeUnit.SECONDS);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(() -> {
log.info("[{}] Try to reconnecting...", HealthyChecker.class.getSimpleName());
client.connect();
}, 5, TimeUnit.SECONDS);
ctx.fireChannelInactive();
}
}
業務線程池
咱們知道,Netty 中維護着兩個 IO 線程池,一個 boss 主要負責連接的創建,另一個 worker 主要負責連接上的數據讀寫,咱們不該該使用 IO 線程來處理咱們的業務,由於這樣極可能會對 IO 線程形成阻塞,致使新連接沒法及時創建或者數據沒法及時讀寫。
爲了解決這個問題,咱們須要在業務線程池中來處理咱們的業務邏輯,可是這並非絕對的,若是咱們要執行的邏輯很簡單,不會形成太大的阻塞,則能夠直接在 IO 線程中處理,好比客戶端發送一個 Ping 服務端回覆一個 Pong,這種狀況是沒有必要在業務線程池中進行處理的,由於處理完了最終仍是要交給 IO 線程去寫數據。可是若是一個業務邏輯須要查詢數據庫或者讀取文件,這種操做每每比較耗時間,因此就須要將這些操做封裝起來交給業務線程池去處理。
服務端容許客戶端在傳輸的 Packet 中指定採用何種方式進行業務的處理,服務端在將字節流解碼成 Packet 以後,會根據 Packet 中的 sync 字段的值,肯定怎樣對該 Packet 進行處理,以下所示:
public class ServerPacketDispatcher extends
SimpleChannelInboundHandler<Packet> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Packet request) {
// if the packet should be handled async
if (request.getAsync() == AsyncHandle.ASYNC) {
EventExecutor channelExecutor = ctx.executor();
// create a promise
Promise<Packet> promise = new DefaultPromise<>(channelExecutor);
// async execute and get a future
Future<Packet> future = executor.asyncExecute(promise, ctx, request);
future.addListener(new GenericFutureListener<Future<Packet>>() {
@Override
public void operationComplete(Future<Packet> f) throws Exception {
if (f.isSuccess()) {
Packet response = f.get();
writeResponse(ctx, response);
}
}
});
} else {
// sync execute and get the response packet
Packet response = executor.execute(ctx, request);
writeResponse(ctx, response);
}
}
}
不止是IM框架
bitchat除了能夠做爲 IM 框架以外,還能夠做爲一個通用的通信框架。
Packet 做爲通信的載體,經過繼承 AbstractPacket 便可快速實現本身的業務,搭配 PacketHandler 做爲數據處理器便可實現客戶端與服務端的通信。