1、解決問題和適用範圍html
主要是用來等待一個條件,這個條件可能須要另外一個線程來知足這個條件。這個和咱們日常適用的pthread_mutex_lock的最大不一樣在於後者保護的通常是一個代碼段(也就是關鍵區),或者一個變量,可是因爲通常來講這個變量的訪問是在一個關鍵區中,因此能夠認爲是一個關鍵區。網絡
可是對於條件變量,是須要的是一個事件,只有事件知足的時候纔會執行後面的操做,此時就出現一個問題:若是不知足咱們應該怎麼辦?若是若是使用簡單信號量,可能另外一方觸發了這個條件,而後經過unlock來喚醒一個線程,可是此時通過屢次喚醒其實這邊根本沒有等待,那麼信號就可能丟失。若是這邊一直的空等,那麼對於CPU的利用率又很是大,因此就引起了條件等待的概念。數據結構
2、網絡上的例子代碼多線程
一下代碼來自http://download.oracle.com/docs/cd/E19455-01/806-5257/6je9h032r/index.html
void producer(buffer_t *b, char item) { pthread_mutex_lock(&b->mutex); while (b->occupied >= BSIZE) pthread_cond_wait(&b->less, &b->mutex); assert(b->occupied < BSIZE); b->buf[b->nextin++] = item; b->nextin %= BSIZE; b->occupied++; /* now: either b->occupied < BSIZE and b->nextin is the index of the next empty slot in the buffer, or b->occupied == BSIZE and b->nextin is the index of the next (occupied) slot that will be emptied by a consumer (such as b->nextin == b->nextout) */ pthread_cond_signal(&b->more); pthread_mutex_unlock(&b->mutex); }
char consumer(buffer_t *b) { char item; pthread_mutex_lock(&b->mutex); while(b->occupied <= 0) pthread_cond_wait(&b->more, &b->mutex); assert(b->occupied > 0); item = b->buf[b->nextout++]; b->nextout %= BSIZE; b->occupied--; /* now: either b->occupied > 0 and b->nextout is the index of the next occupied slot in the buffer, or b->occupied == 0 and b->nextout is the index of the next (empty) slot that will be filled by a producer (such as b->nextout == b->nextin) */ pthread_cond_signal(&b->less); pthread_mutex_unlock(&b->mutex); return(item); }
3、Glibc的實現
一、數據結構
/* Data structure for conditional variable handling. The structure of
the attribute type is not exposed on purpose. */
typedef union
{
struct
{
int __lock;保護多線程中cond結構自己的變量操做不會併發,例如對於total_seq進而wakup_seq的使用和遞增操做。
unsigned int __futex;另外一個線程和這個線程之間在條件點上同步的方式,也就是若是須要和其它線程同步的話,使用這個互斥鎖替換pthread_cond_wait傳入的互斥鎖進行同步。
__extension__ unsigned long long int __total_seq;這個表示在這個條件變量上有多少個線程在等待這個信號。
__extension__ unsigned long long int __wakeup_seq;已經在這個條件變量上執行了多少次喚醒操做。
__extension__ unsigned long long int __woken_seq;這個條件變量中已經被真正喚醒的線程數目。
void *__mutex;保存pthread_cond_wait傳入的互斥鎖,須要保證pthread_cond_wait和pthread_cond_signal傳入的值都是相同值。
unsigned int __nwaiters;表示這個cond結構如今還有多少個線程在使用,當有人在使用的時候,pthread_cond_destroy須要等待全部的操做完成
unsigned int __broadcast_seq; 廣播動做發生了多少次,也就是執行了多少次broadcast
} __data;
char __size[__SIZEOF_PTHREAD_COND_T];
__extension__ long long int __align;
} pthread_cond_t;
二、lll_futex_wait的意義
lll_futex_wait (&cond->__data.__futex, futex_val, pshared);
lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);
對於第一個wait,須要傳入一個咱們用戶態判斷時使用的futex值,也就是這裏的第二個參數futex_val,這樣內核會判斷進行真正的wait掛起的時候這個地址的是否是仍是這個值,若是不是這個wait失敗。可是進行wakup的時候不須要傳入判斷值,多是假設此時已經得到互斥鎖,因此不會有其它線程來競爭了吧。
這個要和pthread_mutex_lock使用的0、一、2三值區分開來,由於這些都是C庫規定的語義,內核對他們沒有任何特殊要求和語義判斷,因此用戶態能夠隨意的改變這個變量的值。
三、pthread_cond_wait的操做
int
__pthread_cond_wait (cond, mutex)
pthread_cond_t *cond;
pthread_mutex_t *mutex;
{
struct _pthread_cleanup_buffer buffer;
struct _condvar_cleanup_buffer cbuffer;
int err;
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are along. */
lll_lock (cond->__data.__lock, pshared);即將對cond結構的成員進行操做和判斷,因此首先得到結構自己保護互斥鎖。
/* Now we can release the mutex. */
err = __pthread_mutex_unlock_usercnt (mutex, 0);釋放用戶傳入的互斥鎖,此時另一個執行pthread_cond_signal的線程能夠經過pthread_mutex_lock執行可能的signal判斷,可是咱們尚未釋放數據操做互斥鎖,因此另外一方執行pthread_cond_signal的時候依然可能會等待。
if (__builtin_expect (err, 0))
{
lll_unlock (cond->__data.__lock, pshared);
return err;
}
/* We have one new user of the condvar. */
++cond->__data.__total_seq;增長系統中全部須要執行的喚醒次數。
++cond->__data.__futex;增長futex,主要是爲了保證用戶態數據一致性。
cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;增長cond結構的使用次數。
/* Remember the mutex we are using here. If there is already a
different address store this is a bad user bug. Do not store
anything for pshared condvars. */
if (cond->__data.__mutex != (void *) ~0l)
cond->__data.__mutex = mutex;
/* Prepare structure passed to cancellation handler. */
cbuffer.cond = cond;
cbuffer.mutex = mutex;
/* Before we block we enable cancellation. Therefore we have to
install a cancellation handler. */
__pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);註冊撤銷點。
/* The current values of the wakeup counter. The "woken" counter
must exceed this value. */
unsigned long long int val;
unsigned long long int seq;
val = seq = cond->__data.__wakeup_seq;
/* Remember the broadcast counter. */
cbuffer.bc_seq = cond->__data.__broadcast_seq;
do
{
unsigned int futex_val = cond->__data.__futex;
/* Prepare to wait. Release the condvar futex. */
lll_unlock (cond->__data.__lock, pshared);此處真正釋放cond操做互斥鎖,咱們已經再也不對其中的變量進行操做。
/* Enable asynchronous cancellation. Required by the standard. */
cbuffer.oldtype = __pthread_enable_asynccancel ();
/* Wait until woken by signal or broadcast. */
lll_futex_wait (&cond->__data.__futex, futex_val, pshared);等待在futex變量上,因爲咱們剛纔保存了futex的原始值,因此若是在上面咱們釋放了data.lock以後另外一個線程修改了這個變量的值,那麼這裏的lll_futex_wait將會返回失敗,因此會繼續進行下一輪的while循環,直到連個執行相同,說明咱們作的判斷時正確的。
/* Disable asynchronous cancellation. */若是執行到這裏,說明咱們已經被signal喚醒。
__pthread_disable_asynccancel (cbuffer.oldtype);
/* We are going to look at shared data again, so get the lock. */
lll_lock (cond->__data.__lock, pshared);訪問變量,須要得到互斥鎖。
/* If a broadcast happened, we are done. */
if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
goto bc_out;
/* Check whether we are eligible for wakeup. */
val = cond->__data.__wakeup_seq;
}
while (val == seq || cond->__data.__woken_seq == val); 當val!=seq&&cond->data.wokenup!=val的時候能夠進行喚醒,也就是另外一個放修改了已經執行了喚醒的次數而且已經被喚醒的線程還有名額的時候。
/* Another thread woken up. */
++cond->__data.__woken_seq;增長系統中已經被喚醒的線程的數目。
bc_out: broadcast跳轉到這裏。
cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;
/* If pthread_cond_destroy was called on this varaible already,
notify the pthread_cond_destroy caller all waiters have left
and it can be successfully destroyed. */
if (cond->__data.__total_seq == -1ULL
&& cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT))
lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);
/* We are done with the condvar. */
lll_unlock (cond->__data.__lock, pshared);
/* The cancellation handling is back to normal, remove the handler. */
__pthread_cleanup_pop (&buffer, 0);
/* Get the mutex before returning. */
return __pthread_mutex_cond_lock (mutex);再次得到mutex互斥鎖,可能會睡眠,由於咱們的這個釋放是對上層透明的,而在進入函數的時候咱們已經釋放了這個互斥鎖,因此此時還要進行一次得到操做,從而配對。
}
四、pthread_cond_signal的操做
int
__pthread_cond_signal (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);
/* Are there any waiters to be woken? */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)若是待喚醒次數比已經喚醒的次數多,那麼此時就進行一個喚醒操做。
{
/* Yes. Mark one of them as woken. */
++cond->__data.__wakeup_seq;
++cond->__data.__futex;改變futex的值,這個值的具體意義並不重要,只是爲了告訴另外一方,這個值已經變化,若是另外一方使用的是原始值,那麼對futex的wait操做將會失敗。
/* Wake one. */
if (! __builtin_expect (lll_futex_wake_unlock (&cond->__data.__futex, 1,
1, &cond->__data.__lock,
pshared), 0))
return 0;
lll_futex_wake (&cond->__data.__futex, 1, pshared);
}
/* We are done. */
lll_unlock (cond->__data.__lock, pshared);
return 0;
}
五、__pthread_cond_broadcast
int
__pthread_cond_broadcast (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);
/* Are there any waiters to be woken? */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)判斷是否有等待喚醒的線程。
{
/* Yes. Mark them all as woken. */
cond->__data.__wakeup_seq = cond->__data.__total_seq;
cond->__data.__woken_seq = cond->__data.__total_seq;
cond->__data.__futex = (unsigned int) cond->__data.__total_seq * 2;
int futex_val = cond->__data.__futex;
/* Signal that a broadcast happened. */
++cond->__data.__broadcast_seq;
/* We are done. */
lll_unlock (cond->__data.__lock, pshared);
/* Do not use requeue for pshared condvars. */
if (cond->__data.__mutex == (void *) ~0l)
goto wake_all;
/* Wake everybody. */
pthread_mutex_t *mut = (pthread_mutex_t *) cond->__data.__mutex;
/* XXX: Kernel so far doesn't support requeue to PI futex. */
/* XXX: Kernel so far can only requeue to the same type of futex,
in this case private (we don't requeue for pshared condvars). */
if (__builtin_expect (mut->__data.__kind
& (PTHREAD_MUTEX_PRIO_INHERIT_NP
| PTHREAD_MUTEX_PSHARED_BIT), 0))
goto wake_all;
/* lll_futex_requeue returns 0 for success and non-zero
for errors. */
if (__builtin_expect (lll_futex_requeue (&cond->__data.__futex, 1,
INT_MAX, &mut->__data.__lock,
futex_val, LLL_PRIVATE), 0))把futex上的轉移到data.lock中並喚醒,若是失敗則直接喚醒而不轉移。
{
/* The requeue functionality is not available. */
wake_all:
lll_futex_wake (&cond->__data.__futex, INT_MAX, pshared);這裏的INT_MAX就是告訴內核喚醒全部在這個變量上等待的線程。
}
/* That's all. */
return 0;
}
/* We are done. */
lll_unlock (cond->__data.__lock, pshared);
return 0;}