微信公衆號:Android部落格java
我的網站:chengang.plus/android
流程圖以下: shell
當原平生臺須要向dart發送消息時,須要用到EventChannel。數組
Android平臺的註冊方式:bash
class MainActivity : FlutterActivity(){ val DATA_RESULT_CHANNEL = "com.yourname.yourname/typeData" override fun onCreate(savedInstanceState: Bundle?) { EventChannel(flutterView, DATA_RESULT_CHANNEL).setStreamHandler(object : EventChannel.StreamHandler { override fun onListen(arguments: Any?, events: EventChannel.EventSink?) { listenEvents = events!! } override fun onCancel(arguments: Any?) { } }) } fun receiveMessage(data: Object) { if (data == null) { listenEvents.error("1", "data == null", null) } else { listenEvents.success(data: Object)) } } } 複製代碼
flutter端的接收方式是:微信
class InteractUtil { static const EventChannel eventChannel = const EventChannel("com.yourname.yourname/typeData"); List<InteractListener> listenerList; factory InteractUtil() => _getInstance(); static InteractUtil get instance => _getInstance(); static InteractUtil _instance; InteractUtil._internal() { listenerList = []; eventChannel.receiveBroadcastStream().listen(_onEvent, onError: _onError); } static InteractUtil _getInstance() { if (_instance == null) { _instance = new InteractUtil._internal(); } return _instance; } void addListener(InteractListener listener) { if (listener == null) { return; } listenerList.add(listener); } void _onEvent(Object event) { print("_onEvent is invoke$event"); for (InteractListener listener in listenerList) { listener.onEvent(event); } } void _onError(Object error) { for (InteractListener listener in listenerList) { listener.onError(error); } } } abstract class InteractListener { void onEvent(Object event); void onError(Object error); } 複製代碼
看看EventChannel的構造函數:markdown
shell\platform\android\io\flutter\plugin\common\EventChannel.java閉包
public EventChannel(BinaryMessenger messenger, String name) { this(messenger, name, StandardMethodCodec.INSTANCE); } public EventChannel(BinaryMessenger messenger, String name, MethodCodec codec) { this.messenger = messenger; this.name = name; this.codec = codec; } 複製代碼
與MethodChannel相似,這裏在構造函數的參數裏面構造了StandardMethodCodec和StandardMessageCodec。前者作方法名稱和參數的編解碼,後者作消息數據的編解碼。異步
這裏的messenger是FlutterView。async
經過setStreamHandler設置消息處理,並接收回調。
shell\platform\android\io\flutter\plugin\common\EventChannel.java
public void setStreamHandler(final StreamHandler handler) { messenger.setMessageHandler(name, handler == null ? null : new IncomingStreamRequestHandler(handler)); } 複製代碼
這裏的handler就是Android MainActivity中定義的EventChannel.StreamHandler。
IncomingStreamRequestHandler仍是在EventChannel中,看看他的定義:
private final class IncomingStreamRequestHandler implements BinaryMessageHandler { private final StreamHandler handler; private final AtomicReference<EventSink> activeSink = new AtomicReference<>(null); IncomingStreamRequestHandler(StreamHandler handler) { this.handler = handler; } @Override public void onMessage(ByteBuffer message, final BinaryReply reply) { final MethodCall call = codec.decodeMethodCall(message); if (call.method.equals("listen")) { onListen(call.arguments, reply); } else if (call.method.equals("cancel")) { onCancel(call.arguments, reply); } else { reply.reply(null); } } private void onListen(Object arguments, BinaryReply callback) { final EventSink eventSink = new EventSinkImplementation(); handler.onCancel(null); handler.onListen(arguments, eventSink); callback.reply(codec.encodeSuccessEnvelope(null)); } private void onCancel(Object arguments, BinaryReply callback) { final EventSink oldSink = activeSink.getAndSet(null); handler.onCancel(arguments); callback.reply(codec.encodeSuccessEnvelope(null)); } } 複製代碼
重載實現了onMessage方法,並在這個方法中根據方法名稱的不一樣,分別調用onListen和onCancel方法。
onMessage是由dart端註冊以後回調過來的,這個流程在dart端追蹤。
根據dart端調用的方法,對應的調用到kotlin代碼中的onListen或onCancel方法,這裏以onListen爲例跟蹤代碼。
在onListen方法中,初始化了EventSinkImplementation對象,同時將這個對象回調給Android的註冊回調函數onListen,後續Android端的數據發送就依靠這個對象了。
看看他的定義:
private final class EventSinkImplementation implements EventSink { final AtomicBoolean hasEnded = new AtomicBoolean(false); @Override @UiThread public void success(Object event) { if (hasEnded.get() || activeSink.get() != this) { return; } EventChannel.this.messenger.send(name, codec.encodeSuccessEnvelope(event)); } @Override @UiThread public void error(String errorCode, String errorMessage, Object errorDetails) { if (hasEnded.get() || activeSink.get() != this) { return; } EventChannel.this.messenger.send( name, codec.encodeErrorEnvelope(errorCode, errorMessage, errorDetails)); } @Override @UiThread public void endOfStream() { if (hasEnded.getAndSet(true) || activeSink.get() != this) { return; } EventChannel.this.messenger.send(name, null); } } 複製代碼
根據結果,kotlin中能夠選擇執行success或error或endOfStream函數,將對應的數據發送到dart端。
以success爲例,先通過codec對象編碼,codec是StandardMethodCodec類型,看看encodeSuccessEnvelope方法是怎麼編碼的:
shell\platform\android\io\flutter\plugin\common\StandardMethodCodec.java
@Override public ByteBuffer encodeSuccessEnvelope(Object result) { final ExposedByteArrayOutputStream stream = new ExposedByteArrayOutputStream(); stream.write(0); messageCodec.writeValue(stream, result); final ByteBuffer buffer = ByteBuffer.allocateDirect(stream.size()); buffer.put(stream.buffer(), 0, stream.size()); return buffer; } 複製代碼
ExposedByteArrayOutputStream繼承自ByteArrayOutputStream,是一個ByteArray輸出流,能夠寫入byte數據。
先寫入成功標誌位0,再寫入數據。這裏的messageCodec是StandardMessageCodec類型,看看怎麼寫數據的:
shell\platform\android\io\flutter\plugin\common\StandardMessageCodec.java
protected void writeValue(ByteArrayOutputStream stream, Object value) { if (value == null || value.equals(null)) { stream.write(NULL); } else if (value == Boolean.TRUE) { stream.write(TRUE); } else if (value == Boolean.FALSE) { stream.write(FALSE); } else if (value instanceof Number) { if (value instanceof Integer || value instanceof Short || value instanceof Byte) { stream.write(INT); writeInt(stream, ((Number) value).intValue()); } else if (value instanceof Long) { stream.write(LONG); writeLong(stream, (long) value); } else if (value instanceof Float || value instanceof Double) { stream.write(DOUBLE); writeAlignment(stream, 8); writeDouble(stream, ((Number) value).doubleValue()); } else if (value instanceof BigInteger) { stream.write(BIGINT); writeBytes(stream, ((BigInteger) value).toString(16).getBytes(UTF8)); } else { throw new IllegalArgumentException("Unsupported Number type: " + value.getClass()); } } else if (value instanceof String) { stream.write(STRING); writeBytes(stream, ((String) value).getBytes(UTF8)); } else if (value instanceof byte[]) { stream.write(BYTE_ARRAY); writeBytes(stream, (byte[]) value); } else if (value instanceof int[]) { stream.write(INT_ARRAY); final int[] array = (int[]) value; writeSize(stream, array.length); writeAlignment(stream, 4); for (final int n : array) { writeInt(stream, n); } } else if (value instanceof long[]) { stream.write(LONG_ARRAY); final long[] array = (long[]) value; writeSize(stream, array.length); writeAlignment(stream, 8); for (final long n : array) { writeLong(stream, n); } } else if (value instanceof double[]) { stream.write(DOUBLE_ARRAY); final double[] array = (double[]) value; writeSize(stream, array.length); writeAlignment(stream, 8); for (final double d : array) { writeDouble(stream, d); } } else if (value instanceof List) { stream.write(LIST); final List<?> list = (List) value; writeSize(stream, list.size()); for (final Object o : list) { writeValue(stream, o); } } else if (value instanceof Map) { stream.write(MAP); final Map<?, ?> map = (Map) value; writeSize(stream, map.size()); for (final Entry<?, ?> entry : map.entrySet()) { writeValue(stream, entry.getKey()); writeValue(stream, entry.getValue()); } } else { throw new IllegalArgumentException("Unsupported value: " + value); } } 複製代碼
編碼方式就是先寫數據長度,再寫入具體數據。支持的數據類型以下:
private static final byte NULL = 0; private static final byte TRUE = 1; private static final byte FALSE = 2; private static final byte INT = 3; private static final byte LONG = 4; private static final byte BIGINT = 5; private static final byte DOUBLE = 6; private static final byte STRING = 7; private static final byte BYTE_ARRAY = 8; private static final byte INT_ARRAY = 9; private static final byte LONG_ARRAY = 10; private static final byte DOUBLE_ARRAY = 11; private static final byte LIST = 12; private static final byte MAP = 13; 複製代碼
BigInteger 不可變的任意精度的整數。全部操做中,都以二進制補碼形式表示 BigInteger。
支持bool,int,long,BigInteger,double,String,ByteArray,IntArray,LongArray,DoubleArray,List,Map。集合類型中的數據類型也必須是基本數據類型或其數組,以及String類型。
寫int數據以前,先4字節對齊;寫long或float或double類型以前,先8字節對齊。
String類型轉換成byte[]數據再寫入。
全部數據寫入stream以後,經過allocateDirect方法爲ByteBuffer分配stream大小的內存空間,並將stream中的數據寫入ByteBuffer中。
上一步生成的ByteBuffer數據在這裏被send,messenger對象實際上是FlutterView,看看send方法:
shell\platform\android\io\flutter\view\FlutterView.java
@Override @UiThread public void send(String channel, ByteBuffer message) { send(channel, message, null); } @Override @UiThread public void send(String channel, ByteBuffer message, BinaryReply callback) { if (!isAttached()) { Log.d(TAG, "FlutterView.send called on a detached view, channel=" + channel); return; } mNativeView.send(channel, message, callback); } 複製代碼
mNativeView是FlutterNativeView類型,看看他裏面的方法:
@Override @UiThread public void send(String channel, ByteBuffer message) { dartExecutor.getBinaryMessenger().send(channel, message); } 複製代碼
這裏的getBinaryMessenger方法返回的是dartMessenger對象,對應DefaultBinaryMessenger類。是在DartExecutor構造函數裏面初始化的:
DartExecutor
public DartExecutor(@NonNull FlutterJNI flutterJNI, @NonNull AssetManager assetManager) { this.flutterJNI = flutterJNI; this.assetManager = assetManager; this.dartMessenger = new DartMessenger(flutterJNI); dartMessenger.setMessageHandler("flutter/isolate", isolateChannelMessageHandler); this.binaryMessenger = new DefaultBinaryMessenger(dartMessenger); } 複製代碼
看看DefaultBinaryMessenger裏面的send方法:
DefaultBinaryMessenger
@Override @UiThread public void send(@NonNull String channel, @Nullable ByteBuffer message) { messenger.send(channel, message, null); } 複製代碼
messenger實際上是dartMessenger,對應DartMessenger類,看看裏面的send方法:
DartMessenger
@Override @UiThread public void send(@NonNull String channel, @NonNull ByteBuffer message) { send(channel, message, null); } @Override public void send( @NonNull String channel, @Nullable ByteBuffer message, @Nullable BinaryMessenger.BinaryReply callback) { int replyId = 0; if (message == null) { flutterJNI.dispatchEmptyPlatformMessage(channel, replyId); } else { flutterJNI.dispatchPlatformMessage(channel, message, message.position(), replyId); } } 複製代碼
這裏的message不爲空,對應調用dispatchPlatformMessage方法。是在FlutterJNI中調用到native層,看看這個方法:
@UiThread public void dispatchPlatformMessage( @NonNull String channel, @Nullable ByteBuffer message, int position, int responseId) { if (isAttached()) { nativeDispatchPlatformMessage(nativePlatformViewId, channel, message, position, responseId); } else { } } // Send a data-carrying platform message to Dart. private native void nativeDispatchPlatformMessage( long nativePlatformViewId, @NonNull String channel, @Nullable ByteBuffer message, int position, int responseId); 複製代碼
nativeDispatchPlatformMessage調用到了native層,是在shell\platform\android\platform_view_android_jni.cc
文件中,看看註冊的地方:
platform_view_android_jni.cc
bool RegisterApi(JNIEnv* env) { static const JNINativeMethod flutter_jni_methods[] = { { .name = "nativeDispatchPlatformMessage", .signature = "(JLjava/lang/String;Ljava/nio/ByteBuffer;II)V", .fnPtr = reinterpret_cast<void*>(&DispatchPlatformMessage), }, } } 複製代碼
看看DispatchPlatformMessage方法的調用棧:
platform_view_android_jni.cc
static void DispatchPlatformMessage(JNIEnv* env, jobject jcaller, jlong shell_holder, jstring channel, jobject message, jint position, jint responseId) { ANDROID_SHELL_HOLDER->GetPlatformView()->DispatchPlatformMessage( env, // fml::jni::JavaStringToString(env, channel), // message, // position, // responseId // ); } 複製代碼
shell\platform\android\platform_view_android.cc
void PlatformViewAndroid::DispatchPlatformMessage(JNIEnv* env, std::string name, jobject java_message_data, jint java_message_position, jint response_id) { uint8_t* message_data = static_cast<uint8_t*>(env->GetDirectBufferAddress(java_message_data)); std::vector<uint8_t> message = std::vector<uint8_t>(message_data, message_data + java_message_position); fml::RefPtr<flutter::PlatformMessageResponse> response; if (response_id) { response = fml::MakeRefCounted<PlatformMessageResponseAndroid>( response_id, java_object_, task_runners_.GetPlatformTaskRunner()); } PlatformView::DispatchPlatformMessage( fml::MakeRefCounted<flutter::PlatformMessage>( std::move(name), std::move(message), std::move(response))); } 複製代碼
這一步將name,message封裝到了PlatformMessage對象中。
shell\common\platform_view.cc
void PlatformView::DispatchPlatformMessage( fml::RefPtr<PlatformMessage> message) { delegate_.OnPlatformViewDispatchPlatformMessage(std::move(message)); } 複製代碼
shell\common\shell.cc
// |PlatformView::Delegate| void Shell::OnPlatformViewDispatchPlatformMessage( fml::RefPtr<PlatformMessage> message) { FML_DCHECK(is_setup_); FML_DCHECK(task_runners_.GetPlatformTaskRunner()->RunsTasksOnCurrentThread()); task_runners_.GetUITaskRunner()->PostTask( [engine = engine_->GetWeakPtr(), message = std::move(message)] { if (engine) { engine->DispatchPlatformMessage(std::move(message)); } }); } 複製代碼
shell\common\engine.cc
void Engine::DispatchPlatformMessage(fml::RefPtr<PlatformMessage> message) { if (message->channel() == kLifecycleChannel) { if (HandleLifecyclePlatformMessage(message.get())) return; } else if (message->channel() == kLocalizationChannel) { if (HandleLocalizationPlatformMessage(message.get())) return; } else if (message->channel() == kSettingsChannel) { HandleSettingsPlatformMessage(message.get()); return; } if (runtime_controller_->IsRootIsolateRunning() && runtime_controller_->DispatchPlatformMessage(std::move(message))) { return; } // If there's no runtime_, we may still need to set the initial route. if (message->channel() == kNavigationChannel) { HandleNavigationPlatformMessage(std::move(message)); return; } FML_DLOG(WARNING) << "Dropping platform message on channel: " << message->channel(); } 複製代碼
在這裏執行到runtime_controller_->DispatchPlatformMessage
中,看看這個方法:
runtime\runtime_controller.cc
bool RuntimeController::DispatchPlatformMessage( fml::RefPtr<PlatformMessage> message) { if (auto* window = GetWindowIfAvailable()) { TRACE_EVENT1("flutter", "RuntimeController::DispatchPlatformMessage", "mode", "basic"); window->DispatchPlatformMessage(std::move(message)); return true; } return false; } 複製代碼
lib\ui\window\window.cc
void Window::DispatchPlatformMessage(fml::RefPtr<PlatformMessage> message) { std::shared_ptr<tonic::DartState> dart_state = library_.dart_state().lock(); if (!dart_state) { FML_DLOG(WARNING) << "Dropping platform message for lack of DartState on channel: " << message->channel(); return; } tonic::DartState::Scope scope(dart_state); Dart_Handle data_handle = (message->hasData()) ? ToByteData(message->data()) : Dart_Null(); if (Dart_IsError(data_handle)) { FML_DLOG(WARNING) << "Dropping platform message because of a Dart error on channel: " << message->channel(); return; } int response_id = 0; if (auto response = message->response()) { response_id = next_response_id_++; pending_responses_[response_id] = response; } tonic::LogIfError( tonic::DartInvokeField(library_.value(), "_dispatchPlatformMessage", {tonic::ToDart(message->channel()), data_handle, tonic::ToDart(response_id)})); } 複製代碼
將message中的數據轉換成Dart_Handle,並最終執行_dispatchPlatformMessage方法,同時傳遞channel name和數據。
_dispatchPlatformMessage對應的是hooks.dart文件中的_invoke3方法。
lib\ui\hooks.dart
void _dispatchPlatformMessage(String name, ByteData data, int responseId) { if (name == ChannelBuffers.kControlChannelName) { try { channelBuffers.handleMessage(data); } catch (ex) { _printDebug('Message to "$name" caused exception $ex'); } finally { window._respondToPlatformMessage(responseId, null); } } else if (window.onPlatformMessage != null) { _invoke3<String, ByteData, PlatformMessageResponseCallback>( window.onPlatformMessage, window._onPlatformMessageZone, name, data, (ByteData responseData) { window._respondToPlatformMessage(responseId, responseData); }, ); } else { channelBuffers.push(name, data, (ByteData responseData) { window._respondToPlatformMessage(responseId, responseData); }); } } 複製代碼
這裏調用到了onPlatformMessage方法,攜帶的參數就是_dispatchPlatformMessage的name,data參數。
這個onPlatformMessage是哪裏定義的呢?記得在ServicesBinding的initInstances方法中,有定義這個方法:
packages\flutter\lib\src\services\binding.dart\ServicesBinding
@override void initInstances() { super.initInstances(); _instance = this; _defaultBinaryMessenger = createBinaryMessenger(); window ..onPlatformMessage = defaultBinaryMessenger.handlePlatformMessage; initLicenses(); SystemChannels.system.setMessageHandler(handleSystemMessage); } 複製代碼
defaultBinaryMessenger就是_DefaultBinaryMessenger類型,而在onPlatformMessage被調用的時候,就執行到了裏面的handlePlatformMessage方法。
看看方法體:
packages\flutter\lib\src\services\binding.dart\_DefaultBinaryMessenger
@override Future<void> handlePlatformMessage( String channel, ByteData data, ui.PlatformMessageResponseCallback callback, ) async { ByteData response; try { final MessageHandler handler = _handlers[channel]; if (handler != null) { response = await handler(data); } else { ui.channelBuffers.push(channel, data, callback); callback = null; } } catch (exception, stack) { } finally { if (callback != null) { callback(response); } } } 複製代碼
這裏會執行到ui.channelBuffers.push(channel, data, callback);
,看看是怎麼講數據push過去的:
pkg\sky_engine\lib\ui\channel_buffers.dart
bool push(String channel, ByteData data, PlatformMessageResponseCallback callback) { _RingBuffer<_StoredMessage> queue = _messages[channel]; if (queue == null) { queue = _makeRingBuffer(kDefaultBufferSize); _messages[channel] = queue; } final bool didOverflow = queue.push(_StoredMessage(data, callback)); if (didOverflow) { } return didOverflow; } 複製代碼
能夠看到這裏有一個消息隊列,有消息過來就將channel對應的消息存起來放到隊列中。
flutter註冊的時候,會註冊兩個方法,一個是_onEvent,一個是_onError。
eventChannel.receiveBroadcastStream().listen(_onEvent, onError: _onError);
複製代碼
看看他的構造方法:
packages\flutter\lib\src\services\platform_channel.dart
const EventChannel(this.name, [this.codec = const StandardMethodCodec(), BinaryMessenger binaryMessenger]) : assert(name != null), assert(codec != null), _binaryMessenger = binaryMessenger; 複製代碼
codec是StandardMethodCodec類型,提供方法及其參數的編解碼,binaryMessenger對象是上一章節中說的_DefaultBinaryMessenger類型,在ServicesBinding中定義及初始化。
是EventChannel類的方法。
packages\flutter\lib\src\services\platform_channel.dart
Stream<dynamic> receiveBroadcastStream([ dynamic arguments ]) { final MethodChannel methodChannel = MethodChannel(name, codec); StreamController<dynamic> controller; controller = StreamController<dynamic>.broadcast(onListen: () async { }, onCancel: () async { }); return controller.stream; } 複製代碼
這裏先是使用EventChannel初始化時傳遞的name和生成的codec參數,構造了一個MethodChannel對象。
接下來調用StreamController的broadcast方法,監聽接收到的消息。
pkg\sky_engine\lib\async\stream_controller.dart\StreamController
factory StreamController.broadcast( {void onListen(), void onCancel(), bool sync: false}) { return sync ? new _SyncBroadcastStreamController<T>(onListen, onCancel) : new _AsyncBroadcastStreamController<T>(onListen, onCancel); } 複製代碼
能夠看到上一步中,沒有傳sync參數,這裏默認是false,也就是會返回一個_AsyncBroadcastStreamController對象。
構造方法以下:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { _AsyncBroadcastStreamController(void onListen(), void onCancel()) : super(onListen, onCancel); 複製代碼
他繼承自_BroadcastStreamController類,再看看super方法:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
_BroadcastStreamController(this.onListen, this.onCancel) : _state = _STATE_INITIAL; 複製代碼
將兩個回調方法給到自身定義的兩個變量中,兩個變量實際上是回調方法。
在receiveBroadcastStream方法的最後會返回controller.stream
,這個stream定義在_BroadcastStreamController中:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
Stream<T> get stream => new _BroadcastStream<T>(this); 複製代碼
看看他的構造方法:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
class _BroadcastStream<T> extends _ControllerStream<T> { _BroadcastStream(_StreamControllerLifecycle<T> controller) : super(controller); bool get isBroadcast => true; } 複製代碼
這裏的controller實際上是_BroadcastStreamController類型,由於_BroadcastStreamController實現了_StreamControllerBase接口,而_StreamControllerBase接口繼承了_StreamControllerLifecycle接口。
_BroadcastStream繼承自_ControllerStream,看看他的構造方法:
pkg\sky_engine\lib\async\stream_controller.dart
class _ControllerStream<T> extends _StreamImpl<T> { _ControllerStream(this._controller); } 複製代碼
_ControllerStream繼承自_StreamImpl方法,其定義的地方是:
pkg\sky_engine\lib\async\stream_impl.dart
abstract class _StreamImpl<T> extends Stream<T> { } 複製代碼
Stream中持有StreamController對象,繼承關係先到這裏。
在咱們本身的dart代碼中執行完receiveBroadcastStream以後,就要執行listen方法了。
listen方法定義在_StreamImpl類中:
pkg\sky_engine\lib\async\stream_impl.dart\_StreamImpl
StreamSubscription<T> listen(void onData(T data), {Function onError, void onDone(), bool cancelOnError}) { cancelOnError = identical(true, cancelOnError); StreamSubscription<T> subscription = _createSubscription(onData, onError, onDone, cancelOnError); _onListen(subscription); return subscription; } // ------------------------------------------------------------------- /** Create a subscription object. Called by [subcribe]. */ StreamSubscription<T> _createSubscription(void onData(T data), Function onError, void onDone(), bool cancelOnError) { return new _BufferingStreamSubscription<T>( onData, onError, onDone, cancelOnError); } 複製代碼
這裏的onData方法對應註冊時的_onEvent方法,第二個參數中的onError對應註冊時的_onError方法。
可是在_StreamImpl的子類_ControllerStream中,也定義了這個_createSubscription方法:
pkg\sky_engine\lib\async\stream_controller.dart
class _ControllerStream<T> extends _StreamImpl<T> { StreamSubscription<T> _createSubscription(void onData(T data), Function onError, void onDone(), bool cancelOnError) => _controller._subscribe(onData, onError, onDone, cancelOnError); } 複製代碼
該調用哪個呢?在https://dartpad.dev/
寫個demo看看:
abstract class Test1{ void listen(){ print("class init"); _test1(); print("class init1"); } void _test1(){ print("_test1 1"); } } class Demo1 extends Test1{ void _test1(){ print("_test1 2"); } } class Demo2 extends Demo1{ } void main() { Demo2().listen(); } 複製代碼
輸出以下:
class init
_test1 2
class init1
複製代碼
應該是調用_ControllerStream的_createSubscription方法。_controller對應的是_AsyncBroadcastStreamController,實際_subscribe方法在其父類_BroadcastStreamController中定義:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
StreamSubscription<T> _subscribe(void onData(T data), Function onError, void onDone(), bool cancelOnError) { if (isClosed) { onDone ??= _nullDoneHandler; return new _DoneStreamSubscription<T>(onDone); } StreamSubscription<T> subscription = new _BroadcastSubscription<T>( this, onData, onError, onDone, cancelOnError); _addListener(subscription); if (identical(_firstSubscription, _lastSubscription)) { // Only one listener, so it must be the first listener. _runGuarded(onListen); } return subscription; } 複製代碼
_BroadcastSubscription的繼承鏈條是:_BroadcastSubscription,_ControllerSubscription,_BufferingStreamSubscription.
看看_addListener方法:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
void _addListener(_BroadcastSubscription<T> subscription) { assert(identical(subscription._next, subscription)); subscription._eventState = (_state & _STATE_EVENT_ID); // Insert in linked list as last subscription. _BroadcastSubscription<T> oldLast = _lastSubscription; _lastSubscription = subscription; subscription._next = null; subscription._previous = oldLast; if (oldLast == null) { _firstSubscription = subscription; } else { oldLast._next = subscription; } } 複製代碼
第一次添加監聽,能夠獲得_firstSubscription和_lastSubscription相等,也就是執行_runGuarded方法,也就會回調到onListen方法。這個方法一開始對應的就是broadcast方法裏面的onListen閉包函數。
如今回到最開始註冊回調的地方,執行EventChannel中receiveBroadcastStream裏面的onListen回調方法:
packages\flutter\lib\src\services\platform_channel.dart
onListen: () async { binaryMessenger.setMessageHandler(name, (ByteData reply) async { if (reply == null) { controller.close(); } else { try { controller.add(codec.decodeEnvelope(reply)); } on PlatformException catch (e) { controller.addError(e); } } return null; }); try { await methodChannel.invokeMethod<void>('listen', arguments); } catch (exception, stack) { } } 複製代碼
binaryMessenger是_DefaultBinaryMessenger類型,看看裏面的setMessageHandler方法:
packages\flutter\lib\src\services\binding.dart
@override void setMessageHandler(String channel, MessageHandler handler) { if (handler == null) _handlers.remove(channel); else _handlers[channel] = handler; ui.channelBuffers.drain(channel, (ByteData data, ui.PlatformMessageResponseCallback callback) async { await handlePlatformMessage(channel, data, callback); }); } 複製代碼
MessageHandler就是onListen第二個參數裏面的閉包塊。先將這個MessageHandler放到_handlers Map中。而後執行drain方法:
pkg\sky_engine\lib\ui\channel_buffers.dart
Future<void> drain(String channel, DrainChannelCallback callback) async { while (!_isEmpty(channel)) { final _StoredMessage message = _pop(channel); await callback(message.data, message.callback); } } 複製代碼
看看_isEmpty方法:
bool _isEmpty(String channel) { final _RingBuffer<_StoredMessage> queue = _messages[channel]; return (queue == null) ? true : queue.isEmpty; } 複製代碼
先獲取channel對應的消息隊列,若是爲空返回true,不然判斷消息隊列的隊頭是否等於隊尾,相等則爲true,不然爲false。
若是消息隊列中有消息,此時就回調callback方法,傳遞數據和message.callback參數。
callback方法對應的調用到_DefaultBinaryMessenger中的handlePlatformMessage方法:
packages\flutter\lib\src\services\binding.dart
@override Future<void> handlePlatformMessage( String channel, ByteData data, ui.PlatformMessageResponseCallback callback, ) async { ByteData response; try { final MessageHandler handler = _handlers[channel]; if (handler != null) { response = await handler(data); } else { ui.channelBuffers.push(channel, data, callback); callback = null; } } catch (exception, stack) { } finally { if (callback != null) { callback(response); } } } 複製代碼
這裏的handler不爲空,因而調用到最開始binaryMessenger.setMessageHandler的第二個閉包函數中:
packages\flutter\lib\src\services\platform_channel.dart
binaryMessenger.setMessageHandler(name, (ByteData reply) async { controller.add(codec.decodeEnvelope(reply)); } 複製代碼
先解碼收到的數據,在StandardMethodCodec中解碼:
packages\flutter\lib\src\services\message_codecs.dart
dynamic decodeEnvelope(ByteData envelope) { if (buffer.getUint8() == 0) return messageCodec.readValue(buffer); } 複製代碼
讀取數據,而後返回,這裏的數據類型爲dynamic,須要咱們本身清楚兩端發送和接收的數據類型便可。
這個方法執行的時機是dart端執行listen方法以後,就會回調到onListen的閉包函數中,而後經過MethodChannel執行一個channel名字爲listen的方法,實際最終執行到了Android的IncomingStreamRequestHandler類中的onMessage方法中,具體流程能夠參考以前的文章 flutter通訊機制-MethodChannel。也就實現了Android端針對EventChannel.EventSink
變量的初始化。
而後執行controller的add方法:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart\_BroadcastStreamController
void add(T data) { if (!_mayAddEvent) throw _addEventError(); _sendData(data); } 複製代碼
add方法定義在_AsyncBroadcastStreamController類中,看看_sendData方法:
void _sendData(T data) { for (_BroadcastSubscription<T> subscription = _firstSubscription; subscription != null; subscription = subscription._next) { subscription._addPending(new _DelayedData<T>(data)); } } 複製代碼
接下來調用_addPending方法,在_BufferingStreamSubscription類中:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
void _addPending(_DelayedEvent event) { _StreamImplEvents<T> pending = _pending; if (_pending == null) { pending = _pending = new _StreamImplEvents<T>(); } pending.add(event); if (!_hasPending) { _state |= _STATE_HAS_PENDING; if (!_isPaused) { _pending.schedule(this); } } } 複製代碼
看看_StreamImplEvents的schedule方法:
pkg\sky_engine\lib\async\stream_impl.dart\_PendingEvents
void schedule(_EventDispatch<T> dispatch) { if (isScheduled) return; assert(!isEmpty); if (_eventScheduled) { assert(_state == _STATE_CANCELED); _state = _STATE_SCHEDULED; return; } scheduleMicrotask(() { int oldState = _state; _state = _STATE_UNSCHEDULED; if (oldState == _STATE_CANCELED) return; handleNext(dispatch); }); _state = _STATE_SCHEDULED; } 複製代碼
handleNext方法執行的就是_StreamImplEvents中的方法,_StreamImplEvents繼承自_PendingEvents類:
pkg\sky_engine\lib\async\stream_impl.dart
void handleNext(_EventDispatch<T> dispatch) { assert(!isScheduled); _DelayedEvent event = firstPendingEvent; firstPendingEvent = event.next; if (firstPendingEvent == null) { lastPendingEvent = null; } event.perform(dispatch); } 複製代碼
到這裏執行perform方法,其對應的是_DelayedData類中的方法:
pkg\sky_engine\lib\async\stream_impl.dart
void perform(_EventDispatch<T> dispatch) { dispatch._sendData(value); } 複製代碼
dispatch對象在_BufferingStreamSubscription的_addPending方法中調用schedule的時候,指代的就是_BufferingStreamSubscription自己,所以_sendData調用會在_BufferingStreamSubscription中:
pkg\sky_engine\lib\async\stream_impl.dart
void _sendData(T data) { bool wasInputPaused = _isInputPaused; _state |= _STATE_IN_CALLBACK; _zone.runUnaryGuarded(_onData, data); _state &= ~_STATE_IN_CALLBACK; _checkState(wasInputPaused); } 複製代碼
_zone.runUnaryGuarded(_onData, data)
最終就會調用到咱們最初dart代碼中定義的onEvent方法中了,也就是能夠在void _onEvent(Object event) {}
回調方法中處理data數據了。
在_PendingEvents的schedule方法中,會執行scheduleMicrotask方法,看看這個方法裏面是怎麼執行的:
pkg\sky_engine\lib\async\schedule_microtask.dart
void scheduleMicrotask(void callback()) { _Zone currentZone = Zone.current; if (identical(_rootZone, currentZone)) { // No need to bind the callback. We know that the root's scheduleMicrotask // will be invoked in the root zone. _rootScheduleMicrotask(null, null, _rootZone, callback); return; } _ZoneFunction implementation = currentZone._scheduleMicrotask; if (identical(_rootZone, implementation.zone) && _rootZone.inSameErrorZone(currentZone)) { _rootScheduleMicrotask( null, null, currentZone, currentZone.registerCallback(callback)); return; } Zone.current.scheduleMicrotask(Zone.current.bindCallbackGuarded(callback)); } 複製代碼
Zone表示一個能夠穩定異步調用的環境。代碼老是在一個空間的上下文中執行,好比Zone.current
。初始化main函數在Zone.root
空間中執行,代碼能夠在不一樣的空間中執行,既能夠經過runZoned
建立一個新的空間,也能夠經過Zone.run
方法在一個已經存在的空間上下文中執行,好比經過Zone.fork
建立的空間中。
異步回調方法老是在他們被調度的上下文空間中運行,兩步便可實現: 一、註冊回調方法,方法是registerCallback或registerUnaryCallback或registerBinaryCallback,這容許空間記錄這一個回調方法,後續可能也會存在修改,好比返回另一個回調方法。作註冊操做時的空間預示着後續回調也運行在這個空間中。
二、在後續某個時間點,回調方法在對應的空間中運行。
爲了方便,空間提供了bindCallback(bindUnaryCallback或bindBinaryCallback)方法來表示這種機制,最開始註冊方法所在的空間,就是其包裹的回調方法被異步執行時所在的空間。
一樣的,空間提供了bindCallbackGuarded(bindUnaryCallbackGuarded或bindBinaryCallbackGuarded)方法,應該在其中經過調用Zone.runGuarded
去執行回調方法。
這裏的Zone.current實際上是_RootZone,意味着跟main函數所在的空間相同。因而這裏調用_rootScheduleMicrotask方法:
pkg\sky_engine\lib\async\zone.dart
void _rootScheduleMicrotask( Zone self, ZoneDelegate parent, Zone zone, void f()) { if (!identical(_rootZone, zone)) { bool hasErrorHandler = !_rootZone.inSameErrorZone(zone); if (hasErrorHandler) { f = zone.bindCallbackGuarded(f); } else { f = zone.bindCallback(f); } // Use root zone as event zone if the function is already bound. zone = _rootZone; } _scheduleAsyncCallback(f); } 複製代碼
_rootZone和zone是相等的,因而調用_scheduleAsyncCallback,將f回調函數異步調用:
pkg\sky_engine\lib\async\schedule_microtask.dart
void _scheduleAsyncCallback(_AsyncCallback callback) { _AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback); if (_nextCallback == null) { _nextCallback = _lastCallback = newEntry; if (!_isInCallbackLoop) { _AsyncRun._scheduleImmediate(_startMicrotaskLoop); } } else { _lastCallback.next = newEntry; _lastCallback = newEntry; } } 複製代碼
接下來執行_startMicrotaskLoop方法:
pkg\sky_engine\lib\async\schedule_microtask.dart
void _startMicrotaskLoop() { _isInCallbackLoop = true; try { // Moved to separate function because try-finally prevents // good optimization. _microtaskLoop(); } finally { _lastPriorityCallback = null; _isInCallbackLoop = false; if (_nextCallback != null) { _AsyncRun._scheduleImmediate(_startMicrotaskLoop); } } } 複製代碼
能夠看到在finally的代碼塊中,又會異步的調用_startMicrotaskLoop,當_nextCallback不爲空時,就能夠一直調用_microtaskLoop方法了。這些調用並不會阻塞UI線程,由於當前是異步的,而異步執行的方法是_microtaskLoop:
void _microtaskLoop() { while (_nextCallback != null) { _lastPriorityCallback = null; _AsyncCallbackEntry entry = _nextCallback; _nextCallback = entry.next; if (_nextCallback == null) _lastCallback = null; (entry.callback)(); } } 複製代碼
_nextCallback就是_AsyncCallbackEntry封裝的異步callback方法,執行回調以前將_nextCallback賦值爲下一個回調方法。
callback就是咱們在_scheduleAsyncCallback方法中封裝過來的callback回調方法,這個回調方法就是_rootScheduleMicrotask中的f(),也就是上一章的scheduleMicrotask方法的第二個參數,最終回調到咱們的onEvent方法了。
回到flutter中_DefaultBinaryMessenger的handlePlatformMessage方法中:
@override Future<void> handlePlatformMessage( String channel, ByteData data, ui.PlatformMessageResponseCallback callback, ) async { ByteData response; try { final MessageHandler handler = _handlers[channel]; if (handler != null) { response = await handler(data); } else { ui.channelBuffers.push(channel, data, callback); callback = null; } } catch (exception, stack) { } finally { if (callback != null) { callback(response); } } } 複製代碼
當沒有經過binaryMessenger.setMessageHandler
設置MessageHandler時,消息存在隊列中,一旦註冊以後,立刻就將消息分發給註冊者;當Map中存在channel對應的MessageHandler時,直接回調,也就是回到了setMessageHandler的閉包代碼塊中,重複執行3.7.2以後的流程。
微信公衆號: