Kafka快速入門(十一)——RdKafka源碼分析
1、RdKafka C源碼分析
一、Kafka OP隊列
RdKafka將與Kafka Broke的交互、內部實現的操做都封裝成Operator結構,而後放入OP處理隊列裏統一處理。Kafka OP隊列是線程間通訊的管道。
RdKafka隊列定義在rdkafka_queue.h文件中,隊列相關操做封裝在rdsysqueue.h文件中。
(1)Kafka OP隊列
node
typedef struct rd_kafka_q_s rd_kafka_q_t; struct rd_kafka_q_s { mtx_t rkq_lock;// 隊列操做加鎖 cnd_t rkq_cond; // 隊列中放入新元素時, 用條件變量喚醒相應等待線程 struct rd_kafka_q_s *rkq_fwdq; // Forwarded/Routed queue struct rd_kafka_op_tailq rkq_q; // 放入隊列的操做所存儲的隊列 int rkq_qlen; /* Number of entries in queue */ int64_t rkq_qsize; /* Size of all entries in queue */ int rkq_refcnt; // 引用計數 int rkq_flags; // 當前隊列的狀態 rd_kafka_t *rkq_rk;// 隊列關聯的Kafka Handler對象 struct rd_kafka_q_io *rkq_qio; //隊列中放入新元素時,向fd寫入數據喚醒等待線程 rd_kafka_q_serve_cb_t *rkq_serve; // 隊列中的操做被執行時所執行的回調函數 void *rkq_opaque; const char *rkq_name; // queue name }; // Kafka Operator隊列,對外接口 typedef struct rd_kafka_queue_s rd_kafka_queue_t; struct rd_kafka_queue_s { rd_kafka_q_t *rkqu_q;// Kafka OP 隊列 rd_kafka_t *rkqu_rk;// 隊列關聯的Kafka Handler int rkqu_is_owner; }; rd_kafka_queue_t *rd_kafka_queue_new (rd_kafka_t *rk) { rd_kafka_q_t *rkq; rd_kafka_queue_t *rkqu; rkq = rd_kafka_q_new(rk); rkqu = rd_kafka_queue_new0(rk, rkq); rd_kafka_q_destroy(rkq); return rkqu; }
建立OP隊列json
rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk) { return rd_kafka_queue_new0(rk, rk->rk_rep); }
獲取RdKafka與應用程序交互使用的OP隊列數組
rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk) { if (!rk->rk_cgrp) return NULL; return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q); }
獲取消費者的OP隊列緩存
rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk, const char *topic, int32_t partition) { shptr_rd_kafka_toppar_t *s_rktp; rd_kafka_toppar_t *rktp; rd_kafka_queue_t *result; if (rk->rk_type == RD_KAFKA_PRODUCER) return NULL; s_rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, /* no ua_on_miss */ 1 /* create_on_miss */); if (!s_rktp) return NULL; rktp = rd_kafka_toppar_s2i(s_rktp); result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq); rd_kafka_toppar_destroy(s_rktp); return result; }
獲取Topic的分區的OP隊列安全
rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, rd_ts_t timeout_us, int32_t version, rd_kafka_q_cb_type_t cb_type, rd_kafka_q_serve_cb_t *callback, void *opaque);
處理OP隊列中的一個OP操做,按version過濾的可處理OP,沒有則等待,若是超時,函數退出。網絡
int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt, rd_kafka_q_cb_type_t cb_type, rd_kafka_q_serve_cb_t *callback, void *opaque);
批量處理OP隊列的OP數據結構
int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size);
處理RD_KAFKA_OP_FETCH OP操做多線程
int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock); #define rd_kafka_q_purge(rkq) rd_kafka_q_purge0(rkq, 1/*lock*/)
清除OP隊列中的全部OP操做rd_kafka_queue_t *rd_kafka_queue_get_background (rd_kafka_t *rk);
獲取Kafka Handle的後臺OP隊列
併發
二、Kafka OP操做
RaKafka OP操做封裝在rdkafka_op.h文件中。app
typedef enum { RD_KAFKA_OP_NONE, // 未指定類型 RD_KAFKA_OP_FETCH, // Kafka thread -> Application RD_KAFKA_OP_ERR, // Kafka thread -> Application RD_KAFKA_OP_CONSUMER_ERR, // Kafka thread -> Application RD_KAFKA_OP_DR, // Kafka thread->Application:Produce message delivery report RD_KAFKA_OP_STATS, // Kafka thread -> Application RD_KAFKA_OP_OFFSET_COMMIT, // any -> toppar's Broker thread RD_KAFKA_OP_NODE_UPDATE, // any -> Broker thread: node update RD_KAFKA_OP_XMIT_BUF, // transmit buffer: any -> broker thread RD_KAFKA_OP_RECV_BUF, // received response buffer: broker thr -> any RD_KAFKA_OP_XMIT_RETRY, // retry buffer xmit: any -> broker thread RD_KAFKA_OP_FETCH_START, // Application -> toppar's handler thread RD_KAFKA_OP_FETCH_STOP, // Application -> toppar's handler thread RD_KAFKA_OP_SEEK, // Application -> toppar's handler thread RD_KAFKA_OP_PAUSE, // Application -> toppar's handler thread RD_KAFKA_OP_OFFSET_FETCH, // Broker->broker thread: fetch offsets for topic RD_KAFKA_OP_PARTITION_JOIN, // cgrp op:add toppar to cgrp,broker op:add toppar to broker RD_KAFKA_OP_PARTITION_LEAVE, // cgrp op:remove toppar from cgrp,broker op:remove toppar from rkb RD_KAFKA_OP_REBALANCE, // broker thread -> app:group rebalance RD_KAFKA_OP_TERMINATE, // For generic use RD_KAFKA_OP_COORD_QUERY, // Query for coordinator RD_KAFKA_OP_SUBSCRIBE, // New subscription RD_KAFKA_OP_ASSIGN, // New assignment RD_KAFKA_OP_GET_SUBSCRIPTION,// Get current subscription Reuses u.subscribe RD_KAFKA_OP_GET_ASSIGNMENT, // Get current assignment Reuses u.assign RD_KAFKA_OP_THROTTLE, // Throttle info RD_KAFKA_OP_NAME, // Request name RD_KAFKA_OP_OFFSET_RESET, // Offset reset RD_KAFKA_OP_METADATA, // Metadata response RD_KAFKA_OP_LOG, // Log RD_KAFKA_OP_WAKEUP, // Wake-up signaling RD_KAFKA_OP_CREATETOPICS, // Admin: CreateTopics: u.admin_request RD_KAFKA_OP_DELETETOPICS, // Admin: DeleteTopics: u.admin_request RD_KAFKA_OP_CREATEPARTITIONS,// Admin: CreatePartitions: u.admin_request RD_KAFKA_OP_ALTERCONFIGS, // Admin: AlterConfigs: u.admin_request RD_KAFKA_OP_DESCRIBECONFIGS, // Admin: DescribeConfigs: u.admin_request RD_KAFKA_OP_ADMIN_RESULT, // Admin API .._result_t RD_KAFKA_OP_PURGE, // Purge queues RD_KAFKA_OP_CONNECT, // Connect (to broker) RD_KAFKA_OP_OAUTHBEARER_REFRESH, // Refresh OAUTHBEARER token RD_KAFKA_OP_MOCK, // Mock cluster command RD_KAFKA_OP_BROKER_MONITOR, // Broker state change RD_KAFKA_OP_TXN, // Transaction command RD_KAFKA_OP__END // 操做結束符 } rd_kafka_op_type_t;
rd_kafka_op_type_t枚舉類型定義了RaKafka 全部OP操做類型。
typedef enum { RD_KAFKA_PRIO_NORMAL = 0, // 正常優先級 RD_KAFKA_PRIO_MEDIUM, // 中級 RD_KAFKA_PRIO_HIGH, // 高級 RD_KAFKA_PRIO_FLASH // 最高優先級:當即 } rd_kafka_prio_t;
rd_kafka_prio_t枚舉類型定義了Kafka OP操做的全部優先級。
typedef enum { RD_KAFKA_OP_RES_PASS, // Not handled, pass to caller RD_KAFKA_OP_RES_HANDLED, // Op was handled (through callbacks) RD_KAFKA_OP_RES_KEEP, // Op已經被回調函數處理,但禁止被op_handle()銷燬 RD_KAFKA_OP_RES_YIELD // Callback called yield } rd_kafka_op_res_t;
rd_kafka_op_res_t枚舉類型定義了OP被處理後的返回結果類型,
若是返回RD_KAFKA_OP_RES_YIELD,handler處理函數須要肯定是否須要將OP從新入隊列仍是將OP銷燬。
typedef enum { RD_KAFKA_Q_CB_INVALID, // 非法,未使用 RD_KAFKA_Q_CB_CALLBACK, // 基於OP觸發回調函數 RD_KAFKA_Q_CB_RETURN, // 返回OP而不是觸發回調函數 RD_KAFKA_Q_CB_FORCE_RETURN, // 不管是否觸發回調函數都返回OP RD_KAFKA_Q_CB_EVENT // 返回Event OP而不是觸發回調函數 } rd_kafka_q_cb_type_t;
rd_kafka_q_cb_type_t枚舉類型定義了OP隊列中OP操做執行回調函數的全部類型。
OP隊列執行回調函數類型定義以下:
typedef rd_kafka_op_res_t (rd_kafka_q_serve_cb_t) (rd_kafka_t *rk, struct rd_kafka_q_s *rkq, struct rd_kafka_op_s *rko, rd_kafka_q_cb_type_t cb_type, void *opaque);
OP回調函數定義以下:
typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk, rd_kafka_q_t *rkq, struct rd_kafka_op_s *rko);
OP執行結果數據結構定義以下:
typedef struct rd_kafka_replyq_s { rd_kafka_q_t *q;// OP執行結果存儲隊列 int32_t version;// 版本 } rd_kafka_replyq_t;
Kafka OP數據結構定義以下:
struct rd_kafka_op_s { TAILQ_ENTRY(rd_kafka_op_s) rko_link;// 增長TAILQ字段 rd_kafka_op_type_t rko_type; // OP類型 rd_kafka_event_type_t rko_evtype;// Event類型 int rko_flags; // OP標識 int32_t rko_version;// 版本 rd_kafka_resp_err_t rko_err;// int32_t rko_len; // rd_kafka_prio_t rko_prio; // OP優先級 shptr_rd_kafka_toppar_t *rko_rktp;// 關聯TopicPartition rd_kafka_replyq_t rko_replyq;// rd_kafka_q_serve_cb_t *rko_serve;// OP隊列回調函數 void *rko_serve_opaque;// OP隊列回調函數參數 rd_kafka_t *rko_rk;// Kafka Handle rd_kafka_op_cb_t *rko_op_cb; // OP回調函數 union { struct { rd_kafka_buf_t *rkbuf; rd_kafka_msg_t rkm; int evidx; } fetch; struct { rd_kafka_topic_partition_list_t *partitions; int do_free; // free .partitions on destroy() } offset_fetch; struct { rd_kafka_topic_partition_list_t *partitions; void (*cb) (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque); void *opaque; int silent_empty; // Fail silently if there are no offsets to commit. rd_ts_t ts_timeout; char *reason; } offset_commit; struct { rd_kafka_topic_partition_list_t *topics; } subscribe; struct { rd_kafka_topic_partition_list_t *partitions; } assign; struct { rd_kafka_topic_partition_list_t *partitions; } rebalance; struct { char *str; } name; struct { int64_t offset; char *errstr; rd_kafka_msg_t rkm; int fatal; } err; struct { int throttle_time; int32_t nodeid; char *nodename; } throttle; struct { char *json; size_t json_len; } stats; struct { rd_kafka_buf_t *rkbuf; } xbuf; // RD_KAFKA_OP_METADATA struct { rd_kafka_metadata_t *md; int force; // force request regardless of outstanding metadata requests. } metadata; struct { shptr_rd_kafka_itopic_t *s_rkt; rd_kafka_msgq_t msgq; rd_kafka_msgq_t msgq2; int do_purge2; } dr; struct { int32_t nodeid; char nodename[RD_KAFKA_NODENAME_SIZE]; } node; struct { int64_t offset; char *reason; } offset_reset; struct { int64_t offset; struct rd_kafka_cgrp_s *rkcg; } fetch_start; // reused for SEEK struct { int pause; int flag; } pause; struct { char fac[64]; int level; char *str; } log; struct { rd_kafka_AdminOptions_t options; rd_ts_t abs_timeout; // Absolute timeout rd_kafka_timer_t tmr; // Timeout timer struct rd_kafka_enq_once_s *eonce; // 只入隊列OP一次,用於觸發Broker狀態變化的OP請求 rd_list_t args; // Type depends on request, e.g. rd_kafka_NewTopic_t for CreateTopics rd_kafka_buf_t *reply_buf; // Protocol reply struct rd_kafka_admin_worker_cbs *cbs; // Worker state enum { RD_KAFKA_ADMIN_STATE_INIT, RD_KAFKA_ADMIN_STATE_WAIT_BROKER, RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER, RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST, RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE, } state; int32_t broker_id; // Requested broker id to communicate with. // Application's reply queue rd_kafka_replyq_t replyq; rd_kafka_event_type_t reply_event_type; } admin_request; struct { rd_kafka_op_type_t reqtype; // Request op type char *errstr; // 錯誤信息 rd_list_t results; // Type depends on request type: void *opaque; // Application's opaque as set by rd_kafka_AdminOptions_set_opaque } admin_result; struct { int flags; // purge_flags from rd_kafka_purge() } purge; // Mock cluster command struct { enum { RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR, RD_KAFKA_MOCK_CMD_TOPIC_CREATE, RD_KAFKA_MOCK_CMD_PART_SET_LEADER, RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER, RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS, RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN, RD_KAFKA_MOCK_CMD_BROKER_SET_RACK, RD_KAFKA_MOCK_CMD_COORD_SET, RD_KAFKA_MOCK_CMD_APIVERSION_SET, } cmd; rd_kafka_resp_err_t err; // Error for:TOPIC_SET_ERROR char *name; // For:TOPIC_SET_ERROR,TOPIC_CREATE,PART_SET_FOLLOWER,PART_SET_FOLLOWER_WMARKS,BROKER_SET_RACK,COORD_SET (key_type) char *str; // For:COORD_SET (key) int32_t partition; // For:PART_SET_FOLLOWER,PART_SET_FOLLOWER_WMARKS,PART_SET_LEADER,APIVERSION_SET (ApiKey) int32_t broker_id; // For:PART_SET_FOLLOWER,PART_SET_LEADER,BROKER_SET_UPDOWN,BROKER_SET_RACK,COORD_SET int64_t lo; // Low offset, for:TOPIC_CREATE (part cnt),PART_SET_FOLLOWER_WMARKS,BROKER_SET_UPDOWN, APIVERSION_SET (minver); int64_t hi; // High offset, for:TOPIC_CREATE (repl fact),PART_SET_FOLLOWER_WMARKS,APIVERSION_SET (maxver) } mock; struct { struct rd_kafka_broker_s *rkb; // 狀態變化的Broker void (*cb) (struct rd_kafka_broker_s *rkb);// 要在OP處理線程觸發的回調函數 } broker_monitor; struct { rd_kafka_error_t *error; // 錯誤對象 char *group_id; // 要提交位移的消費者組ID int timeout_ms; /**< Operation timeout */ rd_ts_t abs_timeout; /**< Absolute time */ rd_kafka_topic_partition_list_t *offsets;// 要提交的位移 } txn; } rko_u; }; typedef struct rd_kafka_op_s rd_kafka_event_t;
const char *rd_kafka_op2str (rd_kafka_op_type_t type);
返回OP類型的相應字符串void rd_kafka_op_destroy (rd_kafka_op_t *rko);
銷燬OP對象
rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type); #define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type)
生成OP對象
rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko);
調用OP的回調函數
rd_kafka_op_res_t rd_kafka_op_handle_std (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, int cb_type) { if (cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) return RD_KAFKA_OP_RES_PASS; else if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) { rd_kafka_op_offset_store(rk, rko); return RD_KAFKA_OP_RES_HANDLED; } else if (cb_type != RD_KAFKA_Q_CB_EVENT && rko->rko_type & RD_KAFKA_OP_CB) return rd_kafka_op_call(rk, rkq, rko); else if (rko->rko_type == RD_KAFKA_OP_RECV_BUF) rd_kafka_buf_handle_op(rko, rko->rko_err); else if (cb_type != RD_KAFKA_Q_CB_RETURN && rko->rko_type & RD_KAFKA_OP_REPLY && rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; else return RD_KAFKA_OP_RES_PASS; return RD_KAFKA_OP_RES_HANDLED; }
對OP進行標準化處理
rd_kafka_op_res_t rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type, void *opaque, rd_kafka_q_serve_cb_t *callback) { rd_kafka_op_res_t res; if (rko->rko_serve) { callback = rko->rko_serve; opaque = rko->rko_serve_opaque; rko->rko_serve = NULL; rko->rko_serve_opaque = NULL; } res = rd_kafka_op_handle_std(rk, rkq, rko, cb_type); if (res == RD_KAFKA_OP_RES_KEEP) { return res; } if (res == RD_KAFKA_OP_RES_HANDLED) { rd_kafka_op_destroy(rko); return res; } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD)) return res; if (callback) res = callback(rk, rkq, rko, cb_type, opaque); return res; }
處理OP
三、Kafka Message
rd_kafka_message_t定義在rdkafka.h文件:
typedef struct rd_kafka_message_s { rd_kafka_resp_err_t err; // 非0表示錯誤消息 rd_kafka_topic_t *rkt; // 關聯Topic int32_t partition; // 分區 void *payload; // 消息數據 size_t len; // err爲0表示消息數據長度,非0表示錯誤信息長度 void *key; // err爲0表示消息key size_t key_len; // err爲0表示消息key的長度 int64_t offset; // 位移 void *_private; // 對Consumer,爲RdKafka私有指針;對於Producer,爲dr_msg_cb } rd_kafka_message_t;
Kafka Producer生產的數據在application層調用接口後最終會將數據封裝成rd_kafka_message_t結構,Consumer從Broker消費的數據回調給application層時也會封裝成rd_kafka_message_t結構。
rd_kafka_msg_t和rd_kafka_msgq_t定義在rdkafka_msg.h文件:
typedef struct rd_kafka_msg_s { rd_kafka_message_t rkm_rkmessage; // Kafka 消息,必須時第一個字段 TAILQ_ENTRY(rd_kafka_msg_s) rkm_link;// 增長TAILQ字段 int rkm_flags; // 消息類型標識 rd_kafka_timestamp_type_t rkm_tstype; // 消息時間戳 int64_t rkm_timestamp;// V1消息格式的時間戳 rd_kafka_headers_t *rkm_headers; rd_kafka_msg_status_t rkm_status; // 消息持久化狀態 union { struct { rd_ts_t ts_timeout; // 消息超時 rd_ts_t ts_enq; // 入隊列或生產消息時間戳 rd_ts_t ts_backoff; uint64_t msgid; // 用於保序的消息ID,從1開始 uint64_t last_msgid; // int retries; // 重試次數 } producer; #define rkm_ts_timeout rkm_u.producer.ts_timeout #define rkm_ts_enq rkm_u.producer.ts_enq #define rkm_msgid rkm_u.producer.msgid struct { rd_kafkap_bytes_t binhdrs; } consumer; } rkm_u; } rd_kafka_msg_t; TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s); typedef struct rd_kafka_msgq_s { struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */ int32_t rkmq_msg_cnt; int64_t rkmq_msg_bytes; } rd_kafka_msgq_t; Kafka Message隊列 static rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) { return (rd_kafka_msg_t *)rkmessage; }
將rd_kafka_message_t類型消息轉換爲rd_kafka_msg_t類型消息
int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition, int msgflags, char *payload, size_t len, const void *keydata, size_t keylen, void *msg_opaque);
建立一條新的Kafka消息並將其入對到相應分區的消息隊列。static void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,rd_kafka_msgq_t *src);
將src消息隊列的全部消息合併到dst消息隊列尾部,src會被清空。static void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,rd_kafka_msgq_t *src);
將src消息隊列的全部元素移動到dst消息隊列,src會被清空static void rd_kafka_msgq_purge (rd_kafka_t *rk, rd_kafka_msgq_t *rkmq);
清空Kafka Handle的消息隊列
static rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm, int do_count);
將rkm消息從消息隊列rkmq中刪除static rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq);
將rkm消息從消息隊列rkmq中刪除
int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt, rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm);
將rkm消息按照消息ID排序插入rnkmq消息隊列static void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq,rd_kafka_msg_t *rkm);
將rkm消息插入消息隊列rkmq頭部static int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,rd_kafka_msg_t *rkm);
將rkm消息追加到rkmq消息隊列
int rd_kafka_msgq_age_scan (struct rd_kafka_toppar_s *rktp, rd_kafka_msgq_t *rkmq, rd_kafka_msgq_t *timedout, rd_ts_t now, rd_ts_t *abs_next_timeout);
掃描rkmq消息隊列,將超時的消息增長到timeout消息隊列,並從rkmq消息隊列將其刪除。
int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm, rd_dolock_t do_lock);
對寫入rkt主題的rkm消息進行分區分配rd_kafka_message_t *rd_kafka_message_get (struct rd_kafka_op_s *rko);
從OP操做提取消息rd_kafka_message_t *rd_kafka_message_new (void);
建立空的Kafka消息
四、Kafka Topic
Kafka Topic相關封裝位於rdkafka_topic.h文件中。
struct rd_kafka_itopic_s { TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link; rd_refcnt_t rkt_refcnt; // 引入計數 rwlock_t rkt_lock; rd_kafkap_str_t *rkt_topic; // Topic名稱 shptr_rd_kafka_toppar_t *rkt_ua; // 未分配分區 shptr_rd_kafka_toppar_t **rkt_p; // 擁有TopicPartition的鏈表 int32_t rkt_partition_cnt; // 分區計數 rd_list_t rkt_desp; rd_ts_t rkt_ts_metadata; // 最近更新Meta的時間戳 mtx_t rkt_app_lock; rd_kafka_topic_t *rkt_app_rkt; // Topic對應用層的指針 int rkt_app_refcnt; enum { RD_KAFKA_TOPIC_S_UNKNOWN, RD_KAFKA_TOPIC_S_EXISTS, RD_KAFKA_TOPIC_S_NOTEXISTS, } rkt_state; // Topic狀態 int rkt_flags; // rd_kafka_t *rkt_rk; // Kafka Handle rd_avg_t rkt_avg_batchsize; rd_avg_t rkt_avg_batchcnt; shptr_rd_kafka_itopic_t *rkt_shptr_app; rd_kafka_topic_conf_t rkt_conf; // Topic配置 };
shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf, int *existing, int do_lock);
建立rd_kafka_itopic_s對象void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics);
獲取當前rd_kafka_t對象持有的全部topic名字,保存在一個rd_list中void rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now);
掃描Kafka Handle持有的全部topic的全部分區,篩選出未分配分區的超時消息、須要在Broker上建立的Topic、Meta數據太舊須要被更新的Topic、Leader未知的分區。
static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt, int32_t partition_cnt);
更新topic的partition個數,若是分區數量有變化,返回1,不然返回0。
rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf);
建立Topic對象
static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt, rd_kafka_resp_err_t err);
分配未分配分區上的消息到可用分區
int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt, int32_t partition);
查詢Topic的分區是否可用,即分區是否未Leader
五、Kafka TopicPartition
rd_kafka_topic_partition_t定義在rdkafka.h文件中。
typedef struct rd_kafka_topic_partition_s { char *topic; // Topic名稱 int32_t partition; // 分區 int64_t offset; // 位移 void *metadata; // 元數據 size_t metadata_size; void *opaque; rd_kafka_resp_err_t err; void *_private; } rd_kafka_topic_partition_t;
typedef struct rd_kafka_topic_partition_list_s { int cnt; // 當前元數數量 int size; // 分配數組大小 rd_kafka_topic_partition_t *elems; // 數組 } rd_kafka_topic_partition_list_t; struct rd_kafka_toppar_s { TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink; TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink; CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_activelink; TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink; TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink; TAILQ_ENTRY(rd_kafka_toppar_s) rktp_txnlink; rd_kafka_itopic_t *rktp_rkt; shptr_rd_kafka_itopic_t *rktp_s_rkt; // 指向Topic對象 int32_t rktp_partition; // 分區 int32_t rktp_leader_id; // 當前Leader ID int32_t rktp_broker_id; // 當前Broker ID rd_kafka_broker_t *rktp_leader; // 當前Leader Broker rd_kafka_broker_t *rktp_broker; // 當前preferred Broker rd_kafka_broker_t *rktp_next_broker; // 下一個preferred Broker rd_refcnt_t rktp_refcnt; // 引用計數 mtx_t rktp_lock; rd_kafka_q_t *rktp_msgq_wakeup_q; // 喚醒消息隊列 rd_kafka_msgq_t rktp_msgq; // rd_kafka_msgq_t rktp_xmit_msgq; int rktp_fetch; rd_kafka_q_t *rktp_fetchq; // 從Broker取消息的隊列 rd_kafka_q_t *rktp_ops; // 主線程OP隊列 rd_atomic32_t rktp_msgs_inflight; uint64_t rktp_msgid; // 當前/最新消息ID struct { rd_kafka_pid_t pid; uint64_t acked_msgid; uint64_t epoch_base_msgid; int32_t next_ack_seq; int32_t next_err_seq; rd_bool_t wait_drain; } rktp_eos; rd_atomic32_t rktp_version; // 最新OP版本 int32_t rktp_op_version; // 從Broker收到的當前命令的OP版本 int32_t rktp_fetch_version; // 當前Fetch的OP版本 enum { RD_KAFKA_TOPPAR_FETCH_NONE = 0, RD_KAFKA_TOPPAR_FETCH_STOPPING, RD_KAFKA_TOPPAR_FETCH_STOPPED, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY, RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT, RD_KAFKA_TOPPAR_FETCH_ACTIVE, } rktp_fetch_state; int32_t rktp_fetch_msg_max_bytes; rd_ts_t rktp_ts_fetch_backoff; int64_t rktp_query_offset; int64_t rktp_next_offset; int64_t rktp_last_next_offset; int64_t rktp_app_offset; // int64_t rktp_stored_offset; // 最近存儲的位移,可能沒有提交 int64_t rktp_committing_offset; // 當前正在提交位移 int64_t rktp_committed_offset; // 最新提交位移 rd_ts_t rktp_ts_committed_offset; // 最新提交位移的時間戳 struct offset_stats rktp_offsets; // struct offset_stats rktp_offsets_fin; // int64_t rktp_ls_offset; // 當前最新穩定位移 int64_t rktp_hi_offset; // 當前高水位 int64_t rktp_lo_offset; // 當前低水位 rd_ts_t rktp_ts_offset_lag; char *rktp_offset_path; // 位移文件路徑 FILE *rktp_offset_fp; // 位移文件描述符 rd_kafka_cgrp_t *rktp_cgrp; int rktp_assigned; rd_kafka_replyq_t rktp_replyq; // int rktp_flags; // 分區狀態 shptr_rd_kafka_toppar_t *rktp_s_for_desp; // rkt_desp鏈表指針 shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; // rkcg_toppars鏈表指針 shptr_rd_kafka_toppar_t *rktp_s_for_rkb; // rkb_toppars鏈表指針 rd_kafka_timer_t rktp_offset_query_tmr; // 位移查詢定時器 rd_kafka_timer_t rktp_offset_commit_tmr; // 位移提交定時器 rd_kafka_timer_t rktp_offset_sync_tmr; // 位移文件同步定時器 rd_kafka_timer_t rktp_consumer_lag_tmr; // 消費者滯後監視定時器 rd_interval_t rktp_lease_intvl; // Preferred副本租約 rd_interval_t rktp_new_lease_intvl; // 建立新的Preferred副本租約的間隔 rd_interval_t rktp_new_lease_log_intvl; // rd_interval_t rktp_metadata_intvl; // Preferred副本的Meta請求的最大頻率 int rktp_wait_consumer_lag_resp; struct rd_kafka_toppar_err rktp_last_err; struct { rd_atomic64_t tx_msgs; // 生產者發送的消息數量 rd_atomic64_t tx_msg_bytes; // 生產者發送的字節數量 rd_atomic64_t rx_msgs; // 消費者接收的消息數量 rd_atomic64_t rx_msg_bytes; // 消費者消費字節數 rd_atomic64_t producer_enq_msgs; // 生產者入對列的消息數量 rd_atomic64_t rx_ver_drops; // 消費者丟棄過時消息數量 } rktp_c; };
六、Kafka Transport
RdKafka與Broker網絡通訊不須要支持高併發,所以RdKafka選擇了Poll網絡IO模型,對transport數據傳輸層進行了封裝。
RdKafka與Kafka Broker間採用TCP鏈接,所以須要根據Kafka Message協議進行拆包組包:前4個字節是payload長度;payload部分分爲header和body兩部分,接收數據時先收4字節,即payload長度,再根據payload長度收取payload內容。
rd_kafka_transport_s定義在rdkafka_transport_init.h文件:
struct rd_kafka_transport_s { rd_socket_t rktrans_s; // 與Broker通訊的Socket fd rd_kafka_broker_t *rktrans_rkb; // 所鏈接Broker struct { void *state; int complete; struct msghdr msg; struct iovec iov[2]; char *recv_buf; int recv_of; int recv_len; } rktrans_sasl; // SASL權限驗證 rd_kafka_buf_t *rktrans_recv_buf; // 接收數據Buffer rd_pollfd_t rktrans_pfd[2]; // Poll IO模型的fd:TCP Socket,Wake up fd int rktrans_pfd_cnt; // size_t rktrans_rcvbuf_size; // Socket接收數據Buffer大小 size_t rktrans_sndbuf_size; // Socket發送數據Buffer大小 }; typedef struct rd_kafka_transport_s rd_kafka_transport_t; rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb, const rd_sockaddr_inx_t *sinx, char *errstr, size_t errstr_size) { rd_kafka_transport_t *rktrans; int s = -1; int r; rkb->rkb_addr_last = sinx; s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family, SOCK_STREAM, IPPROTO_TCP, rkb->rkb_rk->rk_conf.opaque); if (s == -1) { rd_snprintf(errstr, errstr_size, "Failed to create socket: %s", rd_socket_strerror(rd_socket_errno)); return NULL; } rktrans = rd_kafka_transport_new(rkb, s, errstr, errstr_size); if (!rktrans) goto err; rd_rkb_dbg(rkb, BROKER, "CONNECT", "Connecting to %s (%s) " "with socket %i", rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_FAMILY | RD_SOCKADDR2STR_F_PORT), rd_kafka_secproto_names[rkb->rkb_proto], s); /* Connect to broker */ if (rkb->rkb_rk->rk_conf.connect_cb) { rd_kafka_broker_lock(rkb); /* for rkb_nodename */ r = rkb->rkb_rk->rk_conf.connect_cb( s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx), rkb->rkb_nodename, rkb->rkb_rk->rk_conf.opaque); rd_kafka_broker_unlock(rkb); } else { if (connect(s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx)) == RD_SOCKET_ERROR && (rd_socket_errno != EINPROGRESS )) r = rd_socket_errno; else r = 0; } if (r != 0) { rd_rkb_dbg(rkb, BROKER, "CONNECT", "couldn't connect to %s: %s (%i)", rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_PORT | RD_SOCKADDR2STR_F_FAMILY), rd_socket_strerror(r), r); rd_snprintf(errstr, errstr_size, "Failed to connect to broker at %s: %s", rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE), rd_socket_strerror(r)); goto err; } /* Set up transport handle */ rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s; if (rkb->rkb_wakeup_fd[0] != -1) { rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN; rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0]; } /* Poll writability to trigger on connection success/failure. */ rd_kafka_transport_poll_set(rktrans, POLLOUT); return rktrans; err: if (s != -1) rd_kafka_transport_close0(rkb->rkb_rk, s); if (rktrans) rd_kafka_transport_close(rktrans); return NULL; }
創建與Broker創建的TCP鏈接,初始化rd_kafka_transport_s對象並返回int rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans, int timeout_ms);
Poll並處理IO操做void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans, int events);
處理IO操做
ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans, rd_slice_t *slice, char *errstr, size_t errstr_size);
系統調用sendmsg方法的封裝
ssize_t rd_kafka_transport_send (rd_kafka_transport_t *rktrans, rd_slice_t *slice, char *errstr, size_t errstr_size);
系統調用send方法的封裝
ssize_t rd_kafka_transport_recv (rd_kafka_transport_t *rktrans, rd_buf_t *rbuf, char *errstr, size_t errstr_size);
系統調用recv方法的封裝
rd_kafka_transport_t *rd_kafka_transport_new (rd_kafka_broker_t *rkb, rd_socket_t s, char *errstr, size_t errstr_size);
使用已有Socket建立rd_kafka_transport_t對象int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout);
Poll方法封裝
ssize_t rd_kafka_transport_socket_recv (rd_kafka_transport_t *rktrans, rd_buf_t *buf, char *errstr, size_t errstr_size) { #ifndef _MSC_VER // Windows系統調用封裝 return rd_kafka_transport_socket_recvmsg(rktrans, buf, errstr, errstr_size); #endif // Linux系統調用封裝 return rd_kafka_transport_socket_recv0(rktrans, buf, errstr, errstr_size); }
七、Kafka Meta
Kafka集羣的Meta Data包括:全部Broker的信息:IP和Port;
全部Topic的信息:Topic名稱,Partition數量,每一個Partition的Leader,ISR,Replica集合等。
Kafka集羣的每一臺Broker都會緩存整個集羣的Meta Data,當Broker或某一個Topic的Meta Data信息發生變化時, Kafka集羣的Controller都會感知到做相應的狀態轉換,同時把發生變化的新Meta Data信息廣播到全部的Broker。
RdKafka對Meta Data的封裝和操做包括Meta Data獲取、定時刷新以及引用的操做,如Partition Leader遷移,Partition個數的變化,Broker上下線等等。
Meta Data分爲Broker、Topic、Partition三種,定義在rdkafka.h中。
typedef struct rd_kafka_metadata_broker { int32_t id; // Broker ID char *host; // Broker主機名稱 int port; // Broker監聽端口 } rd_kafka_metadata_broker_t; typedef struct rd_kafka_metadata_partition { int32_t id; // Partition ID rd_kafka_resp_err_t err; // Broker報告的分區錯誤 int32_t leader; // 分區Leader Broker int replica_cnt; // 副本中的Broker數量 int32_t *replicas; // 副本Broker列表 int isr_cnt; // ISR列表中的ISR Broker數量 int32_t *isrs; // ISR Broker列表 } rd_kafka_metadata_partition_t; /** * @brief Topic information */ typedef struct rd_kafka_metadata_topic { char *topic; // Topic名稱 int partition_cnt; // 分區數量 struct rd_kafka_metadata_partition *partitions; // rd_kafka_resp_err_t err; // Broker報告的Topic錯誤 } rd_kafka_metadata_topic_t; typedef struct rd_kafka_metadata { int broker_cnt; // Broker數量 struct rd_kafka_metadata_broker *brokers; // Broker Meta int topic_cnt; // Topic數量 struct rd_kafka_metadata_topic *topics; // Topic Meta int32_t orig_broker_id; // Broker ID char *orig_broker_name; // Broker名稱 } rd_kafka_metadata_t;
rd_kafka_resp_err_t rd_kafka_metadata (rd_kafka_t *rk, int all_topics, rd_kafka_topic_t *only_rkt, const struct rd_kafka_metadata **metadatap, int timeout_ms);
請求Meta Data數據,阻塞操做
struct rd_kafka_metadata * rd_kafka_metadata_copy (const struct rd_kafka_metadata *md, size_t size);
深度拷貝Meta Data
rd_kafka_resp_err_t rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb, rd_kafka_buf_t *request, rd_kafka_buf_t *rkbuf, struct rd_kafka_metadata **mdp);
處理Meta Data請求響應
size_t rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos, const rd_kafka_topic_partition_list_t *match);
從當前緩存的Meta Data中查找與match匹配的Topic,並將其加入tinfos
size_t rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos, const rd_kafka_topic_partition_list_t *match);
增長緩存Meta Data中與match匹配的全部Topic到tinfos
rd_kafka_resp_err_t rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb, const rd_list_t *topics, int force, const char *reason);
刷新指定topics的全部Meta Data
rd_kafka_resp_err_t rd_kafka_metadata_refresh_known_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb, int force, const char *reason);
刷新已知Topic的Meta Data
rd_kafka_resp_err_t rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb, const char *reason);
根據Broker刷新Meta Data
rd_kafka_resp_err_t rd_kafka_metadata_refresh_all (rd_kafka_t *rk, rd_kafka_broker_t *rkb, const char *reason);
刷新集羣中全部Topic的Meta Data
rd_kafka_resp_err_t rd_kafka_metadata_request (rd_kafka_t *rk, rd_kafka_broker_t *rkb, const rd_list_t *topics, const char *reason, rd_kafka_op_t *rko);
Meta Data請求void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk);
快速刷新分區Leader的Meta Data
八、Kafka Handle對象建立
Kafka生產者、消費者客戶端對象經過rd_kafka_new函數進行建立,rd_kafka_new源碼以下:
rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf, char *errstr, size_t errstr_size){ ... // 建立conf或指定conf if (!app_conf) conf = rd_kafka_conf_new(); else conf = app_conf; ... /* Call on_new() interceptors */ rd_kafka_interceptors_on_new(rk, &rk->rk_conf); ... // 建立隊列 rk->rk_rep = rd_kafka_q_new(rk); rk->rk_ops = rd_kafka_q_new(rk); rk->rk_ops->rkq_serve = rd_kafka_poll_cb; rk->rk_ops->rkq_opaque = rk; ... if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb) rk->rk_drmode = RD_KAFKA_DR_MODE_CB; else if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR) rk->rk_drmode = RD_KAFKA_DR_MODE_EVENT; else rk->rk_drmode = RD_KAFKA_DR_MODE_NONE; if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE) rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR; if (rk->rk_conf.rebalance_cb) rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE; if (rk->rk_conf.offset_commit_cb) rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT; if (rk->rk_conf.error_cb) rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR; rk->rk_controllerid = -1; ... if (type == RD_KAFKA_CONSUMER && RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) rk->rk_cgrp = rd_kafka_cgrp_new(rk, rk->rk_group_id, rk->rk_client_id); ... // 後臺線程和後臺事件隊列建立 if (rk->rk_conf.background_event_cb) { /* Hold off background thread until thrd_create() is done. */ rd_kafka_wrlock(rk); rk->rk_background.q = rd_kafka_q_new(rk); rk->rk_init_wait_cnt++; if ((thrd_create(&rk->rk_background.thread, rd_kafka_background_thread_main, rk)) != thrd_success) ... } /* Create handler thread */ rk->rk_init_wait_cnt++; if ((thrd_create(&rk->rk_thread, rd_kafka_thread_main, rk)) != thrd_success) { ... } // 啓動Logic Broker線程 rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL, RD_KAFKA_PROTO_PLAINTEXT, "", 0, RD_KAFKA_NODEID_UA); // 根據配置增長Broker if (rk->rk_conf.brokerlist) { if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0) rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN, "No brokers configured"); } ... }
rd_kafka_new主要工做以下;
(1)根據配置設置屬性;
(2)建立Kafka Handle對象的OP隊列;
(3)建立後臺線程和後臺事件隊列;
(4)建立RdKafka主線程,執行rd_kafka_thread_main函數,主線程名稱爲rdk:main;
(5)建立Broker內部線程;
(6)根據配置建立Broker線程(每一個Broker一個線程)。
int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist) { return rd_kafka_brokers_add0(rk, brokerlist); } int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist) { ... if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) && rkb->rkb_source == RD_KAFKA_CONFIGURED) { cnt++; } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, host, port, RD_KAFKA_NODEID_UA) != NULL) cnt++; ... } rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk, rd_kafka_confsource_t source, rd_kafka_secproto_t proto, const char *name, uint16_t port, int32_t nodeid) { ... thrd_create(&rkb->rkb_thread, rd_kafka_broker_thread_main, rkb); ... } static int rd_kafka_broker_thread_main (void *arg) { rd_kafka_set_thread_name("%s", rkb->rkb_name); rd_kafka_set_thread_sysname("rdk:broker%"PRId32, rkb->rkb_nodeid); ... rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms); ... rd_kafka_broker_ops_serve(rkb, RD_POLL_NOWAIT); ... }
九、Producer生產消息過程
(1)rd_kafka_produce
rd_kafka_produce函數位於rdkafka_msg.c文件:
int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque) { return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition, msgflags, payload, len, key, keylen, msg_opaque); }
(2)rd_kafka_msg_new
rd_kafka_msg_new函數位於rdkafka_msg.c文件:
int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition, int msgflags, char *payload, size_t len, const void *key, size_t keylen, void *msg_opaque) { ... // 建立rd_kafka_msg_t消息 rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, payload, len, key, keylen, msg_opaque, &err, &errnox, NULL, 0, rd_clock()); ... // 對消息進行分區分配 err = rd_kafka_msg_partitioner(rkt, rkm, 1); ... }
rd_kafka_msg_new內部經過rd_kafka_msg_new0建立Kafka消息,使用rd_kafka_msg_partitioner對Kafka消息進行分區分配。
(3)rd_kafka_msg_partitioner
rd_kafka_msg_partitioner函數位於rdkafka_msg.c文件:
int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm, rd_dolock_t do_lock) { // 獲取分區號 ... // 獲取分區 s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0); ... rktp_new = rd_kafka_toppar_s2i(s_rktp_new); rd_atomic64_add(&rktp_new->rktp_c.producer_enq_msgs, 1); /* Update message partition */ if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) rkm->rkm_partition = partition; // 將消息入隊分區隊列 rd_kafka_toppar_enq_msg(rktp_new, rkm); ... }
rd_kafka_msg_partitioner內部經過經過rd_kafka_toppar_enq_msg將分區加入分區隊列。
(4)rd_kafka_toppar_enq_msg
void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) { ... // 入隊列 if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA || rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) { queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm); } else { queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt, &rktp->rktp_msgq, rkm); } ... }
(5)rd_kafka_msgq_enq
static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm) { TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link); rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len; return (int)++rkmq->rkmq_msg_cnt; }
(6)rd_kafka_msgq_enq_sorted
rd_kafka_msgq_enq_sorted函數位於rdkafka_msg.c文件:
int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt, rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm) { rd_dassert(rkm->rkm_u.producer.msgid != 0); return rd_kafka_msgq_enq_sorted0(rkmq, rkm, rkt->rkt_conf.msg_order_cmp); } int rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm, int (*order_cmp) (const void *, const void *)) { TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *, rkm_link, order_cmp); rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len; return ++rkmq->rkmq_msg_cnt; }
隊列的操做位於rdsysqueue.h文件中。
rd_kafka_broker_add函數位於rdkafka_broker.c文件:
rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk, rd_kafka_confsource_t source, rd_kafka_secproto_t proto, const char *name, uint16_t port, int32_t nodeid) { rd_kafka_broker_t *rkb; rkb = rd_calloc(1, sizeof(*rkb)); // 設置rd_kafka_broker_t對象屬性 ... if (thrd_create(&rkb->rkb_thread, rd_kafka_broker_thread_main, rkb) != thrd_success) { ... } }
rd_kafka_broker_add建立Broker線程,啓動執行rd_kafka_broker_thread_main函數。
static int rd_kafka_broker_thread_main (void *arg) { ... rd_kafka_set_thread_name("%s", rkb->rkb_name); rd_kafka_set_thread_sysname("rdk:broker%"PRId32, rkb->rkb_nodeid); ... rd_kafka_broker_serve(rkb, ...); ... } static void rd_kafka_broker_serve (rd_kafka_broker_t *rkb, int timeout_ms) { ... if (rkb->rkb_source == RD_KAFKA_INTERNAL) rd_kafka_broker_internal_serve(rkb, abs_timeout); else if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER) rd_kafka_broker_producer_serve(rkb, abs_timeout); else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) rd_kafka_broker_consumer_serve(rkb, abs_timeout); } static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb, rd_ts_t abs_timeout) { // rd_kafka_broker_produce_toppars(rkb, now, &next_wakeup, do_timeout_scan); rd_kafka_broker_ops_io_serve(rkb, next_wakeup); } static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb, rd_ts_t abs_timeout) { ... rd_kafka_broker_ops_serve(rkb, rd_timeout_remains_us(abs_timeout)); ... } static int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb, rd_ts_t timeout_us) { rd_kafka_op_t *rko; int cnt = 0; while ((rko = rd_kafka_q_pop(rkb->rkb_ops, timeout_us, 0)) && (cnt++, rd_kafka_broker_op_serve(rkb, rko))) timeout_us = RD_POLL_NOWAIT; return cnt; }
rdkafka_broker.c文件:
static ssize_t rd_kafka_broker_send (rd_kafka_broker_t *rkb, rd_slice_t *slice) { ... r = rd_kafka_transport_send(rkb->rkb_transport, slice, errstr, sizeof(errstr)); ... }
rdkafka_transport.c文件:
ssize_t rd_kafka_transport_send (rd_kafka_transport_t *rktrans, rd_slice_t *slice, char *errstr, size_t errstr_size) { .. r = rd_kafka_transport_socket_send(rktrans, slice, errstr, errstr_size); ... } static ssize_t rd_kafka_transport_socket_send (rd_kafka_transport_t *rktrans, rd_slice_t *slice, char *errstr, size_t errstr_size) { #ifndef _MSC_VER /* FIXME: Use sendmsg() with iovecs if there's more than one segment * remaining, otherwise (or if platform does not have sendmsg) * use plain send(). */ return rd_kafka_transport_socket_sendmsg(rktrans, slice, errstr, errstr_size); #endif return rd_kafka_transport_socket_send0(rktrans, slice, errstr, errstr_size); } static ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans, rd_slice_t *slice, char *errstr, size_t errstr_size) { ... r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT ... }
十、Consumer消費消息過程
(1)開啓消息消費
RdKafka提供了rd_kafka_consume_start、rd_kafka_consume、rd_kafka_consume_start_queue、rd_kafka_consume_queue接口用於消息消費。
int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition, int64_t offset, rd_kafka_q_t *rkq) { shptr_rd_kafka_toppar_t *s_rktp; if (partition < 0) { rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, ESRCH); return -1; } if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) { rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); return -1; } rd_kafka_topic_wrlock(rkt); s_rktp = rd_kafka_toppar_desired_add(rkt, partition); rd_kafka_topic_wrunlock(rkt); /* Verify offset */ if (offset == RD_KAFKA_OFFSET_BEGINNING || offset == RD_KAFKA_OFFSET_END || offset <= RD_KAFKA_OFFSET_TAIL_BASE) { /* logical offsets */ } else if (offset == RD_KAFKA_OFFSET_STORED) { /* offset manager */ if (rkt->rkt_conf.offset_store_method == RD_KAFKA_OFFSET_METHOD_BROKER && RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) { /* Broker based offsets require a group id. */ rd_kafka_toppar_destroy(s_rktp); rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); return -1; } } else if (offset < 0) { rd_kafka_toppar_destroy(s_rktp); rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); return -1; } rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset, rkq, RD_KAFKA_NO_REPLYQ); rd_kafka_toppar_destroy(s_rktp); rd_kafka_set_last_error(0, 0); return 0; } int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition, int64_t offset) { rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START", "Start consuming partition %"PRId32,partition); return rd_kafka_consume_start0(rkt, partition, offset, NULL); } int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition, int64_t offset, rd_kafka_queue_t *rkqu) { rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q); } static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) { rd_kafka_op_t *rko; rd_kafka_message_t *rkmessage = NULL; rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); if (timeout_ms) rd_kafka_app_poll_blocking(rk); rd_kafka_yield_thread = 0; while ((rko = rd_kafka_q_pop(rkq, rd_timeout_remains_us(abs_timeout), 0))) { rd_kafka_op_res_t res; res = rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL); if (res == RD_KAFKA_OP_RES_PASS) break; if (unlikely(res == RD_KAFKA_OP_RES_YIELD || rd_kafka_yield_thread)) { /* Callback called rd_kafka_yield(), we must * stop dispatching the queue and return. */ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, EINTR); rd_kafka_app_polled(rk); return NULL; } /* Message was handled by callback. */ continue; } if (!rko) { /* Timeout reached with no op returned. */ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT, ETIMEDOUT); rd_kafka_app_polled(rk); return NULL; } rd_kafka_assert(rk, rko->rko_type == RD_KAFKA_OP_FETCH || rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR); /* Get rkmessage from rko */ rkmessage = rd_kafka_message_get(rko); /* Store offset */ rd_kafka_op_offset_store(rk, rko); rd_kafka_set_last_error(0, 0); rd_kafka_app_polled(rk); return rkmessage; } rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt, int32_t partition, int timeout_ms) { rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); shptr_rd_kafka_toppar_t *s_rktp; rd_kafka_toppar_t *rktp; rd_kafka_message_t *rkmessage; rd_kafka_topic_rdlock(rkt); s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/); if (unlikely(!s_rktp)) s_rktp = rd_kafka_toppar_desired_get(rkt, partition); rd_kafka_topic_rdunlock(rkt); if (unlikely(!s_rktp)) { /* No such toppar known */ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, ESRCH); return NULL; } rktp = rd_kafka_toppar_s2i(s_rktp); rkmessage = rd_kafka_consume0(rkt->rkt_rk, rktp->rktp_fetchq, timeout_ms); rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */ return rkmessage; } rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu, int timeout_ms) { return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms); }
(2)Poll輪詢消息隊列
int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) { int r; if (timeout_ms) rd_kafka_app_poll_blocking(rk); r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); rd_kafka_app_polled(rk); return r; } rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms) { rd_kafka_cgrp_t *rkcg; if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) { rd_kafka_message_t *rkmessage = rd_kafka_message_new(); rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; return rkmessage; } return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms); } rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) { rd_kafka_op_t *rko; if (timeout_ms) rd_kafka_app_poll_blocking(rkqu->rkqu_rk); rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0, RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL); rd_kafka_app_polled(rkqu->rkqu_rk); if (!rko) return NULL; return rko; }
2、RdKafka C++源碼分析
一、C++ API對C API的封裝
C++ API主要是對RdKafka C API的封裝,根據不一樣的功能模塊封裝爲不一樣功能類,類定義在rdkafkacpp.h文件中,並使用RdKafka命名空間進行限定,主要類以下Conf、Handle、TopicPartition、Topic、Message、Queue、KafkaConsumer、Consumer、Producer、BrokerMetadata、PartitionMetadata、TopicMetadata、Metadata、DeliveryReportCb、PartitionerCb、PartitionerKeyPointerCb、EventCb、Event、ConsumeCb:Consume、RebalanceCb、OffsetCommitCb、SocketCb、OpenCb。
二、Consumer與KafkaConsumer
Consumer對partition、offset有徹底的控制能力;KafkaConsumer提供了Topic訂閱接口,默認使用latest消費方式,能夠經過assign方法指定開始消費的partition和offset。
三、Producer生產消息過程
(1)Producer建立
RdKafka::Producer *RdKafka::Producer::create (RdKafka::Conf *conf, std::string &errstr) { char errbuf[512]; RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf); RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl(); rd_kafka_conf_t *rk_conf = NULL; if (confimpl) { if (!confimpl->rk_conf_) { errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; delete rkp; return NULL; } rkp->set_common_config(confimpl); rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_); if (confimpl->dr_cb_) { rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline); rkp->dr_cb_ = confimpl->dr_cb_; } } rd_kafka_t *rk; if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errbuf, sizeof(errbuf)))) { errstr = errbuf; // rd_kafka_new() takes ownership only if succeeds if (rk_conf) rd_kafka_conf_destroy(rk_conf); delete rkp; return NULL; } rkp->rk_ = rk; return rkp; }
建立Producer時須要準備好Conf對象。
(2)生產消息
RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque) { RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags, payload, len, key ? key->c_str() : NULL, key ? key->size() : 0, msg_opaque) == -1) return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); return RdKafka::ERR_NO_ERROR; }
生產消息時須要指定Topic對象。
(3)Poll輪詢
int RdKafka::HandleImpl::poll(int timeout_ms) { return rd_kafka_poll(rk_, timeout_ms); }
produce生產消息是異步的,將消息放入到內部隊列後會馬上返回,所以須要由poll返回最終寫入結果。 produce是盡力送達的,會在嘗試直至超過message.timeout.ms才彙報失敗。
四、Consumer消費消息過程
(1)建立KafkaConsumer
RdKafka::KafkaConsumer *RdKafka::KafkaConsumer::create (RdKafka::Conf *conf, std::string &errstr) { char errbuf[512]; RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf); RdKafka::KafkaConsumerImpl *rkc = new RdKafka::KafkaConsumerImpl(); rd_kafka_conf_t *rk_conf = NULL; size_t grlen; if (!confimpl || !confimpl->rk_conf_) { errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; delete rkc; return NULL; } if (rd_kafka_conf_get(confimpl->rk_conf_, "group.id", NULL, &grlen) != RD_KAFKA_CONF_OK || grlen <= 1 /* terminating null only */) { errstr = "\"group.id\" must be configured"; delete rkc; return NULL; } rkc->set_common_config(confimpl); rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_); rd_kafka_t *rk; if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf, errbuf, sizeof(errbuf)))) { errstr = errbuf; // rd_kafka_new() takes ownership only if succeeds rd_kafka_conf_destroy(rk_conf); delete rkc; return NULL; } rkc->rk_ = rk; /* Redirect handle queue to cgrp's queue to provide a single queue point */ rd_kafka_poll_set_consumer(rk); return rkc; }
(2)訂閱Topic
RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::subscribe (const std::vector<std::string> &topics) { rd_kafka_topic_partition_list_t *c_topics; rd_kafka_resp_err_t err; c_topics = rd_kafka_topic_partition_list_new((int)topics.size()); for (unsigned int i = 0 ; i < topics.size() ; i++) rd_kafka_topic_partition_list_add(c_topics, topics[i].c_str(), RD_KAFKA_PARTITION_UA); err = rd_kafka_subscribe(rk_, c_topics); rd_kafka_topic_partition_list_destroy(c_topics); return static_cast<RdKafka::ErrorCode>(err); }
(3)消費消息
RdKafka::Message *RdKafka::KafkaConsumerImpl::consume (int timeout_ms) { rd_kafka_message_t *rkmessage; rkmessage = rd_kafka_consumer_poll(this->rk_, timeout_ms); if (!rkmessage) return new RdKafka::MessageImpl(NULL, RdKafka::ERR__TIMED_OUT); return new RdKafka::MessageImpl(rkmessage); }
3、RdKafka多線程設計
一、Producer/Consumer多線程設計
RdKafka內部使用多線程對硬件資源進行充分利用,RdKafka API是線程安全的,應用程序能夠在任意時間調用其線程內的任意API函數。
每一個Producer/Consumer實例會建立線程以下:
(1)應用線程,處理具體應用業務邏輯。
(2)Kafka Handler線程:每建立一個Producer/Consumer即會建立一個Handler線程,即RdKafka主線程,線程名稱爲rdk::main,線程執行函數爲rd_kafka_thread_main。
(3)Kafka Broker線程:對於增長到Producer/Consumer的每一個Broker會建立一個線程,負責與Broker通訊,線程執行函數爲rd_kafka_broker_thread_main,線程名稱爲rdk::brokerxxx。
(4)Inner Broker線程,用於處未分配分區的OP操做隊列。
(5)後臺線程
若是配置對象設置了background_event_cb,Kafka Handler建立時會建立相應的後臺線程和後臺隊列,線程執行函數爲rd_kafka_background_thread_main。
二、線程查看
Linux查看KafkaConsumer進程的線程的方法:
ps -T -p pid top -H -p pid
Cosnumer線程查看結果:
Producer線程查看結果: