github.com/ethereum/go…包實現了一個事件發佈訂閱的庫,使用接口主要是event.Feed 類型,之前還有event.TypeMux 類型,看代碼註釋,說過期了,目前主要使用Feed 類型。git
package main
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/event"
)
func main() {
type someEvent struct{ I int }
var feed event.Feed
var wg sync.WaitGroup
ch := make(chan someEvent)
sub := feed.Subscribe(ch)
wg.Add(1)
go func() {
defer wg.Done()
for event := range ch {
fmt.Printf("Received: %#v\n", event.I)
}
sub.Unsubscribe()
fmt.Println("done")
}()
feed.Send(someEvent{5})
feed.Send(someEvent{10})
feed.Send(someEvent{7})
feed.Send(someEvent{14})
close(ch)
wg.Wait()
}
複製代碼
經過調用event.Feed 類型的Subscrible
方法訂閱事件通知,須要使用者提早指定接收事件的channel,Subscribe返回Subscription對象,是一個接口類型:github
type Subscription interface {
Err() <-chan error // returns the error channel
Unsubscribe() // cancels sending of events, closing the error channel
}
複製代碼
Err()
返回獲取error 的channel,調用Unsubscribe()
取消事件訂閱。事件的發佈者調用 Send()
方法,發送事件。 能夠使用同一個channel實例,屢次調用Feed 的Subscrible()
方法:golang
package main
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/event"
)
func main() {
var (
feed event.Feed
recv sync.WaitGroup
sender sync.WaitGroup
)
ch := make(chan int)
feed.Subscribe(ch)
feed.Subscribe(ch)
feed.Subscribe(ch)
expectSends := func(value, n int) {
defer sender.Done()
if nsent := feed.Send(value); nsent != n {
fmt.Printf("send delivered %d times, want %d\n", nsent, n)
}
}
expectRecv := func(wantValue, n int) {
defer recv.Done()
for v := range ch {
if v != wantValue {
fmt.Printf("received %d, want %d\n", v, wantValue)
} else {
fmt.Printf("recv v = %d\n", v)
}
}
}
sender.Add(3)
for i := 0; i < 3; i++ {
go expectSends(1, 3)
}
go func() {
sender.Wait()
close(ch)
}()
recv.Add(1)
go expectRecv(1, 3)
recv.Wait()
}
複製代碼
這個例子中, 有三個訂閱者, 有三個發送者, 每一個發送者發送三次1, 同一個channel ch 裏面被推送了9個1. ethereum event 庫還提供了一些高級別的方便接口, 好比event.NewSubscription
函數,接收一個函數類型,做爲數據的生產者, producer自己在後臺一個單獨的goroutine內執行, 後臺goroutine往用戶的channel 發送數據:app
package main
import (
"fmt"
"github.com/ethereum/go-ethereum/event"
)
func main() {
ch := make(chan int)
sub := event.NewSubscription(func(quit <-chan struct{}) error {
for i := 0; i < 10; i++ {
select {
case ch <- i:
case <-quit:
fmt.Println("unsubscribed")
return nil
}
}
return nil
})
for i := range ch {
fmt.Println(i)
if i == 4 {
sub.Unsubscribe()
break
}
}
}
複製代碼
庫也提供了event.SubscriptionScope類型用於追蹤多個訂閱者,提供集中的取消訂閱功能:函數
package main
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/event"
)
// This example demonstrates how SubscriptionScope can be used to control the lifetime of
// subscriptions.
//
// Our example program consists of two servers, each of which performs a calculation when
// requested. The servers also allow subscribing to results of all computations.
type divServer struct{ results event.Feed }
type mulServer struct{ results event.Feed }
func (s *divServer) do(a, b int) int {
r := a / b
s.results.Send(r)
return r
}
func (s *mulServer) do(a, b int) int {
r := a * b
s.results.Send(r)
return r
}
// The servers are contained in an App. The app controls the servers and exposes them
// through its API.
type App struct {
divServer
mulServer
scope event.SubscriptionScope
}
func (s *App) Calc(op byte, a, b int) int {
switch op {
case '/':
return s.divServer.do(a, b)
case '*':
return s.mulServer.do(a, b)
default:
panic("invalid op")
}
}
// The app's SubscribeResults method starts sending calculation results to the given
// channel. Subscriptions created through this method are tied to the lifetime of the App
// because they are registered in the scope.
func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription {
switch op {
case '/':
return s.scope.Track(s.divServer.results.Subscribe(ch))
case '*':
return s.scope.Track(s.mulServer.results.Subscribe(ch))
default:
panic("invalid op")
}
}
// Stop stops the App, closing all subscriptions created through SubscribeResults.
func (s *App) Stop() {
s.scope.Close()
}
func main() {
var (
app App
wg sync.WaitGroup
divs = make(chan int)
muls = make(chan int)
)
divsub := app.SubscribeResults('/', divs)
mulsub := app.SubscribeResults('*', muls)
wg.Add(1)
go func() {
defer wg.Done()
defer fmt.Println("subscriber exited")
for {
select {
case result := <-divs:
fmt.Println("division happened:", result)
case result := <-muls:
fmt.Println("multiplication happened:", result)
case divErr := <-divsub.Err():
fmt.Println("divsub.Err() :", divErr)
return
case mulErr := <-mulsub.Err():
fmt.Println("mulsub.Err() :", mulErr)
return
}
}
}()
app.Calc('/', 22, 11)
app.Calc('*', 3, 4)
app.Stop()
wg.Wait()
}
複製代碼
SubscriptionScope的Close() 方法接收Track方法的返回值 , Track 方法負責追蹤訂閱者。ui
本文由 Copernicus團隊 喻建
寫做,轉載無需受權。this