Camera開發系列之六-使用mina框架實現視頻推流

章節

Camera開發系列之一-顯示攝像頭實時畫面java

Camera開發系列之二-相機預覽數據回調git

Camera開發系列之三-相機數據硬編碼爲h264github

Camera開發系列之四-使用MediaMuxer封裝編碼後的音視頻到mp4容器網絡

Camera開發系列之五-使用MediaExtractor製做一個簡易播放器session

Camera開發系列之六-使用mina框架實現視頻推流框架

Camera開發系列之七-使用GLSurfaceviw繪製Camera預覽畫面 socket

mina是什麼

MINA框架是對java的NIO包的一個封裝,簡化了NIO程序開發的難度,封裝了不少底層的細節,讓開發者把精力集中到業務邏輯上來。說的簡單一點,它就是一個簡單但功能齊全的網絡應用框架,它的做者是如今大名鼎鼎的服務端Netty框架的做者。若是對mina框架不夠了解,ide

請先看這篇文章:mina框架簡單介紹post

請先看這篇文章:mina框架簡單介紹編碼

請先看這篇文章:mina框架簡單介紹

爲何要用mina框架

視頻分爲本地視頻和視頻流,本地視頻即爲已經下載好到本地的視頻,常見格式爲MP4,WMV,AVI等格式,視頻流多見於直播中,其中常見格式爲RTSP流媒體,RTMP流,m3u8 流媒體,MMS 流。

而這些直播協議底層無外乎都是經過socket實現的,想要了解它們的原理,能夠本身使用socket實現推流,來拋磚引玉。可是使用socket實現不太現實,工做量很大,而mina是封裝了socket的輕量級框架,用在這上面再合適不過了。

那麼,怎麼實現camera推流呢,讓咱們將問題拆分一下:

  1. 指定數據收發協議,客戶端和服務端須要制定一套規則來進行溝通
  2. 服務端和客戶端創建鏈接
  3. 服務端將視頻數據編碼,而後發送給客戶端
  4. 客戶端收到消息,將視頻數據解碼播放
  5. 斷線重連問題

而後看一下最終實現的效果,由於是1080p的,並且網絡帶寬不太好,有一些延遲:

數據傳輸協議-TLV協議

數據傳輸協議使用的是TLV協議,即Type-Length-Value,它是一種簡單實用的數據傳輸方案。Type指數據類型,Length指發送的數據長度,Value指數據自己。其中Value又能夠再嵌套TLV,拓展性很是高。

可能就這樣說數據傳輸協議有點抽象,後面會在代碼中體現出來。數據類型定義出來遼,能夠開始編寫發送接收數據了。

先定義一個視頻數據類:

public class VideoStreamModel {
    private int type; //視頻格式
    private int width; //視頻寬
    private int height; //視頻高
    private long seq_no0; //幀數 下同
    private long seq_no1;
    private byte[] video; //視頻數據
    //···省略get/set方法
}
複製代碼

有了數據類,服務端就能夠經過mina的IoSession來發送視頻消息啦。IoSession能夠理解爲客戶端和服務端通訊的橋樑。想要具體瞭解能夠看以前的文章。

服務端發送數據

在解碼h264數據的回調中,將數據經過IoSession發送給客戶端:

public void onH264DataFrame(byte[] h264, int width, int height) {
                //硬編碼以後的h264數據
                byte[] h264Data = Arrays.copyOf(h264, h264.length);
                VideoStreamModel model = new VideoStreamModel();
                mSeq_no0++;
                mSeq_no1++;
                model.setType(2);
                model.setWidth(width);
                model.setHeight(height);
                model.setSeq_no0(mSeq_no0);
                model.setSeq_no1(mSeq_no1);
                model.setVideo(h264Data);
                sendTo(model);
}
private IoSession session;
public String sendTo(VideoStreamModel streamModel) {
        if (null == streamModel){
            return "data is null!";
        }
        byte[] data = streamModel.getVideo();
        IoBuffer buffer = IoBuffer.allocate(8 + 24 + data.length);
        buffer.order(ByteOrder.LITTLE_ENDIAN);
        buffer.putInt(MESSAGE_ID_STRAME);
        buffer.putInt(24 + data.length);
        //下面的數據佔用24個字節
        buffer.putInt(streamModel.getType());
        buffer.putInt(streamModel.getWidth());
        buffer.putInt(streamModel.getHeight());
        buffer.putUnsignedInt(streamModel.getSeq_no0());
        buffer.putUnsignedInt(streamModel.getSeq_no1());
        buffer.putInt(data.length);

        buffer.put(data);
        buffer.flip();
        session.write(buffer);
        Log.i(TAG,"buffer的limit:"+buffer.limit()+" len: " + (24 + data.length)
        +"messageID: "+Constants.createType(Constants.MESSAGE_ID_STRAME));
        return null;
    }
