Go 每日一庫之 rxgo

簡介

ReactiveX,簡稱爲 Rx,是一個異步編程的 API。與 callback(回調)、promise(JS 提供這種方式)和 deferred(Python 的 twisted 網絡編程庫就是使用這種方式)這些異步編程方式有所不一樣,Rx 是基於事件流的。這裏的事件能夠是系統中產生或變化的任何東西,在代碼中咱們通常用對象表示。在 Rx 中,事件流被稱爲 Observable(可觀察的)。事件流須要被 Observer(觀察者)處理纔有意義。想象一下,咱們平常做爲一個 Observer,一個重要的工做就是觀察 BUG 的事件流。每次發現一個 BUG,咱們都須要去解決它。react

Rx 僅僅只是一個 API 規範的定義。Rx 有多種編程語言實現,RxJava/RxJS/Rx.NET/RxClojure/RxSwift。RxGo 是 Rx 的 Go 語言實現。藉助於 Go 語言簡潔的語法和強大的併發支持(goroutine、channel),Rx 與 Go 語言的結合很是完美。git

pipelines (官方博客:https://blog.golang.org/pipelines)是 Go 基礎的併發編程模型。其中包含,fan-in——多個 goroutine 產生數據,一個goroutine 處理數據,fan-out——一個 goroutine 產生數據,多個 goroutine 處理數據,fan-inout——多個 goroutine 產生數據,多個 goroutine 處理數據。它們都是經過 channel 鏈接。RxGo 的實現就是基於 pipelines 的理念,而且提供了方便易用的包裝和強大的擴展。github

快速使用

本文代碼使用 Go Modules。golang

建立目錄並初始化:編程

$ mkdir rxgo && cd rxgo
$ go mod init github.com/darjun/go-daily-lib/rxgo

安裝rxgo庫:json

$ go get -u github.com/reactivex/rxgo/v2

編碼:設計模式

package main

import (
  "fmt"

  "github.com/reactivex/rxgo/v2"
)

func main() {
  observable := rxgo.Just(1, 2, 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {
    fmt.Println(item.V)
  }
}

使用 RxGo 的通常流程以下:promise

  • 使用相關的 Operator 建立 ObservableOperator 就是用來建立 Observable 的。這些術語都比較難貼切地翻譯,並且英文也很好懂,就不強行翻譯了;
  • 中間各個階段可使用過濾操做篩選出咱們想要的數據,使用轉換操做對數據進行轉換;
  • 調用 ObservableObserve()方法,該方法返回一個<- chan rxgo.Item。而後for range遍歷便可。

GitHub 上一張圖很形象地描繪了這個過程:緩存

  • 首先使用Just建立一個僅有若干固定數據的 Observable
  • 使用Map()方法執行轉換(將圓形轉爲方形);
  • 使用Filter()方法執行過濾(過濾掉黃色的方形)。

看懂了這張圖片,就能瞭解 RxGo 工做的基本流程了。微信

上面是簡單的示例,沒有過濾、轉換操做的使用。

運行:

$ go run main.go 
1
2
3
4
5

關於上面的示例,須要注意:

Just使用柯里化(currying)讓它能夠在第一個參數中接受多個數據,在第二個參數中接受多個選項定製行爲。柯里化是函數化編程的思想,簡單來講就是經過在函數中返回函數,以此來減小每一個函數的參數個數。例如:

func add(value int) func (int) int {
  return func (a int) int {
    return value + a
  }
}

fmt.Prinlnt(add(5)(10)) // 15

因爲 Go 不支持多個可變參數,Just經過柯里化迂迴地實現了這個功能:

// rxgo/factory.go
func Just(items ...interface{}) func(opts ...Option) Observable {
  return func(opts ...Option) Observable {
    return &ObservableImpl{
      iterable: newJustIterable(items...)(opts...),
    }
  }
}

實際上rxgo.Item還能夠包含錯誤。因此在使用時,咱們應該作一層判斷:

func main() {
  observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {
    if item.Error() {
      fmt.Println("error:", item.E)
    } else {
      fmt.Println(item.V)
    }
  }
}

運行:

$ go run main.go 
1
2
error: unknown
3
4
5

咱們使用item.Error()檢查是否出現錯誤。而後使用item.V訪問數據,item.E訪問錯誤。

除了使用for range以外,咱們還能夠調用 ObservableForEach()方法來實現遍歷。ForEach()接受 3 個回調函數:

  • NextFunc:類型爲func (v interface {}),處理數據;
  • ErrFunc:類型爲func (err error),處理錯誤;
  • CompletedFunc:類型爲func ()Observable 完成時調用。

有點Promise那味了。使用ForEach(),能夠將上面的示例改寫爲:

func main() {
  observable := rxgo.Just(1, 2, 3, 4, 5)()
  <-observable.ForEach(func(v interface{}) {
    fmt.Println("received:", v)
  }, func(err error) {
    fmt.Println("error:", err)
  }, func() {
    fmt.Println("completed")
  })
}

運行:

$ go run main.go 
received: 1
received: 2
received: 3
received: 4
received: 5
completed

ForEach()其實是異步執行的,它返回一個接收通知的 channel。當 Observable 數據發送完畢時,該 channel 會關閉。因此若是要等待ForEach()執行完成,咱們須要使用<-。上面的示例中若是去掉<-,可能就沒有輸出了,由於主 goroutine 結束了,整個程序就退出了。

建立 Observable

上面使用最簡單的方式建立 Observable:直接調用Just()方法傳入一系列數據。下面再介紹幾種建立 Observable 的方式。

Create

傳入一個[]rxgo.Producer的切片,其中rxgo.Producer的類型爲func(ctx context.Context, next chan<- Item)。咱們能夠在代碼中調用rxgo.Of(value)生成數據,rxgo.Error(err)生成錯誤,而後發送到next通道中:

func main() {
  observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
    next <- rxgo.Of(1)
    next <- rxgo.Of(2)
    next <- rxgo.Of(3)
    next <- rxgo.Error(errors.New("unknown"))
    next <- rxgo.Of(4)
    next <- rxgo.Of(5)
  }})

  ch := observable.Observe()
  for item := range ch {
    if item.Error() {
      fmt.Println("error:", item.E)
    } else {
      fmt.Println(item.V)
    }
  }
}

