目錄html
瘋狂創客圈 Java 分佈式聊天室【 億級流量】實戰系列之 17【 博客園 總入口 】java
源碼IDEA工程獲取連接:Java 聊天室 實戰 源碼面試
你們好,我是做者尼恩。目前和幾個小夥伴一塊兒,組織了一個高併發的實戰社羣【瘋狂創客圈】。正在開始高併發、億級流程的 IM 聊天程序 學習和實戰bootstrap
前面,已經完成一個高性能的 Java 聊天程序的四件大事:安全
完成了協議選型,選擇了性能更佳的 Protobuf協議。具體的文章爲: Netty+Protobuf 整合一:實戰案例,帶源碼服務器
介紹了 通信消息數據包的幾條設計準則。具體的文章爲: Netty +Protobuf 整合二:protobuf 消息通信協議設計的幾個準則session
解決了一個很是基礎的問題,這就是通信的 粘包和半包問題。具體的文章爲:Netty 粘包/半包 全解 | 史上最全解讀併發
前一篇文件,已經完成了 系統三大組成模塊的組成介紹。 具體的文章爲:Netty聊天程序(實戰一):從0開始實戰100w級流量應用分佈式
今天介紹很是重要的一個內容:ide
客戶端的通信、登陸請求和登陸響應設計。
下面,開啓今天的 驚險和刺激實戰之旅。
什麼是會話?
爲了方便客戶端的開發,管理與服務器的鏈接,這裏引入一個很是重要的中間角色——Session (會話)。有點兒像Web開發中的Tomcat的服務器 Session,可是又有很大的不一樣。
客戶端的會話概念圖,以下圖所示:
客戶端會話有兩個很重的成員,一個是user,表明了擁有會話的用戶。一個是channel,表明了鏈接的通道。兩個成員的做用是:
經過user,能夠得到當前的用戶信息
經過channel,能夠向服務器發送消息
因此,會話左擁右抱,左手用戶資料,右手服務器的鏈接。在本例的開發中,會常常用到。
從邏輯上來講,客戶端有三個子的功能模塊。
模塊一:Handler
入站處理器。
在Netty 中很是重要,負責處理入站消息。比方,服務器發送過來登陸響應,服務器發送過來的聊天消息。
模塊二:MsgBuilder
消息組裝器。
將 Java 內部的 消息 Bean 對象,轉成發送出去的 Protobuf 消息。
模塊三:Sender
消息發送器。
Handler 負責收的工做。Sender 則是負責將消息發送出去。
三大子模塊的類關係圖:
介紹完成了主要的組成部分後,開始服務器的鏈接和Session 的建立。
經過bootstrap 幫助類,設置完成線程組、通道類型,向管道流水線加入處理器Handler後,就能夠開始鏈接服務器的工做。
本小節須要重點介紹的,是鏈接成功以後,建立 Session,而且將 Session和 channel 相互綁定。
代碼以下:
package com.crazymakercircle.chat.client; //... @Data @Service("EchoClient") public class ChatClient { static final Logger LOGGER = LoggerFactory.getLogger(ChatClient.class); //.. private Channel channel; private ClientSender sender; public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) { ChannelFuture f = null; try { if (bootstrap != null) { bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.remoteAddress(host, port); // 設置通道初始化 bootstrap.handler( new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufDecoder()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(chatClientHandler); } } ); LOGGER.info(new Date() + "客戶端開始登陸[瘋狂創客圈IM]"); f = bootstrap.connect().addListener((ChannelFuture futureListener) -> { final EventLoop eventLoop = futureListener.channel().eventLoop(); if (!futureListener.isSuccess()) { LOGGER.info("與服務端斷開鏈接!在10s以後準備嘗試重連!"); eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS); initFalg = false; } else { initFalg = true; } if (initFalg) { LOGGER.info("EchoClient客戶端鏈接成功!"); LOGGER.info(new Date() + ": 鏈接成功,啓動控制檯線程……"); channel = futureListener.channel(); // 建立會話 ClientSession session = new ClientSession(channel); channel.attr(ClientSession.SESSION).set(session); session.setUser(ChatClient.this.getUser()); startConsoleThread(); } }); // 阻塞 f.channel().closeFuture().sync(); } } catch (Exception e) { LOGGER.info("客戶端鏈接失敗!" + e.getMessage()); } } //... }
Session和 channel 相互綁定,再截取出來,分析一下。
ClientSession session = new ClientSession(channel); channel.attr(ClientSession.SESSION).set(session); session.setUser(ChatClient.this.getUser());
爲何要Session和 channel 相互綁定呢?
Netty 中的 channel ,實現了AttributeMap接口 ,至關於一個 Map容器。 反向的綁定,利用了channel 的這個特色。
看一下AttributeMap接口 如何使用的?
AttributeMap 是一個接口,而且只有一個attr()方法,接收一個AttributeKey類型的key,返回一個Attribute類型的value。按照Javadoc,AttributeMap實現必須是線程安全的。
AttributeMap內部結構看起來像下面這樣:
不要被嚇着了,其實很簡單。
AttributeMap 的使用,主要是設置和取值。
AttributeMap 的設值的方法,舉例以下:
channel.attr(ClientSession.SESSION).set(session);
這個是鏈式調用,attr() 方法中的是 Key, set()方法中的是Value。 這樣就完成了 Key-> Value 的設置。
AttributeMap 的取值的方法,舉例以下:
ClientSession session = ctx.channel().attr(ClientSession.SESSION).get();
這個是鏈式調用,attr() 方法中的是 Key, get()方法返回 的是Value。 這樣就完成了 取值。
關鍵是,這個key比較特殊。
通常的Map,Key 的類型多半爲字符串。可是這裏的Key不行,有特殊的約定。
Key的類型必須是 AttributeKey 類型,並且這是一個泛型類,它的優點是,不須要對值進行強制的類型轉換。
Key的例子以下:
public static final AttributeKey<ClientSession> SESSION = AttributeKey.valueOf("session");
登陸的請求,大體以下:
ClientSender的 代碼以下:
package com.crazymakercircle.chat.client; @Service("ClientSender") public class ClientSender { static final Logger LOGGER = LoggerFactory.getLogger(ClientSender.class); private User user; private ClientSession session; public void sendLoginMsg() { LOGGER.info("開始登錄"); ProtoMsg.Message message = LoginMsgBuilder.buildLoginMsg(user); session.writeAndFlush(message); } //... public boolean isLogin() { return session.isLogin(); } }
Sender 首先經過 LoginMsgBuilder,構造一個protobuf 消息。而後調用session發送消息。
session 會經過綁定的channel ,將消息發送出去。
session的代碼,以下:
public synchronized void writeAndFlush(Object pkg) { channel.writeAndFlush(pkg); }
其餘的客戶端請求流程,大體也是相似的。
一個客戶端的請求大體的流程有三步,分別從Sender 到session到channel。
這是從服務器過來的入站消息。 若是登陸成功,服務器會發送一個登陸成功的響應過來。 這個響應,會從channel 傳遞到Handler。
處理器 LoginResponceHandler 的代碼以下:
package com.crazymakercircle.chat.clientHandler; //... public class LoginResponceHandler extends ChannelInboundHandlerAdapter { static final Logger LOGGER = LoggerFactory.getLogger(LoginResponceHandler.class); /** * 業務邏輯處理 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { LOGGER.info("msg:{}", msg.toString()); if (msg != null && msg instanceof ProtoMsg.Message) { ProtoMsg.Message pkg = (ProtoMsg.Message) msg; ProtoMsg.LoginResponse info = pkg.getLoginResponse(); ProtoInstant.ResultCodeEnum result = ProtoInstant.ResultCodeEnum.values()[info.getCode()]; if (result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) { ClientSession session = ctx.channel().attr(ClientSession.SESSION).get(); session.setLogin(true); LOGGER.info("登陸成功"); } } } }
LoginResponceHandler 對消息類型進行判斷,若是是請求響應消息,而且登陸成功。 則取出綁定的session,經過session,進一步完成登陸成功後的業務處理。
好比設置成功的狀態,完成一些成功的善後處理操做等等。
其餘的客戶端響應處理流程,大體也是相似的。
至此爲止,能夠看到,客戶端登陸的完整流程。
下一篇:服務器的請求處理和通信的全流程閉環介紹。
Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰
瘋狂創客圈 【 博客園 總入口 】