flutter通訊機制-EventChannel

微信公衆號:Android部落格java

我的網站:chengang.plus/android

流程圖以下: shell

在這裏插入圖片描述

一、使用方式

當原平生臺須要向dart發送消息時,須要用到EventChannel。數組

1.1 Android端註冊

Android平臺的註冊方式:bash

class MainActivity : FlutterActivity(){
    val DATA_RESULT_CHANNEL = "com.yourname.yourname/typeData"
    
    override fun onCreate(savedInstanceState: Bundle?) {
        EventChannel(flutterView, DATA_RESULT_CHANNEL).setStreamHandler(object : EventChannel.StreamHandler {
            override fun onListen(arguments: Any?, events: EventChannel.EventSink?) {
                listenEvents = events!!
            }

            override fun onCancel(arguments: Any?) {
            }
        })
    }
    
    fun receiveMessage(data: Object) {
        if (data == null) {
            listenEvents.error("1", "data == null", null)
        } else {
            listenEvents.success(data: Object))
        }
    }
}
複製代碼

1.2 flutter端消費消息

flutter端的接收方式是:微信

class InteractUtil {
    static const EventChannel eventChannel =
      const EventChannel("com.yourname.yourname/typeData");
  List<InteractListener> listenerList;

  factory InteractUtil() => _getInstance();

  static InteractUtil get instance => _getInstance();
  static InteractUtil _instance;

  InteractUtil._internal() {
    listenerList = [];
    eventChannel.receiveBroadcastStream().listen(_onEvent, onError: _onError);
  }

  static InteractUtil _getInstance() {
    if (_instance == null) {
      _instance = new InteractUtil._internal();
    }
    return _instance;
  }

  void addListener(InteractListener listener) {
    if (listener == null) {
      return;
    }
    listenerList.add(listener);
  }

  void _onEvent(Object event) {
    print("_onEvent is invoke$event");
    for (InteractListener listener in listenerList) {
      listener.onEvent(event);
    }
  }

  void _onError(Object error) {
    for (InteractListener listener in listenerList) {
      listener.onError(error);
    }
  }
}

abstract class InteractListener {
  void onEvent(Object event);

  void onError(Object error);
}
複製代碼

二、Android端

2.1 EventChannel構造

看看EventChannel的構造函數:markdown

shell\platform\android\io\flutter\plugin\common\EventChannel.java閉包

public EventChannel(BinaryMessenger messenger, String name) {
    this(messenger, name, StandardMethodCodec.INSTANCE);
}

public EventChannel(BinaryMessenger messenger, String name, MethodCodec codec) {
    this.messenger = messenger;
    this.name = name;
    this.codec = codec;
}
複製代碼

與MethodChannel相似,這裏在構造函數的參數裏面構造了StandardMethodCodec和StandardMessageCodec。前者作方法名稱和參數的編解碼,後者作消息數據的編解碼。異步

這裏的messenger是FlutterView。async

2.2 設置消息處理

經過setStreamHandler設置消息處理,並接收回調。

shell\platform\android\io\flutter\plugin\common\EventChannel.java

public void setStreamHandler(final StreamHandler handler) {
    messenger.setMessageHandler(name, handler == null ? null : new IncomingStreamRequestHandler(handler));
}
複製代碼

這裏的handler就是Android MainActivity中定義的EventChannel.StreamHandler。

2.2.1 IncomingStreamRequestHandler

IncomingStreamRequestHandler仍是在EventChannel中,看看他的定義:

private final class IncomingStreamRequestHandler implements BinaryMessageHandler {
    private final StreamHandler handler;
    private final AtomicReference<EventSink> activeSink = new AtomicReference<>(null);
    
    IncomingStreamRequestHandler(StreamHandler handler) {
      this.handler = handler;
    }
    
    @Override
    public void onMessage(ByteBuffer message, final BinaryReply reply) {
      final MethodCall call = codec.decodeMethodCall(message);
      if (call.method.equals("listen")) {
        onListen(call.arguments, reply);
      } else if (call.method.equals("cancel")) {
        onCancel(call.arguments, reply);
      } else {
        reply.reply(null);
      }
    }
    
    private void onListen(Object arguments, BinaryReply callback) {
        final EventSink eventSink = new EventSinkImplementation();
        handler.onCancel(null);
        handler.onListen(arguments, eventSink);
        callback.reply(codec.encodeSuccessEnvelope(null));
    }
    
    private void onCancel(Object arguments, BinaryReply callback) {
        final EventSink oldSink = activeSink.getAndSet(null);
        handler.onCancel(arguments);
        callback.reply(codec.encodeSuccessEnvelope(null));
    }
}
複製代碼

重載實現了onMessage方法,並在這個方法中根據方法名稱的不一樣,分別調用onListen和onCancel方法。

onMessage是由dart端註冊以後回調過來的,這個流程在dart端追蹤。

根據dart端調用的方法,對應的調用到kotlin代碼中的onListen或onCancel方法,這裏以onListen爲例跟蹤代碼。

2.3 onListen接收消息處理對象

在onListen方法中,初始化了EventSinkImplementation對象,同時將這個對象回調給Android的註冊回調函數onListen,後續Android端的數據發送就依靠這個對象了。

2.3.1 EventSinkImplementation

看看他的定義:

private final class EventSinkImplementation implements EventSink {
  final AtomicBoolean hasEnded = new AtomicBoolean(false);

  @Override
  @UiThread
  public void success(Object event) {
    if (hasEnded.get() || activeSink.get() != this) {
      return;
    }
    EventChannel.this.messenger.send(name, codec.encodeSuccessEnvelope(event));
  }

