基於P2P的局域網即時通訊應用

前言

這是一個使用java語言開發的基於P2P的局域網即時通訊Android應用,界面是高仿微信的聊天界面,在裏面你將會學到java多線程併發編程、Socket編程、UDP廣播、TCP鏈接、還有圖片加載相關知識等等。java

項目地址:P2Pgit

設計思路

P2P不一樣於C/S方式,它沒有集中式的服務器,在P2P中,程序既是服務器又是客戶端,在同一個局域網內,每一個用戶發送的消息不會通過路由器轉發到其餘局域網,那麼如何保證你們都在同一個局域網內呢?答案是隻要你們都連上同一個WIFI就行,這樣就保證你們在同一個局域網內,這時你手機或電腦就會被路由器分配一個ip地址。github

以下圖:編程

下面是設計思路:json

一、用戶登錄階段

(1)用戶A打開P2P程序的,選擇一個名字和頭像後,點擊登錄,就開始登錄上局域網,用戶A登錄時程序同時會啓動兩個線程,一個線程裏面啓動UDP服務端(端口號9156),用來等待其餘用戶的登錄,另外一個線程裏面啓動TCP服務端(端口號9155),用來等待其餘用戶的Socket鏈接,在登錄同時用戶A還會使用UDP廣播一個UDP包出去。這個UDP包包含了用戶姓名和ip地址等信息,UDP包會發送給同一局域網內全部具備相同端口的UDP服務端程序,包括本程序。數組

(2)這時若是有其餘在線的用戶,那麼每一個在線的用戶程序中的UDP服務端就會收下這個UDP包,而後把用戶信息取出來並把用戶A加入在線列表,由於UDP包中包含用戶A ip地址,因此每一個在線的用戶使用用戶A的ip地址向用戶A發一個回覆。緩存

(3)用戶A等待一段時間後,就會收到全部在線用戶的回覆,而後把全部的在線用戶加入本身的在線列表。服務器

登錄階段如圖:微信

二、 用戶聊天階段

(1)用戶A選擇本身的在線列表中的用戶B聊天,這時用戶A就會向用戶B發起TCP鏈接,與此同時用戶B的TCP服務端中就會收到一個Socket請求,用戶B把這個Socket請求緩存起來,同理用戶A發起鏈接時也會產生一個Socket,用戶A也把這個Socket保存緩存起來,這樣下一次就不用重複創建鏈接。markdown

(2)這樣雙方都擁有一個Socket,雙方基於Socket與Socket之間創建的鏈接上聊天(傳輸文件,文字等)。

聊天階段如圖:

三、用戶退出階段

(1)當用戶A離開程序,退出局域網時,用戶A就像登錄同樣使用廣播地址廣播一個UDP包出去,UDP中包含了要退出登錄的信息,那麼在局域網內的在線用戶收到這個UDP後,就把用戶A移除出在線用戶列表,若是有用戶A的Socket鏈接,就把Socket鏈接關閉掉。

(2)用戶A發出退出廣播後,也把本身緩存的全部Socket鏈接關閉掉。

程序運行截圖

首先用戶A和用戶B登錄,分別選擇一個頭像和姓名,以下

登錄後,雙方正常來說,是隻有一個在線用戶的,可是考慮到平時測試兩臺手機不方便,就沒有把本身過濾掉,因此如今雙方都有兩個在線用戶,用戶A的在線用戶是用戶B和本身,用戶B的在線用戶是用戶A和本身,以下:

下面是雙方聊天的過程,如今能夠發文字、圖片、語音、文件(支持發送大文件,發送大文件有進度顯示)。

關鍵性的代碼

下面紅色方框內的是關鍵性類,以下:

下面講解一些關鍵性代碼:

一、初始化TCP服務端

初始化TCP服務端,在ConnectManager類中,以下:

/** * 初始化ServerSocket監聽,綁定端口號, 等待客戶端鏈接 */
public void initListener(){
    mExecutor.execute(() -> {
        try {
            //建立ServerSocket監聽,並綁定端口號
            mServerSocket = new ServerSocket(PORT);
            LogUtil.d(TAG, "開啓服務端監聽,端口號 = " + PORT);
        } catch (IOException e) {
            e.printStackTrace();
            LogUtil.e(TAG, "綁定端口號失敗,e = " + e.getMessage());
        }
        while (true){
            try {
                //調用accept()開始監聽,等待客戶端的鏈接
                Socket socket = mServerSocket.accept();
                String ipAddress = socket.getInetAddress().getHostAddress();
                if(isClose(ipAddress)){
                    LogUtil.d(TAG, "一個用戶加入聊天,socket = " + socket);
                    //每一個客戶端鏈接用一個線程不斷的讀
                    ReceiveThread receiveThread = new ReceiveThread(socket);
                    //緩存客戶端的鏈接
                    mClients.put(ipAddress, socket);
                    //放到線程池中執行
                    mExecutor.execute(receiveThread);
                    LogUtil.d(TAG, "已鏈接的客戶端數量:" + mClients.size());
                    //簡單的心跳機制
                    heartBeat(ipAddress);
                }
            } catch (IOException e) {
                e.printStackTrace();
                LogUtil.e(TAG, "調用accept()監聽失敗, e = " + e.getMessage());
                break;
            }
        }
        try {
            //釋放掉ServerSocket佔用的端口號
            mServerSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
            LogUtil.e(TAG, "關閉端口號失敗, e = " + e.getMessage());
        }
    });
}
複製代碼

