Unix pthread

線程概念

每一個線程都包括線程ID、一組寄存器值、棧、調度優先級、策略、信號屏蔽字、errno變量、線程私有數據。使用_POSIX_THREADS來測試是否支持這個功能,使用_SC_THREADS運行時肯定,都須要添加#include <pthread.h>,對於pthread庫的函數成功返回0,錯誤返回錯誤編號。shell

線程標識

使用thread_t來標識一個線程,在不一樣系統中實現不同,須要使用函數來比較。bash

int pthread_equal(pthread_t t1, pthread_t t2);
// 相等返回非0值
複製代碼

獲取自身線程pthread_t數據結構

pthread_t pthread_self(void);
複製代碼

經過線程pthread_t來分配任務如圖所示:函數

建立線程

int pthread_create(pthread_t *restrict tidp,
                   cosnt pthread_attr_t *restrict attr,
                   void *(start_rtn)(void *), void *restrict arg);
// 失敗時返回錯誤碼
複製代碼

attr用於定製線程的屬性,爲NULL時是默認屬性。新建立的線程從start_rtn開始運行,將參數放到結構體中,經過void *restrict arg傳遞。測試

例子:獲取線程id

#include "../include/apue.h"
#include <pthread.h>

pthread_t ntid;

void printids(const char *s) {
  pid_t pid;
  pthread_t tid;

  pid = getpid();
  tid = pthread_self();
  printf("%s pid:%lu, tid:%lu\n", s, pid, tid);
}

void *thread_func(void * arg) {
  printids("new~~~");
  sleep(1);
  return (void*)0;
}

int main(void) {
  int err;
  err = pthread_create(&ntid, NULL, thread_func, NULL); // 第二個是pthread線程參數, 第四個是函數參數
  if (err != 0) {
    err_exit(err, "can't create thread");
  }
  printids("main~~~");
  // sleep(1);
  exit(0);
}
複製代碼

若是新線程睡眠1s,而後主線程退出就不會輸出新線程了。若是主線程睡眠1s,則兩個線程的進程號相同。在Linux裏輸出以下:ui

main~~~ pid:8081, tid:139790962181888
new~~~ pid:8081, tid:139790953903872
複製代碼

線程終止

在任意線程中調用exit、_Exit、_exit都會將整個進程終止。線程終止的方法有三種:spa

  1. 從線程啓動函數中返回
  2. 線程被同一進程中的其餘線程取消
  3. 線程調用pthread_exit
void pthread_exit(void * rval_ptr);
// rval_ptr指向返回值
複製代碼

也就是把須要返回的狀態傳進去,用於和等待的線程通訊。可使用pthread_join來等待指定線程完成線程

#include <pthread.h>
int pthread_join(pthread_t thread, void **rval_ptr);
複製代碼

若是被等待的線程返回,rval_ptr包含返回碼,若是線程被取消rval_ptr指定內存單元設置爲PTHREAD_CANCELEDrest

例子:獲取已終止線程退出碼

#include "../include/apue.h"
#include <pthread.h>

void printids(const char *s) {
  pid_t pid;
  pthread_t tid;

  pid = getpid();
  tid = pthread_self();
  printf("%s pid:%lu, tid:%lu\n", s, (long unsigned)pid, (long unsigned)tid);
}

void *return_thread(void * arg) {
  printids("thread returning~~~");
  return (void*)0;
}
void *exit_thread(void * arg) {
  printids("thread exiting~~~");
  pthread_exit((void*) 2); // 參數能夠返回結構體,可是這個結構體必須返回後還能使用(不是在棧上分配)
}

int main(void) {
  int err;
  pthread_t tid1, tid2;
  void * rVal;
  err = pthread_create(&tid1, NULL, return_thread, NULL); // 第二個是pthread線程參數, 第四個是函數參數
  if (err != 0) {
    err_exit(err, "can't create thread");
  }
  err = pthread_create(&tid2, NULL, exit_thread, NULL); // 第二個是pthread線程參數, 第四個是函數參數
  if (err != 0) {
    err_exit(err, "can't create thread");
  }
  pthread_join(tid1, &rVal);
  printf("return_thread return:%ld\n", (long)rVal);

  pthread_join(tid2, &rVal);
  printf("exit_thread return:%ld\n", (long)rVal);
  printids("main~~~");
  // sleep(1);
  exit(0);
}
複製代碼