  @Override
  @UiThread
  public void error(String errorCode, String errorMessage, Object errorDetails) {
    if (hasEnded.get() || activeSink.get() != this) {
      return;
    }
    EventChannel.this.messenger.send(
        name, codec.encodeErrorEnvelope(errorCode, errorMessage, errorDetails));
  }

  @Override
  @UiThread
  public void endOfStream() {
    if (hasEnded.getAndSet(true) || activeSink.get() != this) {
      return;
    }
    EventChannel.this.messenger.send(name, null);
  }
}
複製代碼

根據結果,kotlin中能夠選擇執行success或error或endOfStream函數,將對應的數據發送到dart端。

2.3.2 success發送成功數據

以success爲例,先通過codec對象編碼,codec是StandardMethodCodec類型,看看encodeSuccessEnvelope方法是怎麼編碼的:

shell\platform\android\io\flutter\plugin\common\StandardMethodCodec.java

@Override
public ByteBuffer encodeSuccessEnvelope(Object result) {
    final ExposedByteArrayOutputStream stream = new ExposedByteArrayOutputStream();
    stream.write(0);
    messageCodec.writeValue(stream, result);
    final ByteBuffer buffer = ByteBuffer.allocateDirect(stream.size());
    buffer.put(stream.buffer(), 0, stream.size());
    return buffer;
}
複製代碼

ExposedByteArrayOutputStream繼承自ByteArrayOutputStream,是一個ByteArray輸出流,能夠寫入byte數據。

先寫入成功標誌位0,再寫入數據。這裏的messageCodec是StandardMessageCodec類型,看看怎麼寫數據的:

shell\platform\android\io\flutter\plugin\common\StandardMessageCodec.java

protected void writeValue(ByteArrayOutputStream stream, Object value) {
    if (value == null || value.equals(null)) {
      stream.write(NULL);
    } else if (value == Boolean.TRUE) {
      stream.write(TRUE);
    } else if (value == Boolean.FALSE) {
      stream.write(FALSE);
    } else if (value instanceof Number) {
      if (value instanceof Integer || value instanceof Short || value instanceof Byte) {
        stream.write(INT);
        writeInt(stream, ((Number) value).intValue());
      } else if (value instanceof Long) {
        stream.write(LONG);
        writeLong(stream, (long) value);
      } else if (value instanceof Float || value instanceof Double) {
        stream.write(DOUBLE);
        writeAlignment(stream, 8);
        writeDouble(stream, ((Number) value).doubleValue());
      } else if (value instanceof BigInteger) {
        stream.write(BIGINT);
        writeBytes(stream, ((BigInteger) value).toString(16).getBytes(UTF8));
      } else {
        throw new IllegalArgumentException("Unsupported Number type: " + value.getClass());
      }
    } else if (value instanceof String) {
      stream.write(STRING);
      writeBytes(stream, ((String) value).getBytes(UTF8));
    } else if (value instanceof byte[]) {
      stream.write(BYTE_ARRAY);
      writeBytes(stream, (byte[]) value);
    } else if (value instanceof int[]) {
      stream.write(INT_ARRAY);
      final int[] array = (int[]) value;
      writeSize(stream, array.length);
      writeAlignment(stream, 4);
      for (final int n : array) {
        writeInt(stream, n);
      }
    } else if (value instanceof long[]) {
      stream.write(LONG_ARRAY);
      final long[] array = (long[]) value;
      writeSize(stream, array.length);
      writeAlignment(stream, 8);
      for (final long n : array) {
        writeLong(stream, n);
      }
    } else if (value instanceof double[]) {
      stream.write(DOUBLE_ARRAY);
      final double[] array = (double[]) value;
      writeSize(stream, array.length);
      writeAlignment(stream, 8);
      for (final double d : array) {
        writeDouble(stream, d);
      }
    } else if (value instanceof List) {
      stream.write(LIST);
      final List<?> list = (List) value;
      writeSize(stream, list.size());
      for (final Object o : list) {
        writeValue(stream, o);
      }
    } else if (value instanceof Map) {
      stream.write(MAP);
      final Map<?, ?> map = (Map) value;
      writeSize(stream, map.size());
      for (final Entry<?, ?> entry : map.entrySet()) {
        writeValue(stream, entry.getKey());
        writeValue(stream, entry.getValue());
      }
    } else {
      throw new IllegalArgumentException("Unsupported value: " + value);
    }
}
複製代碼

編碼方式就是先寫數據長度,再寫入具體數據。支持的數據類型以下:

private static final byte NULL = 0;
private static final byte TRUE = 1;
private static final byte FALSE = 2;
private static final byte INT = 3;
private static final byte LONG = 4;
private static final byte BIGINT = 5;
private static final byte DOUBLE = 6;
private static final byte STRING = 7;
private static final byte BYTE_ARRAY = 8;
private static final byte INT_ARRAY = 9;
private static final byte LONG_ARRAY = 10;
private static final byte DOUBLE_ARRAY = 11;
private static final byte LIST = 12;
private static final byte MAP = 13;
複製代碼

BigInteger 不可變的任意精度的整數。全部操做中,都以二進制補碼形式表示 BigInteger。

支持bool,int,long,BigInteger,double,String,ByteArray,IntArray,LongArray,DoubleArray,List,Map。集合類型中的數據類型也必須是基本數據類型或其數組,以及String類型。

