基於Apache Mina框架實現Android Socket長鏈接通訊

前言

Socket長鏈接是Android開發中很基礎的功能,不少的App都會用到這個功能,實現的方式有不少,我今天就只寫基於Apache Mina框架的實現方式,至於TCP/IP協議相關的知識就不涉及了,想了解更多關於Socket長鏈接知識的,能夠去看看剛哥的文章java

Apache Mina介紹

Mina是一個基於NIO的網絡框架,使用它編寫程序時,能夠專一於業務處理,而不用過於關心IO操做。不論應用程序採用什麼協議(TCP、UDP)或者其它的,Mina提供了一套公用的接口,來支持這些協議。目前能夠處理的協議有:HTTP, XML, TCP, LDAP, DHCP, NTP, DNS, XMPP, SSH, FTP… 。從這一點來講,Mina不單單是一個基於NIO的框架,更是一個網絡層協議的實現。git

基於Mina實現Socket長鏈接

1. 先添加相關依賴Jar包

目錄結構

2. 客戶端代碼

完整項目代碼我會上傳到GitHub 添加相關編碼協議,日誌,設置心跳和Handlergithub

NioSocketConnector mSocketConnector = new NioSocketConnector();

// mSocketConnector.setConnectTimeoutMillis(Constants.TIMEOUT);

                //設置協議封裝解析處理
                mSocketConnector.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new FrameCodecFactory()));
                // 設置日誌輸出工廠
                mSocketConnector.getFilterChain().addLast("logger", new LoggingFilter());

                //設置心跳包
                /*KeepAliveFilter heartFilter = new KeepAliveFilter(new HeartBeatMessageFactory()); //每 1 分鐘發送一個心跳包 heartFilter.setRequestInterval(1 * 60); //心跳包超時時間 10s heartFilter.setRequestTimeout(10); // heartFilter.setRequestTimeoutHandler(new HeartBeatTimeoutHandler()); mSocketConnector.getFilterChain().addLast("heartbeat", heartFilter);*/

                //設置 handler 處理業務邏輯
                mSocketConnector.setHandler(new MessageHandler(context));
                mSocketConnector.addListener(new MessageListener(mSocketConnector));

                // 設置接收和發送緩衝區大小
                mSocketConnector.getSessionConfig().setReceiveBufferSize(1024);
// mSocketConnector.getSessionConfig().setSendBufferSize(1024);
                // 設置讀取空閒時間:單位爲s
                mSocketConnector.getSessionConfig().setReaderIdleTime(60);


                //配置服務器地址
                InetSocketAddress mSocketAddress = new InetSocketAddress(Constants.HOST, Constants.PORT);
                //發起鏈接
                ConnectFuture mFuture = mSocketConnector.connect(mSocketAddress);
複製代碼

斷線重連處理,有一點不一樣,mSocketConnector在添加相關協議的時候,要先判斷apache

int count = 0;// 記錄嘗試重連的次數
                NioSocketConnector mSocketConnector = null;
                while (!isRepeat[0] && count < 10) {
                    try {
                        count++;
                        if (mSocketConnector == null) {
                            mSocketConnector = new NioSocketConnector();
                        }

// mSocketConnector.setConnectTimeoutMillis(Constants.TIMEOUT);

                        if (!mSocketConnector.getFilterChain().contains("protocol")) {
                            //設置協議封裝解析處理
                            mSocketConnector.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new FrameCodecFactory()));
                        }

                        if (!mSocketConnector.getFilterChain().contains("logger")) {
                            // 設置日誌輸出工廠
                            mSocketConnector.getFilterChain().addLast("logger", new LoggingFilter());
                        }

                        /*if (!mSocketConnector.getFilterChain().contains("heartbeat")) { //設置心跳包 KeepAliveFilter heartFilter = new KeepAliveFilter(new HeartBeatMessageFactory()); //每 1 分鐘發送一個心跳包 heartFilter.setRequestInterval(1 * 60); //心跳包超時時間 10s heartFilter.setRequestTimeout(10); // heartFilter.setRequestTimeoutHandler(new HeartBeatTimeoutHandler()); mSocketConnector.getFilterChain().addLast("heartbeat", heartFilter); }*/

                        //設置 handler 處理業務邏輯
                        mSocketConnector.setHandler(new MessageHandler(context));
                        mSocketConnector.addListener(new MessageListener(mSocketConnector));

                        // 設置接收和發送緩衝區大小
                        mSocketConnector.getSessionConfig().setReceiveBufferSize(1024);