程序輸出結果以下:code

thread returning~~~ pid:5832, tid:139867556669184
thread exiting~~~ pid:5832, tid:139867548276480
return_thread return:0
exit_thread return:2
main~~~ pid:5832, tid:139867564947200
複製代碼

取消其餘線程

int pthread_cancel(pthread_t tid);
複製代碼

pthrea_cancel不等待線程終止,而是提出請求。

線程清理函數

線程安排本身的退出函數,多個清理函數會註冊到棧中,按找棧裏順序執行。

void pthread_cleanup_push(void (*rtn)(void *), void *arg);
void pthread_cleanup_pop(int execute);
複製代碼

觸發時機:

  • 調用pthread_exit
  • 響應取消請求時
  • 使用非零execute參數調用pthread_cleanup_pop,使用pthread_cleanup_pop(0)不會調用清理函數,只是刪除清理函數。

例子:線程清理

輸出:

thread 1 start up
thread 2 start up
thread1 return:1
clean up in thread2 second handler
clean up in thread2 first handler
thread2 return:2
main~~~ pid:10122, tid:140514088802048
複製代碼

只有第二個線程的清理函數被調用,這是由於系統正常終止是不會調用清理函數,即return結束

線程與進程對比

進程原語 線程原語 描述
fork pthread_create 建立新的控制流
exit pthread_exit 從現有控制流退出
waitpid pthread_join 從控制流中獲得退出狀態
atexit pthread_cleanup_push 註冊在退出時調用的函數
getpid pthread_self 獲取控制流的ID
abort pthread_cancel 請求控制流的非正常退出

分離線程

int pthread_detach(pthread_t tid);
複製代碼

線程同步

當線程B在線程A的讀寫間隔中讀取數據就會出現不一致的值:

在存儲操做須要多個總線週期時:

互斥變量

互斥變量本質是一把鎖。對互斥量加鎖後,任何試圖再次對互斥量加鎖的線程都會被阻塞。釋放互斥量後,其餘阻塞的線程變爲可運行狀態,第一個變爲可運行狀態的線程對互斥量加鎖,其餘變量依然變爲阻塞。
互斥變量使用pthrea_mutex_t數據表示,使用前必須初始化,能夠設置爲pthread_mutex_t t = PTHREAD_MUTEX_INITIALIZER;用於靜態初始化互斥量。

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);// 使用函數初始化
int pthread_mutex_destroy(pthread_mutex_t *mutex); // 使用malloc動態生成的,須要desotry函數銷燬
複製代碼

加鎖與解鎖操做:

int pthread_mutex_lock(pthread_muex_t *mutex);
int pthread_mutex_trylock(pthread_muex_t *mutex); // 線程不但願被阻塞,就使用trylock,成功返回0, 失敗返回EBUSY
int pthread_mutex_unlock(pthread_muex_t *mutex);
複製代碼

例子:互斥鎖

#include "../include/apue.h"
#include <pthread.h>

struct foo {
  int f_count;
  pthread_mutex_t f_lock;
  int f_id;
};

struct foo * foo_alloc(int id) {
  struct foo *fp;

  if ((fp=malloc(sizeof(struct foo))) != NULL) {
    fp->f_count = 1;
    fp->f_id = id;
    if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
      free(fp);
      return (NULL);
    }
  }
  return fp;
}

void foo_hold(struct foo *fp) {
  pthread_mutex_lock(&fp->f_lock);
  fp->f_count++;
  pthread_mutex_unlock(&fp->f_lock);
}

