基於XMPP協議的aSmack源碼分析

      在研究如何實現Pushing功能期間,收集了不少關於Pushing的資料,其中有一個androidnp開源項目用的人比較多,可是因爲長時間沒有什麼人去維護,據說bug的概率挺多的,爲了之後本身的產品穩定些,因此就打算本身研究一下asmack的源碼,本身作一個插件,androidnp移動端的源碼中包含了一個叫作asmack的jar。html

Reader和Writerandroid

     在asmack中有兩個很是重要的對象PacketReader和PacketWriter,那麼從類名上看Packet + (Reader/Wirter),而TCP/IP傳輸的數據,叫作Packet(包),asmack使用的是XMPP協議,XMPP簡單講就是使用TCP/IP協議 + XML流協議的組合。因此這個了對象的做用從字面上看應該是,寫包與讀包,做用爲從服務端讀寫數據。設計模式

     PacketWriter中必定含有一個Writer對象,這個Writer是一個輸出流,一樣的PacketReader對象中有一個Reader,而這個Reader是一個輸入流,Writer和Reader對象就是一個簡單的讀寫器,他們是從socket對象中獲取出來後,通過裝飾變成如今這個樣子。數組

1 reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
2 writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"));

沒有什麼神奇的地方,主要看PacketWriter/Reader,這兩個對象分別把對應的Writer和Reader引用到本身的內部進行操做,下面就先看一個PacketWriter。服務器

    /**
     * Creates a new packet writer with the specified connection.
     *
     * @param connection the connection.
     */
    protected PacketWriter(XMPPConnection connection) {
        this.queue = new ArrayBlockingQueue<Packet>(500, true);
        this.connection = connection;
        init();
    }

     還有就是PacketWriter初始化的時候將XMPPConnection對象傳了進來,由於在init方法中使用到了XMPPConnection對象的writer成員,我想說的是,爲何不直接傳遞writer成員?而是將整個對象XMPPConnection傳了過來?其實這就是設計模式的好處,咱們若是每次都傳遞的是本身的成員,那麼若是後期有改動,實現一個新的XMPPConnection與PacketWriter關聯,那麼老的代碼維護起來是很巨大的,若是這裏XMPPConnection和他的同事類PacketWriter都有相對應的接口,(XMPPConnection的接口是Connection)那就更完美了,而這裏用到的模式應該是中介者,不是絕對意義的中介者,因爲造成中介者的條件比較高,因此實際開發中可能是變形使用。PacketWriter對象在XMPPConnection中的connect方法中被初始化,它的最大做用是在其自身的內部建立了兩個消息循環,其中一個用30s的heartbeats向服務器發送空白字符,保持長鏈接。而第二個循環則時刻從隊列中主動取消息併發往服務器,而向外部提供的sendPacket方法則是向queue中添加消息,前面提到的循環機制都是在線程中工做,而消息的隊列用的是ArrayBlockingQueue,這個無邊界阻塞隊列能夠存聽任何對象,這裏存放的是Packet對象。併發

 1 public void sendPacket(Packet packet) {
 2         if (!done) {
 3             try {
 4                 queue.put(packet);
 5             }
 6             catch (InterruptedException ie) {
 7                 ie.printStackTrace();
 8                 return;
 9             }
10             synchronized (queue) {
11                 queue.notifyAll();
12             }
13         }
14     }
while (!done && (writerThread == thisThread)) {
                Packet packet = nextPacket();
                if (packet != null) {
                    synchronized (writer) {
                        writer.write(packet.toXML());
                        writer.flush();
                        // Keep track of the last time a stanza was sent to the server
                        lastActive = System.currentTimeMillis();
                    }
                }
            }

      消息循環則是一個經過各類成員變量控制的while loop,第一行的nextPacket方法是向queue中獲取Packet消息,而且經過weiter將包發出去,這樣生產/消費的模型就搭建好了,這裏須要注意的是,我刪減了不少影響閱讀的代碼,並無所有貼上。關於heartbeats循環其實也是一個在線程中運行的while loop,也是經過一些成員控制。wirter向服務端寫了寫什麼?看下面的這個方法app

