Go Channel 應用模式

目錄 [−]

  1. Lock/TryLock 模式
    1. Hacked Lock/TryLock 模式
    2. TryLock By Channel
    3. TryLock with Timeout
  2. Or Channel 模式
    1. Goroutine方式
    2. Reflect方式
    3. 遞歸方式
  3. Or-Done-Channel模式
  4. 扇入模式
    1. Goroutine方式
    2. Reflect
    3. 遞歸方式
  5. Tee模式
    1. Goroutine方式
    2. Reflect方式
  6. 分佈模式
    1. Goroutine方式
    2. Reflect方式
  7. eapache
    1. Distribute
    2. Tee
    3. Multiplex
    4. Pipe
  8. 集合操做
    1. skip
      1. skipN
      2. skipFn
      3. skipWhile
    2. take
      1. takeN
      2. takeFn
      3. takeWhile
    3. flat
    4. map
    5. reduce
  9. 總結
  10. 參考資料

Channel是Go中的一種類型,和goroutine一塊兒爲Go提供了併發技術, 它在開發中獲得了普遍的應用。Go鼓勵人們經過Channel在goroutine之間傳遞數據的引用(就像把數據的owner從一個goroutine傳遞給另一個goroutine), Effective Go總結了這麼一句話:html

Do not communicate by sharing memory; instead, share memory by communicating.git

在 Go內存模型指出了channel做爲併發控制的一個特性:github

A send on a channel happens before the corresponding receive from that channel completes. (Golang Spec)golang

除了正常的在goroutine之間安全地傳遞共享數據, Channel還能夠玩出不少的花樣(模式), 本文列舉了一些channel的應用模式。apache

促成本文誕生的因素主要包括:編程

  1. eapache的channels庫
  2. concurrency in go 這本書
  3. Francesc Campoy的 justforfun系列中關於merge channel的實現
  4. 我在出版Scala集合手冊這本書中對Scala集合的啓發

下面就讓咱們以實例的方式看看這麼模式吧。數組

 

Lock/TryLock 模式

咱們知道, Go的標準庫syncMutex,能夠用來做爲鎖,可是Mutex卻沒有實現TryLock方法。緩存

咱們對於TryLock的定義是當前goroutine嘗試得到鎖, 若是成功,則得到了鎖,返回true, 不然返回false。咱們可使用這個方法避免在獲取鎖的時候當前goroutine被阻塞住。安全

原本,這是一個經常使用的功能,在一些其它編程語言中都有實現,爲何Go中沒有實現的?issue#6123有詳細的討論,在我看來,Go核心組成員自己對這個特性沒有積極性,而且認爲經過channel能夠實現相同的方式。併發

1/ Hacked Lock/TryLock 模式

其實,對於標準庫的sync.Mutex要增長這個功能很簡單,下面的方式就是經過hack的方式爲Mutex實現了TryLock的功能。

const mutexLocked = 1 << iota
type Mutex struct {
	mu sync.Mutex
}
func (m *Mutex) Lock() {
	m.mu.Lock()
}
func (m *Mutex) Unlock() {
	m.mu.Unlock()
}
func (m *Mutex) TryLock() bool {
	return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.mu)), 0, mutexLocked)
}
func (m *Mutex) IsLocked() bool {
	return atomic.LoadInt32((*int32)(unsafe.Pointer(&m.mu))) == mutexLocked
}

上面的代碼還額外增長了一個IsLocked方法,不過這個方法通常不經常使用,由於查詢和加鎖這兩個方法執行的時候不是一個原子的操做,素以這個方法通常在調試和打日誌的時候可能有用。若是你看一下Mutex實現的源代碼,就很容易理解上面的這段代碼了,由於mutex實現鎖主要利用CAS對它的一個int32字段作操做。

2/ TryLock By Channel

既然標準庫中不許備在Mutex上增長這個方法,而是推薦使用channel來實現,那麼就讓咱們看看如何使用 channel來實現。

type Mutex struct {
	ch chan struct{}
}
func NewMutex() *Mutex {
	mu := &Mutex{make(chan struct{}, 1)}
	mu.ch <- struct{}{}
	return mu
}
func (m *Mutex) Lock() {
	<-m.ch
}
func (m *Mutex) Unlock() {
	select {
	case m.ch <- struct{}{}:
	default:
		panic("unlock of unlocked mutex")
	}
}
func (m *Mutex) TryLock() bool {
	select {
	case <-m.ch:
		return true
	default:
	}
	return false
}
func (m *Mutex) IsLocked() bool {
	return len(m.ch) > 0
}

