咱們開發項目的時候,爲了方便組件之間的通訊,使代碼更加簡潔,耦合性更低,須要引入事件總線。事件總線的庫咱們一般會選擇EventBus或者基於Rxjava的RxBus,如今隨着jetpack裏LiveData的推出,也出現了基於LiveData實現的事件總線庫。 那麼,除了這些,還有沒有其餘的實現事件總線的方法呢?在使用協程的過程當中,發現協程中的Channel用到了生產者消費者模式,那麼能夠使用Channel實現事件總線嗎?接下來試一下。html
Channel是屬於kotlin協程,要使用須要在項目中引入協程庫java
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3"
複製代碼
Channel相似Java裏的BlockingQueue,producer生產事件發送到Channel,consumer從Channel裏取出事件進行消費。android
kotlin官網文檔提供了Channel的用法git
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
複製代碼
建立Channel,使用Channel.send 發送事件 ,使用Channel.receive 接收事件,要注意的是Channel事件的接收和發送都須要在協程裏調用。github
class ChannelBus private constructor() {
private var channel: Channel<Events> = Channel(Channel.BUFFERED)
companion object {
val instance: ChannelBus by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
ChannelBus()
}
}
}
複製代碼
context
表示協程執行時的上下文;event
是一個掛起的方法,用來消費事件;jobList
用來保存消費事件的job。data class ChannelConsumer(
val context: CoroutineContext,
val event: suspend (event: Events) -> Unit,
var jobList: MutableList<Job> = mutableListOf()
)
複製代碼
receive
方法中,key表示存儲該消費者時的鍵,onEvent是一個掛起函數,表示當咱們接收到事件時的處理方法,是一個lambda函數,而context爲協程上下文,表示這個lambda函數在哪一個線程執行,這地方默認在Dispatchers.Main
主線程執行。咱們將傳入的參數構形成一個ChannelConsumer
對象而後保存在Map中。private val consumerMap = ConcurrentHashMap<String, ChannelConsumer>()
fun receive( key: String, context: CoroutineContext = Dispatchers.Main, onEvent: suspend (event: Events) -> Unit
) {
consumerMap[key] = ChannelConsumer(context, onEvent)
}
複製代碼
Channel
發送出去。fun send(event: Events) {
GlobalScope.launch {
channel.send(event)
}
}
複製代碼
Channel
爲空時會掛起,當有新事件時,事件會從Channel
中取出,咱們在這裏進行分發。咱們遍歷Map,獲得每一個ChannelConsumer
,因而就能夠處理事件e
,這裏直接經過launch
方法啓動協程,協程的上下文 it.value.context
就是receive
方法傳入的context
,it.value.event(e)
就是receive
方法傳入的lambda函數,e
是 send
方法傳入的event
,launch
方法返回一個job
,咱們把這個job添加到ChannelConsumer
的jobList
裏。init {
GlobalScope.launch {
for (e in channel) {
consumerMap.entries.forEach {
it.value.jobList.add(launch(it.value.context) {
it.value.event(e)
})
}
}
}
}
複製代碼
ChannelConsumer
,而後循環jobList
並取消每個job,避免內存泄漏,最後移除消費者。fun remove(key: String) {
consumerMap[key]?.jobList?.forEach {
it.cancel()
}
consumerMap.remove(key)
}
複製代碼
因此咱們在項目裏能夠這麼用api
override fun onCreate(savedInstanceState: Bundle?) {
......
ChannelBus.instance.receive("key",Dispatchers.Main,{
activity_main_text.text = it.name
})
......
}
複製代碼
能夠簡寫成以下方式網絡
override fun onCreate(savedInstanceState: Bundle?) {
......
ChannelBus.instance.receive("key") {
activity_main_text.text = it.name
}
......
}
複製代碼
由於傳入的是suspend函數,因此若是要進行耗時操做,能夠直接執行,只須要把context參數傳入IO線程Dispatchers.IO
就好了,而後使用withContext函數切回主線程,再進行UI操做。async
override fun onCreate(savedInstanceState: Bundle?) {
......
ChannelBus.instance.receive("key", Dispatchers.IO) {
val s = httpRequest() //IO線程,耗時操做
withContext(Dispatchers.Main) { //切回UI線程
activity_sticky_text.text = s //更改UI
}
}
}
//網絡請求
private fun httpRequest(): String {
val url = URL("https://api.github.com/users/LGD2009")
val urlConnection = url.openConnection() as HttpURLConnection
urlConnection.let {
it.connectTimeout = 5000
it.requestMethod = "GET"
}
urlConnection.connect()
if (urlConnection.responseCode != 200) {
return "請求url失敗"
} else {
val inputStream: InputStream = urlConnection.inputStream
return inputStream.bufferedReader().use { it.readText() }
}
}
複製代碼
ChannelBus.instance.send(Events.EVENT_1)
複製代碼
override fun onDestroy() {
......
ChannelBus.instance.remove("key")
}
複製代碼
上面的方法中,註冊消費者以後每次都須要手動取消,那麼可不能夠自動取消呢?這裏就須要用到Lifecycle了。 咱們使 ChannelBus 繼承 LifecycleObserver
,並重載receive
方法和remove
方法。 重載receive
方法,其中key
換成LifecycleOwner
,而後調用lifecycleOwner.lifecycle.addObserver(this)
將當前的ChannelBus做爲觀察者添加進去。ide
class ChannelBus private constructor() : LifecycleObserver {
private val lifecycleOwnerMap = ConcurrentHashMap<LifecycleOwner, ChannelConsumer>()
......
fun receive( lifecycleOwner: LifecycleOwner, context: CoroutineContext = Dispatchers.Main, onEvent: suspend (event: Events) -> Unit
) {
lifecycleOwner.lifecycle.addObserver(this)
lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent)
}
}
複製代碼
而後重載remove
方法,key
換成了LifecycleOwner
,添加註解。由於如今Activity和Fragment都繼承了LifecycleOwner
,當Activity和Fragment運行destroy
銷燬時,當前觀察者就會觀察到並調用這個方法。函數
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun remove(lifecycleOwner: LifecycleOwner) {
lifecycleOwnerMap[lifecycleOwner]?.jobList?.forEach {
it.cancel()
}
lifecycleOwnerMap.remove(lifecycleOwner)
}
複製代碼
因此,咱們在Activity或Fragment裏註冊消費者時只需
override fun onCreate(savedInstanceState: Bundle?) {
......
ChannelBus.instance.receive(this) {
activity_main_text.text = it.name
}
......
}
複製代碼
當Activity或Fragment銷燬時會自動取消註冊。
有時候咱們可能須要在消費者訂閱的時候能收到以前發送的某些事件,這時候就須要用到粘性事件。簡單的實現思路是保存事件,在註冊消費者時候發送事件。 建立List保存粘性事件,並添加移除粘性事件的方法。
private val stickyEventsList = mutableListOf<Events>()
fun removeStickEvent(event: Events) {
stickyEventsList.remove(event)
}
複製代碼
改造send方法,增長一個 Boolean 類型的參數,用來指明是不是粘性的,固然,默認值是false。若是是true,則把事件存入List。
fun send(event: Events, isSticky: Boolean = false) {
GlobalScope.launch {
if (isSticky) {
stickyEventsList.add(event)
}
channel.send(event)
}
}
複製代碼
添加接收粘性事件的消費者方法。
fun receiveSticky( lifecycleOwner: LifecycleOwner, context: CoroutineContext = Dispatchers.Main, onEvent: suspend (event: Events) -> Unit
) {
lifecycleOwner.lifecycle.addObserver(this)
lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent)
stickyEventsList.forEach { e ->
lifecycleOwnerMap[lifecycleOwner]?.jobList?.add(GlobalScope.launch(context) {
onEvent(e)
})
}
}
複製代碼
上面的文章中,同一個事件只能取一次,爲了發送到多個消費者,因此使用Map保存,而後依次發送。而BroadcastChannel則能夠有多個接收端。
因此,若是用BroadcastChannel來實現則更爲簡單。 建立BroadcastChannel對象
@ExperimentalCoroutinesApi
class BroadcastChannelBus private constructor() : LifecycleObserver {
private val broadcastChannel: BroadcastChannel<Events> = BroadcastChannel(Channel.BUFFERED)
private val lifecycleOwnerMap = ConcurrentHashMap<LifecycleOwner, ChannelConsumer>()
companion object {
val instance: BroadcastChannelBus by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
BroadcastChannelBus()
}
}
複製代碼
數據類jobList改爲job,再增長一個receiveChannel
data class ChannelConsumer(
val context: CoroutineContext,
val event: suspend (event: Events) -> Unit,
val job: Job?,
val receiveChannel: ReceiveChannel<Events>
)
複製代碼
發送方法不須要改變,receive方法須要更改。 經過val receiveChannel = broadcastChannel.openSubscription()
訂閱,job和receiveChannel保存到數據類。
fun receive( lifecycleOwner: LifecycleOwner, context: CoroutineContext = Dispatchers.Main, onEvent: suspend (event: Events) -> Unit
) {
lifecycleOwner.lifecycle.addObserver(this)
val receiveChannel = broadcastChannel.openSubscription()
val job = GlobalScope.launch(context) {
for (e in receiveChannel) {
onEvent(e)
}
}
lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent,job,receiveChannel)
}
複製代碼
因此,最後取消訂閱時,關閉receiveChannel並取消任務。
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun remove(lifecycleOwner: LifecycleOwner) {
lifecycleOwnerMap[lifecycleOwner]?.receiveChannel?.cancel()
lifecycleOwnerMap[lifecycleOwner]?.job?.cancel()
lifecycleOwnerMap.remove(lifecycleOwner)
}
複製代碼
不過,須要注意的是這個api如今是實驗性功能,在後續版本的更新中可能會改變。
這篇文章主要是拋磚引玉,使用Channel換一種實現事件總線的思路。實現功能只有一個文件,Demo已上傳到github,項目名ChannelBus,你們能夠在使用時根據具體的需求進行修改。