void openStream() throws IOException {
        StringBuilder stream = new StringBuilder();
        stream.append("<stream:stream");
        stream.append(" to=\"").append(connection.getServiceName()).append("\"");
        stream.append(" xmlns=\"jabber:client\"");
        stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
        stream.append(" version=\"1.0\">");
        writer.write(stream.toString());
        writer.flush();
    }

XML,沒錯,這也是符合XMPP協議規範的一種表現吧,至於更多XMPP協議的好處,因爲本人的經驗有限,就很少作點評,但願後續會對其深刻了解。socket

下面看一個PacketReader這個類都包含了什麼職責。工具

PacketReaderoop

PacketReader全部的核心邏輯都在一個線程中完成的,PacketReader的工做很專一,一樣的在一個while loop中 不停的解析、刷新reader對象、同時做爲事件源發送解析事後的各類Packet,解析這裏用的是Android獨特的Pull解析,Pull解析的特色事件驅動,在這裏被徹底的利用了起來,隨着不一樣的標籤,PacketReader都會作出不一樣的處理,處理完這些數據用不一樣Pocket對象封裝,最後,分發出去,由監聽者作最後的業務處理。

readerThread = new Thread() {
    public void run() {
        parsePackets(this);
    }
};

因爲解析過程的代碼量過於多,我寫到什麼地方就分解什麼地方,你們有時間最好本身看源碼。

1、初始化/重置解析器

private void resetParser() {
    try {
        //用的是Pull解析
        parser = XmlPullParserFactory.newInstance().newPullParser();
        parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
        parser.setInput(connection.reader);
    }
    catch (XmlPullParserException xppe) {
        xppe.printStackTrace();
    }
}

上面這個resetParser方法還會在解析的過程當中碰到不一樣的業務需求會不斷的被調用,有用和業務邏輯比較緊密,沒什麼技術含量,關鍵是要看解析的方式和同時做爲事件源發送解析事後的各類Packet,這兩部分的設計,是很是的迷人的。

2、解析

