swoole_table 實現原理剖析

Swoole項目從 2012 年推出到如今已經有 5 年的歷史,如今愈來愈多的互聯網企業使用Swoole來開發各種後臺應用。受限於 PHP 的ZendVM實現,PHP 程序沒法使用多線程進行編程開發。應用程序中實現並行處理只能使用多進程模式。php

作過多進程開發的 PHPer 都知道進程的內存隔離性。在程序中聲明的global全局數組,實際上並非數據共享的,在一個進程內修改數組的值,在另一個進程中是無效的。編程

$array = array();

function process1() {
    global $array;
    $array['test'] = 'hello world';
}

function process2() {
    global $array;
    //這裏讀取不到test的值
    var_dump($array['test']);
}

這個進程隔離性給程序的開發帶來的不少煩惱。好比實現一個聊天室程序,用戶A在進程1中處理,用戶B在進程2中處理,AB若是在同一個group,這個group在多線程環境中直接用set表示,AB加到對應groupset中便可。但多進程環境中,用 PHP 的array沒法實現。通常能夠有2個思路解決問題:數組

  • 進程間通訊,能夠使用管道,向另一個進程發送請求,獲取數據的值
  • 藉助存儲實現,如RedisMySQL文件

這2個方案雖然能夠實現,但都存在明顯的缺點。方案一實現較爲複雜,開發困難。方案二實現簡單,但存在額外的IO消耗,不是純內存操做,有性能瓶頸。基於/dev/shm實現內存文件讀寫的方案,是一個不錯的方案,但須要注意鎖的操做,讀寫時須要額外的系統調用開銷。服務器

想要解決這個問題,必須實現一個基於共享內存的數據結構。在 PHP 中也有一些擴展模塊能夠使用。如APCuYacshm_put_var/shm_get_varswoole

  • Yac:性能高,但因爲底層實現的限制,沒法保證一致性。只能做爲Cache來使用
  • APCu:支持Key-Value式數據的讀寫,缺點是實現簡單粗暴,鎖的粒度太粗。高併發時存在大量鎖的爭搶,性能較差
  • shm 系列函數:這個方案雖然能實現共享內存操做,但實際上底層實現很是簡陋。一方面底層根本沒有加鎖,若是你要在併發環境中使用,須要自行實現鎖的操做。另外,底層其實是一個鏈表結構,數據較多時,查詢性能很是差

swoole_table 介紹

爲了解決多進程程序中數據共享的難題,Swoole擴展提供了swoole_table數據結構。Table的實現很是精巧,使用最方便,同時性能也是最好的。數據結構

$table = new swoole_table(1024);
$table->column('id', swoole_table::TYPE_INT, 4);
$table->column('name', swoole_table::TYPE_STRING, 64);
$table->column('num', swoole_table::TYPE_FLOAT);
$table->create();

$table->set('tianfenghan@qq.com', array('id' => 145, 'name' => 'rango', 'num' => 3.1415));
$table->set('350749960@qq.com', array('id' => 358, 'name' => "Rango1234", 'num' => 3.1415));
$table->set('hello@qq.com', array('id' => 189, 'name' => 'rango3', 'num' => 3.1415));

$ret1 = $table->get('350749960@qq.com');
$ret2 = $table->get('tianfenghan@qq.com');

$table->del('350749960@qq.com');

Table實現了一個二維Map結構,有點像 PHP 的二維數組,簡單易用。在最新的1.9.19中還能夠使用ArrayAccess接口以array的方式操做Table多線程

$table = new swoole_table(1024);
$table->column('id', swoole_table::TYPE_INT);
$table->column('name', swoole_table::TYPE_STRING, 64);
$table->column('num', swoole_table::TYPE_FLOAT);
$table->create();

$table['apple'] = array('id' => 145, 'name' => 'iPhone', 'num' => 3.1415);
$table['google'] = array('id' => 358, 'name' => "AlphaGo", 'num' => 3.1415);

$table['microsoft']['name'] = "Windows";
$table['microsoft']['num'] = '1997.03';