ConnectManager是用來管理每一個用戶的鏈接,ConnectManager的initListener()方法裏面會綁定一個端口號,而後調用accept()方法等待其餘客戶端的鏈接,若是有客戶端的鏈接請求,就會爲每個客戶端的鏈接建立一個Thread,這個Thread會不停等待接收客戶端的消息。以下:

public class ReceiveThread implements Runnable{
    //...
    @Override
    public void run() {
        while (true){
            Mes mes;
            try{
                InputStream in = mSocket.getInputStream();
                mes = receiveMessageByType(in);
               //...
            } catch (Exception e) {
                e.printStackTrace();
                LogUtil.e(TAG, "獲取客戶端消息失敗,e = " + e.getMessage());
                //兩端的Socker鏈接都要關閉
                ConnectManager.getInstance().removeConnect(mClientIp);
                ConnectManager.getInstance().removeReceiveCallback(mClientIp);
                ConnectManager.getInstance().cancelScheduledTask(mClientIp);
                break;
            }
        }
    }
//...
}
複製代碼

二、初始化UDP服務端

初始化UDP服務端,在OnlineUserManager類中,以下:

/** * 初始化監聽,綁定指定端口, 等待接受廣播 */
public void initListener(){
    mExecutor.execute(() -> {
        try {
            mDatagramSocket = new DatagramSocket(PORT);
            LogUtil.d(TAG, "開啓廣播監聽,端口號 = " + PORT);
        } catch (SocketException e) {
            e.printStackTrace();
            LogUtil.e(TAG, "建立DatagramSocket監聽失敗, e = " + e.getMessage());
        }
        while (true){
            try {
                byte[] buffer = new byte[MAX_RECEIVE_DATA];
                DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length);
                mDatagramSocket.receive(datagramPacket);
                byte[] data = datagramPacket.getData();
                //得到發送方的ip地址
                String receiveIp = datagramPacket.getAddress().getHostAddress();
                //解析數據
                Data datas = resolveData(data);
                if(datas != null){
                    //用戶數據
                    int code = datas.getCode();
                    User user = datas.getUser();
                    user.setIp(receiveIp);
                    if(code == 0){
                        //把它加入在線用戶列表
                        if(!mOnlineUsers.containsKey(receiveIp)){
                            mOnlineUsers.put(receiveIp, user);
                            //通知主活動用用戶加入
                            if(mUserCallback != null){
                                mHandler.obtainMessage(TYPE_JOIN_USER, mOnlineUsers.get(receiveIp)).sendToTarget();
                            }
                            LogUtil.d(TAG, "一個用戶加入,地址 = " + receiveIp);
                        }
                        //回覆它
                        reply(receiveIp);
                    }else if(code == 1){
                        //用戶退出在線用戶列表
                        if(mOnlineUsers.containsKey(receiveIp)){
                            User exitUser = mOnlineUsers.remove(receiveIp);
                            if(mUserCallback != null){
                                mHandler.obtainMessage(TYPE_EXIT_USER, exitUser).sendToTarget();
                            }
                            LogUtil.d(TAG, "一個用戶退出,地址 = " + receiveIp);
                        }

                    }else {
                        //獲得全部在線用戶列表
                        if(!mOnlineUsers.containsKey(receiveIp)) {
                            mOnlineUsers.put(receiveIp, user);
                            //通知主活動用用戶加入
                            if(mUserCallback != null){
                                mHandler.obtainMessage(TYPE_JOIN_USER, mOnlineUsers.get(receiveIp)).sendToTarget();
                            }
                            LogUtil.d(TAG, "得到一個用戶信息,地址 = " + receiveIp);
                        }
                    }
                }
                LogUtil.d(TAG, "當前在線用戶,count = " + mOnlineUsers.size());
            } catch (IOException e) {
                e.printStackTrace();
                LogUtil.e(TAG, "接受廣播失敗, e = " + e.getMessage());
                break;
            }
        }
        if(mDatagramSocket != null){
            mDatagramSocket.close();
        }
    });
}
複製代碼

