本文已參與好文召集令活動,點擊查看:後端、大前端雙賽道投稿,2萬元獎池等你挑戰!前端
面試的時候常常會被問及多線程同步的問題,例如:java
「 現有 Task一、Task2 等多個並行任務,如何等待所有執行完成後,執行 Task3。」web
在 Kotlin 中咱們有多種實現方式,本文將全部這些方式作了整理,建議收藏。面試
1. Thread.join
2. Synchronized
3. ReentrantLock
4. BlockingQueue
5. CountDownLatch
6. CyclicBarrier
7. CAS
8. Future
9. CompletableFuture
10. Rxjava
11. Coroutine
12. Flow編程
咱們先定義三個Task,模擬上述場景, Task3 基於 Task一、Task2 返回的結果拼接字符串,每一個 Task 經過 sleep 模擬耗時: 後端
val task1: () -> String = {
sleep(2000)
"Hello".also { println("task1 finished: $it") }
}
val task2: () -> String = {
sleep(2000)
"World".also { println("task2 finished: $it") }
}
val task3: (String, String) -> String = { p1, p2 ->
sleep(2000)
"$p1 $p2".also { println("task3 finished: $it") }
}
複製代碼
Kotlin 兼容 Java,Java 的全部線程工具默認均可以使用。其中最簡單的線程同步方式就是使用 Thread
的 join()
:api
@Test
fun test_join() {
lateinit var s1: String
lateinit var s2: String
val t1 = Thread { s1 = task1() }
val t2 = Thread { s2 = task2() }
t1.start()
t2.start()
t1.join()
t2.join()
task3(s1, s2)
}
複製代碼
使用 synchronized
鎖進行同步安全
@Test
fun test_synchrnoized() {
lateinit var s1: String
lateinit var s2: String
Thread {
synchronized(Unit) {
s1 = task1()
}
}.start()
s2 = task2()
synchronized(Unit) {
task3(s1, s2)
}
}
複製代碼
可是若是超過三個任務,使用 synchrnoized
這種寫法就比較彆扭了,爲了同步多個並行任務的結果須要聲明n個鎖,並嵌套n個 synchronized
。markdown
ReentrantLock
是 JUC 提供的線程鎖,能夠替換 synchronized 的使用多線程
@Test
fun test_ReentrantLock() {
lateinit var s1: String
lateinit var s2: String
val lock = ReentrantLock()
Thread {
lock.lock()
s1 = task1()
lock.unlock()
}.start()
s2 = task2()
lock.lock()
task3(s1, s2)
lock.unlock()
}
複製代碼
ReentrantLock 的好處是,當有多個並行任務時是不會出現嵌套 synchrnoized
的問題,但仍然須要建立多個 lock 管理不一樣的任務,
阻塞隊列內部也是經過 Lock 實現的,因此也能夠達到同步鎖的效果
@Test
fun test_blockingQueue() {
lateinit var s1: String
lateinit var s2: String
val queue = SynchronousQueue<Unit>()
Thread {
s1 = task1()
queue.put(Unit)
}.start()
s2 = task2()
queue.take()
task3(s1, s2)
}
複製代碼
固然,阻塞隊列更可能是使用在生產/消費場景中的同步。
JUC 中的鎖大都基於 AQS
實現的,能夠分爲獨享鎖和共享鎖。ReentrantLock
就是一種獨享鎖。相比之下,共享鎖更適合本場景。 例如 CountDownLatch
,它可讓一個線程一直處於阻塞狀態,直到其餘線程的執行所有完成:
@Test
fun test_countdownlatch() {
lateinit var s1: String
lateinit var s2: String
val cd = CountDownLatch(2)
Thread() {
s1 = task1()
cd.countDown()
}.start()
Thread() {
s2 = task2()
cd.countDown()
}.start()
cd.await()
task3(s1, s2)
}
複製代碼
共享鎖的好處是沒必要爲了每一個任務都建立單獨的鎖,即便再多並行任務寫起來也很輕鬆
CyclicBarrier
是 JUC 提供的另外一種共享鎖機制,它可讓一組線程到達一個同步點後再一塊兒繼續運行,其中任意一個線程未達到同步點,其餘已到達的線程均會被阻塞。
與 CountDownLatch
的區別在於 CountDownLatch
是一次性的,而 CyclicBarrier
能夠被重置後重復使用,這也正是 Cyclic
的命名由來,能夠循環使用
@Test
fun test_CyclicBarrier() {
lateinit var s1: String
lateinit var s2: String
val cb = CyclicBarrier(3)
Thread {
s1 = task1()
cb.await()
}.start()
Thread() {
s2 = task1()
cb.await()
}.start()
cb.await()
task3(s1, s2)
}
複製代碼
AQS 內部經過自旋鎖實現同步,自旋鎖的本質是利用 CompareAndSwap
避免線程阻塞的開銷。 所以,咱們可使用基於 CAS 的原子類計數,達到實現無鎖操做的目的。
@Test
fun test_cas() {
lateinit var s1: String
lateinit var s2: String
val cas = AtomicInteger(2)
Thread {
s1 = task1()
cas.getAndDecrement()
}.start()
Thread {
s2 = task2()
cas.getAndDecrement()
}.start()
while (cas.get() != 0) {}
task3(s1, s2)
}
複製代碼
while
循環空轉看起來有些浪費資源,可是自旋鎖的本質就是這樣,因此 CAS 僅僅適用於一些cpu密集型的短任務同步。
看到 CAS 的無鎖實現,也許不少人會想到 volatile
, 是否也能實現無鎖的線程安全?
@Test
fun test_Volatile() {
lateinit var s1: String
lateinit var s2: String
Thread {
s1 = task1()
cnt--
}.start()
Thread {
s2 = task2()
cnt--
}.start()
while (cnt != 0) {
}
task3(s1, s2)
}
複製代碼
注意,這種寫法是錯誤的 volatile
能保證可見性,可是不能保證原子性,cnt--
並不是線程安全,須要加鎖操做
上面不管有鎖操做仍是無鎖操做,都須要定義兩個變量s1
、s2
記錄結果很是不方便。 Java 1.5 開始,提供了 Callable
和 Future
,能夠在任務執行結束時返回結果。
@Test
fun test_future() {
val future1 = FutureTask(Callable(task1))
val future2 = FutureTask(Callable(task2))
Executors.newCachedThreadPool().execute(future1)
Executors.newCachedThreadPool().execute(future2)
task3(future1.get(), future2.get())
}
複製代碼
經過 future.get()
,能夠同步等待結果返回,寫起來很是方便
future.get()
雖然方便,可是會阻塞線程。 Java 8 中引入了 CompletableFuture
,他實現了 Future 接口的同時實現了 CompletionStage
接口。 CompletableFuture
能夠針對多個 CompletionStage
進行邏輯組合、實現複雜的異步編程。 這些邏輯組合的方法以回調的形式避免了線程阻塞:
@Test
fun test_CompletableFuture() {
CompletableFuture.supplyAsync(task1)
.thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
task3(p1, p2)
}.join()
}
複製代碼
RxJava
提供的各類操做符以及線程切換能力一樣能夠幫助咱們實現需求: zip
操做符能夠組合兩個 Observable
的結果;subscribeOn
用來啓動異步任務
@Test
fun test_Rxjava() {
Observable.zip(
Observable.fromCallable(Callable(task1))
.subscribeOn(Schedulers.newThread()),
Observable.fromCallable(Callable(task2))
.subscribeOn(Schedulers.newThread()),
BiFunction(task3)
).test().awaitTerminalEvent()
}
複製代碼
前面講了那麼多,其實都是 Java 的工具。 Coroutine
終於算得上是 Kotlin 特有的工具了:
@Test
fun test_coroutine() {
runBlocking {
val c1 = async(Dispatchers.IO) {
task1()
}
val c2 = async(Dispatchers.IO) {
task2()
}
task3(c1.await(), c2.await())
}
}
複製代碼
寫起來特別舒服,能夠說是集前面各種工具的優勢於一身。
Flow
就是 Coroutine 版的 RxJava,具有不少 RxJava 的操做符,例如 zip
:
@Test
fun test_flow() {
val flow1 = flow<String> { emit(task1()) }
val flow2 = flow<String> { emit(task2()) }
runBlocking {
flow1.zip(flow2) { t1, t2 ->
task3(t1, t2)
}.flowOn(Dispatchers.IO)
.collect()
}
}
複製代碼
flowOn
使得 Task 在異步計算併發射結果。
上面這麼多方式,就像茴香豆的「茴」字的四種寫法,不必都掌握。做爲結論,在 Kotlin 上最好用的線程同步方案首推協程!