固然,分紅兩個rxgo.Producer也是同樣的效果:

observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
  next <- rxgo.Of(1)
  next <- rxgo.Of(2)
  next <- rxgo.Of(3)
  next <- rxgo.Error(errors.New("unknown"))
  }, func(ctx context.Context, next chan<- rxgo.Item) {
  next <- rxgo.Of(4)
  next <- rxgo.Of(5)
}})

FromChannel

FromChannel能夠直接從一個已存在的<-chan rxgo.Item對象中建立 Observable

func main() {
    ch := make(chan rxgo.Item)
    go func() {
        for i := 1; i <= 5; i++ {
            ch <- rxgo.Of(i)
        }
        close(ch)
    }()

    observable := rxgo.FromChannel(ch)
    for item := range observable.Observe() {
        fmt.Println(item.V)
    }
}

注意:

通道須要手動調用close()關閉,上面Create()方法內部rxgo自動幫咱們執行了這個步驟。

Interval

Interval以傳入的時間間隔生成一個無窮的數字序列,從 0 開始:

func main() {
  observable := rxgo.Interval(rxgo.WithDuration(5 * time.Second))
  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

上面的程序啓動後,第 5s 輸出 0,第 10s 輸出 1,...,並且不會中止。

咱們能夠用time.Ticker實現相同的功能:

func main() {
  t := time.NewTicker(5 * time.Second)

  var count int
  for range t.C {
    fmt.Println(count)
    count++
  }
}

Range

Range能夠生成一個範圍內的數字:

func main() {
  observable := rxgo.Range(0, 3)
  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

上面代碼依次輸出 0,1,2,3。

Repeat

在已存在的 Observable 對象上調用Repeat,能夠實現每隔指定時間,重複一次該序列,一共重複指定次數:

func main() {
  observable := rxgo.Just(1, 2, 3)().Repeat(
    3, rxgo.WithDuration(1*time.Second),
  )
  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

運行上面的代碼,當即輸出 1,2,3,而後等待 1s,又輸出一次 1,2,3,而後又等待 1s,最後又輸出一次 1,2,3。

Start

能夠給Start方法傳入[]rxgo.Supplier做爲參數,它能夠包含任意數量的rxgo.Supplier類型。rxgo.Supplier的底層類型爲:

// rxgo/types.go
var Supplier func(ctx context.Context) rxgo.Item

Observable 內部會依次調用這些rxgo.Supplier生成rxgo.Item

func Supplier1(ctx context.Context) rxgo.Item {
  return rxgo.Of(1)
}

func Supplier2(ctx context.Context) rxgo.Item {
  return rxgo.Of(2)
}

func Supplier3(ctx context.Context) rxgo.Item {
  return rxgo.Of(3)
}

func main() {
  observable := rxgo.Start([]rxgo.Supplier{Supplier1, Supplier2, Supplier3})
  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

Observable 分類

根據數據在何處生成,Observable 被分爲 HotCold 兩種類型(類比熱啓動和冷啓動)。數據在其它地方生成的被成爲 Hot Observable。相反,在 Observable 內部生成數據的就是 Cold Observable

使用上面介紹的方法建立的實際上都是 Hot Observable

func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 0; i < 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

上面建立的是 Hot Observable。可是有個問題,第一次Observe()消耗了全部的數據,第二個就沒有數據輸出了。

Cold Observable 就不會有這個問題,由於它建立的流是獨立於每一個觀察者的。即每次調用Observe()都建立一個新的 channel。咱們使用Defer()方法建立 Cold Observable,它的參數與Create()方法同樣。

func main() {
  observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    for i := 0; i < 3; i++ {
      ch <- rxgo.Of(i)
    }
  }})

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

輸出:

$ go run main.go
0
1
2
0
1
2

可鏈接的 Observable

可鏈接的(Connectable)Observable 對普通的 Observable 進行了一層組裝。調用它的Observe()方法時並不會馬上產生數據。使用它,咱們能夠等全部的觀察者都準備就緒了(即調用了Observe()方法)以後,再調用其Connect()方法開始生成數據。咱們經過兩個示例比較使用普通的 Observable 和可鏈接的 Observable 有何不一樣。

普通的:

func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("Second observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
}

上例中咱們使用DoOnNext()方法來註冊觀察者。因爲DoOnNext()方法是異步執行的,因此爲了等待結果輸出,在最後增長了一行time.Sleep。運行:

$ go run main.go
First observer: 1
First observer: 2
First observer: 3
before subscribe second observer

由輸出能夠看出,註冊第一個觀察者以後就開始產生數據了。

咱們經過在建立 Observable 的方法中指定rxgo.WithPublishStrategy()選項就能夠建立可鏈接的 Observable

func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("Second observer: %d\n", i)
  })

  observable.Connect(context.Background())
  time.Sleep(3 * time.Second)
}

運行輸出:

$ go run main.go
before subscribe second observer
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3

上面是等兩個觀察者都註冊以後,而且手動調用了 Observable 的Connect()方法才產生數據。並且可鏈接的 Observable 有一個特性:它是冷啓動的!!!,即每一個觀察者都會收到一份相同的拷貝。

轉換 Observable

rxgo 提供了不少轉換函數,能夠修改通過它的rxgo.Item,而後再發送給下一個階段。

Map

Map()方法簡單修改它收到的rxgo.Item而後發送到下一個階段(轉換或過濾)。Map()接受一個類型爲func (context.Context, interface{}) (interface{}, error)的函數。第二個參數就是rxgo.Item中的數據,返回轉換後的數據。若是出錯,則返回錯誤。

func main() {
  observable := rxgo.Just(1, 2, 3)()

  observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    return i.(int)*2 + 1, nil
  }).Map(func(_ context.Context, i interface{}) (interface{}, error) {
    return i.(int)*3 + 2, nil
  })

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

上例中每一個數字通過兩個Map,第一個Map執行2 * i + 1,第二個Map執行3 * i + 2。即對於每一個數字來講,最終進行的變換爲3 * (2 * i + 1) + 2。運行:

$ go run main.go
11
17
23

Marshal

Marshal對通過它的數據進行一次Marshal。這個Marshal能夠是json.Marshal/proto.Marshal,甚至咱們本身寫的Marshal函數。它接受一個類型爲func(interface{}) ([]byte, error)的函數用於對數據進行處理。

