Swoole 源碼分析——鎖與信號量模塊

前言

對於多進程多線程的應用程序來講,保證數據正確的同步與更新離不開鎖和信號,swoole 中的鎖與信號基本採用 pthread 系列函數實現。UNIX 中的鎖類型有不少種:互斥鎖、自旋鎖、文件鎖、讀寫鎖、原子鎖,本節就會講解 swoole 中各類鎖的定義與使用。laravel

APUE 學習筆記——線程與鎖swoole

APUE 學習筆記——高級 IO與文件鎖數據結構

數據結構

  • swoole 中不管哪一種鎖,其數據結構都是 swLock,這個數據結構內部有一個聯合體 object,這個聯合體能夠是 互斥鎖、自旋鎖、文件鎖、讀寫鎖、原子鎖,type 能夠指代這個鎖的類型,具體可選項是 SW_LOCKS 這個枚舉類型
  • 該結構體還定義了幾個函數指針,這幾個函數相似於各個鎖須要實現的接口,值得注意的是 lock_rdtrylock_rd兩個函數是專門爲了 swFileLockswRWLock 設計的,其餘鎖沒有這兩個函數。
typedef struct _swLock
{
    int type;
    union
    {
        swMutex mutex;
#ifdef HAVE_RWLOCK
        swRWLock rwlock;
#endif
#ifdef HAVE_SPINLOCK
        swSpinLock spinlock;
#endif
        swFileLock filelock;
        swSem sem;
        swAtomicLock atomlock;
    } object;

    int (*lock_rd)(struct _swLock *);
    int (*lock)(struct _swLock *);
    int (*unlock)(struct _swLock *);
    int (*trylock_rd)(struct _swLock *);
    int (*trylock)(struct _swLock *);
    int (*free)(struct _swLock *);
} swLock;

enum SW_LOCKS
{
    SW_RWLOCK = 1,
#define SW_RWLOCK SW_RWLOCK
    SW_FILELOCK = 2,
#define SW_FILELOCK SW_FILELOCK
    SW_MUTEX = 3,
#define SW_MUTEX SW_MUTEX
    SW_SEM = 4,
#define SW_SEM SW_SEM
    SW_SPINLOCK = 5,
#define SW_SPINLOCK SW_SPINLOCK
    SW_ATOMLOCK = 6,
#define SW_ATOMLOCK SW_ATOMLOCK
};

互斥鎖

互斥鎖是最經常使用的進程/線程鎖,swMutex 的基礎是 pthread_mutex 系列函數, 所以該數據結構只有兩個成員變量:_lockattr多線程

typedef struct _swMutex
{
    pthread_mutex_t _lock;
    pthread_mutexattr_t attr;
} swMutex;

互斥鎖的建立

互斥鎖的建立就是 pthread_mutex 互斥鎖的初始化,首先初始化互斥鎖的屬性 pthread_mutexattr_t attr,設定互斥鎖是否要進程共享,以後設置各個關於鎖的函數:函數

int swMutex_create(swLock *lock, int use_in_process)
{
    int ret;
    bzero(lock, sizeof(swLock));
    lock->type = SW_MUTEX;
    pthread_mutexattr_init(&lock->object.mutex.attr);
    if (use_in_process == 1)
    {
        pthread_mutexattr_setpshared(&lock->object.mutex.attr, PTHREAD_PROCESS_SHARED);
    }
    if ((ret = pthread_mutex_init(&lock->object.mutex._lock, &lock->object.mutex.attr)) < 0)
    {
        return SW_ERR;
    }
    lock->lock = swMutex_lock;
    lock->unlock = swMutex_unlock;
    lock->trylock = swMutex_trylock;
    lock->free = swMutex_free;
    return SW_OK;
}

互斥鎖函數

互斥鎖的函數就是調用相應的 pthread_mutex 系列函數:學習

static int swMutex_lock(swLock *lock)
{
    return pthread_mutex_lock(&lock->object.mutex._lock);
}

static int swMutex_unlock(swLock *lock)
{
    return pthread_mutex_unlock(&lock->object.mutex._lock);
}

static int swMutex_trylock(swLock *lock)
{
    return pthread_mutex_trylock(&lock->object.mutex._lock);
}

static int swMutex_free(swLock *lock)
{
    pthread_mutexattr_destroy(&lock->object.mutex.attr);
    return pthread_mutex_destroy(&lock->object.mutex._lock);
}