do {
    if (eventType == XmlPullParser.START_TAG) {
        if (parser.getName().equals("message")) {
            processPacket(PacketParserUtils.parseMessage(parser));
    }
    else if (parser.getName().equals("iq")) {
        processPacket(PacketParserUtils.parseIQ(parser, connection));
    }
    else if (parser.getName().equals("presence")) {
        processPacket(PacketParserUtils.parsePresence(parser));
    }

PacketParserUtils是一個工具類,各個靜態方法傳入的仍是Parser對象,內部一樣的使用Pull的方式進行解析,可是因爲Pull是驅動解析,不會無端的浪費資源只會加載感興趣的內容,試想一下,若是這裏用Dom解析……PacketParserUtils的這些靜態解析方法返回的實例對象也不同,從方法名能夠看出有IQ、message、presence等,他們的父類爲Packet,這些對象又被執行processPacket方法的時候傳入

private void processPacket(Packet packet) {
    if (packet == null) {
        return;
    }

    // Loop through all collectors and notify the appropriate ones.
    for (PacketCollector collector: connection.getPacketCollectors()) {
        collector.processPacket(packet);
    }

    // Deliver the incoming packet to listeners.
    listenerExecutor.submit(new ListenerNotification(packet));
}

processPacket方法內部有一個循環來轉調collector.processPacket(packet);方法,前提是connection.getPacketCollectors()內部有貨,到目前位置都沒有涉及到PacketCollector這個接口的內容,他的做用實際上是一個觀察者模式中的執行者的做用,也就是傳說中的監聽器,凡是註冊了它的對象,均可以經過processPacket這個抽象方法,監聽packet的變化。但是到如今任何對象都沒有註冊它,因此這個Loop尚未做用,由於目前咱們還處在鏈接的步驟(還沒繞出來)。

listenerExecutor.submit(new ListenerNotification(packet));其中ListenerNotification是個Runnable
/**
 * A runnable to notify all listeners of a packet.
 */
private class ListenerNotification implements Runnable {

    private Packet packet;

    public ListenerNotification(Packet packet) {
        this.packet = packet;
    }

    public void run() {
        for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) {
            listenerWrapper.notifyListener(packet);
        }
    }
}

咱們上面看到listenerExecutor是一個線程池,在線程池中執行了一個凡是註冊了ListenerWrapper的對象,都將接收到packet,一樣的,到目前爲止沒有對象註冊,(在RegisterTask過程當中ListenerWrapper被註冊)

else if (eventType == XmlPullParser.END_TAG) {
    if (parser.getName().equals("stream")) {
        // Disconnect the connection
        connection.disconnect();
    }
}

當文檔讀取結束是將斷開鏈接

void cleanup() {
    connection.recvListeners.clear();
    connection.collectors.clear();
}

看到了嗎,只是將監聽器接口集合清空而已,並無斷開鏈接,或者取消消息循環

PacketReader對象的startup方法比較複雜,大致上執行了讀取流,並將解析好的Packet對象發送給觀察者,由觀察者繼續後續操做,目前觀察者尚未出現,還有就是使用了線程池和令牌來操做執行線程,並且維護了一個connectionID成員,這個成員的做用還須要再看,這就很少說了。
關於Packet對象,packet對象有不少子類,上面舉例了3個,其實還有不少,都是在parser時封裝的
AuthMechanism\Challenge\Failure\IQ\Message\Presence\Response\Success
還有就是Pull解析的優勢體現了出來,能夠一個parser對象包含了不少信息,但可能沒到一個時刻咱們須要的信息只是一小部分,這樣用Pull解析的驅動式就大大減小了冗餘的過程,PacketReader對象使用了2個監聽器集合對象,PacketCollector、listenerWrapper,仍是那句話,還沒看到觀察者,因此還不知道什麼狀況下須要註冊這兩個監聽。
到目前位置packetReader.startup()方法終於告一個段落了。

 

register過程分析

    RegisterTask這個task在運行中,添加了一個監聽,上面說道的PacketReader中有一個消息機制,在不停的解析服務器返回的結果,而後將解析事後的包分發給各個監聽器(觀察者),而register中就註冊了一個監聽器,比較有意思的是,監聽器被註冊時還加了一個過濾器,這個過濾器的目的是監聽器只接收本身感興趣的內容,這個設計真的很贊。這樣就沒必要在數據源頭PacketReader中對數據進行過濾了,只要後期擴展本身Packet和本身的過濾器,就能達到排除本身不關心的信息的功能。

    Registration registration = new Registration();
    PacketFilter packetFilter = new AndFilter(new PacketIDFilter(registration.getPacketID()), new PacketTypeFilter(IQ.class));

其中Registration的類型其實一個IQ的子類,IQ是Packet的子類。
AndFilter是PacketFilter的子類,PacketFilter的種類型有不少,也能夠本身擴展,AndFilter就是其中一個、PacketTypeFilter也是、PacketIDFilter也是,
其中PacketTypeFilter的構造方法傳入一個IQ.class,其實就是經過這個類文件來過濾packet,這個PacketTypeFilter就是要設置關心的Packet,這裏面它告訴監聽器,只接收類型爲IQ的Packet,這些Filter中都有一個關鍵方法,accept(Packet packet).這個accept方法每一個Filter的實現方式都不同,咱們可能夠擴展本身的Filter而且重寫這個方法,最有意思的是AndFilter這個類,他的構造方法傳入的是一個動態數組,類型爲PacketFilter,你能夠傳入你須要的過濾器,將他們當成組合條件使用來過濾Packet,這個就是典型的裝飾設計模式和職責鏈模式的組合使用。

註冊監聽器

 1 PacketListener packetListener = new PacketListener() {
 2     //這一部分就是監聽器接收到Packet後執行的後續操做
 3     public void processPacket(Packet packet) {
 4         Log.d("RegisterTask.PacketListener", "processPacket().....");
 5         Log.d("RegisterTask.PacketListener", "packet=" + packet.toXML());
 6 
 7         if (packet instanceof IQ) {
 8             IQ response = (IQ) packet;
 9             if (response.getType() == IQ.Type.ERROR) {
10                 if (!response.getError().toString().contains("409")) {
11                     Log.e(LOGTAG,
12                             "Unknown error while registering XMPP account! "
13                                     + response.getError()
14                                             .getCondition());
15                 }
16             } else if (response.getType() == IQ.Type.RESULT) {
17                 xmppManager.setUsername(newUsername);
18                 xmppManager.setPassword(newPassword);
19                 Log.d(LOGTAG, "username=" + newUsername);
20                 Log.d(LOGTAG, "password=" + newPassword);
21 
22                 Editor editor = sharedPrefs.edit();
23                 editor.putString(Constants.XMPP_USERNAME,
24                         newUsername);
25                 editor.putString(Constants.XMPP_PASSWORD,
26                         newPassword);
27                 editor.commit();
28                 Log
29                         .i(LOGTAG,
30                                 "Account registered successfully");
31                 //執行task
32                 xmppManager.runTask();
33             }
34         }
35     }
36 };

addPacketListener方法傳入一個監聽器和過濾器,看一下內部

/**
 * Registers a packet listener with this connection. A packet filter determines
 * which packets will be delivered to the listener. If the same packet listener
 * is added again with a different filter, only the new filter will be used.
 * 
 * @param packetListener the packet listener to notify of new received packets.
 * @param packetFilter   the packet filter to use.
 */
public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) {
    if (packetListener == null) {
        throw new NullPointerException("Packet listener is null.");
    }
    ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
    recvListeners.put(packetListener, wrapper);
}