// mSocketConnector.getSessionConfig().setSendBufferSize(1024);
                        // 設置讀取空閒時間:單位爲s
                        mSocketConnector.getSessionConfig().setReaderIdleTime(60);

                        //配置服務器地址
                        InetSocketAddress mSocketAddress = new InetSocketAddress(Constants.HOST, Constants.PORT);
                        //發起鏈接
                        ConnectFuture mFuture = mSocketConnector.connect(mSocketAddress);
                        mFuture.awaitUninterruptibly();
                        IoSession mSession = mFuture.getSession();
                        if (mSession.isConnected()) {
                            isRepeat[0] = true;
                            e.onNext(mSession);
                            e.onComplete();
                            break;
                        }
                    } catch (Exception e1) {
                        e1.printStackTrace();
                        if (count == Constants.REPEAT_TIME) {
                            System.out.println(Constants.stringNowTime() + " : 斷線重連"
                                    + Constants.REPEAT_TIME + "次以後仍然未成功,結束重連.....");
                            break;
                        } else {
                            System.out.println(Constants.stringNowTime() + " : 本次斷線重連失敗,5s後進行第" + (count + 1) + "次重連.....");
                            try {
                                Thread.sleep(5000);
                                System.out.println(Constants.stringNowTime() + " : 開始第" + (count + 1) + "次重連.....");
                            } catch (InterruptedException e12) {
                            }
                        }
                    }

                }
複製代碼

編碼,解碼協議,根據實際狀況而定服務器

public class FrameDecoder extends CumulativeProtocolDecoder {


    private final static Charset charset = Charset.forName("UTF-8");

    @Override
    protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {

        //數據粘包,斷包處理
        int startPosition = ioBuffer.position();
        while (ioBuffer.hasRemaining()) {
            byte b = ioBuffer.get();
            if (b == '\n') {//讀取到\n時候認爲一行已經讀取完畢
                int currentPosition = ioBuffer.position();
                int limit = ioBuffer.limit();
                ioBuffer.position(startPosition);
                ioBuffer.limit(limit);
                IoBuffer buffer = ioBuffer.slice();
                byte[] bytes = new byte[buffer.limit()];
                buffer.get(bytes);
                String message = new String(bytes, charset);
                protocolDecoderOutput.write(message);
                ioBuffer.position(currentPosition);
                ioBuffer.limit(limit);
                return true;
            }
        }
        ioBuffer.position(startPosition);
        return false;

    }
}
複製代碼
public class FrameEncoder implements ProtocolEncoder {
    private final static Charset charset = Charset.forName("UTF-8");
    @Override
    public void encode(IoSession ioSession, Object message, ProtocolEncoderOutput protocolEncoderOutput) throws Exception {
        IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);
        buff.putString(message.toString(), charset.newEncoder());
        // put 當前系統默認換行符 WINDOWS:\r\n, Linux:\n
        buff.putString(LineDelimiter.WINDOWS.getValue(), charset.newEncoder());
        // 爲下一次讀取數據作準備
        buff.flip();
        protocolEncoderOutput.write(buff);
    }

    @Override
    public void dispose(IoSession ioSession) throws Exception {

    }
}
複製代碼

心跳包處理,Android設備加心跳並不能完成保持長鏈接的狀態,畢竟保活你懂噠!網絡

public class HeartBeatMessageFactory implements KeepAliveMessageFactory {

    @Override
    public boolean isRequest(IoSession ioSession, Object message) {
        //若是是客戶端主動向服務器發起的心跳包, return true, 該框架會發送 getRequest() 方法返回的心跳包內容.

        if(message instanceof String && message.equals(Constants.PING_MESSAGE)){
            return true;
        }
        return false;

    }

    @Override
    public boolean isResponse(IoSession ioSession, Object message) {
        //若是是服務器發送過來的心跳包, return true後會在 getResponse() 方法中處理心跳包.

        if(message instanceof String && message.equals(Constants.PONG_MESSAGE)){
            return true;
        }
        return false;
    }
    
    @Override
    public Object getRequest(IoSession ioSession) {
        //自定義向服務器發送的心跳包內容.
        return Constants.PING_MESSAGE;
    }

    @Override
    public Object getResponse(IoSession ioSession, Object message) {
        //自定義解析服務器發送過來的心跳包.
        return Constants.PONG_MESSAGE;
    }
}
複製代碼
相關文章
相關標籤/搜索