鴻蒙開發實戰系列之二:事件總線EventBus/RxBus

鴻蒙開發實戰系列之一:鴻蒙開發實戰系列之一:圓角java

前言

上一篇跟你們分享瞭如何在鴻蒙系統中實現圓角,這一期咱們來跟你們分享一下如何實現發佈/訂閱的事件總線,也就是咱們在Android開發中經常使用的EventBus、RxBus等框架。react

開始以前,咱們先回顧下Android裏面的事件發佈/訂閱是個什麼東西? 像EventBus、RxBus是咱們在Android應用開發中常選用的發佈/訂閱事件框架,用來代替傳統的Intent,Handler,Broadcast,在Activity,Fragment,Service線程之間傳遞數據,執行方法。android

它有不少優勢:簡化應用組件間的通訊;解耦事件的發送者和接收者;避免複雜和容易出錯的依賴和生命週期的問題;速度快,專門爲高性能優化過等等。git

主要工做原理:事件源將產生的消息發送到事件總線的特定通道之上,而後監聽器在事先會訂閱事務總線之中不一樣的通道以區分消息的響應,而後當消息被髮送到事務總線的特定通道之中時,所對應的監聽器會監聽到消息,而後監聽器根據程序中設置的響應函數進行執行。github

那在鴻蒙系統中,確定是沒有現成的EventBus或者RxBus給咱們直接用的,那麼咱們如何在鴻蒙系統中的Slice,Ability,Service,線程之間傳遞消息,執行方法呢? 磨刀不誤砍柴工啊,兄dei,要是不先把這個工具庫寫出來,這麼幾周的時間怎麼完成一個鴻蒙App呢,產品手中的30m大刀可不是鬧着玩的啊。性能優化

因爲鴻蒙只能使用Java代碼,因此咱們看上了RxJava這個小兄弟,何不像在Android中同樣,在rxjava的基礎上封裝一個RxBus呢?跟Android大哥看齊不香麼?app

好的,說幹就幹,咱們來手lu一個鴻蒙版的RxBus。框架

實現鴻蒙Rxbus

一、先引入Rxjava庫

implementation 'io.reactivex.rxjava3:rxjava:3.0.4'

二、建立鴻蒙線程調度HarmonySchedulers

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.Executor;

public class HarmonySchedulers implements Executor {

    private static HarmonySchedulers instance;
    private final Scheduler mMainScheduler;
    private TaskDispatcher uiTaskDispatcher;

    private HarmonySchedulers() {
        mMainScheduler = Schedulers.from(this);
    }

    public static synchronized Scheduler mainThread() {
        if (instance == null) {
            instance = new HarmonySchedulers();
        }
        return instance.mMainScheduler;
    }

    @Override
    public void execute(@NonNull Runnable command) {
         if (uiTaskDispatcher == null) {
            uiTaskDispatcher = getMainAbility().getUITaskDispatcher();//注意,這裏要用Ability來獲取UI線程的任務發射器,Ability本身想辦法獲取
        }
        uiTaskDispatcher.delayDispatch(runnable, delayTime);
    }
}

三、建立RxBus類,實現訂閱、註冊、取消註冊等功能

@SuppressWarnings("unused")
public class RxBus {
    public static final String LOG_BUS = "RXBUS_LOG";
    private static volatile RxBus defaultInstance;

    private Map<Class, List<Disposable>> subscriptionsByEventType = new HashMap<>();

    private Map<Object, List<Class>> eventTypesBySubscriber = new HashMap<>();

    private Map<Class, List<SubscriberMethod>> subscriberMethodByEventType = new HashMap<>();

    private final Subject<Object> bus;

    private RxBus() {
        this.bus = PublishSubject.create().toSerialized();
    }

    public static RxBus get() {
        RxBus rxBus = defaultInstance;
        if (defaultInstance == null) {
            synchronized (RxBus.class) {
                rxBus = defaultInstance;
                if (defaultInstance == null) {
                    rxBus = new RxBus();
                    defaultInstance = rxBus;
                }
            }
        }
        return rxBus;
    }

    /**
     * 根據傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
     *
     * @param eventType 事件類型
     * @return return
     */
    private <T> Flowable<T> toObservable(Class<T> eventType) {
        return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(eventType);
    }

    /**
     * 根據傳遞的code和 eventType 類型返回特定類型(eventType)的 被觀察者
     *
     * @param code      事件code
     * @param eventType 事件類型
     */
    private <T> Flowable<T> toObservable(final int code, final Class<T> eventType) {
        return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(Message.class)
                .filter(new Predicate<Message>() {
                    @Override
                    public boolean test(Message o) throws Exception {
                        return o.getCode() == code && eventType.isInstance(o.getObject());
                    }
                }).map(new Function<Message, Object>() {
                    @Override
                    public Object apply(Message o) throws Exception {
                        return o.getObject();
                    }
                }).cast(eventType);
    }

