Go 中的 channel 與 Java BlockingQueue 的本質區別

前言

最近在實現兩個需求,因爲二者之間並無依賴關係,因此想利用隊列進行解耦;但在 Go 的標準庫中並無現成可用而且併發安全的數據結構;但 Go 提供了一個更加優雅的解決方案,那就是 channeljava

channel 應用

GoJava 的一個很大的區別就是併發模型不一樣,Go 採用的是 CSP(Communicating sequential processes) 模型;用 Go 官方的說法:git

Do not communicate by sharing memory; instead, share memory by communicating.github

翻譯過來就是:不用使用共享內存來通訊,而是用通訊來共享內存。編程

而這裏所提到的通訊,在 Go 裏就是指代的 channelapi

只講概念並不能快速的理解與應用,因此接下來會結合幾個實際案例更方便理解。安全

futrue task

Go 官方沒有提供相似於 JavaFutureTask 支持:markdown

public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Task task = new Task();
        FutureTask<String> futureTask = new FutureTask<>(task);
        executorService.submit(futureTask);
        String s = futureTask.get();
        System.out.println(s);
        executorService.shutdown();
    }
}

class Task implements Callable<String> {
    @Override
    public String call() throws Exception {
        // 模擬http
        System.out.println("http request");
        Thread.sleep(1000);

        return "request success";
    }
}
複製代碼

但咱們可使用 channel 配合 goroutine 實現相似的功能:數據結構

func main() {
	ch := Request("https://github.com")
	select {
	case r := <-ch:
		fmt.Println(r)
	}
}
func Request(url string) <-chan string {
	ch := make(chan string)
	go func() {
		// 模擬http請求
		time.Sleep(time.Second)
		ch <- fmt.Sprintf("url=%s, res=%s", url, "ok")
	}()
	return ch
}
複製代碼

goroutine 發起請求後直接將這個 channel 返回,調用方會在請求響應以前一直阻塞,直到 goroutine 拿到了響應結果。併發

goroutine 互相通訊