int swMutex_lockwait(swLock *lock, int timeout_msec)
{
    struct timespec timeo;
    timeo.tv_sec = timeout_msec / 1000;
    timeo.tv_nsec = (timeout_msec - timeo.tv_sec * 1000) * 1000 * 1000;
    return pthread_mutex_timedlock(&lock->object.mutex._lock, &timeo);
}

讀寫鎖

對於讀多寫少的狀況,讀寫鎖能夠顯著的提升程序效率,swRWLock 的基礎是 pthread_rwlock 系列函數:ui

typedef struct _swRWLock
{
    pthread_rwlock_t _lock;
    pthread_rwlockattr_t attr;

} swRWLock;

讀寫鎖的建立

讀寫鎖的建立過程和互斥鎖相似:atom

int swRWLock_create(swLock *lock, int use_in_process)
{
    int ret;
    bzero(lock, sizeof(swLock));
    lock->type = SW_RWLOCK;
    pthread_rwlockattr_init(&lock->object.rwlock.attr);
    if (use_in_process == 1)
    {
        pthread_rwlockattr_setpshared(&lock->object.rwlock.attr, PTHREAD_PROCESS_SHARED);
    }
    if ((ret = pthread_rwlock_init(&lock->object.rwlock._lock, &lock->object.rwlock.attr)) < 0)
    {
        return SW_ERR;
    }
    lock->lock_rd = swRWLock_lock_rd;
    lock->lock = swRWLock_lock_rw;
    lock->unlock = swRWLock_unlock;
    lock->trylock = swRWLock_trylock_rw;
    lock->trylock_rd = swRWLock_trylock_rd;
    lock->free = swRWLock_free;
    return SW_OK;
}

讀寫鎖函數

static int swRWLock_lock_rd(swLock *lock)
{
    return pthread_rwlock_rdlock(&lock->object.rwlock._lock);
}

static int swRWLock_lock_rw(swLock *lock)
{
    return pthread_rwlock_wrlock(&lock->object.rwlock._lock);
}

static int swRWLock_unlock(swLock *lock)
{
    return pthread_rwlock_unlock(&lock->object.rwlock._lock);
}

static int swRWLock_trylock_rd(swLock *lock)
{
    return pthread_rwlock_tryrdlock(&lock->object.rwlock._lock);
}

static int swRWLock_trylock_rw(swLock *lock)
{
    return pthread_rwlock_trywrlock(&lock->object.rwlock._lock);
}

static int swRWLock_free(swLock *lock)
{
    return pthread_rwlock_destroy(&lock->object.rwlock._lock);
}

文件鎖

文件鎖是對多進程、多線程同一時間寫相同文件這一場景設定的鎖,底層函數是 fcntl線程

typedef struct _swFileLock
{
    struct flock lock_t;
    int fd;
} swFileLock;

文件鎖的建立

int swFileLock_create(swLock *lock, int fd)
{
    bzero(lock, sizeof(swLock));
    lock->type = SW_FILELOCK;
    lock->object.filelock.fd = fd;
    lock->lock_rd = swFileLock_lock_rd;
    lock->lock = swFileLock_lock_rw;
    lock->trylock_rd = swFileLock_trylock_rd;
    lock->trylock = swFileLock_trylock_rw;
    lock->unlock = swFileLock_unlock;
    lock->free = swFileLock_free;
    return 0;
}

文件鎖函數

static int swFileLock_lock_rd(swLock *lock)
{
    lock->object.filelock.lock_t.l_type = F_RDLCK;
    return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock);
}

static int swFileLock_lock_rw(swLock *lock)
{
    lock->object.filelock.lock_t.l_type = F_WRLCK;
    return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock);
}

static int swFileLock_unlock(swLock *lock)
{
    lock->object.filelock.lock_t.l_type = F_UNLCK;
    return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock);
}

static int swFileLock_trylock_rw(swLock *lock)
{
    lock->object.filelock.lock_t.l_type = F_WRLCK;
    return fcntl(lock->object.filelock.fd, F_SETLK, &lock->object.filelock);
}

static int swFileLock_trylock_rd(swLock *lock)
{
    lock->object.filelock.lock_t.l_type = F_RDLCK;
    return fcntl(lock->object.filelock.fd, F_SETLK, &lock->object.filelock);
}

static int swFileLock_free(swLock *lock)
{
    return close(lock->object.filelock.fd);
}

自旋鎖