寫int數據以前,先4字節對齊;寫long或float或double類型以前,先8字節對齊。

String類型轉換成byte[]數據再寫入。

全部數據寫入stream以後,經過allocateDirect方法爲ByteBuffer分配stream大小的內存空間,並將stream中的數據寫入ByteBuffer中。

2.4 send發送數據

2.4.1 FlutterView

上一步生成的ByteBuffer數據在這裏被send,messenger對象實際上是FlutterView,看看send方法:

shell\platform\android\io\flutter\view\FlutterView.java

@Override
@UiThread
public void send(String channel, ByteBuffer message) {
    send(channel, message, null);
}

@Override
@UiThread
public void send(String channel, ByteBuffer message, BinaryReply callback) {
    if (!isAttached()) {
      Log.d(TAG, "FlutterView.send called on a detached view, channel=" + channel);
      return;
    }
    mNativeView.send(channel, message, callback);
}
複製代碼

2.4.2 FlutterNativeView

mNativeView是FlutterNativeView類型,看看他裏面的方法:

@Override
@UiThread
public void send(String channel, ByteBuffer message) {
    dartExecutor.getBinaryMessenger().send(channel, message);
}
複製代碼

2.4.3 DefaultBinaryMessenger

這裏的getBinaryMessenger方法返回的是dartMessenger對象,對應DefaultBinaryMessenger類。是在DartExecutor構造函數裏面初始化的:

DartExecutor

public DartExecutor(@NonNull FlutterJNI flutterJNI, @NonNull AssetManager assetManager) {
    this.flutterJNI = flutterJNI;
    this.assetManager = assetManager;
    this.dartMessenger = new DartMessenger(flutterJNI);
    dartMessenger.setMessageHandler("flutter/isolate", isolateChannelMessageHandler);
    this.binaryMessenger = new DefaultBinaryMessenger(dartMessenger);
}
複製代碼

看看DefaultBinaryMessenger裏面的send方法:

DefaultBinaryMessenger

@Override
@UiThread
public void send(@NonNull String channel, @Nullable ByteBuffer message) {
  messenger.send(channel, message, null);
}
複製代碼

2.4.4 DartMessenger

messenger實際上是dartMessenger,對應DartMessenger類,看看裏面的send方法:

DartMessenger

@Override
@UiThread
public void send(@NonNull String channel, @NonNull ByteBuffer message) {
    send(channel, message, null);
}

@Override
public void send( @NonNull String channel, @Nullable ByteBuffer message, @Nullable BinaryMessenger.BinaryReply callback) {
    int replyId = 0;
    if (message == null) {
      flutterJNI.dispatchEmptyPlatformMessage(channel, replyId);
    } else {
      flutterJNI.dispatchPlatformMessage(channel, message, message.position(), replyId);
    }
}
複製代碼

2.4.5 FlutterJNI

這裏的message不爲空,對應調用dispatchPlatformMessage方法。是在FlutterJNI中調用到native層,看看這個方法:

@UiThread
public void dispatchPlatformMessage( @NonNull String channel, @Nullable ByteBuffer message, int position, int responseId) {
    if (isAttached()) {
      nativeDispatchPlatformMessage(nativePlatformViewId, channel, message, position, responseId);
    } else {
    }
}

// Send a data-carrying platform message to Dart.
private native void nativeDispatchPlatformMessage( long nativePlatformViewId, @NonNull String channel, @Nullable ByteBuffer message, int position, int responseId);
複製代碼

nativeDispatchPlatformMessage調用到了native層,是在shell\platform\android\platform_view_android_jni.cc文件中,看看註冊的地方:

platform_view_android_jni.cc

bool RegisterApi(JNIEnv* env) {
    static const JNINativeMethod flutter_jni_methods[] = {
        {
          .name = "nativeDispatchPlatformMessage",
          .signature = "(JLjava/lang/String;Ljava/nio/ByteBuffer;II)V",
          .fnPtr = reinterpret_cast<void*>(&DispatchPlatformMessage),
        },
    }
}
複製代碼

看看DispatchPlatformMessage方法的調用棧:

platform_view_android_jni.cc

static void DispatchPlatformMessage(JNIEnv* env, jobject jcaller, jlong shell_holder, jstring channel, jobject message, jint position, jint responseId) {
  ANDROID_SHELL_HOLDER->GetPlatformView()->DispatchPlatformMessage(
      env,                                         //
      fml::jni::JavaStringToString(env, channel),  //
      message,                                     //
      position,                                    //
      responseId                                   //
  );
}
複製代碼

shell\platform\android\platform_view_android.cc

void PlatformViewAndroid::DispatchPlatformMessage(JNIEnv* env,
                                                  std::string name,
                                                  jobject java_message_data,
                                                  jint java_message_position,
                                                  jint response_id) {
  uint8_t* message_data =
      static_cast<uint8_t*>(env->GetDirectBufferAddress(java_message_data));
  std::vector<uint8_t> message =
      std::vector<uint8_t>(message_data, message_data + java_message_position);

  fml::RefPtr<flutter::PlatformMessageResponse> response;
  if (response_id) {
    response = fml::MakeRefCounted<PlatformMessageResponseAndroid>(
        response_id, java_object_, task_runners_.GetPlatformTaskRunner());
  }

  PlatformView::DispatchPlatformMessage(
      fml::MakeRefCounted<flutter::PlatformMessage>(
          std::move(name), std::move(message), std::move(response)));
}
複製代碼

這一步將name,message封裝到了PlatformMessage對象中。

shell\common\platform_view.cc