能夠看到,監聽器和過濾器被 ListenerWrapper 再次封裝,後續的recvListeners這個集合將ListenerWrapper收入囊中,好整個註冊過程完畢,就等待接收信息了,那麼發送信息的地方在什麼地方呢?分析connect過程時,上面的PacketReader中已經開始循環發送了,代碼以下

listenerExecutor.submit(new ListenerNotification(packet));其中ListenerNotification是個Runnable

/**
 * A runnable to notify all listeners of a packet.
 */
private class ListenerNotification implements Runnable {

    private Packet packet;

    public ListenerNotification(Packet packet) {
        this.packet = packet;
    }

    public void run() {
        for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) {
            listenerWrapper.notifyListener(packet);
        }
    }
}

而listenerWrapper的notifyListener(packet)內部,使用了傳入的過濾器對Packet進行了過濾

/**
 * Notify and process the packet listener if the filter matches the packet.
 * 
 * @param packet the packet which was sent or received.
 */
public void notifyListener(Packet packet) {
    if (packetFilter == null || packetFilter.accept(packet)) {
        packetListener.processPacket(packet);
    }

而具體的過濾機制仍是轉調了傳入的過濾器自己的過濾方式accept,很是的靈活。過濾完的Packet將被髮送出去

這個方法connection.sendPacket(registration);將一個Registration對象發了出去,

public void sendPacket(Packet packet) {
    if (!isConnected()) {
        throw new IllegalStateException("Not connected to server.");
    }
    if (packet == null) {
        throw new NullPointerException("Packet is null.");
    }
    packetWriter.sendPacket(packet);
}

內部轉調的是 packetWriter.sendPacket(packet);之前提到過PacketWirter中有兩個循環機制,其中一個就是在不停的訪問隊列來獲取Packet,而這個sendPacket方法就是將消息寫入隊列中供消費者使用。

/**
 * Sends the specified packet to the server.
 *
 * @param packet the packet to send.
 */
public void sendPacket(Packet packet) {
    if (!done) {
        // Invoke interceptors for the new packet that is about to be sent. Interceptors
        // may modify the content of the packet.
        //內部執行了一個發送數據源的動做,也是爲某些監聽器對象服務的interceptorWrapper.notifyListener(packet);
        connection.firePacketInterceptors(packet);

        try {
            //將一個Packet對象放入到阻塞隊列中,在上面的witerPacket方法中的wile循環中發送出去
            queue.put(packet);
        }
        catch (InterruptedException ie) {
            ie.printStackTrace();
            return;
        }
        synchronized (queue) {
            queue.notifyAll();
        }

        // Process packet writer listeners. Note that we're using the sending
        // thread so it's expected that listeners are fast.
        connection.firePacketSendingListeners(packet);
    }
}   

其實,註冊的過程就是在註冊監聽,這樣在有消息發出時,才能夠根據業務需求對消息進行接收和處理。

http://www.cnblogs.com/rioder/archive/2013/01/23/2873176.html

相關文章
相關標籤/搜索