void foo_release(struct foo *fp) {
  pthread_mutex_lock(&fp->f_lock);
  if (--fp->f_count == 0) {
    pthread_mutex_unlock(&fp->f_lock);
    pthread_mutex_destroy(&fp->f_lock);
    free(fp);
  } else {
    printf("id:%d count:%d\n", fp->f_id, fp->f_count);
    pthread_mutex_unlock(&fp->f_lock);
  }
}
void * thread(void* arg) {
  struct foo* f = (struct foo*)arg;
  foo_hold(f);
}
int main(int argc, char const *argv[]) {
  pthread_t tid;
  struct foo* f = foo_alloc(12);
  pthread_create(&tid, NULL, thread, (void*)f);
  pthread_create(&tid, NULL, thread, (void*)f);

  pthread_join(tid, NULL);
  foo_release(f);
  foo_release(f);
  return 0;
}
複製代碼

能夠看到結果中第一次釋放foo時count爲2,每次運行都是。

id:12 count:2
id:12 count:1
複製代碼

避免死鎖

當線程對同一互斥量加鎖兩次時就會死鎖。經過仔細控制互斥量加鎖順序來避免死鎖發生。另外一種方法: 若是已經佔有某些鎖,則使用pthread_mutex_trylock,若是成功則繼續,若是失敗則釋放鎖,作好清理工做,等待一段時間後再試試。


當程序師徒獲取一個已加鎖的互斥量時,pthread_mutex_timedlock互斥量原語綁定線程阻塞的時間。到達超時時間後pthread_mutex_timedlock不會對互斥量加鎖而是返回錯誤碼ETIMEDOUT

#include <pthread.h>
#include <time.h>
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict tsptr);
複製代碼

#include <time.h>中的timespec使用秒和納秒描述時間。

struct timespec {
    __time_t tv_sec;		/* Seconds. */
    __syscall_slong_t tv_nsec;	/* Nanoseconds. */
 };
複製代碼

#include <time.h>中的tm表示年月日星期等。

struct tm {
  int tm_sec;			/* Seconds. [0-60] (1 leap second) */
  int tm_min;			/* Minutes. [0-59] */
  int tm_hour;			/* Hours. [0-23] */
  int tm_mday;			/* Day. [1-31] */
  int tm_mon;			/* Month. [0-11] */
  int tm_year;			/* Year - 1900. */
  int tm_wday;			/* Day of week. [0-6] */
  int tm_yday;			/* Days in year.[0-365] */
  int tm_isdst;			/* DST. [-1/0/1]*/
}
複製代碼

例子:pthread_mutex_timedlock阻塞時間

#include "../include/apue.h"
#include <pthread.h>

void printTime() {
  char buf[64];
  struct timespec tout;
  struct tm* tmp;
  clock_gettime(CLOCK_REALTIME, &tout);
  tmp = localtime(&tout.tv_sec);
  strftime(buf, sizeof(buf), "%r", tmp);
  printf("current time is %s\n", buf);

}
int main(int argc, char const *argv[])
{
  int err;
  struct timespec tout;
  struct tm *tmp;
  pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

  printTime();

  pthread_mutex_lock(&lock);
  printf("mutex is lock\n");

  clock_gettime(CLOCK_REALTIME, &tout);
  tout.tv_sec += 10;
  pthread_mutex_timedlock(&lock, &tout);

  printTime();
  return 0;
}
複製代碼

由於已經得到了lock的鎖,再次用pthread_mutex_lock鎖住會致使死鎖,使用pthread_mutex_timedlock只會阻塞指定時間。 輸出結果以下,只阻塞了10秒

current time is 06:55:12 PM
mutex is lock
current time is 06:55:22 PM
複製代碼

讀寫鎖(共享互斥鎖)

讀寫鎖有三種狀態:讀模式下加鎖狀態、寫模式下加鎖狀態、不加鎖狀態。一次只有一個線程佔有寫模式的讀寫鎖,可是多個線程能夠同時佔有讀模式的讀寫鎖

  1. 當寫鎖已加鎖時,試圖對其加鎖會使線程阻塞
  2. 在讀鎖已加鎖時,試圖對其加讀鎖的線程得到訪問權,若對其加寫鎖會使線程阻塞,直到因此線程釋放讀鎖
  3. 讀寫鎖適用於對數據結構讀取次數遠大於寫入的狀況

讀寫鎖必須在使用前初始化、使用後釋放內存前銷燬

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
// 初始化函數,若是讀寫鎖默認屬性則傳入null給attr

int pthread_rwlock_destroy(pthread_rwlock_t * rwlock);
// 在free前調用