你還能夠將緩存的大小從1改成n,用來處理n個鎖(資源)。主要是利用channel邊界狀況下的阻塞特性實現的。

3/ TryLock with Timeout

有時候,咱們在獲取一把鎖的時候,因爲有競爭的關係,在鎖被別的goroutine擁有的時候,當前goroutine沒有辦法當即得到鎖,只能阻塞等待。標準庫並無提供等待超時的功能,咱們嘗試實現它。

type Mutex struct {
	ch chan struct{}
}
func NewMutex() *Mutex {
	mu := &Mutex{make(chan struct{}, 1)}
	mu.ch <- struct{}{}
	return mu
}
func (m *Mutex) Lock() {
	<-m.ch
}
func (m *Mutex) Unlock() {
	select {
	case m.ch <- struct{}{}:
	default:
		panic("unlock of unlocked mutex")
	}
}
func (m *Mutex) TryLock(timeout time.Duration) bool {
	timer := time.NewTimer(timeout)
	select {
	case <-m.ch:
		timer.Stop()
		return true
	case <-time.After(timeout):
	}
	return false
}
func (m *Mutex) IsLocked() bool {
	return len(m.ch) > 0
}


Or Channel 模式

你也能夠把它用Context來改造,不是利用超時,而是利用Context來取消/超時得到鎖的操做,這個做業留給讀者來實現。

當你等待多個信號的時候,若是收到任意一個信號, 就執行業務邏輯,忽略其它的還未收到的信號。

舉個例子, 咱們往提供相同服務的n個節點發送請求,只要任意一個服務節點返回結果,咱們就能夠執行下面的業務邏輯,其它n-1的節點的請求能夠被取消或者忽略。當n=2的時候,這就是back request模式。 這樣能夠用資源來換取latency的提高。

須要注意的是,當收到任意一個信號的時候,其它信號都被忽略。若是用channel來實現,只要從任意一個channel中接收到一個數據,那麼全部的channel均可以被關閉了(依照你的實現,可是輸出的channel確定會被關閉)。

有三種實現的方式: goroutine、reflect和遞歸。

1/ Goroutine方式

func or(chans ...<-chan interface{}) <-chan interface{} {
	out := make(chan interface{})
	go func() {
		var once sync.Once
		for _, c := range chans {
			go func(c <-chan interface{}) {
				select {
				case <-c:
					once.Do(func() { close(out) })
				case <-out:
				}
			}(c)
		}
	}()
	return out
}

爲了不併發關閉輸出channel的問題,關閉操做只執行一次。or函數能夠處理n個channel,它爲每一個channel啓動一個goroutine,只要任意一個goroutine從channel讀取到數據,輸出的channel就被關閉掉了。

2/ Reflect方式

Go的反射庫針對select語句有專門的數據(reflect.SelectCase)和函數(reflect.Select)處理。
因此咱們能夠利用反射「隨機」地從一組可選的channel中接收數據,並關閉輸出channel。

這種方式看起來更簡潔。

func or(channels ...<-chan interface{}) <-chan interface{} {
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}
	orDone := make(chan interface{})
	go func() {
		defer close(orDone)
		var cases []reflect.SelectCase
		for _, c := range channels {
			cases = append(cases, reflect.SelectCase{
				Dir:  reflect.SelectRecv,
				Chan: reflect.ValueOf(c),
			})
		}
		reflect.Select(cases)
	}()
	return orDone
}

3/ 遞歸方式

遞歸方式一貫是比較開腦洞的實現,下面的方式就是分而治之的方式,逐步合併channel,最終返回一個channel。

func or(channels ...<-chan interface{}) <-chan interface{} {
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}
	orDone := make(chan interface{})
	go func() {
		defer close(orDone)
		switch len(channels) {
		case 2:
			select {
			case <-channels[0]:
			case <-channels[1]:
			}
		default:
			m := len(channels) / 2
			select {
			case <-or(channels[:m]...):
			case <-or(channels[m:]...):
			}
		}
	}()
	return orDone
}

Or-Done-Channel模式

在後面的扇入(合併)模式中,咱們仍是會使用相一樣的遞歸模式來合併多個輸入channel,根據 justforfun 的測試結果,這種遞歸的方式要比goroutine、Reflect更有效。

這種模式是咱們常用的一種模式,經過一個信號channel(done)來控制(取消)輸入channel的處理。

一旦從done channel中讀取到一個信號,或者done channel被關閉, 輸入channel的處理則被取消。

這個模式提供一個簡便的方法,把done channel 和 輸入 channel 融合成一個輸出channel。