/** * 偶數線程 */
    public static class OuNum implements Runnable {
        private TwoThreadWaitNotifySimple number;

        public OuNum(TwoThreadWaitNotifySimple number) {
            this.number = number;
        }

        @Override
        public void run() {
            for (int i = 0; i < 11; i++) {
                synchronized (TwoThreadWaitNotifySimple.class) {
                    if (number.flag) {
                        if (i % 2 == 0) {
                            System.out.println(Thread.currentThread().getName() + "+-+偶數" + i);

                            number.flag = false;
                            TwoThreadWaitNotifySimple.class.notify();
                        }

                    } else {
                        try {
                            TwoThreadWaitNotifySimple.class.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }


    /** * 奇數線程 */
    public static class JiNum implements Runnable {
        private TwoThreadWaitNotifySimple number;

        public JiNum(TwoThreadWaitNotifySimple number) {
            this.number = number;
        }

        @Override
        public void run() {
            for (int i = 0; i < 11; i++) {
                synchronized (TwoThreadWaitNotifySimple.class) {
                    if (!number.flag) {
                        if (i % 2 == 1) {
                            System.out.println(Thread.currentThread().getName() + "+-+奇數" + i);

                            number.flag = true;
                            TwoThreadWaitNotifySimple.class.notify();
                        }

                    } else {
                        try {
                            TwoThreadWaitNotifySimple.class.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
複製代碼

這裏截取了」兩個線程交替打印奇偶數「的部分代碼。編程語言

Java 提供了 object.wait()/object.notify() 這樣的等待通知機制,能夠實現兩個線程間通訊。

go 經過 channel 也能實現相同效果:

func main() {
	ch := make(chan struct{})
	go func() {
		for i := 1; i < 11; i++ {
			ch <- struct{}{}
			//奇數
			if i%2 == 1 {
				fmt.Println("奇數:", i)
			}
		}
	}()

	go func() {
		for i := 1; i < 11; i++ {
			<-ch
			if i%2 == 0 {
				fmt.Println("偶數:", i)
			}
		}
	}()

	time.Sleep(10 * time.Second)
}
複製代碼

本質上他們都是利用了線程(goroutine)阻塞而後喚醒的特性,只是 Java 是經過 wait/notify 機制;

而 go 提供的 channel 也有相似的特性:

  1. channel 發送數據時(ch<-struct{}{})會被阻塞,直到 channel 被消費(<-ch)。

以上針對於無緩衝 channel

channel 自己是由 go 原生保證併發安全的,不用額外的同步措施,能夠放心使用。

廣播通知

不只是兩個 goroutine 之間通訊,一樣也能廣播通知,相似於以下 Java 代碼:

public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    synchronized (NotifyAll.class){
                        NotifyAll.class.wait();
                    }
                    System.out.println(Thread.currentThread().getName() + "done....");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        Thread.sleep(3000);
        synchronized (NotifyAll.class){
            NotifyAll.class.notifyAll();
        }
    }
複製代碼

主線程將全部等待的子線程所有喚醒,這個本質上也是經過 wait/notify 機制實現的,區別只是通知了全部等待的線程。

換作是 go 的實現:

func main() {
	notify := make(chan struct{})
	for i := 0; i < 10; i++ {
		go func(i int) {
			for {
				select {
				case <-notify:
					fmt.Println("done.......",i)
					return
				case <-time.After(1 * time.Second):
					fmt.Println("wait notify",i)

				}
			}
		}(i)
	}
	time.Sleep(1 * time.Second)
	close(notify)
	time.Sleep(3 * time.Second)
}
複製代碼

當關閉一個 channel 後,會使得全部獲取 channelgoroutine 直接返回,不會阻塞,正是利用這一特性實現了廣播通知全部 goroutine 的目的。

注意,同一個 channel 不能反覆關閉,否則會出現panic。

channel 解耦

以上例子都是基於無緩衝的 channel,一般用於 goroutine 之間的同步;同時 channel 也具有緩衝的特性:

ch :=make(chan T, 100)
複製代碼

能夠直接將其理解爲隊列,正是由於具備緩衝能力,因此咱們能夠將業務之間進行解耦,生產方只管往 channel 中丟數據,消費者只管將數據取出後作本身的業務。

同時也具備阻塞隊列的特性:

  • channel 寫滿時生產者將會被阻塞。
  • channel 爲空時消費者也會阻塞。

從上文的例子中能夠看出,實現相同的功能 go 的寫法會更加簡單直接,相對的 Java 就會複雜許多(固然這也和這裏使用的偏底層 api 有關)。

Java 中的 BlockingQueue

這些特性都與 Java 中的 BlockingQueue 很是相似,他們具備如下的相同點:

  • 能夠經過二者來進行 goroutine/thread 通訊。
  • 具有隊列的特徵,能夠解耦業務。
  • 支持併發安全。

一樣的他們又有很大的區別,從表現上看:

  • channel 支持 select 語法,對 channel 的管理更加簡潔直觀。
  • channel 支持關閉,不能向已關閉的 channel 發送消息。
  • channel 支持定義方向,在編譯器的幫助下能夠在語義上對行爲的描述更加準確。

固然還有本質上的區別就是 channel 是 go 推薦的 CSP 模型的核心,具備編譯器的支持,能夠有很輕量的成本實現併發通訊。

BlockingQueue 對於 Java 來講只是一個實現了併發安全的數據結構,即使不使用它也有其餘的通訊方式;只是他們都具備阻塞隊列的特徵,全部在初步接觸 channel 時容易產生混淆。

相同點 channel 特有
阻塞策略 支持select
設置大小 支持關閉
併發安全 自定義方向
普通數據結構 編譯器支持

總結

有過一門編程語言的使用經歷在學習其餘語言是確實是要方便許多,好比以前寫過 Java 再看 Go 時就會發現許多相似之處,只是實現不一樣。

拿這裏的併發通訊來講,本質上是由於併發模型上的不一樣;

Go 更推薦使用通訊來共享內存,而 Java 大部分場景都是使用共享內存來通訊(這樣就得加鎖來同步)。

帶着疑問來學習確實會事半功倍。

最近和網友討論後再補充一下,其實 Go channel 的底層實現也是經過對共享內存的加鎖來實現的,這點任何語言都不可避免。

既然都是共享內存那和咱們本身使用共享內存有什麼區別呢?主要仍是 channel 的抽象層級更高,咱們使用這類高抽象層級的方式編寫代碼會更易理解和維護。

但在一些特殊場景,須要追求極致的性能,下降加鎖顆粒度時用共享內存會更加合適,因此 Go 官方也提供有 sync.Map/Mutex 這樣的庫;只是在併發場景下更推薦使用 channel 來解決問題。

相關文章
相關標籤/搜索