var_dump($table['apple']);
var_dump($table['microsoft']);

$table['google']['num'] = 500.90;
var_dump($table['google']);

Table的優點

  • 性能極高,所有是純內存操做,沒有任何系統調用和IO的開銷。在酷睿I5機器上測試,Table單進程單線程每秒可完成寫操做300萬次,讀操做每秒可完成150萬次。在24核服務器上,理論上每秒可實現數千萬次讀寫操做。
  • 使用數據行鎖,底層使用了數據行鎖自旋鎖。多進程併發執行時,讀寫不一樣的key不存在鎖的爭搶問題。只有同一CPU時間讀寫同一個Key才須要進行加鎖操做。並且Table自己鎖的粒度很是小,getset操做內部只有少許內存讀寫的指令,能夠在數百納秒內完成操做。

Table的侷限性

  • Key最大長度不得超過64字節
  • 必須在建立前規劃好容量,一旦寫滿後,再set新的數據會出現內存分配致使失敗,沒法實現動態擴容

所以使用Table時儘量地設置較大的內存尺寸,這樣雖然會帶來必定的內存浪費,但實際上現代服務器內存很是廉價,這個侷限性在實際項目中的問題並不大。併發

swoole_table 實現原理

Table底層基於共享內存實現,所佔內存取決於表格的尺寸size、衝突率(默認20%)、column的設置(如上面的示例中每行須要8 + 64 + 8字節)、64字節KEY的存儲空間、管理結構的內存消耗。app

Table 的內存申請

size_t row_num = table->size * (1 + table->conflict_proportion);
size_t row_memory_size = sizeof(swTableRow) + table->item_size;
size_t memory_size = row_num * row_memory_size;

memory_size += sizeof(swMemoryPool) + sizeof(swFixedPool) + ((row_num - table->size) * sizeof(swFixedPool_slice));

memory_size += table->size * sizeof(swTableRow *);
void *memory = sw_shm_malloc(memory_size);

swoole_table自己是一個HashTable結構,Key會計算爲hash值,來散列到每一行。HashTable結構會遇到Hash衝突問題,兩個徹底不一樣的Key可能計算的hash值是同一個,這時須要使用鏈表來解決Hash衝突Swoole底層會建立一個浮動的內存池swFixedPool結構來管理這些衝突Key的內存。默認會建立size * 20%數量的浮動內存池。在1.9.19中能夠自行定義衝突率。函數

$table = new swoole_table(65536, 0.9);

假如你的場景中Hash衝突較多,能夠調高衝突率,以申請一塊較大的浮動內存池。

static swTableRow* swTable_hash(swTable *table, char *key, int keylen)
{
#ifdef SW_TABLE_USE_PHP_HASH
    uint64_t hashv = swoole_hash_php(key, keylen);
#else
    uint64_t hashv = swoole_hash_austin(key, keylen);
#endif
    uint64_t index = hashv & table->mask;
    assert(index < table->size);
    return table->rows[index];
}

swTableRow* swTableRow_set(swTable *table, char *key, int keylen, swTableRow **rowlock)
{
    if (keylen > SW_TABLE_KEY_SIZE)
    {
        keylen = SW_TABLE_KEY_SIZE;
    }

    swTableRow *row = swTable_hash(table, key, keylen);
    *rowlock = row;
    swTableRow_lock(row);

#ifdef SW_TABLE_DEBUG
    int _conflict_level = 0;
#endif

    if (row->active)
    {
        for (;;)
        {
            if (strncmp(row->key, key, keylen) == 0)
            {
                break;
            }
            else if (row->next == NULL)
            {
                table->lock.lock(&table->lock);
                swTableRow *new_row = table->pool->alloc(table->pool, 0);

#ifdef SW_TABLE_DEBUG
                conflict_count ++;
                if (_conflict_level > conflict_max_level)
                {
                    conflict_max_level = _conflict_level;
                }

#endif
                table->lock.unlock(&table->lock);

                if (!new_row)
                {
                    return NULL;
                }
                //add row_num
                bzero(new_row, sizeof(swTableRow));
                sw_atomic_fetch_add(&(table->row_num), 1);
                row->next = new_row;
                row = new_row;
                break;
            }
            else
            {
                row = row->next;
#ifdef SW_TABLE_DEBUG
                _conflict_level++;
#endif
            }
        }
    }
    else
    {
#ifdef SW_TABLE_DEBUG
        insert_count ++;
#endif
        sw_atomic_fetch_add(&(table->row_num), 1);
    }

    memcpy(row->key, key, keylen);
    row->active = 1;
    return row;
}
  • 使用swTable_hash計算hash值,散列到對應的行
  • Key發生衝突時,須要調用table->pool->alloc從浮動內存池中分配內存
  • 浮動內存池內存不足時,alloc失敗,這時沒法寫入數據到Table

