微信開源mars源碼分析2—上層samples分析(續)

原本是想直接深刻到mars的核心層去看的,可是發現其實上面的samples部分還有好些沒有分析到,所以回來繼續分析。
ConversationActivity這個類中實際上還作了不少的工做,在onCreate中:java

final MainService mainService = new MainService();
    MarsServiceProxy.setOnPushMessageListener(BaseConstants.CGIHISTORY_CMDID, mainService);
    MarsServiceProxy.setOnPushMessageListener(BaseConstants.CONNSTATUS_CMDID, mainService);
    MarsServiceProxy.setOnPushMessageListener(BaseConstants.FLOW_CMDID, mainService);
    MarsServiceProxy.setOnPushMessageListener(BaseConstants.PUSHMSG_CMDID, mainService);
    MarsServiceProxy.setOnPushMessageListener(BaseConstants.SDTRESULT_CMDID, mainService);

這裏出現了一個MainService,咱們來看看:
/mars-master/samples/android/marsSampleChat/app/src/main/java/com/tencent/mars/sample/core/MainService.javaandroid

public class MainService implements PushMessageHandler {
        
            public static String TAG = "Mars.Sample.MainService";
        
            private Thread recvThread;
        
            private LinkedBlockingQueue<PushMessage> pushMessages = new LinkedBlockingQueue<>();
        
            private BusinessHandler[] handlers = new BusinessHandler[]{
                    new MessageHandler(),
                    new StatisticHandler()
            };
        
            public MainService() {
                this.start();
            }
        
            public void start() {
                if (recvThread == null) {
                    recvThread = new Thread(pushReceiver, "PUSH-RECEIVER");
        
                    recvThread.start();
                }
            }
        
            private final Runnable pushReceiver = new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            PushMessage pushMessage = pushMessages.take();
                            if (pushMessage != null) {
                                for (BusinessHandler handler : handlers) {
                                    if (handler.handleRecvMessage(pushMessage)) {
                                        break;
                                    }
                                }
                            }
        
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            try {
                                Thread.sleep(500);
                            } catch (InterruptedException e1) {
                                //
                            }
                        }
                    }
                }
            };
        
            @Override
            public void process(PushMessage message) {
                pushMessages.offer(message);
            }
        }
1.啓動了一個接受者線程pushReceiver;
2.pushReceiver從LinkedBlockingQueue<PushMessage>的pushMessages中不斷獲取message,而後通知到handlers的每一個成員中,handlers是這樣定義的:
private BusinessHandler[] handlers = new BusinessHandler[]{
                new MessageHandler(),
                new StatisticHandler()
        };
繼續往下看MessageHandler:
/mars-master/samples/android/marsSampleChat/app/src/main/java/com/tencent/mars/sample/core/MessageHandler.java
public class MessageHandler extends BusinessHandler{
    
        public static String TAG = MessageHandler.class.getSimpleName();
    
        @Override
        public boolean handleRecvMessage(PushMessage pushMessage) {
    
            switch (pushMessage.cmdId) {
                case Constants.PUSHCMD:
                {
                    try {
                        Messagepush.MessagePush message = Messagepush.MessagePush.parseFrom(pushMessage.buffer);
                        Intent intent = new Intent();
                        intent.setAction(Constants.PUSHACTION);
                        intent.putExtra("msgfrom", message.from);
                        intent.putExtra("msgcontent", message.content);
                        intent.putExtra("msgtopic", message.topic);
                        SampleApplicaton.getContext().sendBroadcast(intent);
                    } catch (InvalidProtocolBufferNanoException e) {
                        Log.e(TAG, "%s", e.toString());
                    }
                }
                    return true;
                default:
                    break;
            }
    
            return false;
        }
    }

若是是Constants.PUSHCMD類型的message,那麼就發送一個廣播。這個廣播實際上會由Mars核心部分接收到,可是傳遞的intent參數其實沒有意義,核心部分只是根據每次的pushcmd進行網絡狀態的檢查而已,這些後話咱們在分析核心的時候再說。
另一個StatisticHandler,是專用於統計的,咱們暫時不去關注。
讓咱們回到ConversationActivity,在MainService的new以後,會調用MarsServiceProxy.setOnPushMessageListener屢次,設置監聽,咱們來看看MarsServiceProxy裏面如何運轉:網絡