OnlineUserManager是用來管理在線用戶的,OnlineUserManager的initListener()方法裏面也是會綁定一個端口號,而後調用receive()方法等待用戶的廣播信息,若是有用戶的廣播信息,就根據用戶的廣播信息類型作出不一樣的動做,如把用戶加入在線用戶列表。

三、Mes類的設計

Mes類是用戶之間創建鏈接後傳輸消息的實體類,以下:

public class Mes<T>{

    public ItemType itemType;//Mes的Item類型
    public MesType mesType;//Mes的類型
    public String userIp;//發送Mes的用戶的ip
    public T data;//具體消息
    //...
}
複製代碼

其中T是一個泛型,它能夠表明着文本、音頻、文件、圖片的類型,因此在構造一個Mes時,就要肯定它是屬於什麼類型,而後文本、音頻、文件、圖片分別在對應一個實體類。

四、User類的設計

User就表明着用戶,以下:

public class User implements Serializable {
    private String mName;//名字
    private String mIp;//ip
    private String mImagePath;//頭像路徑
    private int mImageLen;//頭像長度
    //...
}
複製代碼

它在傳輸前中會轉成一個Json字符串,收到後再把它轉成User類,這樣就很容易的把它裏面的數據解析出來也方便了傳輸。

五、關於心跳機制的實現

心跳機制是什麼?它就每隔一段事件發一個探測,探測在線的用戶是否存活。有些在線用戶因爲手機關機,不正常退出應用等會致使它沒法正常退出登錄,這時就須要每隔一段時間探測它是否存活。

P2P中實現了一個簡單的心跳機制,其實它就是一個定時任務,線程池中能夠提交週期執行的任務,以下:

/** * 簡單心跳機制 */
private void heartBeat(String ipAddress) {
    if(!mScheduledTasks.containsKey(ipAddress)){
        ScheduledFuture task = mScheduledExecutor.scheduleAtFixedRate(() -> {
            int result = PingManager.getInstance().ping(ipAddress);
            Log.d(TAG, "探測對方是否在線, result = " + result + ", ipAddress = " + ipAddress);
            if(result != 0){
                removeConnect(ipAddress);
                cancelScheduledTask(ipAddress);
            }
        }, 10, 10, TimeUnit.SECONDS);
        mScheduledTasks.put(ipAddress, task);
    }
}
複製代碼

它每隔10秒就會執行一次,而後會ping一下用戶的ip地址,若是它不連通了,就要把它從在線用戶中移除。

開發過程當中遇到的問題及解決辦法

  • 問題1:獲取得到在線用戶列表和若是告訴別人我上線了?

由於第一次開發P2P應用,因此不知道用戶體系創建的邏輯,嘗試的第一種方法是:使用Ping命令把ip地址的最後三位用循環從0~255不斷的ping,若是ping通,就說明這個ip地址的用戶鏈接着局域網的同一個WIFI,把它記錄下來,可是這有一個缺點,能ping通的ip地址只是說明這個用戶連着WIFI,並無說明這個用戶打開了P2P應用,也並不表明這個用戶上線了,因此這種方法不行;後來想到了一種改進辦法:就是把ping通的ip地址列表逐個創建Socket鏈接,若是可以鏈接上,就說明這個用戶打開了P2P應用而且上線了,這個辦法能夠,可是逐個創建鏈接有很麻煩,耗時。

解決辦法:就是使用UDP的廣播,UDP廣播可以告訴同一局域網內的全部打開了同一端口的在線用戶我上線了,而且收到他們的回覆。

  • 問題2:用戶頭像的發送?

由於使用UDP廣播,可是UDP廣播每次最大隻能發送64Kb數據,一個頭像就算壓縮了,也有幾百Kb,因此如何把頭像發送出去是一個問題,嘗試的第一種方法是:把頭像轉化成字節數組和用戶信息一塊兒轉化爲json數據,json數據再轉化爲字節數組,而後把json數據的字節數組分段發送出去,可是這有一個缺點,就是會額外增大發送時UDP的字節數組的長度致使發送額外多的字節,耗時,這種方法不行;嘗試的第二種方法是:把頭像和用戶信息分開發送,先發送用戶信息,而後再把頭像轉化爲字節數組分段發送,可是有一個沒法解決的問題,就是UDP是不可靠,很難保證分段後的字節再從新組合成一個完整的頭像字節數組,會有順序問題,因此這種辦法不行。

解決辦法:用戶信息用UDP廣播發送,由於用戶信息短,不用分段,而後等獲取到在線用戶列表後再逐一創建TCP鏈接把用戶頭像發送給在線用戶列表,TCP可靠。

相關文章
相關標籤/搜索