【原】 twemproxy ketama一致性hash分析

轉貼請註明原帖位置:http://www.cnblogs.com/basecn/p/4288456.html html


測試Twemproxy集羣,雙主雙活git

向twemproxy集羣作寫操做時,發現key的分佈不太理想。在測試節點故障時,也發現一些和預想不太同樣的地方。github

 

一、Key的一致性Hash算法

當嘗試以a001,a002這樣有規律且的key值寫入的時候,在4節點的集羣環境中,key主要分佈在其中的2臺節點,另外兩臺分配極少。對於一些應用來講,key值可能根據必定規則生成,因此有被定向分配的可能。express

解決辦法在key中使用hash_key:{},hask_key使用8位隨機數,測試結果分佈的比較滿意。apache

測試4節點中key的分佈:服務器

1: 12917
2: 10761
3: 8596
4: 14382 

因爲ketama的算法還是使用了md5簽名(具體後面說),又特地觀察了好比有序數字生成的md5序列,結果並無出現明顯的有序或連序值。因此只能建議不使用連續的數據結尾key作一致性hash key。app

 

二、ketama算法less

twemproxy源碼下載:https://github.com/twitter/twemproxy,命令:git clone https://github.com/twitter/twemproxyide

關於ketama算法的代碼在nc_ketama.c文件中,主要是四個方法:

  • ketama_hash 計算某個主機,某個point的hash值
  • ketama_item_cmp 比較兩個連續區的值,用於在ketama_update 方法中排序
  • ketama_update 更新server-pool的分配策略
  • ketama_dispatch 找出給定hash值所在的連續區

2.1 連續區

說一下連續區(continuum),參考下圖。想象全部md5的值構成下面完整的「環」(沒有起點),那麼全部md5結果值在環上都有一個固定的位置。

按ketama的算法,在這個環上建立服務器數*160個點,這些點把環分紅了同等數量的段。

那麼,被插入數據的md5值也必定會落到環的某個區間,以此來判斷數據應被寫入哪臺服務器。

參考:理想化的Redis集羣

 

2.2 如何生成ketama_hash

再來看服務器+點的hash值是如何生成的:

alignment的值固定是4,ketama_hash是對由server名+索引組成的md5簽名,從第16位開始取值,再重組一個32位值。

static uint32_t
ketama_hash(const char *key, size_t key_length, uint32_t alignment)
{
    unsigned char results[16];

    md5_signature((unsigned char*)key, key_length, results);

    return ((uint32_t) (results[3 + alignment * 4] & 0xFF) << 24)
        | ((uint32_t) (results[2 + alignment * 4] & 0xFF) << 16)
        | ((uint32_t) (results[1 + alignment * 4] & 0xFF) << 8)
        | (results[0 + alignment * 4] & 0xFF);
}

下面是調用ketama_hash的代碼:

for (x = 0; x < pointer_per_hash; x++) {
    value = ketama_hash(host, hostlen, x);
    pool->continuum[continuum_index].index = server_index;
    pool->continuum[continuum_index++].value = value;
}

每一個服務器被分紅160個point點,由服務器名+索引組成host值,x值等於160/索引。

 

這樣計算出的服務器各點的值並非有序的,因此進行排序。

qsort(pool->continuum, pool->ncontinuum, sizeof(*pool->continuum), ketama_item_cmp);

排序後的點值是連續的,但同一服務器的點並不必定連續。這時,全部的值構成了用於一致性hash的環。

 

2.三、分配Key

由ketama_dispatch實現key值的分配。

可見方法中使用二分法找到一個值在環中的對應區域。

uint32_t
ketama_dispatch(struct continuum *continuum, uint32_t ncontinuum, uint32_t hash)
{
    struct continuum *begin, *end, *left, *right, *middle;

    ASSERT(continuum != NULL);
    ASSERT(ncontinuum != 0);

    begin = left = continuum;
    end = right = continuum + ncontinuum;

    while (left < right) {
        middle = left + (right - left) / 2;
        if (middle->value < hash) {
          left = middle + 1;
        } else {
          right = middle;
        }
    }
    if (right == end) {
        right = begin;
    }
    return right->index;
}

 

三、服務器的故障處理