複製代碼

上面的IoBuffer.allocate(8 + 24 + data.length)意思是開闢內存空間,空間大小爲8 + 24 + data.length,這個值是怎麼計算出來的呢?

能夠看到,上面有一個常量MESSAGE_ID_STRAME,這個是我自定義的int類型常量,用於標識數據的類型,也就是TLV中的Type。因爲是int類型,因此佔用4個字節。

第二個放入的是24 + data.length,這個是數據的長度,即Length。一樣是int類型,佔用4個字節

最後是數據自己,即VideoStreamModel這個數據類再加上了視頻數據的大小,佔用的字節大小爲:4x6 + data.length。

這樣一個TLV數據發送協議就寫好啦,是否是很簡單呢。

客戶端接收數據

數據發送協議寫好了,那客戶端怎麼收到這些數據並處理呢? 在客戶端的IoHandlerAdapter的messageReceived方法中,能夠對數據進行處理:

public void messageReceived(IoSession session, Object message) throws Exception {
            //super.messageReceived(session, message);
            IoBuffer buffer = (IoBuffer) message;
            buffer.order(ByteOrder.LITTLE_ENDIAN);
            int type = buffer.getInt();
            int messageID = (type & 0x3FF);
            int len = buffer.getInt();
            Log.i(TAG,"messageID:"+messageID+" type: "+type);
            switch (messageID) {
                case MESSAGE_ID_STRAME:
                    if(buffer.remaining() < 4){
                        return;
                    }
                    int type1 = buffer.getInt();
                    if(buffer.remaining() < 24){
                        return;
                    }
                    int width = buffer.getInt();
                    int height = buffer.getInt();
                    long seq_no0 = buffer.getUnsignedInt();
                    long seq_no1 = buffer.getUnsignedInt();
                   
                    int bufferSize = buffer.getInt(); //視頻幀大小
                    if(buffer.remaining() < bufferSize) {
                        return;
                    }
                    // 作個最大斷定,不能太大了!
                    if(bufferSize > 1024 * 1024) {
                        return;
                    }
                    byte[] h264Segment = new byte[bufferSize];
                    buffer.get(h264Segment);
                    H264Decoder.getInstance().handleH264(h264Segment);
                    break;
            }
        }
複製代碼

上面的代碼其實就是將服務端發送過來的數據進行解析,而後播放。這裏只須要注意IoBuffer的使用,視頻若是沒法播放,通常都是發送數據或者接收數據的時候對數據長度處理不當形成的。

粘包、拆包處理

你覺得處理好上面的那些問題就能夠正常使用了麼?天真!熟悉TCP的同窗可能知道TCP有一個滑動窗口的概念,那就能很好的理解數據的拆包粘包現象了,若是不知道也沒關心,它大概就是將數據分開發送的一個東西。

爲啥要將數據分開發送呢,好比傳輸一個10m大小的文件,不分包一次性發送這麼大量的數據,首先TCP/IP協議中,TCP的下一層IP層就不一樣意,由於IP數據包的負載是有限的,大概爲1480個字節,其次是不分包的方式是不合理的,會存在極大的資源浪費。

分包以後,伴隨的問題就出來了:

socket的緩衝區大小是固定的,若是數據包比較大,只會把這個包前面部分的數據發送出去,畢竟我們都是紳士,不能強人所難,硬要人家把這個數據包都傳過去吧,因此能夠看到第一個數據包發生了拆包。

緊接着的是第二個socket來了,它負責把第一個數據包的後面部分帶上,而且還有空閒的空間,勤儉節約是中華名族的傳統美德,既然你還有空間,那就把後面數據包的數據也帶上一些吧,這時候兩個數據包的數據都在一個socket中,也就是發生了粘包。

知道了粘包拆包是怎麼產生的,解決方法也就很簡單了,只須要事先知道數據的起始位置,以及每一個數據包的大小,也就是TLV中的length,就能夠組裝value了。