func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
	valStream := make(chan interface{})
	go func() {
		defer close(valStream)
		for {
			select {
			case <-done:
				return
			case v, ok := <-c:
				if ok == false {
					return
				}
				select {
				case valStream <- v:
				case <-done:
				}
			}
		}
	}()
	return valStream
}

扇入模式
扇入模式(FanIn)是將多個一樣類型的輸入channel合併成一個一樣類型的輸出channel,也就是channel的合併。

1/ Goroutine方式

每一個channel起一個goroutine。

func fanIn(chans ...<-chan interface{}) <-chan interface{} {
	out := make(chan interface{})
	go func() {
		var wg sync.WaitGroup
		wg.Add(len(chans))
		for _, c := range chans {
			go func(c <-chan interface{}) {
				for v := range c {
					out <- v
				}
				wg.Done()
			}(c)
		}
		wg.Wait()
		close(out)
	}()
	return out

2/ Reflect
利用反射庫針對select語句的處理合並輸入channel。

下面這種實現方式其實仍是有些問題的, 在輸入channel讀取比較均勻的時候比較有效,不然性能比較低下。

func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
	out := make(chan interface{})
	go func() {
		defer close(out)
		var cases []reflect.SelectCase
		for _, c := range chans {
			cases = append(cases, reflect.SelectCase{
				Dir:  reflect.SelectRecv,
				Chan: reflect.ValueOf(c),
			})
		}
		for len(cases) > 0 {
			i, v, ok := reflect.Select(cases)
			if !ok { //remove this case
				cases = append(cases[:i], cases[i+1:]...)
				continue
			}
			out <- v.Interface()
		}
	}()
	return out
}

3/ 遞歸方式
這種方式雖然理解起來不直觀,可是性能仍是不錯的(輸入channel不是不少的狀況下遞歸層級不會很高,不會成爲瓶頸)

func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
	switch len(chans) {
	case 0:
		c := make(chan interface{})
		close(c)
		return c
	case 1:
		return chans[0]
	case 2:
		return mergeTwo(chans[0], chans[1])
	default:
		m := len(chans) / 2
		return mergeTwo(
			fanInRec(chans[:m]...),
			fanInRec(chans[m:]...))
	}
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
	c := make(chan interface{})
	go func() {
		defer close(c)
		for a != nil || b != nil {
			select {
			case v, ok := <-a:
				if !ok {
					a = nil
					continue
				}
				c <- v
			case v, ok := <-b:
				if !ok {
					b = nil
					continue
				}
				c <- v
			}
		}
	}()
	return c
}

Tee模式
扇出模式(FanOut)是將一個輸入channel扇出爲多個channel。

扇出行爲至少能夠分爲兩種:

  1. 從輸入channel中讀取一個數據,發送給每一個輸入channel,這種模式稱之爲Tee模式
  2. 從輸入channel中讀取一個數據,在輸出channel中選擇一個channel發送

本節只介紹第一種狀況,下一節介紹第二種狀況

1/ Goroutine方式

將讀取的值發送給每一個輸出channel, 異步模式可能會產生不少的goroutine。

func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
	go func() {
		defer func() {
			for i := 0; i < len(out); i++ {
				close(out[i])
			}
		}()
		for v := range ch {
			v := v
			for i := 0; i < len(out); i++ {
				i := i
				if async {
					go func() {
						out[i] <- v
					}()
				} else {
					out[i] <- v
				}
			}
		}
	}()
}

2/ Reflect方式
這種模式一旦一個輸出channel被阻塞,可能會致使後續的處理延遲。

func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
	go func() {
		defer func() {
			for i := 0; i < len(out); i++ {
				close(out[i])
			}
		}()
		cases := make([]reflect.SelectCase, len(out))
		for i := range cases {
			cases[i].Dir = reflect.SelectSend
		}
		for v := range ch {
			v := v
			for i := range cases {
				cases[i].Chan = reflect.ValueOf(out[i])
				cases[i].Send = reflect.ValueOf(v)
			}
			for _ = range cases { // for each channel
				chosen, _, _ := reflect.Select(cases)
				cases[chosen].Chan = reflect.ValueOf(nil)
			}
		}
	}()
}

分佈模式
分佈模式將從輸入channel中讀取的值往輸出channel中的其中一個發送。

1/ Goroutine方式

roundrobin的方式選擇輸出channel。

func fanOut(ch <-chan interface{}, out []chan interface{}) {
	go func() {
		defer func() {
			for i := 0; i < len(out); i++ {
				close(out[i])
			}
		}()
		// roundrobin
		var i = 0
		var n = len(out)
		for v := range ch {
			v := v
			out[i] <- v
			i = (i + 1) % n
		}
	}()
}

