Go語言:徹底深刻理解數據並行和函數式 Map 1

你們好,我是一名 Go 語言工程師。我平日裏也在教課。前幾天給了一節關於 Go 語言數據並行的講座。我在這裏整理成文。但願你們喜歡。git


從 Map、Reduce 等函數式編程講起

在開發中,咱們常常會將一個某種類型 T 的序列轉化爲類型 T2 的序列。最原始的方法就是使用 for loop。不過,for loop 不只拗口,並且若是要作到良好的異步或者併發處理,就要寫不少重複的代碼。github

因此,在各個編程語言中,標準庫都提供了接口穩定的map, reduce, filter等函數。即便連 JavaScript 社區也開發了 RxJS 這樣的函數式編程類庫。編程

然而,Go 語言在標準可層面並無這樣的類庫。做爲上層業務邏輯開發者,這是不太方便的。因此社區內也有人嘗試寫出好的 Map、Reduce、Filter 等類庫。一個最大的動力和需求就源於 Go 語言極度擅長併發編程。而 Map、Filter、Fold這三個函數是徹底能夠數據並行的(Reduce 不太行,咱們以後會講)。在業界用 Go 語言來作 Data Pipeline 其實也是很好的方案。數據結構

不過,寫出一個好的 Map、Reduce 類庫是須要考慮不少設計問題的。本文旨在給你們詳盡而且深刻地講解這個問題。全部地代碼均可以在 github.com/CreatCodeBu… 的數據並行(data concurrency)目錄下找到。併發


如何寫一個 Map

讓咱們從最基本的 for loop 開始app

func Map(data []int, mapper func(int) int) []int {
	results := make([]int, len(data))
	for i, ele := range data {
		results[i] = mapper(ele)
	}
	return results
}
複製代碼

這是一個最簡單的 Map 實現。將一個 []int Map 到另一個 []int 中。用法就是異步

func TestMap(t *testing.T) {
	results := Map([]int{1, 2, 3}, func(x int) int { return x + 1 })
	require.Equal(t, []int{2, 3, 4}, results)
}
複製代碼

如你所見,1, 2, 3 變成了 2, 3, 4, 由於咱們的 lambda 爲 x + 1編程語言

然而,這裏這個實現有兩個問題。函數式編程

第一,咱們定死了原始類型和目標類型。若是咱們想從 int Map 到 string,就要新寫一個函數。

Go 沒有泛型,這是形成這個緣由的其中一點。不過,這只是開發時中的問題。好比 C++ 有基於代碼生成模板的泛型,那麼實際上是在編譯時生成更多的、屬於不一樣類型的代碼。因此,無論你是手寫、仍是編譯器生成,在運行時代碼都是同樣多的。固然,若是像 Java 那樣,經過運行時類型檢查來實現泛型、就是另一回事了。函數

因此,我認爲這頂多叫作麻煩,而不是問題。

第二,更大的問題來自運行時

這個實現最大的問題,就是定死了原始數據序列和目標數據序列的內存模型。爲何必需要是 Slice 這種數據結構呢?Slice 所帶來的一個反作用就是,數據最終會用 Array 存起來。而 Array 是連續的內存。然而,Map 這個函數從邏輯上根本沒有要求數據要在物理上連續。Map 甚至都沒有要求數據在邏輯上是連續的。

我爲甚麼不能從一個 Slice 映射到一個隊列呢?爲何不能從一個 channel 映射到一個文件流呢?Map 的本質就是抽象的序列(數據流)之間的映射。因此,咱們的實現應該表現出這一點,而且同時不要帶來內存連續這樣的反作用。我不是說內存連續很差,而是說沒必要要。優秀的設計反應事物的本質。優秀的實現沒有沒必要要的東西。

一個更好的 Map

先解決第二個問題

// producer 是一個數據生產者。Next 會迭代並返回序列中的下一個元素。
// 返回 io.EOF 表示窮盡了序列。其餘錯誤值表示 producer 自己遇到了錯誤。
type producer interface {
	Next() (string, error)
}

// consumer 是數據消費者。Send 會讀入新的數據。
type consumer interface {
	Send(int64)
}

// 返回錯誤若是 string 不能表示 int。好比 "xxx" 不是一個正確的 int 表示形式。
type mapper func(string) (int64, error) func BetterMap(p producer, c consumer, mapper mapper) error {
	for {
		next, err := p.Next()
		if err != nil {
			if err == io.EOF {
				break
			} else {
				return err // 生產者自己遇到錯誤,終止 Map。
			}
		}
		datum, err := mapper(next)
		if err != nil {
			return err // mapper 出了問題,終止 Map。
		}
		c.Send(datum)
	}
	return nil
}

type StringProducer struct {
	index int
	data  []string
}

func (ip *StringProducer) Next() (string, error) {
	if ip.index < len(ip.data) {
		defer func() { ip.index++ }()
		return ip.data[ip.index], nil
	}
	return "", io.EOF
}

type OutputConsumer struct{}

func (c OutputConsumer) Send(ele int64) {
	fmt.Println(ele)
}
複製代碼

咱們完成了一個很大的提高,將數據的生產者和消費者的具體實現交給了 Map 的調用者,而不是 Map 本身來定義。Map 只定義 2 個你們都贊成的接口。

用起來也很是方便

func ExampleBetterMap() {
	BetterMap(&StringProducer{data: []string{"1", "10", "11"}}, OutputConsumer{}, func(str string) (int64, error) {
		// 這裏的 lambda 將字符串以二進制形式轉爲整數
		return strconv.ParseInt(str, 2, 64)
	})
	// Output: 1
	// 2
	// 3
}
複製代碼

如你所見,咱們能夠隨意地使用咱們本身的實現。consumer甚至能夠將結果 IO 出去,而不是存在內存裏。這樣 Map 函數就沒有影響程序的內存效率。調用者代碼和 Map 本身的權責分明瞭。一樣的道理,producer也能夠將數據從其餘源流讀進來,而不是一次性地所有存在本身內部。

再來解決第一個問題

不過,咱們仍然須要解決第一個問題:就是針對不一樣類型的 Map。

咱們會在下文中講解。

掃碼關注哲的代碼實驗室

相關文章
相關標籤/搜索