pthread_rwlock_t lock = PTHREAD_RWLOCK_INITIALIZER;  // 靜態初始化(Signle UNIX Specification)

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);    //讀加鎖
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);    // 寫加鎖
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);    // 解鎖

int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); // 條件版本(Signle UNIX Specification)
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); // 條件版本(Signle UNIX Specification)
// 得到鎖是返回0,不然返回錯誤EBUSY
複製代碼

例子:讀寫鎖(共享互斥鎖)

做業隊列,使用單個讀寫鎖保護隊列,插入、刪除會嘗試給隊列加寫鎖,查找隊列時會給隊列加讀鎖。

#include "../include/apue.h"
#include <pthread.h>

struct job {
  struct job *j_next;
  struct job *j_prev;
  pthread_t j_id; // 哪個線程處理這個任務
};

struct queue {
  struct job *q_head;
  struct job *q_tail;
  pthread_rwlock_t q_lock;
};

int queue_init(struct queue *qp) {
  int err;
  qp->q_head = NULL;
  qp->q_tail = NULL;
  err = pthread_rwlock_init(&qp->q_lock);
  if (err != 0) {
    return err;
  }
  return 0;
}

// 從隊列後面插入job
void job_insert_tail(struct queue *qp, struct job *jp) {
  pthread_rwlock_wrlock(&qp->q_lock);
  jp->j_next = NULL;
  jp->j_prev = qp->q_tail;
  if (qp->q_tail != NULL) {
    qp->q_tail->j_next = jp;
  } else {
    qp->q_head = jp; // 鏈表爲空
  }
  qp->q_tail = jp;
  pthread_rwlock_unlock(&qp->q_lock);
}

// 從隊列前面插入job
void job_insert_front(struct queue *qp, struct job *jp) {
  pthread_rwlock_wrlock(&qp->q_lock);
  jp->j_next = qp->q_head;
  jp->j_prev = NULL;
  if (qp->q_head != NULL) {
    qp->q_head->j_prev = jp;
  } else {
    qp->q_tail = jp; // 鏈表爲空
  }
  qp->q_head = jp;
  pthread_rwlock_unlock(&qp->q_lock);
}

// 從隊列中刪除job
void job_remove(struct queue *qp, struct job *jp) {
  pthread_rwlock_wrlock(&qp->q_lock);
  if (jp == qp->q_head) {
    qp->q_head = jp->j_next;
    if (jp == qp->q_tail) {
      qp->q_tail = NULL;
    } else {
      jp->j_next->j_prev = jp->j_prev;
    }
  } else if (jp == qp->q_tail) {
    jp->j_prev->j_next = jp->j_next;
    qp->q_tail = jp->j_prev;
  } else {
    jp->j_prev->j_next = jp->j_next;
    jp->j_next->j_prev = jp->j_prev;
  }
  pthread_rwlock_unlock(&qp->q_lock);
}

// 經過線程id查找某個任務
struct job* job_find(struct queue *qp, pthread_t id) {
  struct job* jp;
  if (pthread_rwlock_rdlock(&qp->q_lock) != 0) {
    return NULL;
  }
  for (jp = qp->q_head; jp != NULL; jp = jp->j_next) {
    if (pthread_equal(jp->j_id, id)) {
      printf("Find you!\n");
      break;
    }
  }
  pthread_rwlock_unlock(&qp->q_lock);
  return jp;
}
複製代碼

每次只能有一個寫鎖,因此對於job結構體不須要對它加鎖。

帶有超時的讀寫鎖(Single UNIX Specification)

int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr);
複製代碼

超時會返回ETIMEDOUT

條件變量

條件變量有互斥量保護,線程在改變條件狀態以前首先鎖住互斥量,鎖住後計算條件。

// 1.動態初始化
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);`
// 2.靜態初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;`

// 銷燬方式:
int pthread_cond_destroy(pthread_cond_t *cond);

// 等待條件變量變爲真
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict tsptr);
複製代碼