2/ Reflect方式
利用發射隨機的選擇。

func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
	go func() {
		defer func() {
			for i := 0; i < len(out); i++ {
				close(out[i])
			}
		}()
		cases := make([]reflect.SelectCase, len(out))
		for i := range cases {
			cases[i].Dir = reflect.SelectSend
			cases[i].Chan = reflect.ValueOf(out[i])
		}
		for v := range ch {
			v := v
			for i := range cases {
				cases[i].Send = reflect.ValueOf(v)
			}
			_, _, _ = reflect.Select(cases)
		}
	}()
}

eapache
eapache/channels提供了一些channel應用模式的方法,好比上面的扇入扇出模式等。

由於go自己的channel沒法再進行擴展, eapache/channels庫定義了本身的channel接口,並提供了與channel方便的轉換。

eapache/channels 提供了四個方法:

  • Distribute: 從輸入channel讀取值,發送到其中一個輸出channel中。當輸入channel關閉後,輸出channel都被關閉
  • Tee: 從輸入channel讀取值,發送到全部的輸出channel中。當輸入channel關閉後,輸出channel都被關閉
  • Multiplex: 合併輸入channel爲一個輸出channel, 當全部的輸入都關閉後,輸出才關閉
  • Pipe: 將兩個channel串起來

同時對上面的四個函數還提供了WeakXXX的函數,輸入關閉後不會關閉輸出。

下面看看對應的函數的例子。

1/ Distribute

func testDist() {
	fmt.Println("dist:")
	a := channels.NewNativeChannel(channels.None)
	outputs := []channels.Channel{
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
	}
	channels.Distribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
	//channels.WeakDistribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
	go func() {
		for i := 0; i < 5; i++ {
			a.In() <- i
		}
		a.Close()
	}()
	for i := 0; i < 6; i++ {
		var v interface{}
		var j int
		select {
		case v = <-outputs[0].Out():
			j = 0
		case v = <-outputs[1].Out():
			j = 1
		case v = <-outputs[2].Out():
			j = 2
		case v = <-outputs[3].Out():
			j = 3
		}
		fmt.Printf("channel#%d: %d\n", j, v)
	}
}

2/ Tee

func testTee() {
	fmt.Println("tee:")
	a := channels.NewNativeChannel(channels.None)
	outputs := []channels.Channel{
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
	}
	channels.Tee(a, outputs[0], outputs[1], outputs[2], outputs[3])
	//channels.WeakTee(a, outputs[0], outputs[1], outputs[2], outputs[3])
	go func() {
		for i := 0; i < 5; i++ {
			a.In() <- i
		}
		a.Close()
	}()
	for i := 0; i < 20; i++ {
		var v interface{}
		var j int
		select {
		case v = <-outputs[0].Out():
			j = 0
		case v = <-outputs[1].Out():
			j = 1
		case v = <-outputs[2].Out():
			j = 2
		case v = <-outputs[3].Out():
			j = 3
		}
		fmt.Printf("channel#%d: %d\n", j, v)
	}
}

3/ Multiplex

func testMulti() {
	fmt.Println("multi:")
	a := channels.NewNativeChannel(channels.None)
	inputs := []channels.Channel{
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
	}
	channels.Multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
	//channels.WeakMultiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
	go func() {
		for i := 0; i < 5; i++ {
			for j := range inputs {
				inputs[j].In() <- i
			}
		}
		for i := range inputs {
			inputs[i].Close()
		}
	}()
	for v := range a.Out() {
		fmt.Printf("%d ", v)
	}
}

4/ Pipe

func testPipe() {
	fmt.Println("pipe:")
	a := channels.NewNativeChannel(channels.None)
	b := channels.NewNativeChannel(channels.None)
	channels.Pipe(a, b)
	// channels.WeakPipe(a, b)
	go func() {
		for i := 0; i < 5; i++ {
			a.In() <- i
		}
		a.Close()
	}()
	for v := range b.Out() {
		fmt.Printf("%d ", v)
	}
}

集合操做

從channel的行爲來看,它看起來很像一個數據流,因此咱們能夠實現一些相似Scala 集合的操做。

Scala的集合類提供了豐富的操做(方法), 固然其它的一些編程語言或者框架也提供了相似的方法, 好比Apache Spark、Java Stream、ReactiveX等。

下面列出了一些方法的實現,我相信通過一些人的挖掘,相關的方法能夠變成一個很好的類庫,可是目前咱們先看一些例子。