void PlatformView::DispatchPlatformMessage(
    fml::RefPtr<PlatformMessage> message) {
  delegate_.OnPlatformViewDispatchPlatformMessage(std::move(message));
}
複製代碼

shell\common\shell.cc

// |PlatformView::Delegate|
void Shell::OnPlatformViewDispatchPlatformMessage(
    fml::RefPtr<PlatformMessage> message) {
  FML_DCHECK(is_setup_);
  FML_DCHECK(task_runners_.GetPlatformTaskRunner()->RunsTasksOnCurrentThread());

  task_runners_.GetUITaskRunner()->PostTask(
      [engine = engine_->GetWeakPtr(), message = std::move(message)] {
        if (engine) {
          engine->DispatchPlatformMessage(std::move(message));
        }
      });
}
複製代碼

shell\common\engine.cc

void Engine::DispatchPlatformMessage(fml::RefPtr<PlatformMessage> message) {
  if (message->channel() == kLifecycleChannel) {
    if (HandleLifecyclePlatformMessage(message.get()))
      return;
  } else if (message->channel() == kLocalizationChannel) {
    if (HandleLocalizationPlatformMessage(message.get()))
      return;
  } else if (message->channel() == kSettingsChannel) {
    HandleSettingsPlatformMessage(message.get());
    return;
  }

  if (runtime_controller_->IsRootIsolateRunning() &&
      runtime_controller_->DispatchPlatformMessage(std::move(message))) {
    return;
  }

  // If there's no runtime_, we may still need to set the initial route.
  if (message->channel() == kNavigationChannel) {
    HandleNavigationPlatformMessage(std::move(message));
    return;
  }

  FML_DLOG(WARNING) << "Dropping platform message on channel: "
                    << message->channel();
}
複製代碼

在這裏執行到runtime_controller_->DispatchPlatformMessage中,看看這個方法:

runtime\runtime_controller.cc

bool RuntimeController::DispatchPlatformMessage(
    fml::RefPtr<PlatformMessage> message) {
  if (auto* window = GetWindowIfAvailable()) {
    TRACE_EVENT1("flutter", "RuntimeController::DispatchPlatformMessage",
                 "mode", "basic");
    window->DispatchPlatformMessage(std::move(message));
    return true;
  }
  return false;
}
複製代碼

lib\ui\window\window.cc

void Window::DispatchPlatformMessage(fml::RefPtr<PlatformMessage> message) {
  std::shared_ptr<tonic::DartState> dart_state = library_.dart_state().lock();
  if (!dart_state) {
    FML_DLOG(WARNING)
        << "Dropping platform message for lack of DartState on channel: "
        << message->channel();
    return;
  }
  tonic::DartState::Scope scope(dart_state);
  Dart_Handle data_handle =
      (message->hasData()) ? ToByteData(message->data()) : Dart_Null();
  if (Dart_IsError(data_handle)) {
    FML_DLOG(WARNING)
        << "Dropping platform message because of a Dart error on channel: "
        << message->channel();
    return;
  }

  int response_id = 0;
  if (auto response = message->response()) {
    response_id = next_response_id_++;
    pending_responses_[response_id] = response;
  }

  tonic::LogIfError(
      tonic::DartInvokeField(library_.value(), "_dispatchPlatformMessage",
                             {tonic::ToDart(message->channel()), data_handle,
                              tonic::ToDart(response_id)}));
}
複製代碼

將message中的數據轉換成Dart_Handle,並最終執行_dispatchPlatformMessage方法,同時傳遞channel name和數據。

三、flutter端

3.1 方法映射

_dispatchPlatformMessage對應的是hooks.dart文件中的_invoke3方法。

lib\ui\hooks.dart

void _dispatchPlatformMessage(String name, ByteData data, int responseId) {
  if (name == ChannelBuffers.kControlChannelName) {
    try {
      channelBuffers.handleMessage(data);
    } catch (ex) {
      _printDebug('Message to "$name" caused exception $ex');
    } finally {
      window._respondToPlatformMessage(responseId, null);
    }
  } else if (window.onPlatformMessage != null) {
    _invoke3<String, ByteData, PlatformMessageResponseCallback>(
      window.onPlatformMessage,
      window._onPlatformMessageZone,
      name,
      data,
      (ByteData responseData) {
        window._respondToPlatformMessage(responseId, responseData);
      },
    );
  } else {
    channelBuffers.push(name, data, (ByteData responseData) {
      window._respondToPlatformMessage(responseId, responseData);
    });
  }
}
複製代碼

這裏調用到了onPlatformMessage方法,攜帶的參數就是_dispatchPlatformMessage的name,data參數。

3.2 ServicesBinding初始化

這個onPlatformMessage是哪裏定義的呢?記得在ServicesBinding的initInstances方法中,有定義這個方法:

packages\flutter\lib\src\services\binding.dart\ServicesBinding

@override
void initInstances() {
    super.initInstances();
    _instance = this;
    _defaultBinaryMessenger = createBinaryMessenger();
    window
      ..onPlatformMessage = defaultBinaryMessenger.handlePlatformMessage;
    initLicenses();
    SystemChannels.system.setMessageHandler(handleSystemMessage);
}
複製代碼

defaultBinaryMessenger就是_DefaultBinaryMessenger類型,而在onPlatformMessage被調用的時候,就執行到了裏面的handlePlatformMessage方法。

3.3 _DefaultBinaryMessenger消息發送對象

看看方法體:

packages\flutter\lib\src\services\binding.dart\_DefaultBinaryMessenger