type User struct {
  Name string `json:"name"`
  Age  int    `json:"age"`
}

func main() {
  observable := rxgo.Just(
    User{
      Name: "dj",
      Age:  18,
    },
    User{
      Name: "jw",
      Age:  20,
    },
  )()

  observable = observable.Marshal(json.Marshal)

  for item := range observable.Observe() {
    fmt.Println(string(item.V.([]byte)))
  }
}

因爲Marshal操做返回的是[]byte類型,咱們須要進行類型轉換以後再輸出。

Unmarshal

既然有Marshal,也就有它的相反操做UnmarshalUnmarshal用於將一個[]byte類型轉換爲相應的結構體或其餘類型。與Marshal不一樣,Unmarshal須要知道轉換的目標類型,因此須要提供一個函數用於生成該類型的對象。而後將[]byte數據Unmarshal到該對象中。Unmarshal接受兩個參數,參數一是類型爲func([]byte, interface{}) error的函數,參數二是func () interface{}用於生成實際類型的對象。咱們拿上面的例子中生成的 JSON 字符串做爲數據,將它們從新UnmarshalUser對象:

type User struct {
  Name string `json:"name"`
  Age  int    `json:"age"`
}

func main() {
  observable := rxgo.Just(
    `{"name":"dj","age":18}`,
    `{"name":"jw","age":20}`,
  )()

  observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    return []byte(i.(string)), nil
  }).Unmarshal(json.Unmarshal, func() interface{} {
    return &User{}
  })

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

因爲Unmarshaller接受[]byte類型的參數,咱們在Unmarshal以前加了一個Map用於將string轉爲[]byte。運行:

$ go run main.go
&{dj 18}
&{jw 20}

Buffer

Buffer按照必定的規則收集接收到的數據,而後一次性發送出去(做爲切片),而不是收到一個發送一個。有 3 種類型的Buffer

  • BufferWithCount(n):每收到n個數據發送一次,最後一次可能少於n個;
  • BufferWithTime(n):發送在一個時間間隔n內收到的數據;
  • BufferWithTimeOrCount(d, n):收到n個數據,或通過d時間間隔,發送當前收到的數據。

BufferWithCount

func main() {
  observable := rxgo.Just(1, 2, 3, 4)()

  observable = observable.BufferWithCount(3)

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

運行:

$ go run main.go
[1 2 3]
[4]

注意,最後一組只有一個。

BufferWithTime

func main() {
  ch := make(chan rxgo.Item, 1)

  go func() {
    i := 0
    for range time.Tick(time.Second) {
      ch <- rxgo.Of(i)
      i++
    }
  }()

  observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(3 * time.Second))

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

每 3s 發送一次:

$ go run main.go
[0 1 2]
[3 4 5]
[6 7 8]
...

BufferWithTimeOrCount

func main() {
  ch := make(chan rxgo.Item, 1)

  go func() {
    i := 0
    for range time.Tick(time.Second) {
      ch <- rxgo.Of(i)
      i++
    }
  }()

  observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(3*time.Second), 2)

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

上面 3s 能夠收集 3 個數據,可是設置了收集 2 個就發送。因此,運行輸出爲:

$ go run main.go
[0 1]
[2 3]
[4 5]
...

GroupBy

GroupBy根據傳入一個 Hash 函數,爲每一個不一樣的結果分別建立新的 Observable。換句話說,GroupBy生成一個數據類型爲 ObservableObservable

func main() {
  count := 3
  observable := rxgo.Range(0, 10).GroupBy(count, func(item rxgo.Item) int {
    return item.V.(int) % count
  }, rxgo.WithBufferedChannel(10))

  for subObservable := range observable.Observe() {
    fmt.Println("New observable:")

    for item := range subObservable.V.(rxgo.Observable).Observe() {
      fmt.Printf("item: %v\n", item.V)
    }
  }
}

上面根據每一個數模 3 的餘數將整個流分爲 3 組。運行:

$ go run main.go 
New observable:
item: 0
item: 3
item: 6
item: 9
New observable:
item: 1
item: 4
item: 7
item: 10
New observable:
item: 2
item: 5
item: 8

注意rxgo.WithBufferedChannel(10)的使用,因爲咱們的數字是連續生成的,依次爲 0->1->2->...->9->10。而 Observable 默認是惰性的,即由Observe()驅動。內層的Observe()在返回一個 0 以後就等待下一個數,可是下一個數 1 不在此 Observable 中。因此會陷入死鎖。使用rxgo.WithBufferedChannel(10),設置它們之間的鏈接 channel 緩衝區大小爲 10,這樣即便咱們未取出 channel 裏面的數字,上游仍是能發送數字進來。

並行操做

默認狀況下,這些轉換操做都是串行的,即只有一個 goroutine 負責執行轉換函數。咱們也可使用rxgo.WithPool(n)選項設置運行n個 goroutine,或者rxgo.WitCPUPool()選項設置運行與邏輯 CPU 數量相等的 goroutine。

func main() {
  observable := rxgo.Range(1, 100)

  observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    time.Sleep(time.Duration(rand.Int31()))
    return i.(int)*2 + 1, nil
  }, rxgo.WithCPUPool())

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