    /**
     * 註冊
     *
     * @param subscriber 訂閱者
     */
    public void register(Object subscriber) {
        Class<?> subClass = subscriber.getClass();
        Method[] methods = subClass.getDeclaredMethods();
        for (Method method : methods) {
            if (method.isAnnotationPresent(Subscribe.class)) {
                //得到參數類型
                Class[] parameterType = method.getParameterTypes();
                //參數不爲空 且參數個數爲1
                if (parameterType != null && parameterType.length == 1) {

                    Class eventType = parameterType[0];

                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();

                    SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode);
                    addSubscriberToMap(eventType, subscriberMethod);

                    addSubscriber(subscriberMethod);
                } else if (parameterType == null || parameterType.length == 0) {

                    Class eventType = BusData.class;

                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();

                    SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode);
                    addSubscriberToMap(eventType, subscriberMethod);

                    addSubscriber(subscriberMethod);

                }
            }
        }
    }
    
    /**
     * 將event的類型以訂閱中subscriber爲key保存到map裏
     *
     * @param subscriber 訂閱者
     * @param eventType  event類型
     */
    private void addEventTypeToMap(Object subscriber, Class eventType) {
        List<Class> eventTypes = eventTypesBySubscriber.get(subscriber);
        if (eventTypes == null) {
            eventTypes = new ArrayList<>();
            eventTypesBySubscriber.put(subscriber, eventTypes);
        }

        if (!eventTypes.contains(eventType)) {
            eventTypes.add(eventType);
        }
    }

    /**
     * 將註解方法信息以event類型爲key保存到map中
     *
     * @param eventType        event類型
     * @param subscriberMethod 註解方法信息
     */
    private void addSubscriberToMap(Class eventType, SubscriberMethod subscriberMethod) {
        List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
        if (subscriberMethods == null) {
            subscriberMethods = new ArrayList<>();
            subscriberMethodByEventType.put(eventType, subscriberMethods);
        }

        if (!subscriberMethods.contains(subscriberMethod)) {
            subscriberMethods.add(subscriberMethod);
        }
    }

    /**
     * 將訂閱事件以event類型爲key保存到map,用於取消訂閱時用
     *
     * @param eventType  event類型
     * @param disposable 訂閱事件
     */
    private void addSubscriptionToMap(Class eventType, Disposable disposable) {
        List<Disposable> disposables = subscriptionsByEventType.get(eventType);
        if (disposables == null) {
            disposables = new ArrayList<>();
            subscriptionsByEventType.put(eventType, disposables);
        }

        if (!disposables.contains(disposable)) {
            disposables.add(disposable);
        }
    }

    /**
     * 用RxJava添加訂閱者
     *
     * @param subscriberMethod d
     */
    @SuppressWarnings("unchecked")
    private void addSubscriber(final SubscriberMethod subscriberMethod) {
        Flowable flowable;
        if (subscriberMethod.code == -1) {
            flowable = toObservable(subscriberMethod.eventType);
        } else {
            flowable = toObservable(subscriberMethod.code, subscriberMethod.eventType);
        }
        Disposable subscription = postToObservable(flowable, subscriberMethod)
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        callEvent(subscriberMethod, o);
                    }
                });
        addSubscriptionToMap(subscriberMethod.subscriber.getClass(), subscription);
    }

    /**
     * 用於處理訂閱事件在那個線程中執行
     *
     * @param observable       d
     * @param subscriberMethod d
     * @return Observable
     */
    private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
        Scheduler scheduler;
        switch (subscriberMethod.threadMode) {
            case MAIN:
                scheduler = HarmonySchedulers.mainThread();
                break;

            case NEW_THREAD:
                scheduler = Schedulers.newThread();
                break;

            case CURRENT_THREAD:
                scheduler = Schedulers.trampoline();
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
        }
        return observable.observeOn(scheduler);
    }

    /**
     * 回調到訂閱者的方法中
     *
     * @param method code
     * @param object obj
     */
    private void callEvent(SubscriberMethod method, Object object) {
        Class eventClass = object.getClass();
        List<SubscriberMethod> methods = subscriberMethodByEventType.get(eventClass);
        if (methods != null && methods.size() > 0) {
            for (SubscriberMethod subscriberMethod : methods) {
                Subscribe sub = subscriberMethod.method.getAnnotation(Subscribe.class);
                int c = sub.code();
                if (c == method.code && method.subscriber.equals(subscriberMethod.subscriber) && method.method.equals(subscriberMethod.method)) {
                    subscriberMethod.invoke(object);
                }

            }
        }
    }

    /**
     * 取消註冊
     *
     * @param subscriber object
     */
    public void unRegister(Object subscriber) {
        List<Class> subscribedTypes = eventTypesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unSubscribeByEventType(subscriber.getClass());
                unSubscribeMethodByEventType(subscriber, eventType);
            }
            eventTypesBySubscriber.remove(subscriber);
        }
    }

    /**
     * subscriptions unsubscribe
     *
     * @param eventType eventType
     */
    private void unSubscribeByEventType(Class eventType) {
        List<Disposable> disposables = subscriptionsByEventType.get(eventType);
        if (disposables != null) {
            Iterator<Disposable> iterator = disposables.iterator();
            while (iterator.hasNext()) {
                Disposable disposable = iterator.next();
                if (disposable != null && !disposable.isDisposed()) {
                    disposable.dispose();
                    iterator.remove();
                }
            }
        }
    }

    /**
     * 移除subscriber對應的subscriberMethods
     *
     * @param subscriber subscriber
     * @param eventType  eventType
     */
    private void unSubscribeMethodByEventType(Object subscriber, Class eventType) {
        List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
        if (subscriberMethods != null) {
            Iterator<SubscriberMethod> iterator = subscriberMethods.iterator();
            while (iterator.hasNext()) {
                SubscriberMethod subscriberMethod = iterator.next();
                if (subscriberMethod.subscriber.equals(subscriber)) {
                    iterator.remove();
                }
            }
        }
    }

    public void send(int code, Object o) {
        bus.onNext(new Message(code, o));
    }

    public void send(Object o) {
        bus.onNext(o);
    }

    public void send(int code) {
        bus.onNext(new Message(code, new BusData()));
    }

    private class Message {
        private int code;
        private Object object;

        public Message() {
        }

        private Message(int code, Object o) {
            this.code = code;
            this.object = o;
        }

        private int getCode() {
            return code;
        }

        public void setCode(int code) {
            this.code = code;
        }

        private Object getObject() {
            return object;
        }

        public void setObject(Object object) {
            this.object = object;
        }
    }
}