@override
Future<void> handlePlatformMessage(
String channel,
ByteData data,
ui.PlatformMessageResponseCallback callback,
) async {
    ByteData response;
    try {
      final MessageHandler handler = _handlers[channel];
      if (handler != null) {
        response = await handler(data);
      } else {
        ui.channelBuffers.push(channel, data, callback);
        callback = null;
      }
    } catch (exception, stack) {
    } finally {
      if (callback != null) {
        callback(response);
      }
    }
}
複製代碼

這裏會執行到ui.channelBuffers.push(channel, data, callback);,看看是怎麼講數據push過去的:

pkg\sky_engine\lib\ui\channel_buffers.dart

bool push(String channel, ByteData data, PlatformMessageResponseCallback callback) {
    _RingBuffer<_StoredMessage> queue = _messages[channel];
    if (queue == null) {
      queue = _makeRingBuffer(kDefaultBufferSize);
      _messages[channel] = queue;
    }
    final bool didOverflow = queue.push(_StoredMessage(data, callback));
    if (didOverflow) {
    }
    return didOverflow;
}
複製代碼

能夠看到這裏有一個消息隊列,有消息過來就將channel對應的消息存起來放到隊列中。

3.4 flutter註冊消息接收

flutter註冊的時候,會註冊兩個方法,一個是_onEvent,一個是_onError。

eventChannel.receiveBroadcastStream().listen(_onEvent, onError: _onError);
複製代碼

3.5 EventChannel構造

看看他的構造方法:

packages\flutter\lib\src\services\platform_channel.dart

const EventChannel(this.name, [this.codec = const StandardMethodCodec(), BinaryMessenger binaryMessenger])
  : assert(name != null),
    assert(codec != null),
    _binaryMessenger = binaryMessenger;
複製代碼

codec是StandardMethodCodec類型,提供方法及其參數的編解碼,binaryMessenger對象是上一章節中說的_DefaultBinaryMessenger類型,在ServicesBinding中定義及初始化。

3.5.1 receiveBroadcastStream獲取Stream對象

是EventChannel類的方法。

packages\flutter\lib\src\services\platform_channel.dart

Stream<dynamic> receiveBroadcastStream([ dynamic arguments ]) {
    final MethodChannel methodChannel = MethodChannel(name, codec);
    StreamController<dynamic> controller;
    controller = StreamController<dynamic>.broadcast(onListen: () async {
    }, onCancel: () async {
    });
    return controller.stream;
}
複製代碼

這裏先是使用EventChannel初始化時傳遞的name和生成的codec參數,構造了一個MethodChannel對象。

3.5.2 StreamController控制數據流

接下來調用StreamController的broadcast方法,監聽接收到的消息。

pkg\sky_engine\lib\async\stream_controller.dart\StreamController

factory StreamController.broadcast(
  {void onListen(), void onCancel(), bool sync: false}) {
    return sync
        ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
        : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
}
複製代碼

能夠看到上一步中,沒有傳sync參數,這裏默認是false,也就是會返回一個_AsyncBroadcastStreamController對象。

3.5.3 _AsyncBroadcastStreamController異步數據流處理

構造方法以下:

pkg\sky_engine\lib\async\broadcast_stream_controller.dart

class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
  _AsyncBroadcastStreamController(void onListen(), void onCancel())
      : super(onListen, onCancel);
複製代碼

他繼承自_BroadcastStreamController類,再看看super方法:

pkg\sky_engine\lib\async\broadcast_stream_controller.dart

_BroadcastStreamController(this.onListen, this.onCancel)
  : _state = _STATE_INITIAL;
複製代碼

將兩個回調方法給到自身定義的兩個變量中,兩個變量實際上是回調方法。

在receiveBroadcastStream方法的最後會返回controller.stream,這個stream定義在_BroadcastStreamController中:

pkg\sky_engine\lib\async\broadcast_stream_controller.dart

Stream<T> get stream => new _BroadcastStream<T>(this);
複製代碼

3.5.4 _BroadcastStream構造

看看他的構造方法:

pkg\sky_engine\lib\async\broadcast_stream_controller.dart

class _BroadcastStream<T> extends _ControllerStream<T> {
  _BroadcastStream(_StreamControllerLifecycle<T> controller)
      : super(controller);

  bool get isBroadcast => true;
}
複製代碼

這裏的controller實際上是_BroadcastStreamController類型,由於_BroadcastStreamController實現了_StreamControllerBase接口,而_StreamControllerBase接口繼承了_StreamControllerLifecycle接口。

3.5.5 _ControllerStream構造

_BroadcastStream繼承自_ControllerStream,看看他的構造方法:

pkg\sky_engine\lib\async\stream_controller.dart

class _ControllerStream<T> extends _StreamImpl<T> {
    _ControllerStream(this._controller);
}
複製代碼

3.5.6 _StreamImpl構造

_ControllerStream繼承自_StreamImpl方法,其定義的地方是:

pkg\sky_engine\lib\async\stream_impl.dart

abstract class _StreamImpl<T> extends Stream<T> {
}
複製代碼

Stream中持有StreamController對象,繼承關係先到這裏。

3.6 listen監聽

在咱們本身的dart代碼中執行完receiveBroadcastStream以後,就要執行listen方法了。

listen方法定義在_StreamImpl類中:

pkg\sky_engine\lib\async\stream_impl.dart\_StreamImpl