1/ skip

skip函數是從一個channel中跳過開一些數據,而後纔開始讀取。

1.1 skipN

skipN跳過開始的N個數據。

func skipN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for i := 0; i < num; i++ {
			select {
			case <-done:
				return
			case takeStream <- <-valueStream:
			}
		}
	}()
	return takeStream
}

1.2 skipFn

skipFn 提供Fn函數爲true的數據,好比跳過偶數。

func skipFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for {
			select {
			case <-done:
				return
			case v := <-valueStream:
				if !fn(v) {
					takeStream <- v
				}
			}
		}
	}()
	return takeStream
}

1.3 skipWhile

跳過開頭函數fn爲true的數據。

func skipWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		take := false
		for {
			select {
			case <-done:
				return
			case v := <-valueStream:
				if !take {
					take = !fn(v)
					if !take {
						continue
					}
				}
				takeStream <- v
			}
		}
	}()
	return takeStream
}

2/ take

skip的反向操做,讀取一部分數據。

 

2.1 takeN

takeN 讀取開頭N個數據。

func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for i := 0; i < num; i++ {
			select {
			case <-done:
				return
			case takeStream <- <-valueStream:
			}
		}
	}()
	return takeStream
}

2.2 takeFn

takeFn 只篩選知足fn的數據。

func takeFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for {
			select {
			case <-done:
				return
			case v := <-valueStream:
				if fn(v) {
					takeStream <- v
				}
			}
		}
	}()
	return takeStream
}

2.3 takeWhile
takeWhile只挑選開頭知足fn的數據。

func takeWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for {
			select {
			case <-done:
				return
			case v := <-valueStream:
				if !fn(v) {
					return
				}
				takeStream <- v
			}
		}
	}()
	return takeStream
}

3/ flat
平展(flat)操做是一個有趣的操做。

若是輸入是一個channel,channel中的數據仍是相同類型的channel, 那麼flat將返回一個輸出channel,輸出channel中的數據是輸入的各個channel中的數據。

它與扇入不一樣,扇入的輸入channel在調用的時候就是固定的,而且以數組的方式提供,而flat的輸入是一個channel,能夠運行時隨時的加入channel。

func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
	valStream := make(chan interface{})
	go func() {
		defer close(valStream)
		for {
			select {
			case <-done:
				return
			case v, ok := <-c:
				if ok == false {
					return
				}
				select {
				case valStream <- v:
				case <-done:
				}
			}
		}
	}()
	return valStream
}
func flat(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
	valStream := make(chan interface{})
	go func() {
		defer close(valStream)
		for {
			var stream <-chan interface{}
			select {
			case maybeStream, ok := <-chanStream:
				if ok == false {
					return
				}
				stream = maybeStream
			case <-done:
				return
			}
			for val := range orDone(done, stream) {
				select {
				case valStream <- val:
				case <-done:
				}
			}
		}
	}()
	return valStream
}

4/ map
map和reduce是一組經常使用的操做。

map將一個channel映射成另一個channel, channel的類型能夠不一樣。

func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
	out := make(chan interface{})
	if in == nil {
		close(out)
		return out
	}
	go func() {
		defer close(out)
		for v := range in {
			out <- fn(v)
		}
	}()
	return out
}

好比你能夠處理一個公司員工工資的channel, 輸出一個扣稅以後的員工工資的channel。由於map是go的關鍵字,因此咱們不能命名函數類型爲map,這裏用mapChan代替。

5/ reduce

func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
	if in == nil {
		return nil
	}
	out := <-in
	for v := range in {
		out = fn(out, v)
	}
	return out
}
你能夠用`reduce`實現`sum`、`max`、`min`等聚合操做。

本文列出了channel的一些深刻應用的模式,相信經過閱讀本文,你能夠更加深刻的瞭解Go的channel類型,並在開發中靈活的應用channel。也歡迎你在評論中提出更多的 channel的應用模式。

總結

全部的代碼能夠在github上找到: smallnest/channels

參考資料

  1. https://github.com/kat-co/concurrency-in-go-src
  2. https://github.com/campoy/justforfunc/tree/master/27-merging-chans
  3. https://github.com/eapache/channels
  4. https://github.com/LK4D4/trylock
  5. https://stackoverflow.com/questions/36391421/explain-dont-communicate-by-sharing-memory-share-memory-by-communicating
  6. https://github.com/lrita/gosync
  7. https://www.ardanlabs.com/blog/2017/10/the-behavior-of-channels.html
相關文章
相關標籤/搜索