微信已經開源了mars,可是市面上相關的文章較少,即便有也是多在於使用xlog等這些,那麼此次我但願可以從stn這個直接用於im底層通信的部分進行個分析。
爲了能分析的全面些,咱們從samples開始。
首先明確下,微信用了google的開源協議protobuf庫,來代替json和xml。至於爲什麼使用這個,緣由還在於效率和傳輸量上,效率上他可以比json提高將近10倍,並且基於二進制而非文本,傳輸的大小更加有優點,具體的再也不累述,有興趣的能夠本身查查。
咱們從samples開始看看經過http是怎麼得到列表數據的,直接看/mars-master/samples/android/marsSampleChat/app/src/main/java/com/tencent/mars/sample/ConversationActivity.java,這個是個初始的列表界面,須要看的就是這個:java
/** * pull conversation list from server */ private void updateConversationTopics() { if (taskGetConvList != null) { MarsServiceProxy.cancel(taskGetConvList); } mTextView.setVisibility(View.INVISIBLE); progressBar.setVisibility(View.VISIBLE); swipeRefreshLayout.setRefreshing(true); taskGetConvList = new NanoMarsTaskWrapper<Main.ConversationListRequest, Main.ConversationListResponse>( new Main.ConversationListRequest(), new Main.ConversationListResponse() ) { private List<Conversation> dataList = new LinkedList<>(); @Override public void onPreEncode(Main.ConversationListRequest req) { req.type = conversationFilterType; req.accessToken = ""; // TODO: Log.d("xxx", "onPreEncode: " + req.toString()); } @Override public void onPostDecode(Main.ConversationListResponse response) { Log.d("xxx", "onPostDecode: " + response.toString()); } @Override public void onTaskEnd(int errType, int errCode) { Log.d("xxx", "onTaskEnd: " + errType + " " + errCode); runOnUiThread(new Runnable() { @Override public void run() { if (response != null) { for (Main.Conversation conv : response.list) { dataList.add(new Conversation(conv.name, conv.topic, conv.notice)); Log.d("xxx", conv.toString()); } } if (!dataList.isEmpty()) { progressBar.setVisibility(View.INVISIBLE); conversationListAdapter.list.clear(); conversationListAdapter.list.addAll(dataList); conversationListAdapter.notifyDataSetChanged(); swipeRefreshLayout.setRefreshing(false); } else { Log.i(TAG, "getconvlist: empty response list"); progressBar.setVisibility(View.INVISIBLE); mTextView.setVisibility(View.VISIBLE); } } }); } }; MarsServiceProxy.send(taskGetConvList.setHttpRequest(CONVERSATION_HOST, "/mars/getconvlist")); }
new了一個NanoMarsTaskWrapper對象,並Override了幾個方法:onPreEncode、onPostDecode、onTaskEnd。分別是編碼傳輸前回調,收到結果解碼後回調,任務結束後回調;android
設置NanoMarsTaskWrapper對象的http url地址;json
經過MarsServiceProxy的send方法,執行發送;api
經過這些,咱們能夠大致瞭解到,經過一個內置的任務體系,來進行傳輸的派發調用的;經過服務來驅使整個體系運轉,並保證獨立性;緩存
其實在目錄中已經能夠看到了,samples分爲2個部分,一個是app,另外一個是wrapper,wrapper是jar。
好吧,咱們從wrapper入手看下基本結構。
首先是manifest:安全
<manifest xmlns:android="http://schemas.android.com/apk/res/android" package="com.tencent.mars.sample.wrapper"> <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /> <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> <uses-permission android:name="android.permission.INTERNET" /> <uses-permission android:name="android.permission.WAKE_LOCK" /> <application> <service android:name=".service.MarsServiceNative" android:process=":marsservice" /> <receiver android:name="com.tencent.mars.BaseEvent$ConnectionReceiver" android:process=":marsservice"/> </application> </manifest>
能夠看到,獨立進程的服務在這裏約定了。廣播接受者在這裏約定了,與服務在同一進程中。
上面app中使用的MarsServiceProxy是個什麼東西呢?微信
public class MarsServiceProxy implements ServiceConnection { ...... private MarsServiceProxy() { worker = new Worker(); worker.start(); } public static void init(Context context, Looper looper, String packageName) { if (inst != null) { // TODO: Already initialized return; } gContext = context.getApplicationContext(); gPackageName = (packageName == null ? context.getPackageName() : packageName); gClassName = SERVICE_DEFUALT_CLASSNAME; inst = new MarsServiceProxy(); } ...... }
實際上是從ServiceConnection繼承下來的服務鏈接對象,可是他不只僅是個鏈接對象。咱們看到,他是個單例,在app的SampleApplicaton的onCreate中進行的初始化:app
// NOTE: MarsServiceProxy is for client/caller // Initialize MarsServiceProxy for local client, can be moved to other place MarsServiceProxy.init(this, getMainLooper(), null);
app中調用的是send這個靜態方法:框架
public static void send(MarsTaskWrapper marsTaskWrapper) { inst.queue.offer(marsTaskWrapper); }
其實這個方法在操做的是隊列LinkedBlockingQueue<MarsTaskWrapper>。看到了吧,這個MarsServiceProxy實際上是個api代理,內部有緩存的任務隊列,實際上send就是向這個線程安全的隊列中加入一項任務MarsTaskWrapper。
暫時放一下,咱們關注下他的服務功能。在構造的時候,new了一個Worker,並start了。這個worker就是一個線程:tcp
private static class Worker extends Thread { @Override public void run() { while (true) { inst.continueProcessTaskWrappers(); try { Thread.sleep(50); } catch (InterruptedException e) { // } } } }
也就是說,在這個類建立的時候,同時建立了一個工做線程,不斷的以間隔50ms循環調用continueProcessTaskWrappers。再看continueProcessTaskWrappers:
private void continueProcessTaskWrappers() { try { if (service == null) { Log.d(TAG, "try to bind remote mars service, packageName: %s, className: %s", gPackageName, gClassName); Intent i = new Intent().setClassName(gPackageName, gClassName); gContext.startService(i); if (!gContext.bindService(i, inst, Service.BIND_AUTO_CREATE)) { Log.e(TAG, "remote mars service bind failed"); } // Waiting for service connected return; } MarsTaskWrapper taskWrapper = queue.take(); if (taskWrapper == null) { // Stop, no more task return; } try { Log.d(TAG, "sending task = %s", taskWrapper); final String cgiPath = taskWrapper.getProperties().getString(MarsTaskProperty.OPTIONS_CGI_PATH); final Integer globalCmdID = GLOBAL_CMD_ID_MAP.get(cgiPath); if (globalCmdID != null) { taskWrapper.getProperties().putInt(MarsTaskProperty.OPTIONS_CMD_ID, globalCmdID); Log.i(TAG, "overwrite cmdID with global cmdID Map: %s -> %d", cgiPath, globalCmdID); } service.send(taskWrapper, taskWrapper.getProperties()); } catch (Exception e) { // RemoteExceptionHandler e.printStackTrace(); } } catch (Exception e) { } }
1.檢查服務是否啓動,沒有則啓動並返回等待下一個50ms再繼續;
2.從隊列中獲取一個任務,並給他分配一個cmdID,而後調用MarsService的send方法執行真正的發送事件。
其實從上面看,這個服務代理就是作了這些事情,更深刻的事情實際上是交給了具體的服務進程來作的。這裏就是個代理api。
好的,咱們往下看具體的服務。
首先MarsService是個aidl的定義,不過咱們從上面的這個線程循環裏就能夠看到,啓動的服務是根據Intent i = new Intent().setClassName(gPackageName, gClassName);啓動的,這個gClassName = SERVICE_DEFUALT_CLASSNAME;就是public static final String SERVICE_DEFUALT_CLASSNAME = "com.tencent.mars.sample.wrapper.service.MarsServiceNative";看到了吧,就是MarsServiceNative。
如今起進入到服務裏面。
public class MarsServiceNative extends Service implements MarsService { private static final String TAG = "Mars.Sample.MarsServiceNative"; private MarsServiceStub stub; ...... }
這裏保存了一個MarsServiceStub,後面的send都是調用他來實現的,如今暫時先放下send,看下onCreate:
@Override public void onCreate() { super.onCreate(); final MarsServiceProfile profile = gFactory.createMarsServiceProfile(); stub = new MarsServiceStub(this, profile); // set callback AppLogic.setCallBack(stub); StnLogic.setCallBack(stub); SdtLogic.setCallBack(stub); // Initialize the Mars PlatformComm Mars.init(getApplicationContext(), new Handler(Looper.getMainLooper())); // Initialize the Mars StnLogic.setLonglinkSvrAddr(profile.longLinkHost(), profile.longLinkPorts()); StnLogic.setShortlinkSvrAddr(profile.shortLinkPort()); StnLogic.setClientVersion(profile.productID()); Mars.onCreate(true); StnLogic.makesureLongLinkConnected(); // Log.d(TAG, "mars service native created"); }
1.建立配置信息類MarsServiceProfile;
2.new出MarsServiceStub來;
3.設置各類回調;
4.初始化Mars;
5.Mars.onCreate(true);
6.StnLogic.makesureLongLinkConnected();確認長鏈接。
這裏開始用到了Mars了,這個纔是核心,而且不在這個工程中。核心的部分咱們先放下,下一篇再深刻分析。
回到MarsServiceStub,看他的send方法:
@Override public void send(final MarsTaskWrapper taskWrapper, Bundle taskProperties) throws RemoteException { final StnLogic.Task _task = new StnLogic.Task(StnLogic.Task.EShort, 0, "", null); // Set host & cgi path final String host = taskProperties.getString(MarsTaskProperty.OPTIONS_HOST); final String cgiPath = taskProperties.getString(MarsTaskProperty.OPTIONS_CGI_PATH); _task.shortLinkHostList = new ArrayList<>(); _task.shortLinkHostList.add(host); _task.cgi = cgiPath; final boolean shortSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, true); final boolean longSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, false); if (shortSupport && longSupport) { _task.channelSelect = StnLogic.Task.EBoth; } else if (shortSupport) { _task.channelSelect = StnLogic.Task.EShort; } else if (longSupport) { _task.channelSelect = StnLogic.Task.ELong; } else { Log.e(TAG, "invalid channel strategy"); throw new RemoteException("Invalid Channel Strategy"); } // Set cmdID if necessary int cmdID = taskProperties.getInt(MarsTaskProperty.OPTIONS_CMD_ID, -1); if (cmdID != -1) { _task.cmdID = cmdID; } TASK_ID_TO_WRAPPER.put(_task.taskID, taskWrapper); WRAPPER_TO_TASK_ID.put(taskWrapper, _task.taskID); // Send Log.i(TAG, "now start task with id %d", _task.taskID); StnLogic.startTask(_task); if (StnLogic.hasTask(_task.taskID)) { Log.i(TAG, "stn task started with id %d", _task.taskID); } else { Log.e(TAG, "stn task start failed with id %d", _task.taskID); } }
1.new一個StnLogic.Task;
2.設置task的參數,根據入口的Bundle;
3.2個map保存taskID與task的關係;
4.StnLogic.startTask(_task);啓動任務執行;
這裏的內容又深刻到了Mars核內心,能夠看到,關鍵的處理都是在Mars核心部分完成的,這裏的內容甭管是服務仍是什麼都是在作參數的傳遞及關係的維護等工做。
好吧,咱們倒帶回來,回到MarsServiceStub,他實現了StnLogic.ICallBack這個interface。定義在mars裏:
public interface ICallBack { /** * SDK要求上層作認證操做(可能新發起一個AUTH CGI) * @return */ boolean makesureAuthed(); /** * SDK要求上層作域名解析.上層能夠實現傳統DNS解析,或者本身實現的域名/IP映射 * @param host * @return */ String[] onNewDns(final String host); /** * 收到SVR PUSH下來的消息 * @param cmdid * @param data */ void onPush(final int cmdid, final byte[] data); /** * SDK要求上層對TASK組包 * @param taskID 任務標識 * @param userContext * @param reqBuffer 組包的BUFFER * @param errCode 組包的錯誤碼 * @return */ boolean req2Buf(final int taskID, Object userContext, ByteArrayOutputStream reqBuffer, int[] errCode, int channelSelect); /** * SDK要求上層對TASK解包 * @param taskID 任務標識 * @param userContext * @param respBuffer 要解包的BUFFER * @param errCode 解包的錯誤碼 * @return int */ int buf2Resp(final int taskID, Object userContext, final byte[] respBuffer, int[] errCode, int channelSelect); /** * 任務結束回調 * @param taskID 任務標識 * @param userContext * @param errType 錯誤類型 * @param errCode 錯誤碼 * @return */ int onTaskEnd(final int taskID, Object userContext, final int errType, final int errCode); /** * 流量統計 * @param send * @param recv */ void trafficData(final int send, final int recv); /** * 鏈接狀態通知 * @param status 綜合狀態,即長連+短連的狀態 * @param longlinkstatus 僅長連的狀態 */ void reportConnectInfo(int status, int longlinkstatus); /** * SDK要求上層生成長連接數據校驗包,在長連接鏈接上以後使用,用於驗證SVR身份 * @param identifyReqBuf 校驗包數據內容 * @param hashCodeBuffer 校驗包的HASH * @param reqRespCmdID 數據校驗的CMD ID * @return ECHECK_NOW(須要校驗), ECHECK_NEVER(不校驗), ECHECK_NEXT(下一次再詢問) */ int getLongLinkIdentifyCheckBuffer(ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int[] reqRespCmdID); /** * SDK要求上層解鏈接校驗回包. * @param buffer SVR回覆的鏈接校驗包 * @param hashCodeBuffer CLIENT請求的鏈接校驗包的HASH值 * @return */ boolean onLongLinkIdentifyResp(final byte[] buffer, final byte[] hashCodeBuffer); /** * 請求作sync */ void requestDoSync(); String[] requestNetCheckShortLinkHosts(); /** * 是否登陸 * @return true 登陸 false 未登陸 */ boolean isLogoned(); void reportTaskProfile(String taskString); }
能夠看到都是回調,經過mars的回調,MarsServiceStub接收到了taskend,並執行了:
@Override public int onTaskEnd(int taskID, Object userContext, int errType, int errCode) { final MarsTaskWrapper wrapper = TASK_ID_TO_WRAPPER.remove(taskID); if (wrapper == null) { Log.w(TAG, "stn task onTaskEnd callback may fail, null wrapper, taskID=%d", taskID); return 0; // TODO: ??? } try { wrapper.onTaskEnd(errType, errCode); } catch (RemoteException e) { e.printStackTrace(); } finally { WRAPPER_TO_TASK_ID.remove(wrapper); // onTaskEnd will be called only once for each task } return 0; }
從map中移除task,而後執行了task本身的onTaskEnd。這樣咱們正最初的updateConversationTopics裏就能夠看到後續的更新ui的代碼。
下面咱們要回到updateConversationTopics附近,看看NanoMarsTaskWrapper:
public abstract class NanoMarsTaskWrapper<T extends MessageNano, R extends MessageNano> extends AbstractTaskWrapper { private static final String TAG = "Mars.Sample.NanoMarsTaskWrapper"; protected T request; protected R response; public NanoMarsTaskWrapper(T req, R resp) { super(); this.request = req; this.response = resp; } @Override public byte[] req2buf() { try { onPreEncode(request); final byte[] flatArray = new byte[request.getSerializedSize()]; final CodedOutputByteBufferNano output = CodedOutputByteBufferNano.newInstance(flatArray); request.writeTo(output); Log.d(TAG, "encoded request to buffer, [%s]", MemoryDump.dumpHex(flatArray)); return flatArray; } catch (Exception e) { e.printStackTrace(); } return new byte[0]; } @Override public int buf2resp(byte[] buf) { try { Log.d(TAG, "decode response buffer, [%s]", MemoryDump.dumpHex(buf)); response = MessageNano.mergeFrom(response, buf); onPostDecode(response); return StnLogic.RESP_FAIL_HANDLE_NORMAL; } catch (Exception e) { Log.e(TAG, "%s", e); } return StnLogic.RESP_FAIL_HANDLE_TASK_END; } public abstract void onPreEncode(T request); public abstract void onPostDecode(R response);
}
1.從AbstractTaskWrapper繼承下來;
2.保存了request和response,都是MessageNano類型的(google的protobuf內的message數據類);
3.實現了2個接口,分別用來做爲request轉換爲buf何buf轉換成爲response。其實就是對象轉成byte[],byte轉成對象;
3.在req2buf轉換的過程當中,調用了request的writeTo方法;
4.在buf2resp中,調用了MessageNano.mergeFrom,實際上最終也是調用了response的mergeFrom,見下:
/** * Parse {@code data} as a message of this type and merge it with the * message being built. */ public static final <T extends MessageNano> T mergeFrom(T msg, final byte[] data) throws InvalidProtocolBufferNanoException { return mergeFrom(msg, data, 0, data.length); }
根據上面的4點能夠看到這是個實現序列化及反序列化的過程。google的開源protobuf咱們不去關注,可是須要了解的是他是經過以proto爲後綴名的配置文件來達到編譯時便可生成類的相關代碼的程度。
那麼這個AbstractTaskWrapper的基類的做用又是什麼呢?
public abstract class AbstractTaskWrapper extends MarsTaskWrapper.Stub { private Bundle properties = new Bundle(); public AbstractTaskWrapper() { // Reflects task properties final TaskProperty taskProperty = this.getClass().getAnnotation(TaskProperty.class); if (taskProperty != null) { setHttpRequest(taskProperty.host(), taskProperty.path()); setShortChannelSupport(taskProperty.shortChannelSupport()); setLongChannelSupport(taskProperty.longChannelSupport()); setCmdID(taskProperty.cmdID()); } } @Override public Bundle getProperties() { return properties; } @Override public abstract void onTaskEnd(int errType, int errCode); public AbstractTaskWrapper setHttpRequest(String host, String path) { properties.putString(MarsTaskProperty.OPTIONS_HOST, ("".equals(host) ? null : host)); properties.putString(MarsTaskProperty.OPTIONS_CGI_PATH, path); return this; } public AbstractTaskWrapper setShortChannelSupport(boolean support) { properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, support); return this; } public AbstractTaskWrapper setLongChannelSupport(boolean support) { properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, support); return this; } public AbstractTaskWrapper setCmdID(int cmdID) { properties.putInt(MarsTaskProperty.OPTIONS_CMD_ID, cmdID); return this; } @Override public String toString() { return "AbsMarsTask: " + BundleFormat.toString(properties); }
}
很簡單,就是提供了一些接口來設置傳輸協議類型,長短鏈接、http等。
綜合來講,這個demo使用了獨立的服務框架來進行傳輸的保證;使用了任務體系來承載每次傳輸及響應;大量的回調來監控運轉過程當中的各項關鍵點;封裝了獨立的jar wrapper,便於上層的更改及使用;獨立的配置類引入支持http和tcp長短鏈接的使用;protobuf的引入極大提高序列化及反序列化的效率,並下降傳輸的數據大小;
這篇暫時就到這裏吧,後面咱們會深刻分析下mars的核心部分。