在研究如何實現Pushing功能期間,收集了不少關於Pushing的資料,其中有一個androidnp開源項目用的人比較多,可是因爲長時間沒有什麼人去維護,據說bug的概率挺多的,爲了之後本身的產品穩定些,因此就打算本身研究一下asmack的源碼,本身作一個插件,androidnp移動端的源碼中包含了一個叫作asmack的jar。html
Reader和Writerandroid
在asmack中有兩個很是重要的對象PacketReader和PacketWriter,那麼從類名上看Packet + (Reader/Wirter),而TCP/IP傳輸的數據,叫作Packet(包),asmack使用的是XMPP協議,XMPP簡單講就是使用TCP/IP協議 + XML流協議的組合。因此這個了對象的做用從字面上看應該是,寫包與讀包,做用爲從服務端讀寫數據。設計模式
PacketWriter中必定含有一個Writer對象,這個Writer是一個輸出流,一樣的PacketReader對象中有一個Reader,而這個Reader是一個輸入流,Writer和Reader對象就是一個簡單的讀寫器,他們是從socket對象中獲取出來後,通過裝飾變成如今這個樣子。數組
1 reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")); 2 writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"));
沒有什麼神奇的地方,主要看PacketWriter/Reader,這兩個對象分別把對應的Writer和Reader引用到本身的內部進行操做,下面就先看一個PacketWriter。服務器
/** * Creates a new packet writer with the specified connection. * * @param connection the connection. */ protected PacketWriter(XMPPConnection connection) { this.queue = new ArrayBlockingQueue<Packet>(500, true); this.connection = connection; init(); }
還有就是PacketWriter初始化的時候將XMPPConnection對象傳了進來,由於在init方法中使用到了XMPPConnection對象的writer成員,我想說的是,爲何不直接傳遞writer成員?而是將整個對象XMPPConnection傳了過來?其實這就是設計模式的好處,咱們若是每次都傳遞的是本身的成員,那麼若是後期有改動,實現一個新的XMPPConnection與PacketWriter關聯,那麼老的代碼維護起來是很巨大的,若是這裏XMPPConnection和他的同事類PacketWriter都有相對應的接口,(XMPPConnection的接口是Connection)那就更完美了,而這裏用到的模式應該是中介者,不是絕對意義的中介者,因爲造成中介者的條件比較高,因此實際開發中可能是變形使用。PacketWriter對象在XMPPConnection中的connect方法中被初始化,它的最大做用是在其自身的內部建立了兩個消息循環,其中一個用30s的heartbeats向服務器發送空白字符,保持長鏈接。而第二個循環則時刻從隊列中主動取消息併發往服務器,而向外部提供的sendPacket方法則是向queue中添加消息,前面提到的循環機制都是在線程中工做,而消息的隊列用的是ArrayBlockingQueue,這個無邊界阻塞隊列能夠存聽任何對象,這裏存放的是Packet對象。併發
1 public void sendPacket(Packet packet) { 2 if (!done) { 3 try { 4 queue.put(packet); 5 } 6 catch (InterruptedException ie) { 7 ie.printStackTrace(); 8 return; 9 } 10 synchronized (queue) { 11 queue.notifyAll(); 12 } 13 } 14 }
while (!done && (writerThread == thisThread)) { Packet packet = nextPacket(); if (packet != null) { synchronized (writer) { writer.write(packet.toXML()); writer.flush(); // Keep track of the last time a stanza was sent to the server lastActive = System.currentTimeMillis(); } } }
消息循環則是一個經過各類成員變量控制的while loop,第一行的nextPacket方法是向queue中獲取Packet消息,而且經過weiter將包發出去,這樣生產/消費的模型就搭建好了,這裏須要注意的是,我刪減了不少影響閱讀的代碼,並無所有貼上。關於heartbeats循環其實也是一個在線程中運行的while loop,也是經過一些成員控制。wirter向服務端寫了寫什麼?看下面的這個方法app
void openStream() throws IOException { StringBuilder stream = new StringBuilder(); stream.append("<stream:stream"); stream.append(" to=\"").append(connection.getServiceName()).append("\""); stream.append(" xmlns=\"jabber:client\""); stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\""); stream.append(" version=\"1.0\">"); writer.write(stream.toString()); writer.flush(); }
XML,沒錯,這也是符合XMPP協議規範的一種表現吧,至於更多XMPP協議的好處,因爲本人的經驗有限,就很少作點評,但願後續會對其深刻了解。socket
下面看一個PacketReader這個類都包含了什麼職責。工具
PacketReaderoop
PacketReader全部的核心邏輯都在一個線程中完成的,PacketReader的工做很專一,一樣的在一個while loop中 不停的解析、刷新reader對象、同時做爲事件源發送解析事後的各類Packet,解析這裏用的是Android獨特的Pull解析,Pull解析的特色事件驅動,在這裏被徹底的利用了起來,隨着不一樣的標籤,PacketReader都會作出不一樣的處理,處理完這些數據用不一樣Pocket對象封裝,最後,分發出去,由監聽者作最後的業務處理。
readerThread = new Thread() {
public void run() {
parsePackets(this);
}
};
因爲解析過程的代碼量過於多,我寫到什麼地方就分解什麼地方,你們有時間最好本身看源碼。
1、初始化/重置解析器
private void resetParser() {
try {
//用的是Pull解析
parser = XmlPullParserFactory.newInstance().newPullParser();
parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
parser.setInput(connection.reader);
}
catch (XmlPullParserException xppe) {
xppe.printStackTrace();
}
}
上面這個resetParser方法還會在解析的過程當中碰到不一樣的業務需求會不斷的被調用,有用和業務邏輯比較緊密,沒什麼技術含量,關鍵是要看解析的方式和同時做爲事件源發送解析事後的各類Packet,這兩部分的設計,是很是的迷人的。
2、解析
do {
if (eventType == XmlPullParser.START_TAG) {
if (parser.getName().equals("message")) {
processPacket(PacketParserUtils.parseMessage(parser));
}
else if (parser.getName().equals("iq")) {
processPacket(PacketParserUtils.parseIQ(parser, connection));
}
else if (parser.getName().equals("presence")) {
processPacket(PacketParserUtils.parsePresence(parser));
}
PacketParserUtils是一個工具類,各個靜態方法傳入的仍是Parser對象,內部一樣的使用Pull的方式進行解析,可是因爲Pull是驅動解析,不會無端的浪費資源只會加載感興趣的內容,試想一下,若是這裏用Dom解析……PacketParserUtils的這些靜態解析方法返回的實例對象也不同,從方法名能夠看出有IQ、message、presence等,他們的父類爲Packet,這些對象又被執行processPacket方法的時候傳入
private void processPacket(Packet packet) {
if (packet == null) {
return;
}
// Loop through all collectors and notify the appropriate ones.
for (PacketCollector collector: connection.getPacketCollectors()) {
collector.processPacket(packet);
}
// Deliver the incoming packet to listeners.
listenerExecutor.submit(new ListenerNotification(packet));
}
processPacket方法內部有一個循環來轉調collector.processPacket(packet);方法,前提是connection.getPacketCollectors()內部有貨,到目前位置都沒有涉及到PacketCollector這個接口的內容,他的做用實際上是一個觀察者模式中的執行者的做用,也就是傳說中的監聽器,凡是註冊了它的對象,均可以經過processPacket這個抽象方法,監聽packet的變化。但是到如今任何對象都沒有註冊它,因此這個Loop尚未做用,由於目前咱們還處在鏈接的步驟(還沒繞出來)。
listenerExecutor.submit(new ListenerNotification(packet));其中ListenerNotification是個Runnable
/**
* A runnable to notify all listeners of a packet.
*/
private class ListenerNotification implements Runnable {
private Packet packet;
public ListenerNotification(Packet packet) {
this.packet = packet;
}
public void run() {
for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) {
listenerWrapper.notifyListener(packet);
}
}
}
咱們上面看到listenerExecutor是一個線程池,在線程池中執行了一個凡是註冊了ListenerWrapper的對象,都將接收到packet,一樣的,到目前爲止沒有對象註冊,(在RegisterTask過程當中ListenerWrapper被註冊)
else if (eventType == XmlPullParser.END_TAG) {
if (parser.getName().equals("stream")) {
// Disconnect the connection
connection.disconnect();
}
}
當文檔讀取結束是將斷開鏈接
void cleanup() {
connection.recvListeners.clear();
connection.collectors.clear();
}
看到了嗎,只是將監聽器接口集合清空而已,並無斷開鏈接,或者取消消息循環
PacketReader對象的startup方法比較複雜,大致上執行了讀取流,並將解析好的Packet對象發送給觀察者,由觀察者繼續後續操做,目前觀察者尚未出現,還有就是使用了線程池和令牌來操做執行線程,並且維護了一個connectionID成員,這個成員的做用還須要再看,這就很少說了。
關於Packet對象,packet對象有不少子類,上面舉例了3個,其實還有不少,都是在parser時封裝的
AuthMechanism\Challenge\Failure\IQ\Message\Presence\Response\Success
還有就是Pull解析的優勢體現了出來,能夠一個parser對象包含了不少信息,但可能沒到一個時刻咱們須要的信息只是一小部分,這樣用Pull解析的驅動式就大大減小了冗餘的過程,PacketReader對象使用了2個監聽器集合對象,PacketCollector、listenerWrapper,仍是那句話,還沒看到觀察者,因此還不知道什麼狀況下須要註冊這兩個監聽。
到目前位置packetReader.startup()方法終於告一個段落了。
register過程分析
RegisterTask這個task在運行中,添加了一個監聽,上面說道的PacketReader中有一個消息機制,在不停的解析服務器返回的結果,而後將解析事後的包分發給各個監聽器(觀察者),而register中就註冊了一個監聽器,比較有意思的是,監聽器被註冊時還加了一個過濾器,這個過濾器的目的是監聽器只接收本身感興趣的內容,這個設計真的很贊。這樣就沒必要在數據源頭PacketReader中對數據進行過濾了,只要後期擴展本身Packet和本身的過濾器,就能達到排除本身不關心的信息的功能。
Registration registration = new Registration();
PacketFilter packetFilter = new AndFilter(new PacketIDFilter(registration.getPacketID()), new PacketTypeFilter(IQ.class));
其中Registration的類型其實一個IQ的子類,IQ是Packet的子類。
AndFilter是PacketFilter的子類,PacketFilter的種類型有不少,也能夠本身擴展,AndFilter就是其中一個、PacketTypeFilter也是、PacketIDFilter也是,
其中PacketTypeFilter的構造方法傳入一個IQ.class,其實就是經過這個類文件來過濾packet,這個PacketTypeFilter就是要設置關心的Packet,這裏面它告訴監聽器,只接收類型爲IQ的Packet,這些Filter中都有一個關鍵方法,accept(Packet packet).這個accept方法每一個Filter的實現方式都不同,咱們可能夠擴展本身的Filter而且重寫這個方法,最有意思的是AndFilter這個類,他的構造方法傳入的是一個動態數組,類型爲PacketFilter,你能夠傳入你須要的過濾器,將他們當成組合條件使用來過濾Packet,這個就是典型的裝飾設計模式和職責鏈模式的組合使用。
註冊監聽器
1 PacketListener packetListener = new PacketListener() { 2 //這一部分就是監聽器接收到Packet後執行的後續操做 3 public void processPacket(Packet packet) { 4 Log.d("RegisterTask.PacketListener", "processPacket()....."); 5 Log.d("RegisterTask.PacketListener", "packet=" + packet.toXML()); 6 7 if (packet instanceof IQ) { 8 IQ response = (IQ) packet; 9 if (response.getType() == IQ.Type.ERROR) { 10 if (!response.getError().toString().contains("409")) { 11 Log.e(LOGTAG, 12 "Unknown error while registering XMPP account! " 13 + response.getError() 14 .getCondition()); 15 } 16 } else if (response.getType() == IQ.Type.RESULT) { 17 xmppManager.setUsername(newUsername); 18 xmppManager.setPassword(newPassword); 19 Log.d(LOGTAG, "username=" + newUsername); 20 Log.d(LOGTAG, "password=" + newPassword); 21 22 Editor editor = sharedPrefs.edit(); 23 editor.putString(Constants.XMPP_USERNAME, 24 newUsername); 25 editor.putString(Constants.XMPP_PASSWORD, 26 newPassword); 27 editor.commit(); 28 Log 29 .i(LOGTAG, 30 "Account registered successfully"); 31 //執行task 32 xmppManager.runTask(); 33 } 34 } 35 } 36 };
addPacketListener方法傳入一個監聽器和過濾器,看一下內部
/** * Registers a packet listener with this connection. A packet filter determines * which packets will be delivered to the listener. If the same packet listener * is added again with a different filter, only the new filter will be used. * * @param packetListener the packet listener to notify of new received packets. * @param packetFilter the packet filter to use. */ public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) { if (packetListener == null) { throw new NullPointerException("Packet listener is null."); } ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); recvListeners.put(packetListener, wrapper); }
能夠看到,監聽器和過濾器被 ListenerWrapper 再次封裝,後續的recvListeners這個集合將ListenerWrapper收入囊中,好整個註冊過程完畢,就等待接收信息了,那麼發送信息的地方在什麼地方呢?分析connect過程時,上面的PacketReader中已經開始循環發送了,代碼以下
listenerExecutor.submit(new ListenerNotification(packet));其中ListenerNotification是個Runnable
/** * A runnable to notify all listeners of a packet. */ private class ListenerNotification implements Runnable { private Packet packet; public ListenerNotification(Packet packet) { this.packet = packet; } public void run() { for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) { listenerWrapper.notifyListener(packet); } } }
而listenerWrapper的notifyListener(packet)內部,使用了傳入的過濾器對Packet進行了過濾
/** * Notify and process the packet listener if the filter matches the packet. * * @param packet the packet which was sent or received. */ public void notifyListener(Packet packet) { if (packetFilter == null || packetFilter.accept(packet)) { packetListener.processPacket(packet); }
而具體的過濾機制仍是轉調了傳入的過濾器自己的過濾方式accept,很是的靈活。過濾完的Packet將被髮送出去
這個方法connection.sendPacket(registration);將一個Registration對象發了出去,
public void sendPacket(Packet packet) { if (!isConnected()) { throw new IllegalStateException("Not connected to server."); } if (packet == null) { throw new NullPointerException("Packet is null."); } packetWriter.sendPacket(packet); }
內部轉調的是 packetWriter.sendPacket(packet);之前提到過PacketWirter中有兩個循環機制,其中一個就是在不停的訪問隊列來獲取Packet,而這個sendPacket方法就是將消息寫入隊列中供消費者使用。
/** * Sends the specified packet to the server. * * @param packet the packet to send. */ public void sendPacket(Packet packet) { if (!done) { // Invoke interceptors for the new packet that is about to be sent. Interceptors // may modify the content of the packet. //內部執行了一個發送數據源的動做,也是爲某些監聽器對象服務的interceptorWrapper.notifyListener(packet); connection.firePacketInterceptors(packet); try { //將一個Packet對象放入到阻塞隊列中,在上面的witerPacket方法中的wile循環中發送出去 queue.put(packet); } catch (InterruptedException ie) { ie.printStackTrace(); return; } synchronized (queue) { queue.notifyAll(); } // Process packet writer listeners. Note that we're using the sending // thread so it's expected that listeners are fast. connection.firePacketSendingListeners(packet); } }
其實,註冊的過程就是在註冊監聽,這樣在有消息發出時,才能夠根據業務需求對消息進行接收和處理。
http://www.cnblogs.com/rioder/archive/2013/01/23/2873176.html