使用互斥量對條件進行保護,調用者把鎖住的互斥量傳給函數,函數而後自動把調用線程放到等待條件的線程列表中,而後對互斥量解鎖。
等待時間使用的是絕對時間,不是以前的時間差而是將將來時間傳入,使用clock_gettime得到timespec表示的當前時間,也能夠經過gettimeofday得到timeval結構表示的當前時間,再轉換爲timespec,函數以下所示:

#include <sys/time.h>
#include <stdlib.h>

// 以分鐘做爲時間間隔
void maketimeout(struct timespec *tsp, long minutes) {
    struct timeval now;
    gettimeofday(&now, NULL);
    tsp->tv_sec = now.tv_sec;
    tsp->tv_nsec = now.tv_usec * 1000;
    tsp->tv_sec += minutes * 60;
}
複製代碼

pthread_cond_waitpthread_cond_timedwait調用成功返回時,線程須要從新計算條件。pthread_cond_signal能喚醒至少一個睡眠線程,pthread_cond_broadcast能喚醒因此等待而睡眠的線程。

int pthread_cond_signal(pthread_cond_t *cnd);
int pthread_cond_broadcast(pthread_cond_t *cond);
複製代碼

必須在改變條件狀態以後再給線程發送信號

例子:使用條件變量和互斥量進行線程同步

#include "../include/apue.h"
#include <pthread.h>

#define WORKS_NUMS 10

struct msg {
  struct msg *m_next;
  char message[64];
};

struct msg *workq;

pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

void process_msg(char* name) {
  struct msg *mp;
  pthread_mutex_lock(&qlock);
  while (workq == NULL)
    pthread_cond_wait(&qready, &qlock);
    // 將鎖住的互斥量傳入,pthread_cond_wait會將線程放入等待隊列中,而後解鎖qlock,
    // 此時阻塞在pthread_cond_wait,當被pthread_cond_signal或pthread_cond_broadcast
    // 喚醒後此時qlock會鎖住,若是workq==NULL,就會再次等待
  mp = workq;
  workq = workq->m_next;
  pthread_mutex_unlock(&qlock);

  printf("[process-%s]:%s\n", name, mp->message);
}

void enqueue_msg(struct msg *mp) {
  pthread_mutex_lock(&qlock); // 條件workq是由互斥量mutex保護
  mp->m_next = workq;         // 修改條件這個操做須要保持一致
  workq = mp;
  pthread_mutex_unlock(&qlock);
  pthread_cond_signal(&qready); // 修改條件後才發送信號
}

void *worker(void *arg) {
  printf("worker %s create!\n", (char *)arg);
  process_msg((char*)arg);
}

void *sender(void *arg) {
  printf("sender create!\n");
  for (int i = 0; i < WORKS_NUMS; i++)
  {
    struct msg *m = (struct msg *)malloc(sizeof(struct msg));
    sprintf(m->message, "msg-%d", i);
    enqueue_msg(m);
    sleep(1); // 發送後等待1s,讓worker處於等待狀態
  }
  return 0;
}
int main(int argc, char const *argv[]) {
  pthread_t send;
  pthread_t works[WORKS_NUMS];
  pthread_create(&send, NULL, sender, NULL);
  // sleep(1);
  for(int i= 0; i < WORKS_NUMS; i++) {
    char *name = (char*)malloc(10);
    sprintf(name, "%d", i);
    pthread_create(&works[i], NULL, worker, (void *)name);
  }
  
  pthread_join(send, NULL);
  for(int i= 0; i < WORKS_NUMS; i++) {
    pthread_join(works[i], NULL);
  }
  return 0;
}
複製代碼

輸出:

sender create!
worker 0 create!
[worker-0]:msg-0
worker 3 create!
worker 2 create!
worker 4 create!
worker 1 create!
worker 5 create!
worker 6 create!
worker 7 create!
worker 8 create!
worker 9 create!
[worker-3]:msg-1
[worker-2]:msg-2
[worker-4]:msg-3
[worker-1]:msg-4
[worker-5]:msg-5
[worker-6]:msg-6
[worker-7]:msg-7
[worker-8]:msg-8
[worker-9]:msg-9
複製代碼

由於pthread_cond_wait是在while循環中,若是不知足條件會繼續進入循環。

自旋鎖

相關文章
相關標籤/搜索