數據自旋鎖

當同一CPU時間,多個進程同時讀取某一行時,須要鎖的爭搶。

swTableRow_lock(row);
//內存操做
swTableRow_unlock(_rowlock);

swTableRow_lock 自己是一個自選鎖,這裏使用了gcc編譯器提供的__sync_bool_compare_and_swap函數進行CPU原子操做。多個進程同時讀寫某一行數據時,先獲得鎖的進程會執行內存讀寫操做,未獲得鎖的進程會進行CPU自旋等待進程釋放鎖。

static sw_inline void sw_spinlock(sw_atomic_t *lock)
{
    uint32_t i, n;
    while (1)
    {
        if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))
        {
            return;
        }
        if (SW_CPU_NUM > 1)
        {
            for (n = 1; n < SW_SPINLOCK_LOOP_N; n <<= 1)
            {
                for (i = 0; i < n; i++)
                {
                    sw_atomic_cpu_pause();
                }

                if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))
                {
                    return;
                }
            }
        }
        swYield();
    }
}

返回結果

使用table::get方法時,從Table共享內存中,讀取數據寫入到PHP本地內存數組中。底層會根據列信息table->columns,計算內存指針的偏移量,獲得對應字段的值。

static inline void php_swoole_table_row2array(swTable *table, swTableRow *row, zval *return_value)
{
    array_init(return_value);

    swTableColumn *col = NULL;
    swTable_string_length_t vlen = 0;
    double dval = 0;
    int64_t lval = 0;
    char *k;

    while(1)
    {
        col = swHashMap_each(table->columns, &k);
        if (col == NULL)
        {
            break;
        }
        if (col->type == SW_TABLE_STRING)
        {
            memcpy(&vlen, row->data + col->index, sizeof(swTable_string_length_t));
            sw_add_assoc_stringl_ex(return_value, col->name->str, col->name->length + 1, row->data + col->index + sizeof(swTable_string_length_t), vlen, 1);
        }
        else if (col->type == SW_TABLE_FLOAT)
        {
            memcpy(&dval, row->data + col->index, sizeof(dval));
            sw_add_assoc_double_ex(return_value, col->name->str, col->name->length + 1, dval);
        }
        else
        {
            switch (col->type)
            {
            case SW_TABLE_INT8:
                memcpy(&lval, row->data + col->index, 1);
                sw_add_assoc_long_ex(return_value, col->name->str, col->name->length + 1, (int8_t) lval);
                break;
            case SW_TABLE_INT16:
                memcpy(&lval, row->data + col->index, 2);
                sw_add_assoc_long_ex(return_value, col->name->str, col->name->length + 1, (int16_t) lval);
                break;
            case SW_TABLE_INT32:
                memcpy(&lval, row->data + col->index, 4);
                sw_add_assoc_long_ex(return_value, col->name->str, col->name->length + 1, (int32_t) lval);
                break;
            default:
                memcpy(&lval, row->data + col->index, 8);
                sw_add_assoc_long_ex(return_value, col->name->str, col->name->length + 1, lval);
                break;
            }
        }
    }
}
相關文章
相關標籤/搜索