StreamSubscription<T> listen(void onData(T data),
  {Function onError, void onDone(), bool cancelOnError}) {
    cancelOnError = identical(true, cancelOnError);
    StreamSubscription<T> subscription =
        _createSubscription(onData, onError, onDone, cancelOnError);
    _onListen(subscription);
    return subscription;
}

// -------------------------------------------------------------------
/** Create a subscription object. Called by [subcribe]. */
StreamSubscription<T> _createSubscription(void onData(T data),
  Function onError, void onDone(), bool cancelOnError) {
    return new _BufferingStreamSubscription<T>(
        onData, onError, onDone, cancelOnError);
}
複製代碼

這裏的onData方法對應註冊時的_onEvent方法,第二個參數中的onError對應註冊時的_onError方法。

可是在_StreamImpl的子類_ControllerStream中,也定義了這個_createSubscription方法:

pkg\sky_engine\lib\async\stream_controller.dart

class _ControllerStream<T> extends _StreamImpl<T> {
  StreamSubscription<T> _createSubscription(void onData(T data),
          Function onError, void onDone(), bool cancelOnError) =>
      _controller._subscribe(onData, onError, onDone, cancelOnError);
}
複製代碼

該調用哪個呢?在https://dartpad.dev/寫個demo看看:

abstract class Test1{
  void listen(){
    print("class init");
    _test1();
    print("class init1");
  }
  
  void _test1(){
    print("_test1 1");
  }
}

class Demo1 extends Test1{
  void _test1(){
    print("_test1 2");
  }
}

class Demo2 extends Demo1{
}

void main() {
  Demo2().listen();
}
複製代碼

輸出以下:

class init
_test1 2
class init1
複製代碼

應該是調用_ControllerStream的_createSubscription方法。_controller對應的是_AsyncBroadcastStreamController,實際_subscribe方法在其父類_BroadcastStreamController中定義:

pkg\sky_engine\lib\async\broadcast_stream_controller.dart

StreamSubscription<T> _subscribe(void onData(T data), Function onError,
  void onDone(), bool cancelOnError) {
    if (isClosed) {
      onDone ??= _nullDoneHandler;
      return new _DoneStreamSubscription<T>(onDone);
    }
    StreamSubscription<T> subscription = new _BroadcastSubscription<T>(
        this, onData, onError, onDone, cancelOnError);
    _addListener(subscription);
    if (identical(_firstSubscription, _lastSubscription)) {
      // Only one listener, so it must be the first listener.
      _runGuarded(onListen);
    }
    return subscription;
}
複製代碼

_BroadcastSubscription的繼承鏈條是:_BroadcastSubscription,_ControllerSubscription,_BufferingStreamSubscription.

看看_addListener方法:

pkg\sky_engine\lib\async\broadcast_stream_controller.dart

void _addListener(_BroadcastSubscription<T> subscription) {
    assert(identical(subscription._next, subscription));
    subscription._eventState = (_state & _STATE_EVENT_ID);
    // Insert in linked list as last subscription.
    _BroadcastSubscription<T> oldLast = _lastSubscription;
    _lastSubscription = subscription;
    subscription._next = null;
    subscription._previous = oldLast;
    if (oldLast == null) {
      _firstSubscription = subscription;
    } else {
      oldLast._next = subscription;
    }
}
複製代碼

第一次添加監聽,能夠獲得_firstSubscription和_lastSubscription相等,也就是執行_runGuarded方法,也就會回調到onListen方法。這個方法一開始對應的就是broadcast方法裏面的onListen閉包函數。

3.7 回到dart註冊

如今回到最開始註冊回調的地方,執行EventChannel中receiveBroadcastStream裏面的onListen回調方法:

packages\flutter\lib\src\services\platform_channel.dart

onListen: () async {
      binaryMessenger.setMessageHandler(name, (ByteData reply) async {
        if (reply == null) {
          controller.close();
        } else {
          try {
            controller.add(codec.decodeEnvelope(reply));
          } on PlatformException catch (e) {
            controller.addError(e);
          }
        }
        return null;
      });
      try {
        await methodChannel.invokeMethod<void>('listen', arguments);
      } catch (exception, stack) {
      }
    }
複製代碼

3.7.1 設置消息處理

binaryMessenger是_DefaultBinaryMessenger類型,看看裏面的setMessageHandler方法:

packages\flutter\lib\src\services\binding.dart

@override
void setMessageHandler(String channel, MessageHandler handler) {
    if (handler == null)
      _handlers.remove(channel);
    else
      _handlers[channel] = handler;
    ui.channelBuffers.drain(channel, (ByteData data, ui.PlatformMessageResponseCallback callback) async {
      await handlePlatformMessage(channel, data, callback);
    });
}
複製代碼

MessageHandler就是onListen第二個參數裏面的閉包塊。先將這個MessageHandler放到_handlers Map中。而後執行drain方法:

pkg\sky_engine\lib\ui\channel_buffers.dart

Future<void> drain(String channel, DrainChannelCallback callback) async {
    while (!_isEmpty(channel)) {
      final _StoredMessage message = _pop(channel);
      await callback(message.data, message.callback);
    }
}
複製代碼

看看_isEmpty方法:

bool _isEmpty(String channel) {
    final _RingBuffer<_StoredMessage> queue = _messages[channel];
    return (queue == null) ? true : queue.isEmpty;
}
複製代碼

先獲取channel對應的消息隊列,若是爲空返回true,不然判斷消息隊列的隊頭是否等於隊尾,相等則爲true,不然爲false。

若是消息隊列中有消息,此時就回調callback方法,傳遞數據和message.callback參數。