自旋鎖相似於互斥鎖,不一樣的是自旋鎖在加鎖失敗的時候,並不會沉入內核,而是空轉,這樣的鎖效率更高,可是會空耗 CPU
資源:設計

typedef struct _swSpinLock
{
    pthread_spinlock_t lock_t;
} swSpinLock;

自旋鎖的建立

int swSpinLock_create(swLock *lock, int use_in_process)
{
    int ret;
    bzero(lock, sizeof(swLock));
    lock->type = SW_SPINLOCK;
    if ((ret = pthread_spin_init(&lock->object.spinlock.lock_t, use_in_process)) < 0)
    {
        return -1;
    }
    lock->lock = swSpinLock_lock;
    lock->unlock = swSpinLock_unlock;
    lock->trylock = swSpinLock_trylock;
    lock->free = swSpinLock_free;
    return 0;
}

自旋鎖函數

static int swSpinLock_lock(swLock *lock)
{
    return pthread_spin_lock(&lock->object.spinlock.lock_t);
}

static int swSpinLock_unlock(swLock *lock)
{
    return pthread_spin_unlock(&lock->object.spinlock.lock_t);
}

static int swSpinLock_trylock(swLock *lock)
{
    return pthread_spin_trylock(&lock->object.spinlock.lock_t);
}

static int swSpinLock_free(swLock *lock)
{
    return pthread_spin_destroy(&lock->object.spinlock.lock_t);
}

原子鎖

不一樣於以上幾種鎖,swoole 的原子鎖並非 pthread 系列的鎖,而是自定義實現的。

typedef volatile uint32_t                 sw_atomic_uint32_t;
typedef sw_atomic_uint32_t                sw_atomic_t;

typedef struct _swAtomicLock
{
    sw_atomic_t lock_t;
    uint32_t spin;
} swAtomicLock;

原子鎖的建立

int swAtomicLock_create(swLock *lock, int spin)
{
    bzero(lock, sizeof(swLock));
    lock->type = SW_ATOMLOCK;
    lock->object.atomlock.spin = spin;
    lock->lock = swAtomicLock_lock;
    lock->unlock = swAtomicLock_unlock;
    lock->trylock = swAtomicLock_trylock;
    return SW_OK;
}

原子鎖的加鎖

static int swAtomicLock_lock(swLock *lock)
{
    sw_spinlock(&lock->object.atomlock.lock_t);
    return SW_OK;
}

原子鎖的加鎖邏輯函數 sw_spinlock 很是複雜,具體步驟以下:

  • 若是原子鎖沒有被鎖,那麼調用原子函數 sw_atomic_cmp_set(__sync_bool_compare_and_swap ) 進行加鎖
  • 若原子鎖已經被加鎖,若是是單核,那麼就調用 sched_yield 函數讓出執行權,由於這說明自旋鎖已經被其餘進程加鎖,可是卻被強佔睡眠,咱們須要讓出控制權讓那個惟一的 cpu 把那個進程跑下去,注意這時絕對不能進行自選,不然就是死鎖。
  • 若是是多核,就要不斷空轉的嘗試加鎖,防止睡眠,加鎖的嘗試間隔時間會指數增長,例如第一次 1 個時鐘週期,第二次 2 時鐘週期,第三次 4 時鐘週期...
  • 間隔時間內執行的函數 sw_atomic_cpu_pause 使用的是內嵌的彙編代碼,目的在讓 cpu 空轉,禁止線程或進程被其餘線程強佔致使睡眠,恢復上下文浪費時間。
  • 若是超過了 SW_SPINLOCK_LOOP_N 次數,尚未可以獲取的到鎖,那麼也要讓出控制權,這時頗有可能被鎖保護的代碼有阻塞行爲
#define sw_atomic_cmp_set(lock, old, set) __sync_bool_compare_and_swap(lock, old, set)
#define sw_atomic_cpu_pause()             __asm__ __volatile__ ("pause")
#define swYield()              sched_yield() //or usleep(1)

static sw_inline void sw_spinlock(sw_atomic_t *lock)
{
    uint32_t i, n;
    while (1)
    {
        if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))
        {
            return;
        }
        if (SW_CPU_NUM > 1)
        {
            for (n = 1; n < SW_SPINLOCK_LOOP_N; n <<= 1)
            {
                for (i = 0; i < n; i++)
                {
                    sw_atomic_cpu_pause();
                }

                if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))
                {
                    return;
                }
            }
        }
        swYield();
    }
}

原子鎖的函數