public static void setOnPushMessageListener(int cmdId, PushMessageHandler pushMessageHandler) {
    if (pushMessageHandler == null) {
        inst.pushMessageHandlerHashMap.remove(cmdId);
    } else {
        inst.pushMessageHandlerHashMap.put(cmdId, pushMessageHandler);
    }
}

添加了監聽到一個支持高併發的hashmap中:併發

private ConcurrentHashMap<Integer, PushMessageHandler> pushMessageHandlerHashMap = new ConcurrentHashMap<>();
private MarsPushMessageFilter filter = new MarsPushMessageFilter.Stub() {

    @Override
    public boolean onRecv(int cmdId, byte[] buffer) throws RemoteException {
        PushMessageHandler handler = pushMessageHandlerHashMap.get(cmdId);
        if (handler != null) {
            Log.i(TAG, "processing push message, cmdid = %d", cmdId);
            PushMessage message = new PushMessage(cmdId, buffer);
            handler.process(message);
            return true;

        } else {
            Log.i(TAG, "no push message listener set for cmdid = %d, just ignored", cmdId);
        }

        return false;
    }
};

能夠看到,這裏建立了一個filter,這個filter在接收到一個cmd後,根據id查找到PushMessageHandler,而後調用hander.process,傳遞參數PushMessage,這個PushMessage實際上又是由cmdid和buffer組成的。這裏的cmdid理解爲一個指令的類型id便可。再到MainService中的process,其實就是加入PushMessage的一個隊列中等待處理。
回來看filter在什麼時候調用的吧。
首先在MarsServiceProxy的onServiceConnected:app

@Override
public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
    Log.d(TAG, "remote mars service connected");

    try {
        service = MarsService.Stub.asInterface(iBinder);
        service.registerPushMessageFilter(filter);
        service.setAccountInfo(accountInfo.uin, accountInfo.userName);

    } catch (Exception e) {
        service = null;
    }
}

看到了吧,將這個filter註冊到了服務中。來吧,MarsServiceNative的registerPushMessageFilter:ide

@Override
public void registerPushMessageFilter(MarsPushMessageFilter filter) throws RemoteException {
    stub.registerPushMessageFilter(filter);
}

到了MarsServiceStub裏:高併發

@Override
public void registerPushMessageFilter(MarsPushMessageFilter filter) throws RemoteException {
    filters.remove(filter);
    filters.add(filter);
}

加入了filters的隊列中,而後在onPush中有所調用:ui

@Override
public void onPush(int cmdid, byte[] data) {
    for (MarsPushMessageFilter filter : filters) {
        try {
            if (filter.onRecv(cmdid, data)) {
                break;
            }

        } catch (RemoteException e) {
            //
        }
    }
}

這裏調用了每一個filter的onRecv方法,這樣就和上面的串起來了吧。那麼什麼時候調用的這個onPush呢,答案在覈心mars部分的StnLogic裏面,這裏規定了一個ICallBack,裏面有onPush,會在適當的時候調用。具體的內容我會在後面的mars核心層分析的時候指出。this

總結一下:
MainService在一開始啓動,而且啓動專門的線程處理接收到的pushMessage,處理的過程就是調用以前已經準備好的一個handler的隊列的每一個項目的handleRecvMessage,其中的MessageHandler會發送廣播通知mars有pushcmd來了。
另外一方面,MainService提供process調用向message隊列中加入新項目。這個加入的過程調用在MarsServiceProxy中進行,內部生成了一個filter過濾器,並將其註冊到服務MarsServiceNative中,MarsServiceNative其實也是走的MarsServiceStub,在他裏面維護的filter的隊列,而且MarsServiceStub仍是底層核心Mars的回調監聽者,在onPush到來的時候,依次調用filter隊列中的各個項目的onRecv方法來實現通知。在filter的onRecv方法中就實現了handler.process(message),通知到了handler。線程

如今對於這部分應該比較明晰了吧。準備下一篇進行Mars的核心層分析吧。

相關文章
相關標籤/搜索