twemproxy是redis和memcached鏈接池中間件html
github項目地址:github.com/twitter/twe…node
項目簡介參考:github.com/twitter/twe…linux
文檔參考:github.com/twitter/twe…git
主流程就是啓動了一個事件循環,全部的邏輯經過事件出發調用回調函數執行github
消息會在三種角色的fd中進行流轉:client, server, proxyredis
角色 | 做用 |
---|---|
client | 表明應用程序端接入twemproxy的fd |
proxy | 表明twemproxy服務器的fd,等待應用程序端接入 |
server | 表明twemproxy連接redis/memcache的fd,proxy fd處理程序會把從client fd讀到的數據轉到server fd中 |
核心流程以下: 算法
通常linux使用epoll, mac使用kqueue,Sun Solaris使用evportapi
twemproxy爲了統一底層api調用,對上面的三種事件處理api進行了接口的統一: event/nc_event.h,核心api參考:數組
函數 | 做用 |
---|---|
event_base_create() | 建立事件循環管理fd |
event_add_in() | 將一個fd的讀事件歸入事件管理器的管理中 |
event_add_out() | 將一個fd的讀寫事件歸入事件管理器的管理中 |
event_add_conn() | 將一個fd的讀寫事件歸入事件管理器的管理中 |
event_wait() | 從事件管理器中獲取準備好讀/寫的fd列表,並調用cb進行處理 |
核心結構體event_base參考: src/event/nc_event.h服務器
struct event_base {
int ep; /* epoll descriptor */
struct epoll_event *event; /* event[] - events that were triggered int nevent; /* # event */
event_cb_t cb; /* event callback */
};
複製代碼
字段 | 做用 |
---|---|
ep | 事件管理器fd |
struct epoll_event | 這個事件管理器能夠處理的事件類型(讀、寫、hup信號) |
int nevent | 最大取多少個事件 |
event_cb_t cb | 事件處理回調函數 |
twemproxy中大量使用事件回調來驅動數據流轉,上面的event_cb_t就是一個回調函數
typedef int (*event_cb_t)(void *, uint32_t);
複製代碼
struct array {
uint32_t nelem; /* # element */
void *elem; /* element */
size_t size; /* element size */
uint32_t nalloc; /* # allocated element */
};
複製代碼
經過調整nelem和*elem指針內容實現動態數組
struct string {
uint32_t len; /* string length */
uint8_t *data; /* string data */
};
複製代碼
struct rbnode {
struct rbnode *left; /* left link */
struct rbnode *right; /* right link */
struct rbnode *parent; /* parent link */
int64_t key; /* key for ordering */
void *data; /* opaque data */
uint8_t color; /* red | black */
};
複製代碼
參考:src/hashkit/nc_ketama.c
twemproxy使用的隊列封裝支持數種不一樣類型的隊列
參考:src/nc_queue.h,這個隊列文件來自freebsd linx內核:github.com/freebsd/fre…
twemproxy使用yaml格式的配置文件,使用libyaml庫解析這個yaml格式的配置文件,參考:github.com/yaml/libyam…
twemproxy配置文件解析邏輯參考:src/nc_conf.h src/nc_conf.c
twemproxy支持memcache, redis兩種協議,抽象成接口能夠把這個協議從twemproxy主流程中解耦、這樣也利於添加新的協議支持
void memcache_parse_req(struct msg *r);
void memcache_parse_rsp(struct msg *r);
bool memcache_failure(struct msg *r);
void memcache_pre_coalesce(struct msg *r);
void memcache_post_coalesce(struct msg *r);
rstatus_t memcache_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn);
rstatus_t memcache_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);
rstatus_t memcache_reply(struct msg *r);
void memcache_post_connect(struct context *ctx, struct conn *conn, struct server *server);
void memcache_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg);
複製代碼
void redis_parse_req(struct msg *r);
void redis_parse_rsp(struct msg *r);
bool redis_failure(struct msg *r);
void redis_pre_coalesce(struct msg *r);
void redis_post_coalesce(struct msg *r);
rstatus_t redis_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn);
rstatus_t redis_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);
rstatus_t redis_reply(struct msg *r);
void redis_post_connect(struct context *ctx, struct conn *conn, struct server *server);
void redis_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg);
複製代碼
能夠看到redis和memcached協議被統一封裝成了兩組對等的接口,對應實現參考:nc_memcache.c nc_redis.c
twemproxy使用單獨的線程處理監控數據獲取請求,參考src/nc_stats.c::stats_start_aggregator
static rstatus_t stats_start_aggregator(struct stats *st) {
rstatus_t status;
// error handle...
status = stats_listen(st);
// error handle...
status = pthread_create(&st->tid, NULL, stats_loop, st);
// error handle...
return NC_OK;
}
複製代碼
這個監控線程和主線程間使用結構體共享監控數據,主線程往這個結構體中寫入實時監控數據,監控線程接到客戶端獲取監控數據的請求後,從這個結構體中讀取監控數據輸出給客戶端,核心結構體參考:src/nc_stats.h
typedef enum stats_type {
// ...
} stats_type_t;
struct stats_metric {
// ...
};
struct stats_server {
// ...
};
struct stats_pool {
// ...
};
struct stats_buffer {
// ...
};
struct stats {
// ...
};
複製代碼
使用命令curl -s 127.0.0.1:22222 | jq .
能夠查看實時的監控信息
參考:src/nc_signal.h src/nc_signal.c
static struct signal signals[] = {
{ SIGUSR1, "SIGUSR1", 0, signal_handler },
{ SIGUSR2, "SIGUSR2", 0, signal_handler },
{ SIGTTIN, "SIGTTIN", 0, signal_handler },
{ SIGTTOU, "SIGTTOU", 0, signal_handler },
{ SIGHUP, "SIGHUP", 0, signal_handler },
{ SIGINT, "SIGINT", 0, signal_handler },
{ SIGSEGV, "SIGSEGV", (int)SA_RESETHAND, signal_handler },
{ SIGPIPE, "SIGPIPE", 0, SIG_IGN },
{ 0, NULL, 0, NULL }
};
複製代碼
能夠看到twemproxy對多個信號進行回調:signal_handler
void signal_handler(int signo) {
//...
switch (signo) {
case SIGUSR1:
break;
case SIGUSR2:
break;
case SIGTTIN:
actionstr = ", up logging level";
action = log_level_up;
break;
case SIGTTOU:
actionstr = ", down logging level";
action = log_level_down;
break;
case SIGHUP:
actionstr = ", reopening log file";
action = log_reopen;
break;
case SIGINT:
done = true;
actionstr = ", exiting";
break;
case SIGSEGV:
log_stacktrace();
actionstr = ", core dumping";
raise(SIGSEGV);
break;
default:
NOT_REACHED();
}
log_safe("signal %d (%s) received%s", signo, sig->signame, actionstr);
if (action != NULL) {
action();
}
if (done) {
exit(1);
}
}
複製代碼
能夠看到使用SIGTTIN/信號增長日誌輸出級別,SIGTTOU下降日誌輸入級別, SIGHUP從新打開日誌文件,SIGINT退出程序,SIGSEGV輸出內存core dump文件
本文基於twemproxy 0.4.1寫成
儘管codis目前已經成爲redis中間件的事實選擇,可是twemproxy做爲第一款memcache&redis中間件,仍具備比較大學習價值,而且支持memcache