C++性能優化(十三)——無鎖隊列c++
隊列是一種很是重要的數據結構,其特性是先進先出(FIFO),符合流水線業務流程。在進程間通訊、網絡通訊間常常採用隊列作緩存,緩解數據處理壓力。根據操做隊列的場景分爲:單生產者——單消費者、多生產者——單消費者、單生產者——多消費者、多生產者——多消費者四大模型。根據隊列中數據分爲:隊列中的數據是定長的、隊列中的數據是變長的。
(1)單生產者——單消費者
(2)多生產者——單消費者
(3)單生產者——多消費者
(4)多生產者——多消費者
(5)數據定長隊列
(6)數據變長隊列git
生產環境中普遍使用生產者和消費者模型,要求生產者在生產的同時,消費者能夠進行消費,一般使用互斥鎖保證數據同步。但線程互斥鎖的開銷仍然比較大,所以在要求高性能、低延時場景中,推薦使用無鎖隊列。程序員
CAS即Compare and Swap,是全部CPU指令都支持CAS的原子操做(X86中CMPXCHG彙編指令),用於實現實現各類無鎖(lock free)數據結構。
CAS操做的C語言實現以下:github
bool compare_and_swap ( int *memory_location, int expected_value, int new_value) { if (*memory_location == expected_value) { *memory_location = new_value; return true; } return false; }
CAS用於檢查一個內存位置是否包含預期值,若是包含,則把新值復賦值到內存位置。成功返回true,失敗返回false。
(1)GGC對CAS支持
GCC4.1+版本中支持CAS原子操做。編程
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...); type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
(2)Windows對CAS支持
Windows中使用Windows API支持CAS。數組
LONG InterlockedCompareExchange( LONG volatile *Destination, LONG ExChange, LONG Comperand );
(3)C++11對CAS支持
C++11 STL中atomic函數支持CAS並能夠跨平臺。緩存
template< class T > bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired ); template< class T > bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );
其它原子操做以下:
Fetch-And-Add:通常用來對變量作+1的原子操做
Test-and-set:寫值到某個內存位置並傳回其舊值安全
boost提供了三種無鎖方案,分別適用不一樣使用場景。
boost::lockfree::queue是支持多個生產者和多個消費者線程的無鎖隊列。
boost::lockfree::stack是支持多個生產者和多個消費者線程的無鎖棧。
boost::lockfree::spsc_queue是僅支持單個生產者和單個消費者線程的無鎖隊列,比boost::lockfree::queue性能更好。
Boost無鎖數據結構的API經過輕量級原子鎖實現lock-free,不是真正意義的無鎖。
Boost提供的queue能夠設置初始容量,添加新元素時若是容量不夠,則總容量自動增加;但對於無鎖數據結構,添加新元素時若是容量不夠,總容量不會自動增加。性能優化
ConcurrentQueue是基於C++實現的工業級無鎖隊列方案。GitHub:https://github.com/cameron314/concurrentqueue
ReaderWriterQueue是基於C++實現的單生產者單消費者場景的無鎖隊列方案。GitHub:https://github.com/cameron314/readerwriterqueue
網絡
Disruptor是英國外匯交易公司LMAX基於JAVA開發的一個高性能隊列。GitHub:https://github.com/LMAX-Exchange/disruptor
RingBuffer是生產者和消費者模型中經常使用的數據結構,生產者將數據追加到數組尾端,當達到數組的尾部時,生產者繞回到數組的頭部;消費者從數組頭端取走數據,當到達數組的尾部時,消費者繞回到數組頭部。
若是隻有一個生產者和一個消費者,環形緩衝區能夠無鎖訪問,環形緩衝區的寫入index只容許生產者訪問並修改,只要生產者在更新index前將新的值保存到緩衝區中,則消費者將始終看到一致的數據結構;讀取index也只容許消費者訪問並修改,消費者只要在取走數據後更新讀index,則生產者將始終看到一致的數據結構。
空隊列時,front與rear相等;當有元素進隊,則rear後移;有元素出隊,則front後移。
空隊列時,rear等於front;滿隊列時,隊列尾部空一個位置,所以判斷循環隊列滿時使用(rear-front+maxn)%maxn。
入隊操做:
data[rear] = x; rear = (rear+1)%maxn;
出隊操做:
x = data[front]; rear = (front+1)%maxn;
對於單生產者和單消費者場景,因爲read_index和write_index都只會有一個線程寫,所以不須要加鎖也不須要原子操做,直接修改便可,但讀寫數據時須要考慮遇到數組尾部的狀況。
線程對write_index和read_index的讀寫操做以下:
(1)寫操做。先判斷隊列時否爲滿,若是隊列未滿,則先寫數據,寫完數據後再修改write_index。
(2)讀操做。先判斷隊列是否爲空,若是隊列不爲空,則先讀數據,讀完再修改read_index。
多生產者和單消費者場景中,因爲多個生產者都會修改write_index,因此在不加鎖的狀況下必須使用原子操做。
RingBuffer.hpp文件:
#pragma once template <class T> class RingBuffer { public: RingBuffer(unsigned size): m_size(size), m_front(0), m_rear(0) { m_data = new T[size]; } ~RingBuffer() { delete [] m_data; m_data = NULL; } inline bool isEmpty() const { return m_front == m_rear; } inline bool isFull() const { return m_front == (m_rear + 1) % m_size; } bool push(const T& value) { if(isFull()) { return false; } m_data[m_rear] = value; m_rear = (m_rear + 1) % m_size; return true; } bool push(const T* value) { if(isFull()) { return false; } m_data[m_rear] = *value; m_rear = (m_rear + 1) % m_size; return true; } inline bool pop(T& value) { if(isEmpty()) { return false; } value = m_data[m_front]; m_front = (m_front + 1) % m_size; return true; } inline unsigned int front()const { return m_front; } inline unsigned int rear()const { return m_rear; } inline unsigned int size()const { return m_size; } private: unsigned int m_size;// 隊列長度 int m_front;// 隊列頭部索引 int m_rear;// 隊列尾部索引 T* m_data;// 數據緩衝區 };
RingBufferTest.cpp測試代碼:
#include <stdio.h> #include <thread> #include <unistd.h> #include <sys/time.h> #include "RingBuffer.hpp" class Test { public: Test(int id = 0, int value = 0) { this->id = id; this->value = value; sprintf(data, "id = %d, value = %d\n", this->id, this->value); } void display() { printf("%s", data); } private: int id; int value; char data[128]; }; double getdetlatimeofday(struct timeval *begin, struct timeval *end) { return (end->tv_sec + end->tv_usec * 1.0 / 1000000) - (begin->tv_sec + begin->tv_usec * 1.0 / 1000000); } RingBuffer<Test> queue(1 << 12);2u000 #define N (10 * (1 << 20)) void produce() { struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.push(Test(i % 1024, i))) { i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i); } void consume() { sleep(1); Test test; struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.pop(test)) { // test.display(); i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f, size=%u \n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i); } int main(int argc, char const *argv[]) { std::thread producer1(produce); std::thread consumer(consume); producer1.join(); consumer.join(); return 0; }
編譯:g++ --std=c++11 RingBufferTest.cpp -o test -pthread
單生產者單消費者場景下,消息吞吐量爲350萬條/秒左右。
LockFreeQueue.hpp:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <fcntl.h> #include <stdbool.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/time.h> #include <sys/mman.h> #define SHM_NAME_LEN 128 #define MIN(a, b) ((a) > (b) ? (b) : (a)) #define IS_POT(x) ((x) && !((x) & ((x)-1))) #define MEMORY_BARRIER __sync_synchronize() template <class T> class LockFreeQueue { protected: typedef struct { int m_lock; inline void spinlock_init() { m_lock = 0; } inline void spinlock_lock() { while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {} } inline void spinlock_unlock() { __sync_lock_release(&m_lock); } } spinlock_t; public: // size:隊列大小 // name:共享內存key的路徑名稱,默認爲NULL,使用數組做爲底層緩衝區。 LockFreeQueue(unsigned int size, const char* name = NULL) { memset(shm_name, 0, sizeof(shm_name)); createQueue(name, size); } ~LockFreeQueue() { if(shm_name[0] == 0) { delete [] m_buffer; m_buffer = NULL; } else { if (munmap(m_buffer, m_size * sizeof(T)) == -1) { perror("munmap"); } if (shm_unlink(shm_name) == -1) { perror("shm_unlink"); } } } bool isFull()const { #ifdef USE_POT return m_head == (m_tail + 1) & (m_size - 1); #else return m_head == (m_tail + 1) % m_size; #endif } bool isEmpty()const { return m_head == m_tail; } unsigned int front()const { return m_head; } unsigned int tail()const { return m_tail; } bool push(const T& value) { #ifdef USE_LOCK m_spinLock.spinlock_lock(); #endif if(isFull()) { #ifdef USE_LOCK m_spinLock.spinlock_unlock(); #endif return false; } memcpy(m_buffer + m_tail, &value, sizeof(T)); #ifdef USE_MB MEMORY_BARRIER; #endif #ifdef USE_POT m_tail = (m_tail + 1) & (m_size - 1); #else m_tail = (m_tail + 1) % m_size; #endif #ifdef USE_LOCK m_spinLock.spinlock_unlock(); #endif return true; } bool pop(T& value) { #ifdef USE_LOCK m_spinLock.spinlock_lock(); #endif if (isEmpty()) { #ifdef USE_LOCK m_spinLock.spinlock_unlock(); #endif return false; } memcpy(&value, m_buffer + m_head, sizeof(T)); #ifdef USE_MB MEMORY_BARRIER; #endif #ifdef USE_POT m_head = (m_head + 1) & (m_size - 1); #else m_head = (m_head + 1) % m_size; #endif #ifdef USE_LOCK m_spinLock.spinlock_unlock(); #endif return true; } protected: virtual void createQueue(const char* name, unsigned int size) { #ifdef USE_POT if (!IS_POT(size)) { size = roundup_pow_of_two(size); } #endif m_size = size; m_head = m_tail = 0; if(name == NULL) { m_buffer = new T[m_size]; } else { int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666); if (shm_fd < 0) { perror("shm_open"); } if (ftruncate(shm_fd, m_size * sizeof(T)) < 0) { perror("ftruncate"); close(shm_fd); } void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (addr == MAP_FAILED) { perror("mmap"); close(shm_fd); } if (close(shm_fd) == -1) { perror("close"); exit(1); } m_buffer = static_cast<T*>(addr); memcpy(shm_name, name, SHM_NAME_LEN - 1); } #ifdef USE_LOCK spinlock_init(m_lock); #endif } inline unsigned int roundup_pow_of_two(size_t size) { size |= size >> 1; size |= size >> 2; size |= size >> 4; size |= size >> 8; size |= size >> 16; size |= size >> 32; return size + 1; } protected: char shm_name[SHM_NAME_LEN]; volatile unsigned int m_head; volatile unsigned int m_tail; unsigned int m_size; #ifdef USE_LOCK spinlock_t m_spinLock; #endif T* m_buffer; };
#define USE_LOCK
開啓spinlock鎖,多生產者多消費者場景#define USE_MB
開啓Memory Barrier#define USE_POT
開啓隊列大小的2的冪對齊
LockFreeQueueTest.cpp測試文件:
#include "LockFreeQueue.hpp" #include <thread> //#define USE_LOCK class Test { public: Test(int id = 0, int value = 0) { this->id = id; this->value = value; sprintf(data, "id = %d, value = %d\n", this->id, this->value); } void display() { printf("%s", data); } private: int id; int value; char data[128]; }; double getdetlatimeofday(struct timeval *begin, struct timeval *end) { return (end->tv_sec + end->tv_usec * 1.0 / 1000000) - (begin->tv_sec + begin->tv_usec * 1.0 / 1000000); } LockFreeQueue<Test> queue(1 << 10, "/shm"); #define N ((1 << 20)) void produce() { struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.push(Test(i >> 10, i))) i++; } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i); } void consume() { Test test; struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.pop(test)) { //test.display(); i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i); } int main(int argc, char const *argv[]) { std::thread producer1(produce); //std::thread producer2(produce); std::thread consumer(consume); producer1.join(); //producer2.join(); consumer.join(); return 0; }
多線程場景下,須要定義USE_LOCK宏,開啓鎖保護。
編譯:g++ --std=c++11 -O3 LockFreeQueueTest.cpp -o test -lrt -pthread
kfifo是Linux內核的一個FIFO數據結構,採用環形循環隊列的數據結構來實現,提供一個無邊界的字節流服務,而且使用並行無鎖編程技術,即單生產者單消費者場景下兩個線程能夠併發操做,不須要任何加鎖行爲就能夠保證kfifo線程安全。
kfifo數據結構定義以下:
struct kfifo { unsigned char *buffer; unsigned int size; unsigned int in; unsigned int out; spinlock_t *lock; }; // 建立隊列 struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock) { struct kfifo *fifo; // 判斷是否爲2的冪 BUG_ON(!is_power_of_2(size)); fifo = kmalloc(sizeof(struct kfifo), gfp_mask); if (!fifo) return ERR_PTR(-ENOMEM); fifo->buffer = buffer; fifo->size = size; fifo->in = fifo->out = 0; fifo->lock = lock; return fifo; } // 分配空間 struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock) { unsigned char *buffer; struct kfifo *ret; // 判斷是否爲2的冪 if (!is_power_of_2(size)) { BUG_ON(size > 0x80000000); // 向上擴展成2的冪 size = roundup_pow_of_two(size); } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret; } void kfifo_free(struct kfifo *fifo) { kfree(fifo->buffer); kfree(fifo); } // 入隊操做 static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len) { unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_put(fifo, buffer, len); spin_unlock_irqrestore(fifo->lock, flags); return ret; } // 出隊操做 static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_get(fifo, buffer, len); //當fifo->in == fifo->out時,buufer爲空 if (fifo->in == fifo->out) fifo->in = fifo->out = 0; spin_unlock_irqrestore(fifo->lock, flags); return ret; } // 入隊操做 unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len) { unsigned int l; //buffer中空的長度 len = min(len, fifo->size - fifo->in + fifo->out); // 內存屏障:smp_mb(),smp_rmb(), smp_wmb()來保證對方觀察到的內存操做順序 smp_mb(); // 將數據追加到隊列尾部 l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); memcpy(fifo->buffer, buffer + l, len - l); smp_wmb(); //每次累加,到達最大值後溢出,自動轉爲0 fifo->in += len; return len; } // 出隊操做 unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned int l; //有數據的緩衝區的長度 len = min(len, fifo->in - fifo->out); smp_rmb(); l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); memcpy(buffer + l, fifo->buffer, len - l); smp_mb(); fifo->out += len; //每次累加,到達最大值後溢出,自動轉爲0 return len; } static inline void __kfifo_reset(struct kfifo *fifo) { fifo->in = fifo->out = 0; } static inline void kfifo_reset(struct kfifo *fifo) { unsigned long flags; spin_lock_irqsave(fifo->lock, flags); __kfifo_reset(fifo); spin_unlock_irqrestore(fifo->lock, flags); } static inline unsigned int __kfifo_len(struct kfifo *fifo) { return fifo->in - fifo->out; } static inline unsigned int kfifo_len(struct kfifo *fifo) { unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_len(fifo); spin_unlock_irqrestore(fifo->lock, flags); return ret; }
(1)保證buffer size爲2的冪kfifo->size
值在調用者傳遞參數size的基礎上向2的冪擴展,目的是使kfifo->size
取模運算能夠轉化爲位與運算(提升運行效率)。kfifo->in % kfifo->size
轉化爲 kfifo->in & (kfifo->size – 1)
保證size是2的冪能夠經過位運算的方式求餘,在頻繁操做隊列的狀況下能夠大大提升效率。
(2)使用spin_lock_irqsave與spin_unlock_irqrestore 實現同步。
Linux內核中有spin_lock、spin_lock_irq和spin_lock_irqsave保證同步。
static inline void __raw_spin_lock(raw_spinlock_t *lock) { preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); } static inline void __raw_spin_lock_irq(raw_spinlock_t *lock) { local_irq_disable(); preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); }
spin_lock比spin_lock_irq速度快,但並非線程安全的。spin_lock_irq增長調用local_irq_disable函數,即禁止本地中斷,是線程安全的,既禁止本地中斷,又禁止內核搶佔。
spin_lock_irqsave是基於spin_lock_irq實現的一個輔助接口,在進入和離開臨界區後,不會改變中斷的開啓、關閉狀態。
若是自旋鎖在中斷處理函數中被用到,在獲取自旋鎖前須要關閉本地中斷,spin_lock_irqsave實現以下:
A、保存本地中斷狀態;
B、關閉本地中斷;
C、獲取自旋鎖。
解鎖時經過 spin_unlock_irqrestore完成釋放鎖、恢復本地中斷到原來狀態等工做。
(3)線性代碼結構
代碼中沒有任何if-else分支來判斷是否有足夠的空間存放數據,kfifo每次入隊或出隊只是簡單的 +len 判斷剩餘空間,並無對kfifo->size 進行取模運算,因此kfifo->in和kfifo->out老是一直增大,直到unsigned in超過最大值時繞回到0這一塊兒始端,但始終知足:kfifo->in - kfifo->out <= kfifo->size
。(4)使用Memory Barriermb():適用於多處理器和單處理器的內存屏障。rmb():適用於多處理器和單處理器的讀內存屏障。wmb():適用於多處理器和單處理器的寫內存屏障。smp_mb():適用於多處理器的內存屏障。smp_rmb():適用於多處理器的讀內存屏障。smp_wmb():適用於多處理器的寫內存屏障。 Memory Barrier使用場景以下:A、實現同步原語(synchronization primitives)B、實現無鎖數據結構(lock-free data structures)C、驅動程序程序在運行時內存實際訪問順序和程序代碼編寫的訪問順序不必定一致,即內存亂序訪問。內存亂序訪問行爲出現是爲了提高程序運行時的性能。內存亂序訪問主要發生在兩個階段:A、編譯時,編譯器優化致使內存亂序訪問(指令重排)。B、運行時,多CPU間交互引發內存亂序訪問。Memory Barrier可以讓CPU或編譯器在內存訪問上有序。Memory barrier前的內存訪問操做一定先於其後的完成。Memory Barrier包括兩類:A、編譯器Memory Barrier。B、CPU Memory Barrier。一般,編譯器和CPU引發內存亂序訪問不會帶來問題,但若是程序邏輯的正確性依賴於內存訪問順序,內存亂序訪問會帶來邏輯上的錯誤。在編譯時,編譯器對代碼作出優化時可能改變實際執行指令的順序(如GCC的O2或O3都會改變實際執行指令的順序)。在運行時,CPU雖然會亂序執行指令,但在單個CPU上,硬件可以保證程序執行時全部的內存訪問操做都是按程序代碼編寫的順序執行的,Memory Barrier沒有必要使用(不考慮編譯器優化)。爲了更快執行指令,CPU採起流水線的執行方式,編譯器在編譯代碼時爲了使指令更適合CPU的流水線執行方式以及多CPU執行,本來指令就會出現亂序的狀況。在亂序執行時,CPU真正執行指令的順序由可用的輸入數據決定,而非程序員編寫的順序。