因爲是並行,因此輸出順序就不肯定了。爲了讓不肯定性更明顯一點,我在代碼中加了一行time.Sleep

過濾 Observable

Observable 中發送過來的數據並不必定都是咱們須要的,咱們要把不想要的過濾掉。

Filter

Filter()接受一個類型爲func (i interface{}) bool的參數,經過的數據使用這個函數斷言,返回true的將發送給下一個階段。不然,丟棄。

func main() {
  observable := rxgo.Range(1, 10)

  observable = observable.Filter(func(i interface{}) bool {
    return i.(int)%2 == 0
  })

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

上面過濾掉奇數,最後只剩下偶數:

$ go run main.go
2
4
6
8
10

ElementAt

ElementAt()只發送指定索引的數據,如ElementAt(2)只發送索引爲 2 的數據,即第 3 個數據。

func main() {
  observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

上面代碼輸出 2。

Debounce

Debounce()比較有意思,它收到數據後還會等待指定的時間間隔,後續間隔內沒有收到其餘數據纔會發送剛開始的數據。

func main() {
  ch := make(chan rxgo.Item)

  go func() {
    ch <- rxgo.Of(1)
    time.Sleep(2 * time.Second)
    ch <- rxgo.Of(2)
    ch <- rxgo.Of(3)
    time.Sleep(2 * time.Second)
    close(ch)
  }()

  observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))
  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

上面示例,先收到 1,而後 2s 內沒收到數據,因此發送 1。接着收到了數據 2,因爲立刻又收到了 3,因此 2 不會發送。收到 3 以後 2s 內沒有收到數據,發送了 3。因此最後輸出爲 1,3。

Distinct

Distinct()會記錄它發送的全部數據,它不會發送重複的數據。因爲數據格式多樣,Distinct()要求咱們提供一個函數,根據原數據返回一個惟一標識碼(有點相似哈希值)。基於這個標識碼去重。

func main() {
  observable := rxgo.Just(1, 2, 2, 3, 3, 4, 4)().
    Distinct(func(_ context.Context, i interface{}) (interface{}, error) {
      return i, nil
    })
  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

依次輸出 1,2,3,4,沒有重複。

Skip

Skip能夠跳過前若干個數據。

func main() {
  observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)
  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

Take

Take只取前若干個數據。

func main() {
  observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)
  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

選項

rxgo 提供的大部分方法的最後一個參數是一個可變長的選項類型。這是 Go 中特有的、經典的選項設計模式。咱們前面已經使用了:

  • rxgo.WithBufferedChannel(10):設置 channel 的緩存大小;
  • rxgo.WithPool(n)/rxgo.WithCpuPool():使用多個 goroutine 執行轉換操做;
  • rxgo.WithPublishStrategy():使用發佈策略,即建立可鏈接的 Observable

除此以外,rxgo 還提供了不少其餘選項。留待你們自行探索了。

總結

rxgo 讓基於 pipelines 的併發編程變得更容易!

你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. rxgo GitHub:https://github.com/jordan-wright/rxgo
  2. Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib

個人博客:https://darjun.github.io

歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~

相關文章
相關標籤/搜索