近來在學習Java NIO網絡開發知識,寫了一個基於Java NIO的多人在線聊天工具MyChat練練手。源碼公開在Coding上:html
https://coding.net/u/hust_wsh/p/MyChat/git ,開發環境是Ubuntu14.04+Eclipse Mars+JDK1.8。git
編寫一個基於Java NIO的多人在線聊天工具,須要如下幾方面的知識:客戶端服務器模型,Java NIO中的Selector,Channel,ByteBuffer,Collections以及序列化和反序列化的知識。下面來對照源碼逐一剖析MyChat的源碼構成:服務器
一.服務器網絡
爲了便於實時分析服務器在線人數和聊天室列表,須要在服務器端提供一個交互接口,也就是獲取System.in的輸入,執行相應的操做,以下所示: 多線程
System.out.println("===輸入選擇項==="); System.out.println("1.獲取用戶列表;2.獲取聊天室列表;3.獲取指定聊天室成員;4.關閉服務器"); boolean isExit=false; Scanner scanner=new Scanner(System.in);
因爲主線程須要運行交互界面,這樣一來執行與客戶端交互任務的代碼就要放在另一個線程中了:函數
ChatServer server=new ChatServer(); Thread serverThread=new Thread(server,"聊天服務器"); serverThread.setDaemon(true);//後臺進程 serverThread.start();
接下來將分別介紹服務器端的實現類ChatServer的關鍵成員:工具
private Selector mSelector=null;//用於註冊全部鏈接到服務器的SocketChannel對象
//保存全部用戶的Map private Map<String,UserEntity> mUsers=Collections.synchronizedMap(new HashMap<String,UserEntity>());
//保存全部聊天室的Map
private Map<String,ChatRoom> mRooms=Collections.synchronizedMap(new HashMap<String,ChatRoom>());//聊天室
第一個成員變量mSelector是一個Selector對象,用於管理全部鏈接到服務器的Channel,爲了管理多個通道的讀寫,要將不一樣的通道註冊到一個Selector對象上。每一個通道分配有一個SelectionKey。而後程序能夠詢問這個Selector對象,哪些通道已經準備就緒能夠無阻塞的完成你但願完成的操做,能夠請求Selector對象返回相應的鍵集合。經過調用Selector類的惟一構造函數:靜態工廠方法open()來建立新的選擇器,並經過register()方法註冊通道。性能
mSelector=Selector.open(); ServerSocketChannel server=ServerSocketChannel.open(); InetSocketAddress isa=new InetSocketAddress(mHost, mPort); server.bind(isa);//綁定指定端口 server.configureBlocking(false); server.register(mSelector, SelectionKey.OP_ACCEPT); System.out.println("服務器在"+mPort+"端口啓動成功");
註冊成功後,就能夠經過Selector的select()方法查詢已經就緒的通道。學習
while(mSelector.select()>0) { Iterator<SelectionKey> iterator=mSelector.selectedKeys().iterator(); while(iterator.hasNext()) { SelectionKey sk=iterator.next(); iterator.remove();
select()方法用於查詢註冊到Selector上的待處理的就緒Channel,是一個阻塞方法,直到至少有一個註冊的Channel準備好以後就能夠進行處理。SelectionKey對象至關於通道的指針,能夠保存通道的鏈接狀態。Selector對象的selectedKeys()方法能夠返回全部註冊Channel的SelectionKey。接下來能夠經過isAccetable(),isReadable(),isWritable()等方法測試該鍵能進行的操做。測試
ServerSocketChannel類只有一個目的:接受入站鏈接。經過註冊到Selector對象來獲取入站鏈接通知,以下所示:
if(sk.isAcceptable()) { SocketChannel sc=server.accept();//開始接收客戶端鏈接 sc.configureBlocking(false); sc.register(mSelector, SelectionKey.OP_READ); sk.interestOps(SelectionKey.OP_ACCEPT); }
接下來能夠經過sk.isReadable()進入處理客戶端數據的代碼塊:
if(sk.isReadable())//有數據 { SocketChannel sc=(SocketChannel)sk.channel(); ByteBuffer buffer=ByteBuffer.allocate(1024); ByteArrayOutputStream boStream=new ByteArrayOutputStream(); try { while(sc.read(buffer)>0)//TODO:性能問題 { buffer.flip(); boStream.write(Arrays.copyOfRange(buffer.array(), 0, buffer.limit())); } byte[] frame=boStream.toByteArray(); boStream.close();
爲了能進一步講明白爲何須要上面這種方式讀取客戶端信息,這裏先插入講解一下服務器和客戶端交互的信使類Message。爲了提高擴展性,我定義了一個Serializable類Message,用於服務器和客戶端之間進行交互(如登陸,返回結果,建立聊天室等)。Message類的定義以下:
1 class Message implements Serializable 2 { 3 private static final long serialVersionUID = 1L; 4 private Map<FieldType,String> fields=new HashMap<>();//TODO:泛型支持,任意消息類型,包括文本,圖片,語音,視頻,文件等 5 private Commands command; 6 public Message(Commands command) 7 { 8 this.command=command; 9 } 10 public Commands getCommand() 11 { 12 return this.command; 13 } 14 public Message set(FieldType key,String value) 15 { 16 if(key!=null&&value!=null) 17 { 18 fields.put(key,value); 19 } 20 return this; 21 } 22 public String get(FieldType key) 23 { 24 return fields.get(key); 25 } 26 27 public byte[] toBytes() 28 { 29 return SerializeHelper.serialize(this); 30 } 31 32 public ByteBuffer wrap() 33 { 34 byte[] frame=toBytes(); 35 return ByteBuffer.wrap(frame); 36 } 37 }
其中有兩個關鍵的成員:一個Map型的用於保存數據的field成員和一個枚舉類型的用於代表命令類型的command成員。其中Command枚舉定義以下:
enum Commands{ LOG_IN, LOG_OUT, QUERY_USERS, QUERY_ALL_CHAT_ROOMS, QUERY_MY_CHAT_ROOMS, QUERY_ROOM_MEMBERS, HEART_BEAT, MSG_P2P,//我的對我的的消息 MSG_P2R,//聊天室消息 CREATE_CHAT_ROOM, JOIN_CHAT_ROOM, LEAVE_CHAT_ROOM, SET_USER_NAME; };
另外,爲了指名攜帶數據的類型,定義了一個FieldType枚舉,以下:
enum FieldType{ USER_ID, USER_NAME, PASS_WD, PEER_ID,//單聊對象的ID ROOM_ID,//聊天室ID USER_LIST,//用戶列表 ROOM_LIST_ALL,//全部房間列表 ROOM_LIST_ME,//個人聊天室列表 ROOM_MEMBERS,//用戶列表 MSG_TXT, RESPONSE_STATUS, ENCODING; };
這樣一來,服務器和客戶端就能夠經過這種可序列化的Message相互通訊了。具體就是客戶端將要發送給服務器的數據封裝在Message對象中後,經過SocketChanne發送到服務器,服務器收到數據後經過反序列化獲取原始的Message對象,並根據command成員來判斷接收到的是什麼類型的Message,如登陸,點對點消息等。
Message msg=(Message)SerializeHelper.deSerialize(frame); if(msg!=null) { String userId=msg.get(FieldType.USER_ID); switch (msg.getCommand()) { case LOG_IN: { System.out.println("用戶"+userId+"請求登陸..."); Message message=new Message(Commands.LOG_IN); //TODO:檢查用戶名密碼,暫時沒有註冊功能,就只檢測用戶名是否重複 if(!mUsers.containsKey(userId)) { message.set(FieldType.RESPONSE_STATUS,"成功"); System.out.println("用戶"+userId+"登陸成功"); UserEntity user=new UserEntity(userId,sc); mUsers.put(userId,user); } else { message.set(FieldType.RESPONSE_STATUS,"該賬號已經登陸"); } //發送登陸結果 sendRawMessage(sc, message); break; }
這裏出現的mUsers對象,就是我要介紹的服務器端第二個重要的成員變量,mUsers是一個用Collections.synchronizedSet封裝的支持多線程訪問的HashSet,用於保存[用戶ID->用戶對象]的映射。所謂用戶對象就是另外定義的一個用於保存用戶基本信息的類,其中包含了用戶的id,passwd,對應的SocketChannel和所加入的聊天室集合。以下所示:
class UserEntity{ private String mUserId; private String mPassWd; private SocketChannel mSocketChannel; private Set<String> mJoinedRooms=Collections.synchronizedSet(new HashSet<String>());
服務器端還有一個重要的成員變量,用於保存服務器端全部聊天室的集合,也是一個用Collections.synchronizedSet封裝的HashSet,用於保存[聊天室ID->聊天室對象]的映射。聊天室對象是專門定義的一個保存聊天室基本信息的類,其中包含了聊天室id,聊天室成員集合。以下所示:
final class ChatRoom { private String mRoomId=null; private Set<String> mUsers=Collections.synchronizedSet(new HashSet<String>());
到此,服務器端的代碼基本剖析完畢,接下來咱們看看客戶端的代碼。
二.客戶端
客戶端的代碼相對服務器來講要簡單許多,一個典型的NIO客戶端程序鏈接服務器流程以下所示:
mSelector=Selector.open(); InetSocketAddress remote=new InetSocketAddress(host, port); mSocketChannel=SocketChannel.open(remote); mSocketChannel.configureBlocking(false); mSocketChannel.register(mSelector, SelectionKey.OP_READ);
其中註冊Selector的接口幾乎與服務器一致,除了傳遞給register方法的第二個參數不一樣。註冊完通道後就能夠向服務器發送登陸請求了:
Message message=new Message(Commands.LOG_IN); message.set(FieldType.USER_ID, userid); message.set(FieldType.PASS_WD, passwd); sendRawMessage(message);
其中sendRawMessage是一個私有方法,用於將Message序列化後使用ByteBuffer經過SocketChannel發送到服務器端,具體代碼以下:
private void sendRawMessage(Message message) { if(mSocketChannel!=null&&message!=null) { try { mSocketChannel.write(message.wrap()); } catch (Exception e) { e.printStackTrace(); } } }
我爲Message類設計了一個wrap()方法能夠將Message序列化後的byte[]包裝成ByteBuffer返回,從而能夠直接做爲SocketChannel.write()方法的參數。具體代碼能夠參考文章開頭的Git倉庫。
與服務器同樣,客戶端須要接收用戶輸入,從而也將與服務器交互的部分放在單獨的線程運行。我將這個線程類放在ChatClient類的內部做爲嵌套類,這樣能夠直接訪問外部類的成員變量,爲線程之間通訊提供便利。
三.實例分析
介紹完服務器和客戶端的設計以後,下面以建立聊天室爲例詳細介紹客戶端和服務器端的通訊流程。
當客戶端登陸到服務器中後,服務器會保存客戶端的用戶ID以及對應的SocketChannel信息,客戶端經過一條建立聊天室的Message向服務器申請建立聊天室:
Message message=new Message(Commands.CREATE_CHAT_ROOM); message.set(FieldType.USER_ID,mUserId ); message.set(FieldType.ROOM_ID, roomId); sendRawMessage(message);
如上所示,該Message的命令字是Commands.CREATE_CHAT_ROOM,包含了兩個域,分別是建立者的ID和待建立的房間ID(這裏爲了設計簡便,將ID和名稱等同爲一個概念,實際中ID應該是一個惟一的整型量,名稱是聊天室的名字,能夠重複)。服務器端經過反序列化Message,並提取對應的命令字進入對應的處理邏輯:
case CREATE_CHAT_ROOM: { System.out.println("用戶"+userId+"請求建立聊天室"); String roomId=msg.get(FieldType.ROOM_ID); Message message=new Message(Commands.CREATE_CHAT_ROOM); if(!StringHelper.isNullOrTrimEmpty(roomId)) { if(!mRooms.containsKey(roomId)) { ChatRoom room=new ChatRoom(roomId); room.addUser(userId); mRooms.put(roomId, room); UserEntity user=mUsers.get(userId); if(user!=null) user.joinRoom(roomId); message.set(FieldType.RESPONSE_STATUS, "成功"); } else { message.set(FieldType.RESPONSE_STATUS, "建立失敗,已存在同名聊天室"); } } else//返回錯誤消息 { message.set(FieldType.RESPONSE_STATUS, "建立失敗,聊天室名稱不能爲空"); } sendRawMessage(sc, message); break; }
咱們來仔細分析下上面的代碼。首先從Message中提取到了userId和roomId,而後判斷服務器端mRooms集合是否已經存在同名聊天室,若是不存在,則建立一個新的聊天室:ChatRoom room=new Chat(roomId)。並將建立者本人加入到聊天室用戶列表中:room.addUser(userId)。同時,爲了方便查找用戶加入的全部聊天室,還將該聊天室的ID經過UserEntity的joinRoom()方法保存到了UserEntity的聊天室集合中,最後將表示正確結果的Message發送給請求客戶端;反之若是已經存在同名聊天室,則將包含錯誤信息的Message發送給客戶端。而客戶端負責與服務器端交互的線程則經過反序列化Message獲取操做結果,並顯示給用戶。
爲了更加直觀地展現MyChat的工做流程,將終端運行的結果整了幾張截圖附在下面:
客戶端1:
客戶端2:
服務器端:
本文爲原創,轉載請聲明:轉載自hust_wsh的技術博客:http://www.cnblogs.com/hust_wsh/p/5166001.html