從集羣中摘除節點時,ketama的算法不會從新計算"環"。當須要寫入故障節點時,會拋出異常。

仔細想一下是合理的,由於摘除的節點持有一部分數據,通常來講是須要恢復的,這是一個前提。

咱們假設twemproxy能夠感知節點故障,並從新計算分配策略。那麼,故障後又有新的數據寫入。這時,一部分本來要寫入故障節點的數據會被分配到其它節點上。

隨後,故障節點恢復,twemproxy又從新調整了分配策略。那麼,後寫入的那部分數據就不會再被找到(這個有點像內存泄露)。


 

nc_ketama.c 完整代碼

/*
 * twemproxy - A fast and lightweight proxy for memcached protocol.
 * Copyright (C) 2011 Twitter, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include <stdlib.h>
#include <math.h>

#include <nc_core.h>
#include <nc_server.h>
#include <nc_hashkit.h>

#define KETAMA_CONTINUUM_ADDITION   10  /* # extra slots to build into continuum */
#define KETAMA_POINTS_PER_SERVER    160 /* 40 points per hash */
#define KETAMA_MAX_HOSTLEN          86

static uint32_t
ketama_hash(const char *key, size_t key_length, uint32_t alignment)
{
    unsigned char results[16];

    md5_signature((unsigned char*)key, key_length, results);

    return ((uint32_t) (results[3 + alignment * 4] & 0xFF) << 24)
        | ((uint32_t) (results[2 + alignment * 4] & 0xFF) << 16)
        | ((uint32_t) (results[1 + alignment * 4] & 0xFF) << 8)
        | (results[0 + alignment * 4] & 0xFF);
}

static int
ketama_item_cmp(const void *t1, const void *t2)
{
    const struct continuum *ct1 = t1, *ct2 = t2;

    if (ct1->value == ct2->value) {
        return 0;
    } else if (ct1->value > ct2->value) {
        return 1;
    } else {
        return -1;
    }
}