粘包,拆包都是數據發送端自動完成的,因此只須要在數據接收端進行處理,下面是自定義mina解碼器解決粘包、拆包問題,繼承CumulativeProtocolDecoder這個類,它是累積性的協議解碼器,也就是說只要有數據發送過來,這個類就會去讀取數據,而後累積到內部的 IoBuffer 緩衝區,可是具體的拆包交由子類的 doDecode()方法完成:

public final class TLVDecoder extends CumulativeProtocolDecoder {
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        if (in.remaining() >= 8) { // 前8字節是包頭+長度 兩個都是int 因此是8
            // 標記當前position的快照標記mark,以便後繼的reset操做能恢復position位置
            in.mark();

            in.order(ByteOrder.LITTLE_ENDIAN);
            @SuppressWarnings("unused")
            int type = in.getInt();
            int len = in.getInt();

            // 注意上面的get操做會致使下面的remaining()值發生變化
            if (in.remaining() < len) {
                // 若是消息內容不夠,則重置恢復position位置到操做前,進入下一輪, 接收新數據,以拼湊成完整數據
                in.reset();
                return false;
            } else {
                // 消息內容足夠
                in.reset(); // 重置恢復position位置到操做前
                int totalLen = (int) (8 + len); // 總長 = 包頭+包體

                byte[] packArr = new byte[totalLen];
                in.get(packArr, 0, totalLen);

                IoBuffer buffer = IoBuffer.allocate(totalLen);
                buffer.put(packArr);
                buffer.flip();
                out.write(buffer);
                buffer.free();

                if (in.remaining() > 0) { // 若是讀取一個完整包內容後還粘了包,就讓父類再調用一次,進行下一次解析
                    return true;
                }
            }
        }
        return false; // 處理成功,讓父類進行接收下個包
    }
}

複製代碼

自定義編碼器,肥腸簡單,這裏繼承的ProtocolEncoderAdapter是將java中的對象轉換爲字節流發送,是mina框架封裝的編碼器:

public final class TLVEncoder extends ProtocolEncoderAdapter {
    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        out.write(message);
        out.flush();
    }
}
複製代碼

編解碼器定義好了,在編解碼器工廠中進行初始化,實現了ProtocolCodecFactory,mina中的編解碼工廠都須要實現這個接口,好比咱們以前用的TextLineCodecFactory

public final class TLVCodecFactory implements ProtocolCodecFactory {
    private TLVDecoder decoder;
    private TLVEncoder encoder;
    
    public TLVCodecFactory() {
        decoder = new TLVDecoder();
        encoder = new TLVEncoder();
    }

    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return encoder;
    }

    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return decoder;
    }
}
複製代碼

初始化mina的時候傳這個編解碼工廠就好了,

TLVCodecFactory codecFactory = new TLVCodecFactory();
ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(codecFactory);
streamConnection = new NioSocketConnector();
streamConnection.getFilterChain().addLast("tlv", codecFilter);
複製代碼

斷線重連

網絡應用不免會遇到網絡不穩定的時候,這時候鏈接有可能會斷開,斷開以後須要作自動斷線重連,給用戶一個較好的體驗,mina有一個IoServiceListener接口,用於監聽當前會話的狀態,咱們實現這個接口,就能夠在會話異常銷燬的時候,進行重連:

private final class StreamAutoReconnectHandler implements IoServiceListener {

        @Override
        public void serviceActivated(IoService ioService) throws Exception {

        }

        @Override
        public void serviceIdle(IoService ioService, IdleStatus idleStatus) throws Exception {

        }

        @Override
        public void serviceDeactivated(IoService ioService) throws Exception {

        }

        @Override
        public void sessionCreated(IoSession ioSession) throws Exception {

        }

        @Override
        public void sessionClosed(IoSession ioSession) throws Exception {

        }

        @Override
        public void sessionDestroyed(IoSession ioSession) throws Exception {
            // 不是主動斷開鏈接的,須要重連
            while(m_state == State.Connected || m_state == State.ReConnecting) {
                try {
                    ConnectFuture future = streamConnection.connect();
                    future.awaitUninterruptibly();// 等待鏈接建立成功
                    streamSession = future.getSession();// 獲取會話
                    if(m_state != State.Connected && m_state != State.ReConnecting) {
                        Log.i(TAG,"不須要斷線重連");
                        streamSession.closeNow();
                        break;
                    }
                    if (streamSession != null) {
                        m_state = State.Connected;
                        break;
                    }
                    Log.i(TAG,"斷線重連");
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
複製代碼

完整代碼已上傳至github:camera開發系列

相關文章
相關標籤/搜索