java socket 模擬im 即時通信

本身想了一下怎麼實現,就寫了,沒有深究是否合理.更多處理沒有寫下去,例如收件人不在線,應該保存在數據庫,等下一次鏈接的時候刷新map,再把數據發送過去,圖片發送也沒有作,也沒有用json格式java

socket很奇怪,我用客戶端鏈接上了服務器,沒有發送消息的狀況下,斷開電腦網絡,是不會出現問題,而後在把電腦網絡鏈接上,通信依然正常,正常斷開也不出問題,可是用idea直接按stop鍵,那麼服務端就會出問題了,讀取事件會一直爲true,形成死循環,消耗CPU,因此必需要判斷一下客戶端鏈接是否斷開了數據庫

 

只須要把客戶端代碼啓動幾個,修改一些userName以及收件人,就能夠測試,實現相似QQ微信即時通信,聊天功能json

 

 

服務端代碼服務器

package serversocketchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 
 * @author ZhenWeiLai
 *
 */
public class ServerSocketChannelNonBlocking {
    private static ServerSocketChannel serverSocketChannel = null;
    private static Charset charset = Charset.forName("GBK");//設置編碼集,用於編碼,解碼
    private static Selector selector = null;
    //保存客戶端的map
    private static final ConcurrentHashMap<String,SocketChannel> clientSockets = new ConcurrentHashMap<>();
    static{
        try {
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().setReuseAddress(true);
            serverSocketChannel.socket().bind(new InetSocketAddress(8000));
            serverSocketChannel.configureBlocking(false);//設置爲非阻塞
            selector = Selector.open();//實例化一個選擇器
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
            service();
    }
    
    private static void service(){
        SocketChannel clientChannel = null;
        SelectionKey selectionKey = null;
        SocketChannel targetChannel = null;
        try {
            serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);//服務端監聽鏈接
            while(true){
                selector.select();//阻塞至有新的鏈接就開始處理
                Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
                while(selectionKeys.hasNext()){
                    selectionKey = selectionKeys.next();
                    if(selectionKey.isAcceptable()){//若是事件是鏈接事件
                        ServerSocketChannel serverChannel = (ServerSocketChannel)selectionKey.channel();//獲取事件綁定的channel
                        clientChannel = serverChannel.accept();//鏈接獲取帶客戶端信息的socketChannel
                        clientChannel.configureBlocking(false);//客戶設置爲非阻塞,由於非阻塞才支持選擇器.避免盲等浪費資源
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//做爲每個客戶端的附件緩衝器
                        /**
                         * 只監聽讀事件,這裏千萬別監聽寫事件,由於只要鏈接有效,那麼寫事件會一直爲true,致使死循環,很耗資源
                         * 能夠跟serverSocket用同一個選擇器,由於綁定的channel不一樣
                         */
                        clientChannel.register(selector,SelectionKey.OP_READ,byteBuffer);
                    }else if(selectionKey.isReadable()){//只要有客戶端寫入,那麼就能夠處理
                        //獲取客戶端附件,也就是寫入的數據
                        ByteBuffer byteBuffer = (ByteBuffer)selectionKey.attachment();
                        //從selectionKey獲取客戶端的channel
                        SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
                        //把附件讀出,解碼爲字符串
                        String msg = read(socketChannel,byteBuffer);
                        //這裏用了->分割收件人,->後面跟着的字符串是收件人
                        if(msg.indexOf("->")!=-1){
                            //內容
                            String content = msg.substring(0,msg.lastIndexOf("->"));
                            //從map裏獲取收件人的socket
                            targetChannel = clientSockets.get(msg.substring(msg.lastIndexOf("->")+2));
                            //實例化一個緩衝區,用來寫出到收件人的socketChannel
                            ByteBuffer temp = ByteBuffer.allocate(1024);
                            temp.put(charset.encode(content));
                            //寫出
                            handleWrite(targetChannel,temp);
                        }else{
                            //若是內容沒有收件人,那麼視爲第一次鏈接,客戶端發過來的userName,做爲KEY存入MAP
                            clientSockets.put(msg,socketChannel);
                        }
                    }
                    selectionKeys.remove();
                }
            }
        } catch (IOException e) {
            try {
                if(selectionKey!=null)selectionKey.cancel();
                if(clientChannel!=null){
                    clientChannel.shutdownInput();
                    clientChannel.shutdownOutput();
                    clientChannel.close();
                }
                if(targetChannel!=null){
                    targetChannel.shutdownInput();
                    targetChannel.shutdownOutput();
                    targetChannel.close();
                }
            } catch (IOException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            e.printStackTrace();
        }

    }
    
    private static String read(SocketChannel socketChannel,ByteBuffer byteBuffer){
        //重置position limit爲寫入作準備
        byteBuffer.clear();
        try {
            int flag =socketChannel.read(byteBuffer);
            //判斷客戶端是否斷開鏈接
            if(flag==-1){
                //若是客戶端無端斷開,必定要關閉,不然讀取事件一直爲true形成死循環,很是耗資源
                socketChannel.close();
            }
        } catch (IOException e) {
            try {
                socketChannel.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            e.printStackTrace();
        }
        //position =0 limit等於有效下標,爲寫出作準備
        byteBuffer.flip();
        return charset.decode(byteBuffer).toString();
    }
    
    //寫出
    private static void handleWrite(SocketChannel socketChannel,ByteBuffer byteBuffer){
        synchronized (byteBuffer) {
            byteBuffer.flip();
            try {
                socketChannel.write(byteBuffer);
            } catch (IOException e) {
                try {
                    socketChannel.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                e.printStackTrace();
            }
        }
    }
}

客戶端代碼微信

package socketchannel;


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 * Created by lzw on 17-2-28.
 */
public class SocketChannelNonBlockingClient {
    private static Charset charset = Charset.forName("GBK");
    private static ByteBuffer receiveBuffer = ByteBuffer.allocate(10240);
    private static ByteBuffer sendBuffer = ByteBuffer.allocate(10240);
    private static SocketChannel socketChannel = null;
    private static Selector selector = null;
    private static String userName = "client1";//客戶端名
    private static String targetName = "client2";//收件人名

    public static void main(String[] args) {
        try {
            socketChannel = SocketChannel.open();
            //鏈接到服務端
            SocketAddress socketAddress = new InetSocketAddress("19.95.103.112",8000);
            selector = Selector.open();//實例化一個選擇器
            socketChannel.configureBlocking(false);//設置爲非阻塞
            //先監聽一個鏈接事件
            socketChannel.register(selector,SelectionKey.OP_CONNECT);
            //鏈接
            socketChannel.connect(socketAddress);
            //jdk 1.8的lambda表達式,用一個線程監控控制檯輸入
            new Thread(()->{
                    try {
                        receiveFromUser();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
            }).start();

            talk();

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private static void talk(){
        try {
            while(true){
                selector.select();//阻塞直到鏈接事件
                Iterator<SelectionKey> readyKeys = selector.selectedKeys().iterator();
               while(readyKeys.hasNext()){
                    SelectionKey key =readyKeys.next();
                    if(key.isConnectable()){
                        //非阻塞的狀況下可能沒有鏈接完成,這裏調用finishConnect阻塞至鏈接完成
                        socketChannel.finishConnect();
                        //鏈接完成之後,先發送本身的userName以便保存在服務端的客戶端map裏面
                        synchronized (sendBuffer){
                            SocketChannel socketChannel1 = (SocketChannel)key.channel();
                            sendBuffer.clear();
                            sendBuffer.put(charset.encode(userName));
                            send(socketChannel1);
                            socketChannel.register(selector,SelectionKey.OP_READ);//僅監聽一個讀取事件
                        }

                    }else if(key.isReadable()){
                        //處理讀事件
                        receive(key);
                    }
                    readyKeys.remove();
                }
            }
        } catch (ClosedChannelException e) {
            try {
                socketChannel.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    /**
     * 從控制檯獲取用戶輸入
     * @throws IOException
     */
    private static void receiveFromUser() throws IOException{
        //阻塞直到控制檯有輸入
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        for(String msg = br.readLine();msg!=null&&!msg.equals("bye");msg = br.readLine()){
            //同步鎖避免線程競爭
            synchronized (sendBuffer) {
                sendBuffer.clear();
                //編碼
                sendBuffer.put(charset.encode(msg));
                //分割副
                sendBuffer.put(charset.encode("->"));
                //目標名
                sendBuffer.put(charset.encode(targetName));
                send(socketChannel);
            }
        }
    }
    /**
     * 接收服務端的數據
     * @param key
     */
    private static void receive(SelectionKey key) throws IOException {
        //獲取服務端的channel
        SocketChannel channel = (SocketChannel) key.channel();
        //爲寫入緩衝器作準備position=0,limit=capacity
            receiveBuffer.clear();
            //從服務端的channel把數據讀入緩衝器
            channel.read(receiveBuffer);
            //position=0,limit=有效下標最後一位
            receiveBuffer.flip();
            //解碼
            String msg = charset.decode(receiveBuffer).toString();
            //輸出到控制檯
            System.out.println(msg);
    }

    /**
     * 發送到服務端
     */
    private static void send(SocketChannel sendChannel) throws IOException {
            if(sendBuffer.remaining()!=0){
                synchronized (sendBuffer){
                    sendBuffer.flip();
                    sendChannel.write(sendBuffer);
                }
            }
    }
}
相關文章
相關標籤/搜索