併發編程(二):分析Boost對 互斥量和條件變量的封裝及實現生產者消費者問題

請閱讀上篇文章《併發編程實戰: POSIX 使用互斥量和條件變量實現生產者/消費者問題》。固然不閱讀亦不影響本篇文章的閱讀大笑linux

Boost的互斥量,條件變量作了很好的封裝,所以比「原生的」POSIX mutex,condition variables好用。而後咱們會經過分析boost相關源碼看一下boost linux是如何對pthread_mutex_t和pthread_cond_t進行的封裝。編程

首先看一下condition_variable_any的具體實現,代碼路徑:/boost/thread/pthread/condition_variable.hpp
安全

class condition_variable_any
{
    pthread_mutex_t internal_mutex;
    pthread_cond_t cond;

    condition_variable_any(condition_variable_any&);
    condition_variable_any& operator=(condition_variable_any&);

public:
    condition_variable_any()
    {
        int const res=pthread_mutex_init(&internal_mutex,NULL);
        if(res)
        {
            boost::throw_exception(thread_resource_error());
        }
        int const res2=pthread_cond_init(&cond,NULL);
        if(res2)
        {
            BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));
            boost::throw_exception(thread_resource_error());
        }
    }
    ~condition_variable_any()
    {
        BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));
        BOOST_VERIFY(!pthread_cond_destroy(&cond));
    }
 
condition_variable_any的構造函數是對於內部使用的mutex和cond的初始化,對應的,析構函數則是這些資源的回收。

BOOST_VERIFY的實現:多線程

#undef BOOST_VERIFY  
#if defined(BOOST_DISABLE_ASSERTS) || ( !defined(BOOST_ENABLE_ASSERT_HANDLER) && defined(NDEBUG) )  
// 在任何狀況下,expr必定會被求值。  
#define BOOST_VERIFY(expr) ((void)(expr))  
#else  
#define BOOST_VERIFY(expr) BOOST_ASSERT(expr)  
#endif  
所以不一樣於assert在Release版的被優化掉不一樣,咱們能夠放心的使用BOOST_VERITY,所以它的表達式確定會被求值,而不用擔憂assert的side effect。
接下來看一下condition_variable_any的核心實現:wait

   template<typename lock_type>
    void wait(lock_type& m)
    {
        int res=0;
        {
            thread_cv_detail::lock_on_exit<lock_type> guard;
            detail::interruption_checker check_for_interruption(&internal_mutex,&cond);
            guard.activate(m);
            res=pthread_cond_wait(&cond,&internal_mutex);
            this_thread::interruption_point();
        }
        if(res)
        {
            boost::throw_exception(condition_error());
        }
    }
首先看一下lock_on_exit:

namespace thread_cv_detail
{
    template<typename MutexType>
    struct lock_on_exit
    {
        MutexType* m;

        lock_on_exit():
            m(0)
        {}

        void activate(MutexType& m_)
        {
            m_.unlock();
            m=&m_;
        }
        ~lock_on_exit()
        {
            if(m)
            {
                m->lock();
            }
       }
    };
}
代碼很簡單,實現了在調用activate時將傳入的lock解鎖,在該變量生命期結束時將guard的lock加鎖。

接下來的detail::interruption_checker check_for_interruption(&internal_mutex,&cond);是什麼意思呢?From /boost/thread/pthread/thread_data.hpp
併發

class interruption_checker
{
    thread_data_base* const thread_info;
    pthread_mutex_t* m;
    bool set;

    void check_for_interruption()
    {
        if(thread_info->interrupt_requested)
        {
            thread_info->interrupt_requested=false;
            throw thread_interrupted();
        }
    }