四、添加其餘附加類

BusData.java 事件數據封裝

public class BusData {
    String id;
    String status;
    public BusData() {}
    public BusData(String id, String status) {
        this.id = id;
        this.status = status;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getStatus() {
        return status;
    }
    public void setStatus(String status) {
        this.status = status;
    }
}

Subscribe.java 註解類

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
    int code() default -1;

    ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
}

SubscriberMethod.java 執行註冊方法封裝

public class SubscriberMethod {
    public Method method;
    public ThreadMode threadMode;
    public Class<?> eventType;
    public Object subscriber;
    public int code;

    public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.subscriber = subscriber;
        this.code = code;
    }
    
    /**
     * 調用方法
     * @param o 參數
     */
    public void invoke(Object o){
        try {
            Class[] parameterType = method.getParameterTypes();
            if(parameterType != null && parameterType.length == 1){
                method.invoke(subscriber, o);
            } else if(parameterType == null || parameterType.length == 0){
                method.invoke(subscriber);
            }
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
    }
}

ThreadMode.java 線程模型,用來標識事件運行線程

public enum ThreadMode {
    /**
     * current thread
     */
    CURRENT_THREAD,
    /**
     * android main thread
     */
    MAIN,
    /**
     * new thread
     */
    NEW_THREAD
}

使用Rxbus

一、定義事件參數類

public class RxbusEvent {}

二、定義事件接收者

public class RxBusDemoAbilitySlice extends AbilitySlice {
    @Override
    protected void onStart(Intent intent) {
        super.onStart(intent);
        RxBus.get().register(this);//注測rxbus
    }
    @Override
    protected void onStop() {
        super.onStop();
        RxBus.get().unRegister(this);//註銷rxbus
    }
    /**
     * 接收事件
     * @param rxbusEvent
     */
    @Subscribe(threadMode = ThreadMode.MAIN)
    public void rxBusRxbusEvent(RxbusEvent rxbusEvent) {
        if (rxbusEvent == null) {
            return;
        }
        //執行對應操做
    }    
}

三、發送事件

RxBus.get().send(new RxbusEvent());//發送事件

總結

打完收工,最後使用起來跟Android中基本同樣,本期代碼比較多,歡迎你們使用測試並反饋bug,後期代碼整理好會一併傳到github上去。好了,繼續搞鴻蒙其餘功能去了。ide

這是本系列的第二篇,後面還會爲你們帶來更多的鴻蒙乾貨,敬請期待…。函數

若是文章對您有一點啓發的話,但願您能點個贊,來個關注收藏不迷路。

相關文章
相關標籤/搜索