static int swAtomicLock_unlock(swLock *lock)
{
    return lock->object.atomlock.lock_t = 0;
}

static int swAtomicLock_trylock(swLock *lock)
{
    sw_atomic_t *atomic = &lock->object.atomlock.lock_t;
    return (*(atomic) == 0 && sw_atomic_cmp_set(atomic, 0, 1));
}

信號量

信號量也是數據同步的一種重要方式,其數據結構爲:

typedef struct _swSem
{
    key_t key;
    int semid;
} swSem;

信號量的建立

  • 信號量的初始化首先須要調用 semget 建立一個新的信號量
  • semctl 會將信號量初始化爲 0
int swSem_create(swLock *lock, key_t key)
{
    int ret;
    lock->type = SW_SEM;
    if ((ret = semget(key, 1, IPC_CREAT | 0666)) < 0)
    {
        return SW_ERR;
    }

    if (semctl(ret, 0, SETVAL, 1) == -1)
    {
        swWarn("semctl(SETVAL) failed");
        return SW_ERR;
    }
    lock->object.sem.semid = ret;

    lock->lock = swSem_lock;
    lock->unlock = swSem_unlock;
    lock->free = swSem_free;

    return SW_OK;
}

信號量的 V 操做

static int swSem_unlock(swLock *lock)
{
    struct sembuf sem;
    sem.sem_flg = SEM_UNDO;
    sem.sem_num = 0;
    sem.sem_op = 1;
    return semop(lock->object.sem.semid, &sem, 1);
}

信號量的 P 操做

static int swSem_lock(swLock *lock)
{
    struct sembuf sem;
    sem.sem_flg = SEM_UNDO;
    sem.sem_num = 0;
    sem.sem_op = -1;
    return semop(lock->object.sem.semid, &sem, 1);
}

信號量的銷燬

  • IPC_RMID 用於銷燬信號量
static int swSem_free(swLock *lock)
{
    return semctl(lock->object.sem.semid, 0, IPC_RMID);
}

條件變量

  • 條件變量並無做爲 swLock 的一員,而是自成一體
  • 條件變量不只須要 pthread_cond_t,還須要互斥量 swLock
typedef struct _swCond
{
    swLock _lock;
    pthread_cond_t _cond;

    int (*wait)(struct _swCond *object);
    int (*timewait)(struct _swCond *object, long, long);
    int (*notify)(struct _swCond *object);
    int (*broadcast)(struct _swCond *object);
    void (*free)(struct _swCond *object);
    int (*lock)(struct _swCond *object);
    int (*unlock)(struct _swCond *object);
} swCond;

條件變量的建立

int swCond_create(swCond *cond)
{
    if (pthread_cond_init(&cond->_cond, NULL) < 0)
    {
        swWarn("pthread_cond_init fail. Error: %s [%d]", strerror(errno), errno);
        return SW_ERR;
    }
    if (swMutex_create(&cond->_lock, 0) < 0)
    {
        return SW_ERR;
    }

    cond->notify = swCond_notify;
    cond->broadcast = swCond_broadcast;
    cond->timewait = swCond_timewait;
    cond->wait = swCond_wait;
    cond->lock = swCond_lock;
    cond->unlock = swCond_unlock;
    cond->free = swCond_free;

    return SW_OK;
}

條件變量的函數

  • 值得注意的是,條件變量的函數使用必定要結合 swCond_lockswCond_unlock 等函數
static int swCond_notify(swCond *cond)
{
    return pthread_cond_signal(&cond->_cond);
}

static int swCond_broadcast(swCond *cond)
{
    return pthread_cond_broadcast(&cond->_cond);
}

static int swCond_timewait(swCond *cond, long sec, long nsec)
{
    struct timespec timeo;

    timeo.tv_sec = sec;
    timeo.tv_nsec = nsec;

    return pthread_cond_timedwait(&cond->_cond, &cond->_lock.object.mutex._lock, &timeo);
}

static int swCond_wait(swCond *cond)
{
    return pthread_cond_wait(&cond->_cond, &cond->_lock.object.mutex._lock);
}

static int swCond_lock(swCond *cond)
{
    return cond->_lock.lock(&cond->_lock);
}

static int swCond_unlock(swCond *cond)
{
    return cond->_lock.unlock(&cond->_lock);
}

static void swCond_free(swCond *cond)
{
    pthread_cond_destroy(&cond->_cond);
    cond->_lock.free(&cond->_lock);
}
相關文章
相關標籤/搜索