rstatus_t
ketama_update(struct server_pool *pool)
{
    uint32_t nserver;             /* # server - live and dead */
    uint32_t nlive_server;        /* # live server */
    uint32_t pointer_per_server;  /* pointers per server proportional to weight */
    uint32_t pointer_per_hash;    /* pointers per hash */
    uint32_t pointer_counter;     /* # pointers on continuum */
    uint32_t pointer_index;       /* pointer index */
    uint32_t points_per_server;   /* points per server */
    uint32_t continuum_index;     /* continuum index */
    uint32_t continuum_addition;  /* extra space in the continuum */
    uint32_t server_index;        /* server index */
    uint32_t value;               /* continuum value */
    uint32_t total_weight;        /* total live server weight */
    int64_t now;                  /* current timestamp in usec */

    ASSERT(array_n(&pool->server) > 0);

    now = nc_usec_now();
    if (now < 0) {
        return NC_ERROR;
    }

    /*
     * Count live servers and total weight, and also update the next time to
     * rebuild the distribution
     */
    nserver = array_n(&pool->server);
    nlive_server = 0;
    total_weight = 0;
    pool->next_rebuild = 0LL;
    for (server_index = 0; server_index < nserver; server_index++) {
        struct server *server = array_get(&pool->server, server_index);

        if (pool->auto_eject_hosts) {
            if (server->next_retry <= now) {
                server->next_retry = 0LL;
                nlive_server++;
            } else if (pool->next_rebuild == 0LL ||
                       server->next_retry < pool->next_rebuild) {
                pool->next_rebuild = server->next_retry;
            }
        } else {
            nlive_server++;
        }

        ASSERT(server->weight > 0);

        /* count weight only for live servers */
        if (!pool->auto_eject_hosts || server->next_retry <= now) {
            total_weight += server->weight;
        }
    }

    pool->nlive_server = nlive_server;

    if (nlive_server == 0) {
        log_debug(LOG_DEBUG, "no live servers for pool %"PRIu32" '%.*s'",
                  pool->idx, pool->name.len, pool->name.data);

        return NC_OK;
    }
    log_debug(LOG_DEBUG, "%"PRIu32" of %"PRIu32" servers are live for pool "
              "%"PRIu32" '%.*s'", nlive_server, nserver, pool->idx,
              pool->name.len, pool->name.data);

    continuum_addition = KETAMA_CONTINUUM_ADDITION;
    points_per_server = KETAMA_POINTS_PER_SERVER;
    /*
     * Allocate the continuum for the pool, the first time, and every time we
     * add a new server to the pool
     */
    if (nlive_server > pool->nserver_continuum) {
        struct continuum *continuum;
        uint32_t nserver_continuum = nlive_server + continuum_addition;
        uint32_t ncontinuum = nserver_continuum * points_per_server;

        continuum = nc_realloc(pool->continuum, sizeof(*continuum) * ncontinuum);
        if (continuum == NULL) {
            return NC_ENOMEM;
        }

        pool->continuum = continuum;
        pool->nserver_continuum = nserver_continuum;
        /* pool->ncontinuum is initialized later as it could be <= ncontinuum */
    }

    /*
     * Build a continuum with the servers that are live and points from
     * these servers that are proportial to their weight
     */
    continuum_index = 0;
    pointer_counter = 0;
    for (server_index = 0; server_index < nserver; server_index++) {
        struct server *server;
        float pct;

        server = array_get(&pool->server, server_index);

        if (pool->auto_eject_hosts && server->next_retry > now) {
            continue;
        }

        pct = (float)server->weight / (float)total_weight;
        pointer_per_server = (uint32_t) ((floorf((float) (pct * KETAMA_POINTS_PER_SERVER / 4 * (float)nlive_server + 0.0000000001))) * 4);
        pointer_per_hash = 4;

        log_debug(LOG_VERB, "%.*s:%"PRIu16" weight %"PRIu32" of %"PRIu32" "
                  "pct %0.5f points per server %"PRIu32"",
                  server->name.len, server->name.data, server->port,
                  server->weight, total_weight, pct, pointer_per_server);

        for (pointer_index = 1;
             pointer_index <= pointer_per_server / pointer_per_hash;
             pointer_index++) {

            char host[KETAMA_MAX_HOSTLEN]= "";
            size_t hostlen;
            uint32_t x;

            hostlen = snprintf(host, KETAMA_MAX_HOSTLEN, "%.*s-%u",
                               server->name.len, server->name.data,
                               pointer_index - 1);

            for (x = 0; x < pointer_per_hash; x++) {
                value = ketama_hash(host, hostlen, x);
                pool->continuum[continuum_index].index = server_index;
                pool->continuum[continuum_index++].value = value;
            }
        }
        pointer_counter += pointer_per_server;
    }
    

    pool->ncontinuum = pointer_counter;
    qsort(pool->continuum, pool->ncontinuum, sizeof(*pool->continuum),
          ketama_item_cmp);

    for (pointer_index = 0;
         pointer_index < ((nlive_server * KETAMA_POINTS_PER_SERVER) - 1);
         pointer_index++) {
        if (pointer_index + 1 >= pointer_counter) {
            break;
        }
        ASSERT(pool->continuum[pointer_index].value <=
               pool->continuum[pointer_index + 1].value);
    }

    log_debug(LOG_VERB, "updated pool %"PRIu32" '%.*s' with %"PRIu32" of "
              "%"PRIu32" servers live in %"PRIu32" slots and %"PRIu32" "
              "active points in %"PRIu32" slots", pool->idx,
              pool->name.len, pool->name.data, nlive_server, nserver,
              pool->nserver_continuum, pool->ncontinuum,
              (pool->nserver_continuum + continuum_addition) * points_per_server);

    return NC_OK;
}

uint32_t
ketama_dispatch(struct continuum *continuum, uint32_t ncontinuum, uint32_t hash)
{
    struct continuum *begin, *end, *left, *right, *middle;

    ASSERT(continuum != NULL);
    ASSERT(ncontinuum != 0);

    begin = left = continuum;
    end = right = continuum + ncontinuum;

    while (left < right) {
        middle = left + (right - left) / 2;
        if (middle->value < hash) {
          left = middle + 1;
        } else {
          right = middle;
        }
    }

    if (right == end) {
        right = begin;
    }

    return right->index;
}
 
View Code
相關文章
相關標籤/搜索