對於多進程多線程的應用程序來講,保證數據正確的同步與更新離不開鎖和信號,swoole
中的鎖與信號基本採用 pthread
系列函數實現。UNIX
中的鎖類型有不少種:互斥鎖、自旋鎖、文件鎖、讀寫鎖、原子鎖,本節就會講解 swoole
中各類鎖的定義與使用。laravel
APUE 學習筆記——線程與鎖swoole
swoole
中不管哪一種鎖,其數據結構都是 swLock
,這個數據結構內部有一個聯合體 object
,這個聯合體能夠是 互斥鎖、自旋鎖、文件鎖、讀寫鎖、原子鎖,type
能夠指代這個鎖的類型,具體可選項是 SW_LOCKS
這個枚舉類型lock_rd
和 trylock_rd
兩個函數是專門爲了 swFileLock
和 swRWLock
設計的,其餘鎖沒有這兩個函數。typedef struct _swLock { int type; union { swMutex mutex; #ifdef HAVE_RWLOCK swRWLock rwlock; #endif #ifdef HAVE_SPINLOCK swSpinLock spinlock; #endif swFileLock filelock; swSem sem; swAtomicLock atomlock; } object; int (*lock_rd)(struct _swLock *); int (*lock)(struct _swLock *); int (*unlock)(struct _swLock *); int (*trylock_rd)(struct _swLock *); int (*trylock)(struct _swLock *); int (*free)(struct _swLock *); } swLock; enum SW_LOCKS { SW_RWLOCK = 1, #define SW_RWLOCK SW_RWLOCK SW_FILELOCK = 2, #define SW_FILELOCK SW_FILELOCK SW_MUTEX = 3, #define SW_MUTEX SW_MUTEX SW_SEM = 4, #define SW_SEM SW_SEM SW_SPINLOCK = 5, #define SW_SPINLOCK SW_SPINLOCK SW_ATOMLOCK = 6, #define SW_ATOMLOCK SW_ATOMLOCK };
互斥鎖是最經常使用的進程/線程鎖,swMutex
的基礎是 pthread_mutex
系列函數, 所以該數據結構只有兩個成員變量:_lock
、attr
:多線程
typedef struct _swMutex { pthread_mutex_t _lock; pthread_mutexattr_t attr; } swMutex;
互斥鎖的建立就是 pthread_mutex
互斥鎖的初始化,首先初始化互斥鎖的屬性 pthread_mutexattr_t attr
,設定互斥鎖是否要進程共享,以後設置各個關於鎖的函數:函數
int swMutex_create(swLock *lock, int use_in_process) { int ret; bzero(lock, sizeof(swLock)); lock->type = SW_MUTEX; pthread_mutexattr_init(&lock->object.mutex.attr); if (use_in_process == 1) { pthread_mutexattr_setpshared(&lock->object.mutex.attr, PTHREAD_PROCESS_SHARED); } if ((ret = pthread_mutex_init(&lock->object.mutex._lock, &lock->object.mutex.attr)) < 0) { return SW_ERR; } lock->lock = swMutex_lock; lock->unlock = swMutex_unlock; lock->trylock = swMutex_trylock; lock->free = swMutex_free; return SW_OK; }
互斥鎖的函數就是調用相應的 pthread_mutex
系列函數:學習
static int swMutex_lock(swLock *lock) { return pthread_mutex_lock(&lock->object.mutex._lock); } static int swMutex_unlock(swLock *lock) { return pthread_mutex_unlock(&lock->object.mutex._lock); } static int swMutex_trylock(swLock *lock) { return pthread_mutex_trylock(&lock->object.mutex._lock); } static int swMutex_free(swLock *lock) { pthread_mutexattr_destroy(&lock->object.mutex.attr); return pthread_mutex_destroy(&lock->object.mutex._lock); } int swMutex_lockwait(swLock *lock, int timeout_msec) { struct timespec timeo; timeo.tv_sec = timeout_msec / 1000; timeo.tv_nsec = (timeout_msec - timeo.tv_sec * 1000) * 1000 * 1000; return pthread_mutex_timedlock(&lock->object.mutex._lock, &timeo); }
對於讀多寫少的狀況,讀寫鎖能夠顯著的提升程序效率,swRWLock
的基礎是 pthread_rwlock
系列函數:ui
typedef struct _swRWLock { pthread_rwlock_t _lock; pthread_rwlockattr_t attr; } swRWLock;
讀寫鎖的建立過程和互斥鎖相似:atom
int swRWLock_create(swLock *lock, int use_in_process) { int ret; bzero(lock, sizeof(swLock)); lock->type = SW_RWLOCK; pthread_rwlockattr_init(&lock->object.rwlock.attr); if (use_in_process == 1) { pthread_rwlockattr_setpshared(&lock->object.rwlock.attr, PTHREAD_PROCESS_SHARED); } if ((ret = pthread_rwlock_init(&lock->object.rwlock._lock, &lock->object.rwlock.attr)) < 0) { return SW_ERR; } lock->lock_rd = swRWLock_lock_rd; lock->lock = swRWLock_lock_rw; lock->unlock = swRWLock_unlock; lock->trylock = swRWLock_trylock_rw; lock->trylock_rd = swRWLock_trylock_rd; lock->free = swRWLock_free; return SW_OK; }
static int swRWLock_lock_rd(swLock *lock) { return pthread_rwlock_rdlock(&lock->object.rwlock._lock); } static int swRWLock_lock_rw(swLock *lock) { return pthread_rwlock_wrlock(&lock->object.rwlock._lock); } static int swRWLock_unlock(swLock *lock) { return pthread_rwlock_unlock(&lock->object.rwlock._lock); } static int swRWLock_trylock_rd(swLock *lock) { return pthread_rwlock_tryrdlock(&lock->object.rwlock._lock); } static int swRWLock_trylock_rw(swLock *lock) { return pthread_rwlock_trywrlock(&lock->object.rwlock._lock); } static int swRWLock_free(swLock *lock) { return pthread_rwlock_destroy(&lock->object.rwlock._lock); }
文件鎖是對多進程、多線程同一時間寫相同文件這一場景設定的鎖,底層函數是 fcntl
:線程
typedef struct _swFileLock { struct flock lock_t; int fd; } swFileLock;
int swFileLock_create(swLock *lock, int fd) { bzero(lock, sizeof(swLock)); lock->type = SW_FILELOCK; lock->object.filelock.fd = fd; lock->lock_rd = swFileLock_lock_rd; lock->lock = swFileLock_lock_rw; lock->trylock_rd = swFileLock_trylock_rd; lock->trylock = swFileLock_trylock_rw; lock->unlock = swFileLock_unlock; lock->free = swFileLock_free; return 0; }
static int swFileLock_lock_rd(swLock *lock) { lock->object.filelock.lock_t.l_type = F_RDLCK; return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock); } static int swFileLock_lock_rw(swLock *lock) { lock->object.filelock.lock_t.l_type = F_WRLCK; return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock); } static int swFileLock_unlock(swLock *lock) { lock->object.filelock.lock_t.l_type = F_UNLCK; return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock); } static int swFileLock_trylock_rw(swLock *lock) { lock->object.filelock.lock_t.l_type = F_WRLCK; return fcntl(lock->object.filelock.fd, F_SETLK, &lock->object.filelock); } static int swFileLock_trylock_rd(swLock *lock) { lock->object.filelock.lock_t.l_type = F_RDLCK; return fcntl(lock->object.filelock.fd, F_SETLK, &lock->object.filelock); } static int swFileLock_free(swLock *lock) { return close(lock->object.filelock.fd); }
自旋鎖相似於互斥鎖,不一樣的是自旋鎖在加鎖失敗的時候,並不會沉入內核,而是空轉,這樣的鎖效率更高,可是會空耗 CPU
資源:設計
typedef struct _swSpinLock { pthread_spinlock_t lock_t; } swSpinLock;
int swSpinLock_create(swLock *lock, int use_in_process) { int ret; bzero(lock, sizeof(swLock)); lock->type = SW_SPINLOCK; if ((ret = pthread_spin_init(&lock->object.spinlock.lock_t, use_in_process)) < 0) { return -1; } lock->lock = swSpinLock_lock; lock->unlock = swSpinLock_unlock; lock->trylock = swSpinLock_trylock; lock->free = swSpinLock_free; return 0; }
static int swSpinLock_lock(swLock *lock) { return pthread_spin_lock(&lock->object.spinlock.lock_t); } static int swSpinLock_unlock(swLock *lock) { return pthread_spin_unlock(&lock->object.spinlock.lock_t); } static int swSpinLock_trylock(swLock *lock) { return pthread_spin_trylock(&lock->object.spinlock.lock_t); } static int swSpinLock_free(swLock *lock) { return pthread_spin_destroy(&lock->object.spinlock.lock_t); }
不一樣於以上幾種鎖,swoole
的原子鎖並非 pthread
系列的鎖,而是自定義實現的。
typedef volatile uint32_t sw_atomic_uint32_t; typedef sw_atomic_uint32_t sw_atomic_t; typedef struct _swAtomicLock { sw_atomic_t lock_t; uint32_t spin; } swAtomicLock;
int swAtomicLock_create(swLock *lock, int spin) { bzero(lock, sizeof(swLock)); lock->type = SW_ATOMLOCK; lock->object.atomlock.spin = spin; lock->lock = swAtomicLock_lock; lock->unlock = swAtomicLock_unlock; lock->trylock = swAtomicLock_trylock; return SW_OK; }
static int swAtomicLock_lock(swLock *lock) { sw_spinlock(&lock->object.atomlock.lock_t); return SW_OK; }
原子鎖的加鎖邏輯函數 sw_spinlock
很是複雜,具體步驟以下:
sw_atomic_cmp_set
(__sync_bool_compare_and_swap
) 進行加鎖sched_yield
函數讓出執行權,由於這說明自旋鎖已經被其餘進程加鎖,可是卻被強佔睡眠,咱們須要讓出控制權讓那個惟一的 cpu
把那個進程跑下去,注意這時絕對不能進行自選,不然就是死鎖。sw_atomic_cpu_pause
使用的是內嵌的彙編代碼,目的在讓 cpu
空轉,禁止線程或進程被其餘線程強佔致使睡眠,恢復上下文浪費時間。SW_SPINLOCK_LOOP_N
次數,尚未可以獲取的到鎖,那麼也要讓出控制權,這時頗有可能被鎖保護的代碼有阻塞行爲#define sw_atomic_cmp_set(lock, old, set) __sync_bool_compare_and_swap(lock, old, set) #define sw_atomic_cpu_pause() __asm__ __volatile__ ("pause") #define swYield() sched_yield() //or usleep(1) static sw_inline void sw_spinlock(sw_atomic_t *lock) { uint32_t i, n; while (1) { if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1)) { return; } if (SW_CPU_NUM > 1) { for (n = 1; n < SW_SPINLOCK_LOOP_N; n <<= 1) { for (i = 0; i < n; i++) { sw_atomic_cpu_pause(); } if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1)) { return; } } } swYield(); } }
static int swAtomicLock_unlock(swLock *lock) { return lock->object.atomlock.lock_t = 0; } static int swAtomicLock_trylock(swLock *lock) { sw_atomic_t *atomic = &lock->object.atomlock.lock_t; return (*(atomic) == 0 && sw_atomic_cmp_set(atomic, 0, 1)); }
信號量也是數據同步的一種重要方式,其數據結構爲:
typedef struct _swSem { key_t key; int semid; } swSem;
semget
建立一個新的信號量semctl
會將信號量初始化爲 0int swSem_create(swLock *lock, key_t key) { int ret; lock->type = SW_SEM; if ((ret = semget(key, 1, IPC_CREAT | 0666)) < 0) { return SW_ERR; } if (semctl(ret, 0, SETVAL, 1) == -1) { swWarn("semctl(SETVAL) failed"); return SW_ERR; } lock->object.sem.semid = ret; lock->lock = swSem_lock; lock->unlock = swSem_unlock; lock->free = swSem_free; return SW_OK; }
static int swSem_unlock(swLock *lock) { struct sembuf sem; sem.sem_flg = SEM_UNDO; sem.sem_num = 0; sem.sem_op = 1; return semop(lock->object.sem.semid, &sem, 1); }
static int swSem_lock(swLock *lock) { struct sembuf sem; sem.sem_flg = SEM_UNDO; sem.sem_num = 0; sem.sem_op = -1; return semop(lock->object.sem.semid, &sem, 1); }
IPC_RMID
用於銷燬信號量static int swSem_free(swLock *lock) { return semctl(lock->object.sem.semid, 0, IPC_RMID); }
swLock
的一員,而是自成一體pthread_cond_t
,還須要互斥量 swLock
typedef struct _swCond { swLock _lock; pthread_cond_t _cond; int (*wait)(struct _swCond *object); int (*timewait)(struct _swCond *object, long, long); int (*notify)(struct _swCond *object); int (*broadcast)(struct _swCond *object); void (*free)(struct _swCond *object); int (*lock)(struct _swCond *object); int (*unlock)(struct _swCond *object); } swCond;
int swCond_create(swCond *cond) { if (pthread_cond_init(&cond->_cond, NULL) < 0) { swWarn("pthread_cond_init fail. Error: %s [%d]", strerror(errno), errno); return SW_ERR; } if (swMutex_create(&cond->_lock, 0) < 0) { return SW_ERR; } cond->notify = swCond_notify; cond->broadcast = swCond_broadcast; cond->timewait = swCond_timewait; cond->wait = swCond_wait; cond->lock = swCond_lock; cond->unlock = swCond_unlock; cond->free = swCond_free; return SW_OK; }
swCond_lock
、swCond_unlock
等函數static int swCond_notify(swCond *cond) { return pthread_cond_signal(&cond->_cond); } static int swCond_broadcast(swCond *cond) { return pthread_cond_broadcast(&cond->_cond); } static int swCond_timewait(swCond *cond, long sec, long nsec) { struct timespec timeo; timeo.tv_sec = sec; timeo.tv_nsec = nsec; return pthread_cond_timedwait(&cond->_cond, &cond->_lock.object.mutex._lock, &timeo); } static int swCond_wait(swCond *cond) { return pthread_cond_wait(&cond->_cond, &cond->_lock.object.mutex._lock); } static int swCond_lock(swCond *cond) { return cond->_lock.lock(&cond->_lock); } static int swCond_unlock(swCond *cond) { return cond->_lock.unlock(&cond->_lock); } static void swCond_free(swCond *cond) { pthread_cond_destroy(&cond->_cond); cond->_lock.free(&cond->_lock); }