Rust入坑指南:齊頭並進(下)


前文中咱們聊了Rust如何管理線程以及如何利用Rust中的鎖進行編程。今天咱們繼續學習併發編程,web

原子類型

許多編程語言都會提供原子類型,Rust也不例外,在前文中咱們聊了Rust中鎖的使用,有了鎖,就要當心死鎖的問題,Rust雖然聲稱是安全併發,可是仍然沒法幫助咱們解決死鎖的問題。原子類型就是編程語言爲咱們提供的無鎖併發編程的最佳手段。熟悉Java的同窗應該知道,Java的編譯器並不能保證代碼的執行順序,編譯器會對咱們的代碼的執行順序進行優化,這一操做成爲指令重排。而Rust的多線程內存模型不會進行指令重排,它能夠保證指令的執行順序。編程

一般來說原子類型會提供如下操做:安全

  • Load:從原子類型讀取值微信

  • Store:爲一個原子類型寫入值多線程

  • CAS(Compare-And-Swap):比較並交換併發

  • Swap:交換異步

  • Fetch-add(sub/and/or):表示一系列的原子的加減或邏輯運算編程語言

Ok,這些基礎的概念聊完之後,咱們就來看看Rust爲咱們提供了哪些原子類型。Rust的原子類型定義在標準庫std::sync::atomic中,目前它提供了12種原子類型。編輯器

原子類型

下面這段代碼是Rust演示瞭如何用原子類型實現一個自旋鎖。函數

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

fn main() {
    let spinlock = Arc::new(AtomicUsize::new(1));
    let spinlock_clone = spinlock.clone();
    let thread = thread::spawn(move|| {
        spinlock_clone.store(0, Ordering::SeqCst);
    });
    while spinlock.load(Ordering::SeqCst) != 0 {}
    if let Err(panic) = thread.join() {
        println!("Thread had an error: {:?}", panic);
    }
}

咱們利用AtomicUsize的store方法將它的值設置爲0,而後用load方法獲取到它的值,若是不是0,則程序一直空轉。在store和load方法中,咱們都用到了一個參數:Ordering::SeqCst,在聲明中能看出來它也是屬於atomic包。

咱們在文檔中發現它是一個枚舉。其定義爲

pub enum Ordering {
    Relaxed,
    Release,
    Acquire,
    AcqRel,
    SeqCst,
}

它的做用是將內存順序的控制權交給開發者,咱們能夠本身定義底層的內存排序。下面咱們一塊兒來看一下這5種排序分別表明什麼意思

  • Relaxed:表示「沒有順序」,也就是開發者不會干預線程順序,線程只進行原子操做

  • Release:對於使用Release的store操做,在它以前全部使用Acquire的load操做都是可見的

  • Acquire:對於使用Acquire的load操做,在它以前的全部使用Release的store操做也都是可見的

  • AcqRel:它表明讀時使用Acquire順序的load操做,寫時使用Release順序的store操做

  • SeqCst:使用了SeqCst的原子操做都必須先存儲,再加載。

通常狀況下建議使用SeqCst,而不推薦使用Relaxed。

線程間通訊

Go語言文檔中有這樣一句話:不要使用共享內存來通訊,應該使用通訊實現共享內存。

Rust標準庫選擇了CSP併發模型,也就是依賴channel來進行線程間的通訊。它的定義是在標準庫std::sync::mpsc中,裏面定義了三種類型的CSP進程:

  • Sender:發送異步消息

  • SyncSender:發送同步消息

  • Receiver:用於接收消息

咱們經過一個栗子來看一下channel是如何建立並收發消息的。

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

首先,咱們先是使用了channel()函數來建立一個channel,它會返回一個(Sender, Receiver)元組。它的緩衝區是無界的。此外,咱們還可使用sync_channel()來建立channel,它返回的則是(SyncSender, Receiver)元組,這樣的channel發送消息是同步的,而且能夠設置緩衝區大小。

接着,在子線程中,咱們定義了一個字符串變量,並使用send()函數向channel中發送消息。這裏send返回的是一個Result類型,因此使用unwrap來傳播錯誤。

在main函數最後,咱們又用recv()函數來接收消息。

這裏須要注意的是,send()函數會轉移全部權,因此,若是你在發送消息以後再使用val變量時,程序就會報錯。

如今咱們已經掌握了使用Channel進行線程間通訊的方法了,這裏還有一段代碼,感興趣的同窗能夠本身執行一下這段代碼看是否可以順利執行。若是不能,應該怎麼修改這段代碼呢?

use std::thread;
use std::sync::mpsc;
fn main() {
    let (tx, rx) = mpsc::channel();
    for i in 0..5 {
        let tx = tx.clone();
        thread::spawn(move || {
            tx.send(i).unwrap();
        });
    }

    for rx in rx.iter() {
        println!("{:?}", j);
    }
}

線程池

在實際工做中,若是每次都要建立新的線程,每次建立、銷燬線程的開銷就會變得很是可觀,甚至會成爲系統性能的瓶頸。對於這種問題,咱們一般使用線程池來解決。

Rust的標準庫中沒有現成的線程池給咱們使用,不過仍是有一些第三方庫來支持的。這裏我使用的是threadpool(https://crates.io/crates/threadpool)。

首先須要在Cargo.toml中增長依賴threadpool = "1.7.1"。而後就可使用use threadpool::ThreadPool;將ThreadPool引入咱們的程序中了。

use threadpool::ThreadPool;
use std::sync::mpsc::channel;

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = ThreadPool::new(n_workers);

    let (tx, rx) = channel();
    for _ in 0..n_jobs {
        let tx = tx.clone();
        pool.execute(move|| {
            tx.send(1).expect("channel will be there waiting for the pool");
        });
    }

    assert_eq!(rx.iter().take(n_jobs).fold(0|a, b| a + b), 8);
}

這裏咱們使用ThreadPool::new()來建立一個線程池,初始化4個工做線程。使用時用execute()方法就能夠拿出一個線程來進行具體的工做。

總結

今天咱們介紹了Rust併發編程的三種特性:原子類型、線程間通訊和線程池的使用。

原子類型是咱們進行無鎖併發的重要手段,線程間通訊和線程池也都是工做中所必須使用的。固然併發編程的知識遠不止於此,你們有興趣的能夠自行學習也能夠與我交流討論。

本文分享自微信公衆號 - 代碼潔癖患者(Jackeyzhe2018)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索