    void operator=(interruption_checker&);
public:
    explicit interruption_checker(pthread_mutex_t* cond_mutex,pthread_cond_t* cond):
        thread_info(detail::get_current_thread_data()),m(cond_mutex),
        set(thread_info && thread_info->interrupt_enabled)
    {
        if(set)
        {
            lock_guard<mutex> guard(thread_info->data_mutex);
            check_for_interruption();
            thread_info->cond_mutex=cond_mutex;
            thread_info->current_cond=cond;
            BOOST_VERIFY(!pthread_mutex_lock(m));
        }
        else
        {
            BOOST_VERIFY(!pthread_mutex_lock(m));
        }
    }
    ~interruption_checker()
    {
        if(set)
        {
            BOOST_VERIFY(!pthread_mutex_unlock(m));
            lock_guard<mutex> guard(thread_info->data_mutex);
            thread_info->cond_mutex=NULL;
            thread_info->current_cond=NULL;
        }
        else
        {
            BOOST_VERIFY(!pthread_mutex_unlock(m));
        }
    }
代碼面前,毫無隱藏。那句話就是此時若是有interrupt,那麼就interrupt吧。不然,lock傳入的mutex,也是爲了res=pthread_cond_wait(&cond,&internal_mutex);作準備。

關於線程的中斷點,能夠移步《【Boost】boost庫中thread多線程詳解5——談談線程中斷》。app

對於boost::mutex,你們可使用一樣的方法去解讀boost的實現,相對於condition variable,mutex的實現更加直觀。代碼路徑:/boost/thread/pthread/mutex.hpp。ide

namespace boost
{
    class mutex
    {
    private:
        mutex(mutex const&);
        mutex& operator=(mutex const&);
        pthread_mutex_t m;
    public:
        mutex()
        {
            int const res=pthread_mutex_init(&m,NULL);
            if(res)
            {
                boost::throw_exception(thread_resource_error());
            }
        }
        ~mutex()
        {
            BOOST_VERIFY(!pthread_mutex_destroy(&m));
        }

        void lock()
        {
            int const res=pthread_mutex_lock(&m);
            if(res)
            {
                boost::throw_exception(lock_error(res));
            }
        }

        void unlock()
        {
            BOOST_VERIFY(!pthread_mutex_unlock(&m));
        }

        bool try_lock()
        {
            int const res=pthread_mutex_trylock(&m);
            if(res && (res!=EBUSY))
            {
                boost::throw_exception(lock_error(res));
            }

            return !res;
        }

        typedef pthread_mutex_t* native_handle_type;
        native_handle_type native_handle()
        {
            return &m;
        }

        typedef unique_lock<mutex> scoped_lock;
        typedef detail::try_lock_wrapper<mutex> scoped_try_lock;
    }; 
}

boost對於pthread_mutex_t和pthread_cond_t的封裝,方便了開發者的使用的資源的安全有效管理。固然,在不一樣的公司,可能也都有相似的封裝,學習boost的源碼,無疑能夠加深咱們的理解。在某些特定的場合,咱們也能夠學習boost的封裝方法,簡化咱們的平常開發。函數

最後,奉上簡單的生產者、消費者的boost的實現,和前文《併發編程實戰: POSIX 使用互斥量和條件變量實現生產者/消費者問題》相比,咱們能夠看到boost簡化了mutex和condition variable的使用。如下代碼引自《Boost程序庫徹底開發指南》:學習

#include <boost/thread.hpp>
#include <stack>
using std::stack;
using std::cout;
class buffer
{
private:
    boost::mutex mu; // 條件變量須要配合互斥量
    boost::condition_variable_any cond_put; // 生產者寫入
    boost::condition_variable_any cond_get; // 消費者讀走

    stack<int> stk;
    int un_read;
    int capacity;

    bool is_full()
    {
        return un_read == capacity;
    }
    bool is_empty()
    {
        return 0 == un_read;
    }

public:
    buffer(size_t capacity) : un_read(0), capacity(capacity)
    {}
    void put(int x)
    {

        boost::mutex::scoped_lock lock(mu); // 這裏是讀鎖的門閂類

        while (is_full())
        {
            cout << "full waiting..." << endl;
            cond_put.wait(mu); // line:51
        }
        stk.push(x);
        ++un_read;

        cond_get.notify_one();
    }
    void get(int *x)
    {
        boost::mutex::scoped_lock lock(mu); // 這裏是讀鎖的門閂類

        while (is_empty())
        {
            cout << "empty waiting..." << endl;
            cond_get.wait(mu);
        }
        *x = stk.top();
        stk.pop();
        --un_read;

        cond_put.notify_one(); // 通知 51line能夠寫入了
    }
};

buffer buf(5); 

void producer(int n)
{
    for (int i = 0; i < n; ++i)
    {
        cout << "put : " << i << endl;
        buf.put(i);
    }
}

void consumer(int n)
{
    int x;
    for (int i = 0; i < n; ++i)
    {
        buf.get(&x);
        cout << "get : " << x << endl;
    }
}

int main()
{
    boost::thread t1(producer, 20);
    boost::thread t2(consumer, 10);
    boost::thread t3(consumer, 10);

    t1.join();
    t2.join();
    t3.join();

    return 0;
}
最後說一句,condition_variable_any == condition, from / boost/ thread/ condition.hpp

namespace boost
{
    typedef condition_variable_any condition;
}
相關文章
相關標籤/搜索