callback方法對應的調用到_DefaultBinaryMessenger中的handlePlatformMessage方法:

packages\flutter\lib\src\services\binding.dart

@override
Future<void> handlePlatformMessage(
String channel,
ByteData data,
ui.PlatformMessageResponseCallback callback,
) async {
    ByteData response;
    try {
      final MessageHandler handler = _handlers[channel];
      if (handler != null) {
        response = await handler(data);
      } else {
        ui.channelBuffers.push(channel, data, callback);
        callback = null;
      }
    } catch (exception, stack) {

    } finally {
      if (callback != null) {
        callback(response);
      }
    }
}
複製代碼

這裏的handler不爲空,因而調用到最開始binaryMessenger.setMessageHandler的第二個閉包函數中:

packages\flutter\lib\src\services\platform_channel.dart

binaryMessenger.setMessageHandler(name, (ByteData reply) async {
    controller.add(codec.decodeEnvelope(reply));
}
複製代碼

先解碼收到的數據,在StandardMethodCodec中解碼:

packages\flutter\lib\src\services\message_codecs.dart

dynamic decodeEnvelope(ByteData envelope) {
    if (buffer.getUint8() == 0)
        return messageCodec.readValue(buffer);
}
複製代碼

讀取數據,而後返回,這裏的數據類型爲dynamic,須要咱們本身清楚兩端發送和接收的數據類型便可。

3.7.2 invokeMethod發消息給Android

這個方法執行的時機是dart端執行listen方法以後,就會回調到onListen的閉包函數中,而後經過MethodChannel執行一個channel名字爲listen的方法,實際最終執行到了Android的IncomingStreamRequestHandler類中的onMessage方法中,具體流程能夠參考以前的文章 flutter通訊機制-MethodChannel。也就實現了Android端針對EventChannel.EventSink變量的初始化。

3.7.3 添加數據處理

而後執行controller的add方法:

pkg\sky_engine\lib\async\broadcast_stream_controller.dart\_BroadcastStreamController

void add(T data) {
    if (!_mayAddEvent) throw _addEventError();
    _sendData(data);
}
複製代碼

add方法定義在_AsyncBroadcastStreamController類中,看看_sendData方法:

void _sendData(T data) {
    for (_BroadcastSubscription<T> subscription = _firstSubscription;
        subscription != null;
        subscription = subscription._next) {
      subscription._addPending(new _DelayedData<T>(data));
    }
}
複製代碼

接下來調用_addPending方法,在_BufferingStreamSubscription類中:

pkg\sky_engine\lib\async\broadcast_stream_controller.dart

void _addPending(_DelayedEvent event) {
    _StreamImplEvents<T> pending = _pending;
    if (_pending == null) {
      pending = _pending = new _StreamImplEvents<T>();
    }
    pending.add(event);
    if (!_hasPending) {
      _state |= _STATE_HAS_PENDING;
      if (!_isPaused) {
        _pending.schedule(this);
      }
    }
}
複製代碼

3.7.4 調度消息回調

看看_StreamImplEvents的schedule方法:

pkg\sky_engine\lib\async\stream_impl.dart\_PendingEvents

void schedule(_EventDispatch<T> dispatch) {
    if (isScheduled) return;
    assert(!isEmpty);
    if (_eventScheduled) {
      assert(_state == _STATE_CANCELED);
      _state = _STATE_SCHEDULED;
      return;
    }
    scheduleMicrotask(() {
      int oldState = _state;
      _state = _STATE_UNSCHEDULED;
      if (oldState == _STATE_CANCELED) return;
      handleNext(dispatch);
    });
    _state = _STATE_SCHEDULED;
}
複製代碼

handleNext方法執行的就是_StreamImplEvents中的方法,_StreamImplEvents繼承自_PendingEvents類:

pkg\sky_engine\lib\async\stream_impl.dart

void handleNext(_EventDispatch<T> dispatch) {
    assert(!isScheduled);
    _DelayedEvent event = firstPendingEvent;
    firstPendingEvent = event.next;
    if (firstPendingEvent == null) {
      lastPendingEvent = null;
    }
    event.perform(dispatch);
}
複製代碼

到這裏執行perform方法,其對應的是_DelayedData類中的方法:

pkg\sky_engine\lib\async\stream_impl.dart

void perform(_EventDispatch<T> dispatch) {
    dispatch._sendData(value);
}
複製代碼

dispatch對象在_BufferingStreamSubscription的_addPending方法中調用schedule的時候,指代的就是_BufferingStreamSubscription自己,所以_sendData調用會在_BufferingStreamSubscription中:

pkg\sky_engine\lib\async\stream_impl.dart

void _sendData(T data) {
    bool wasInputPaused = _isInputPaused;
    _state |= _STATE_IN_CALLBACK;
    _zone.runUnaryGuarded(_onData, data);
    _state &= ~_STATE_IN_CALLBACK;
    _checkState(wasInputPaused);
}
複製代碼

_zone.runUnaryGuarded(_onData, data)最終就會調用到咱們最初dart代碼中定義的onEvent方法中了,也就是能夠在void _onEvent(Object event) {}回調方法中處理data數據了。

四、scheduleMicrotask任務調度

在_PendingEvents的schedule方法中,會執行scheduleMicrotask方法,看看這個方法裏面是怎麼執行的:

pkg\sky_engine\lib\async\schedule_microtask.dart

void scheduleMicrotask(void callback()) {
  _Zone currentZone = Zone.current;
  if (identical(_rootZone, currentZone)) {
    // No need to bind the callback. We know that the root's scheduleMicrotask
    // will be invoked in the root zone.
    _rootScheduleMicrotask(null, null, _rootZone, callback);
    return;
  }
  _ZoneFunction implementation = currentZone._scheduleMicrotask;
  if (identical(_rootZone, implementation.zone) &&
      _rootZone.inSameErrorZone(currentZone)) {
    _rootScheduleMicrotask(
        null, null, currentZone, currentZone.registerCallback(callback));
    return;
  }
  Zone.current.scheduleMicrotask(Zone.current.bindCallbackGuarded(callback));
}
複製代碼

4.1 Zone運行空間

Zone表示一個能夠穩定異步調用的環境。代碼老是在一個空間的上下文中執行,好比Zone.current。初始化main函數在Zone.root空間中執行,代碼能夠在不一樣的空間中執行,既能夠經過runZoned建立一個新的空間,也能夠經過Zone.run方法在一個已經存在的空間上下文中執行,好比經過Zone.fork建立的空間中。

異步回調方法老是在他們被調度的上下文空間中運行,兩步便可實現: 一、註冊回調方法,方法是registerCallback或registerUnaryCallback或registerBinaryCallback,這容許空間記錄這一個回調方法,後續可能也會存在修改,好比返回另一個回調方法。作註冊操做時的空間預示着後續回調也運行在這個空間中。

二、在後續某個時間點,回調方法在對應的空間中運行。

爲了方便,空間提供了bindCallback(bindUnaryCallback或bindBinaryCallback)方法來表示這種機制,最開始註冊方法所在的空間,就是其包裹的回調方法被異步執行時所在的空間。

一樣的,空間提供了bindCallbackGuarded(bindUnaryCallbackGuarded或bindBinaryCallbackGuarded)方法,應該在其中經過調用Zone.runGuarded去執行回調方法。

4.2 執行任務

這裏的Zone.current實際上是_RootZone,意味着跟main函數所在的空間相同。因而這裏調用_rootScheduleMicrotask方法:

pkg\sky_engine\lib\async\zone.dart

void _rootScheduleMicrotask(
    Zone self, ZoneDelegate parent, Zone zone, void f()) {
  if (!identical(_rootZone, zone)) {
    bool hasErrorHandler = !_rootZone.inSameErrorZone(zone);
    if (hasErrorHandler) {
      f = zone.bindCallbackGuarded(f);
    } else {
      f = zone.bindCallback(f);
    }
    // Use root zone as event zone if the function is already bound.
    zone = _rootZone;
  }
  _scheduleAsyncCallback(f);
}
複製代碼

_rootZone和zone是相等的,因而調用_scheduleAsyncCallback,將f回調函數異步調用:

pkg\sky_engine\lib\async\schedule_microtask.dart

void _scheduleAsyncCallback(_AsyncCallback callback) {
  _AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);
  if (_nextCallback == null) {
    _nextCallback = _lastCallback = newEntry;
    if (!_isInCallbackLoop) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  } else {
    _lastCallback.next = newEntry;
    _lastCallback = newEntry;
  }
}
複製代碼

接下來執行_startMicrotaskLoop方法:

pkg\sky_engine\lib\async\schedule_microtask.dart

void _startMicrotaskLoop() {
  _isInCallbackLoop = true;
  try {
    // Moved to separate function because try-finally prevents
    // good optimization.
    _microtaskLoop();
  } finally {
    _lastPriorityCallback = null;
    _isInCallbackLoop = false;
    if (_nextCallback != null) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  }
}
複製代碼

能夠看到在finally的代碼塊中,又會異步的調用_startMicrotaskLoop,當_nextCallback不爲空時,就能夠一直調用_microtaskLoop方法了。這些調用並不會阻塞UI線程,由於當前是異步的,而異步執行的方法是_microtaskLoop:

void _microtaskLoop() {
  while (_nextCallback != null) {
    _lastPriorityCallback = null;
    _AsyncCallbackEntry entry = _nextCallback;
    _nextCallback = entry.next;
    if (_nextCallback == null) _lastCallback = null;
    (entry.callback)();
  }
}
複製代碼

_nextCallback就是_AsyncCallbackEntry封裝的異步callback方法,執行回調以前將_nextCallback賦值爲下一個回調方法。

callback就是咱們在_scheduleAsyncCallback方法中封裝過來的callback回調方法,這個回調方法就是_rootScheduleMicrotask中的f(),也就是上一章的scheduleMicrotask方法的第二個參數,最終回調到咱們的onEvent方法了。

五、Android通知flutter

回到flutter中_DefaultBinaryMessenger的handlePlatformMessage方法中:

@override
Future<void> handlePlatformMessage(
String channel,
ByteData data,
ui.PlatformMessageResponseCallback callback,
) async {
    ByteData response;
    try {
      final MessageHandler handler = _handlers[channel];
      if (handler != null) {
        response = await handler(data);
      } else {
        ui.channelBuffers.push(channel, data, callback);
        callback = null;
      }
    } catch (exception, stack) {
    } finally {
      if (callback != null) {
        callback(response);
      }
    }
}
複製代碼

當沒有經過binaryMessenger.setMessageHandler設置MessageHandler時,消息存在隊列中,一旦註冊以後,立刻就將消息分發給註冊者;當Map中存在channel對應的MessageHandler時,直接回調,也就是回到了setMessageHandler的閉包代碼塊中,重複執行3.7.2以後的流程。

微信公衆號:

在這裏插入圖片描述
相關文章
相關標籤/搜索