Kafka快速入門(十一)——RdKafka源碼分析

Kafka快速入門(十一)——RdKafka源碼分析

1、RdKafka C源碼分析

一、Kafka OP隊列

RdKafka將與Kafka Broke的交互、內部實現的操做都封裝成Operator結構,而後放入OP處理隊列裏統一處理。Kafka OP隊列是線程間通訊的管道。
RdKafka隊列定義在rdkafka_queue.h文件中,隊列相關操做封裝在rdsysqueue.h文件中。
(1)Kafka OP隊列

node

typedef struct rd_kafka_q_s rd_kafka_q_t;
struct rd_kafka_q_s
{
    mtx_t  rkq_lock;// 隊列操做加鎖
    cnd_t  rkq_cond; // 隊列中放入新元素時, 用條件變量喚醒相應等待線程
    struct rd_kafka_q_s *rkq_fwdq; // Forwarded/Routed queue
    struct rd_kafka_op_tailq rkq_q; // 放入隊列的操做所存儲的隊列
    int           rkq_qlen;      /* Number of entries in queue */
    int64_t       rkq_qsize;     /* Size of all entries in queue */
    int           rkq_refcnt; // 引用計數
    int           rkq_flags; // 當前隊列的狀態
    rd_kafka_t   *rkq_rk;// 隊列關聯的Kafka Handler對象
    struct rd_kafka_q_io *rkq_qio; //隊列中放入新元素時,向fd寫入數據喚醒等待線程
    rd_kafka_q_serve_cb_t *rkq_serve; // 隊列中的操做被執行時所執行的回調函數
    void *rkq_opaque;
    const char *rkq_name; // queue name
};

// Kafka Operator隊列,對外接口
typedef struct rd_kafka_queue_s rd_kafka_queue_t;
struct rd_kafka_queue_s
{
    rd_kafka_q_t *rkqu_q;// Kafka OP 隊列
    rd_kafka_t   *rkqu_rk;// 隊列關聯的Kafka Handler
    int rkqu_is_owner;
};
rd_kafka_queue_t *rd_kafka_queue_new (rd_kafka_t *rk)
{
    rd_kafka_q_t *rkq;
    rd_kafka_queue_t *rkqu;

    rkq = rd_kafka_q_new(rk);
    rkqu = rd_kafka_queue_new0(rk, rkq);
    rd_kafka_q_destroy(rkq);
    return rkqu;
}

建立OP隊列json

rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk)
{
    return rd_kafka_queue_new0(rk, rk->rk_rep);
}

獲取RdKafka與應用程序交互使用的OP隊列數組

rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk) {
    if (!rk->rk_cgrp)
        return NULL;
    return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q);
}

獲取消費者的OP隊列緩存

rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,
                                                const char *topic,
                                                int32_t partition) {
        shptr_rd_kafka_toppar_t *s_rktp;
        rd_kafka_toppar_t *rktp;
        rd_kafka_queue_t *result;

        if (rk->rk_type == RD_KAFKA_PRODUCER)
                return NULL;

        s_rktp = rd_kafka_toppar_get2(rk, topic,
                                      partition,
                                      0, /* no ua_on_miss */
                                      1 /* create_on_miss */);

        if (!s_rktp)
                return NULL;

        rktp = rd_kafka_toppar_s2i(s_rktp);
        result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq);
        rd_kafka_toppar_destroy(s_rktp);

        return result;
}

獲取Topic的分區的OP隊列安全

rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, rd_ts_t timeout_us,
                     int32_t version,
                                     rd_kafka_q_cb_type_t cb_type,
                                     rd_kafka_q_serve_cb_t *callback,
                     void *opaque);

處理OP隊列中的一個OP操做,按version過濾的可處理OP,沒有則等待,若是超時,函數退出。網絡

int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
                      rd_kafka_q_cb_type_t cb_type,
                      rd_kafka_q_serve_cb_t *callback,
                      void *opaque);

批量處理OP隊列的OP數據結構

int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms,
                                 rd_kafka_message_t **rkmessages,
                                 size_t rkmessages_size);

處理RD_KAFKA_OP_FETCH OP操做多線程

int  rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock);
#define rd_kafka_q_purge(rkq) rd_kafka_q_purge0(rkq, 1/*lock*/)

清除OP隊列中的全部OP操做
rd_kafka_queue_t *rd_kafka_queue_get_background (rd_kafka_t *rk);
獲取Kafka Handle的後臺OP隊列

併發

二、Kafka OP操做

RaKafka OP操做封裝在rdkafka_op.h文件中。app

typedef enum
{
    RD_KAFKA_OP_NONE, // 未指定類型
    RD_KAFKA_OP_FETCH, // Kafka thread -> Application
    RD_KAFKA_OP_ERR, // Kafka thread -> Application
    RD_KAFKA_OP_CONSUMER_ERR, // Kafka thread -> Application
    RD_KAFKA_OP_DR, // Kafka thread->Application:Produce message delivery report
    RD_KAFKA_OP_STATS, // Kafka thread -> Application
    RD_KAFKA_OP_OFFSET_COMMIT, // any -> toppar's Broker thread
    RD_KAFKA_OP_NODE_UPDATE, // any -> Broker thread: node update
    RD_KAFKA_OP_XMIT_BUF, // transmit buffer: any -> broker thread
    RD_KAFKA_OP_RECV_BUF, // received response buffer: broker thr -> any
    RD_KAFKA_OP_XMIT_RETRY, // retry buffer xmit: any -> broker thread
    RD_KAFKA_OP_FETCH_START, // Application -> toppar's handler thread
    RD_KAFKA_OP_FETCH_STOP,  // Application -> toppar's handler thread
    RD_KAFKA_OP_SEEK, // Application -> toppar's handler thread
    RD_KAFKA_OP_PAUSE, // Application -> toppar's handler thread
    RD_KAFKA_OP_OFFSET_FETCH, // Broker->broker thread: fetch offsets for topic
    RD_KAFKA_OP_PARTITION_JOIN, // cgrp op:add toppar to cgrp,broker op:add toppar to broker
    RD_KAFKA_OP_PARTITION_LEAVE, // cgrp op:remove toppar from cgrp,broker op:remove toppar from rkb
    RD_KAFKA_OP_REBALANCE, // broker thread -> app:group rebalance
    RD_KAFKA_OP_TERMINATE, // For generic use
    RD_KAFKA_OP_COORD_QUERY,  // Query for coordinator
    RD_KAFKA_OP_SUBSCRIBE, // New subscription
    RD_KAFKA_OP_ASSIGN,  // New assignment
    RD_KAFKA_OP_GET_SUBSCRIPTION,// Get current subscription Reuses u.subscribe
    RD_KAFKA_OP_GET_ASSIGNMENT, // Get current assignment Reuses u.assign
    RD_KAFKA_OP_THROTTLE, // Throttle info
    RD_KAFKA_OP_NAME, // Request name
    RD_KAFKA_OP_OFFSET_RESET, // Offset reset
    RD_KAFKA_OP_METADATA, //  Metadata response
    RD_KAFKA_OP_LOG, //  Log
    RD_KAFKA_OP_WAKEUP, // Wake-up signaling
    RD_KAFKA_OP_CREATETOPICS, // Admin: CreateTopics: u.admin_request
    RD_KAFKA_OP_DELETETOPICS, // Admin: DeleteTopics: u.admin_request
    RD_KAFKA_OP_CREATEPARTITIONS,// Admin: CreatePartitions: u.admin_request
    RD_KAFKA_OP_ALTERCONFIGS, //  Admin: AlterConfigs: u.admin_request
    RD_KAFKA_OP_DESCRIBECONFIGS, // Admin: DescribeConfigs: u.admin_request
    RD_KAFKA_OP_ADMIN_RESULT, //  Admin API .._result_t
    RD_KAFKA_OP_PURGE, // Purge queues
    RD_KAFKA_OP_CONNECT, // Connect (to broker)
    RD_KAFKA_OP_OAUTHBEARER_REFRESH, // Refresh OAUTHBEARER token
    RD_KAFKA_OP_MOCK, // Mock cluster command
    RD_KAFKA_OP_BROKER_MONITOR, // Broker state change
    RD_KAFKA_OP_TXN, // Transaction command
    RD_KAFKA_OP__END // 操做結束符
} rd_kafka_op_type_t;

rd_kafka_op_type_t枚舉類型定義了RaKafka 全部OP操做類型。

typedef enum
{
    RD_KAFKA_PRIO_NORMAL = 0,  // 正常優先級
    RD_KAFKA_PRIO_MEDIUM,  // 中級
    RD_KAFKA_PRIO_HIGH,  // 高級
    RD_KAFKA_PRIO_FLASH // 最高優先級:當即
} rd_kafka_prio_t;

rd_kafka_prio_t枚舉類型定義了Kafka OP操做的全部優先級。

typedef enum
{
    RD_KAFKA_OP_RES_PASS,  // Not handled, pass to caller
    RD_KAFKA_OP_RES_HANDLED, // Op was handled (through callbacks)
    RD_KAFKA_OP_RES_KEEP, // Op已經被回調函數處理,但禁止被op_handle()銷燬
    RD_KAFKA_OP_RES_YIELD // Callback called yield
} rd_kafka_op_res_t;

rd_kafka_op_res_t枚舉類型定義了OP被處理後的返回結果類型,
若是返回RD_KAFKA_OP_RES_YIELD,handler處理函數須要肯定是否須要將OP從新入隊列仍是將OP銷燬。

typedef enum
{
    RD_KAFKA_Q_CB_INVALID, // 非法,未使用
    RD_KAFKA_Q_CB_CALLBACK, // 基於OP觸發回調函數
    RD_KAFKA_Q_CB_RETURN,  // 返回OP而不是觸發回調函數
    RD_KAFKA_Q_CB_FORCE_RETURN, // 不管是否觸發回調函數都返回OP
    RD_KAFKA_Q_CB_EVENT // 返回Event OP而不是觸發回調函數
} rd_kafka_q_cb_type_t;

rd_kafka_q_cb_type_t枚舉類型定義了OP隊列中OP操做執行回調函數的全部類型。
OP隊列執行回調函數類型定義以下:

typedef rd_kafka_op_res_t
(rd_kafka_q_serve_cb_t) (rd_kafka_t *rk,
                         struct rd_kafka_q_s *rkq,
                         struct rd_kafka_op_s *rko,
                         rd_kafka_q_cb_type_t cb_type, void *opaque);

OP回調函數定義以下:

typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk,
        rd_kafka_q_t *rkq,
        struct rd_kafka_op_s *rko);

OP執行結果數據結構定義以下:

typedef struct rd_kafka_replyq_s
{
    rd_kafka_q_t *q;// OP執行結果存儲隊列
    int32_t       version;// 版本
} rd_kafka_replyq_t;

Kafka OP數據結構定義以下:

struct rd_kafka_op_s
{
    TAILQ_ENTRY(rd_kafka_op_s) rko_link;// 增長TAILQ字段
    rd_kafka_op_type_t    rko_type; // OP類型
    rd_kafka_event_type_t rko_evtype;// Event類型
    int                   rko_flags; // OP標識
    int32_t               rko_version;// 版本
    rd_kafka_resp_err_t   rko_err;//
    int32_t               rko_len; //
    rd_kafka_prio_t       rko_prio; // OP優先級
    shptr_rd_kafka_toppar_t *rko_rktp;// 關聯TopicPartition
    rd_kafka_replyq_t rko_replyq;//
    rd_kafka_q_serve_cb_t *rko_serve;// OP隊列回調函數
    void *rko_serve_opaque;// OP隊列回調函數參數
    rd_kafka_t     *rko_rk;// Kafka Handle
    rd_kafka_op_cb_t *rko_op_cb; // OP回調函數
    union
    {
        struct
        {
            rd_kafka_buf_t *rkbuf;
            rd_kafka_msg_t  rkm;
            int evidx;
        } fetch;
        struct
        {
            rd_kafka_topic_partition_list_t *partitions;
            int do_free; // free .partitions on destroy()
        } offset_fetch;
        struct
        {
            rd_kafka_topic_partition_list_t *partitions;
            void (*cb) (rd_kafka_t *rk,
                        rd_kafka_resp_err_t err,
                        rd_kafka_topic_partition_list_t *offsets,
                        void *opaque);
            void *opaque;
            int silent_empty; // Fail silently if there are no offsets to commit.
            rd_ts_t ts_timeout;
            char *reason;
        } offset_commit;

        struct
        {
            rd_kafka_topic_partition_list_t *topics;
        } subscribe;

        struct
        {
            rd_kafka_topic_partition_list_t *partitions;
        } assign;
        struct
        {
            rd_kafka_topic_partition_list_t *partitions;
        } rebalance;
        struct
        {
            char *str;
        } name;
        struct
        {
            int64_t offset;
            char *errstr;
            rd_kafka_msg_t rkm;
            int fatal;
        } err;

        struct
        {
            int throttle_time;
            int32_t nodeid;
            char *nodename;
        } throttle;

        struct
        {
            char *json;
            size_t json_len;
        } stats;

        struct
        {
            rd_kafka_buf_t *rkbuf;
        } xbuf;

        // RD_KAFKA_OP_METADATA
        struct
        {
            rd_kafka_metadata_t *md;
            int force; // force request regardless of outstanding metadata requests.
        } metadata;

        struct
        {
            shptr_rd_kafka_itopic_t *s_rkt;
            rd_kafka_msgq_t msgq;
            rd_kafka_msgq_t msgq2;
            int do_purge2;
        } dr;

        struct
        {
            int32_t nodeid;
            char    nodename[RD_KAFKA_NODENAME_SIZE];
        } node;

        struct
        {
            int64_t offset;
            char *reason;
        } offset_reset;

        struct
        {
            int64_t offset;
            struct rd_kafka_cgrp_s *rkcg;
        } fetch_start; // reused for SEEK

        struct
        {
            int pause;
            int flag;
        } pause;

        struct
        {
            char fac[64];
            int  level;
            char *str;
        } log;

        struct
        {
            rd_kafka_AdminOptions_t options;
            rd_ts_t abs_timeout; // Absolute timeout
            rd_kafka_timer_t tmr; // Timeout timer
            struct rd_kafka_enq_once_s *eonce; // 只入隊列OP一次,用於觸發Broker狀態變化的OP請求
            rd_list_t args; // Type depends on request, e.g. rd_kafka_NewTopic_t for CreateTopics
            rd_kafka_buf_t *reply_buf; // Protocol reply
            struct rd_kafka_admin_worker_cbs *cbs;
            // Worker state
            enum
            {
                RD_KAFKA_ADMIN_STATE_INIT,
                RD_KAFKA_ADMIN_STATE_WAIT_BROKER,
                RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER,
                RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST,
                RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE,
            } state;

            int32_t broker_id; // Requested broker id to communicate with.
            // Application's reply queue
            rd_kafka_replyq_t replyq;
            rd_kafka_event_type_t reply_event_type;
        } admin_request;

        struct
        {
            rd_kafka_op_type_t reqtype; // Request op type
            char *errstr; // 錯誤信息
            rd_list_t results; // Type depends on request type:
            void *opaque; // Application's opaque as set by rd_kafka_AdminOptions_set_opaque
        } admin_result;

        struct
        {
            int flags; // purge_flags from rd_kafka_purge()
        } purge;

        // Mock cluster command
        struct
        {
            enum
            {
                RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR,
                RD_KAFKA_MOCK_CMD_TOPIC_CREATE,
                RD_KAFKA_MOCK_CMD_PART_SET_LEADER,
                RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER,
                RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS,
                RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN,
                RD_KAFKA_MOCK_CMD_BROKER_SET_RACK,
                RD_KAFKA_MOCK_CMD_COORD_SET,
                RD_KAFKA_MOCK_CMD_APIVERSION_SET,
            } cmd;

            rd_kafka_resp_err_t err; // Error for:TOPIC_SET_ERROR
            char *name; // For:TOPIC_SET_ERROR,TOPIC_CREATE,PART_SET_FOLLOWER,PART_SET_FOLLOWER_WMARKS,BROKER_SET_RACK,COORD_SET (key_type)
            char *str; // For:COORD_SET (key)
            int32_t partition; // For:PART_SET_FOLLOWER,PART_SET_FOLLOWER_WMARKS,PART_SET_LEADER,APIVERSION_SET (ApiKey)
            int32_t broker_id; // For:PART_SET_FOLLOWER,PART_SET_LEADER,BROKER_SET_UPDOWN,BROKER_SET_RACK,COORD_SET
            int64_t lo; // Low offset, for:TOPIC_CREATE (part cnt),PART_SET_FOLLOWER_WMARKS,BROKER_SET_UPDOWN, APIVERSION_SET (minver);
            int64_t hi; // High offset, for:TOPIC_CREATE (repl fact),PART_SET_FOLLOWER_WMARKS,APIVERSION_SET (maxver)
        } mock;

        struct
        {
            struct rd_kafka_broker_s *rkb; // 狀態變化的Broker
            void (*cb) (struct rd_kafka_broker_s *rkb);// 要在OP處理線程觸發的回調函數
        } broker_monitor;

        struct
        {
            rd_kafka_error_t *error; // 錯誤對象
            char *group_id; // 要提交位移的消費者組ID
            int   timeout_ms; /**< Operation timeout */
            rd_ts_t abs_timeout; /**< Absolute time */
            rd_kafka_topic_partition_list_t *offsets;// 要提交的位移
        } txn;

    } rko_u;
};
typedef struct rd_kafka_op_s rd_kafka_event_t;

const char *rd_kafka_op2str (rd_kafka_op_type_t type);
返回OP類型的相應字符串
void rd_kafka_op_destroy (rd_kafka_op_t *rko);
銷燬OP對象


rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type);
#define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type)

生成OP對象

rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk,
                                    rd_kafka_q_t *rkq, rd_kafka_op_t *rko);

調用OP的回調函數

rd_kafka_op_res_t rd_kafka_op_handle_std (rd_kafka_t *rk, rd_kafka_q_t *rkq,
                        rd_kafka_op_t *rko, int cb_type)
{
    if (cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
        return RD_KAFKA_OP_RES_PASS;
    else if (unlikely(rd_kafka_op_is_ctrl_msg(rko)))
    {
        rd_kafka_op_offset_store(rk, rko);
        return RD_KAFKA_OP_RES_HANDLED;
    }
    else if (cb_type != RD_KAFKA_Q_CB_EVENT &&
             rko->rko_type & RD_KAFKA_OP_CB)
        return rd_kafka_op_call(rk, rkq, rko);
    else if (rko->rko_type == RD_KAFKA_OP_RECV_BUF)
        rd_kafka_buf_handle_op(rko, rko->rko_err);
    else if (cb_type != RD_KAFKA_Q_CB_RETURN &&
             rko->rko_type & RD_KAFKA_OP_REPLY &&
             rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
        return RD_KAFKA_OP_RES_HANDLED; 
    else
        return RD_KAFKA_OP_RES_PASS;

    return RD_KAFKA_OP_RES_HANDLED;
}

對OP進行標準化處理

rd_kafka_op_res_t
rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
                    rd_kafka_q_cb_type_t cb_type, void *opaque,
                    rd_kafka_q_serve_cb_t *callback)
{
    rd_kafka_op_res_t res;

    if (rko->rko_serve)
    {
        callback = rko->rko_serve;
        opaque   = rko->rko_serve_opaque;
        rko->rko_serve        = NULL;
        rko->rko_serve_opaque = NULL;
    }

    res = rd_kafka_op_handle_std(rk, rkq, rko, cb_type);
    if (res == RD_KAFKA_OP_RES_KEEP)
    {
        return res;
    }
    if (res == RD_KAFKA_OP_RES_HANDLED)
    {
        rd_kafka_op_destroy(rko);
        return res;
    }
    else if (unlikely(res == RD_KAFKA_OP_RES_YIELD))
        return res;

    if (callback)
        res = callback(rk, rkq, rko, cb_type, opaque);

    return res;
}

處理OP

三、Kafka Message

rd_kafka_message_t定義在rdkafka.h文件:

typedef struct rd_kafka_message_s
{
    rd_kafka_resp_err_t err; // 非0表示錯誤消息
    rd_kafka_topic_t *rkt; // 關聯Topic
    int32_t partition; //  分區
    void   *payload; // 消息數據
    size_t  len; // err爲0表示消息數據長度,非0表示錯誤信息長度
    void   *key; // err爲0表示消息key
    size_t  key_len; // err爲0表示消息key的長度
    int64_t offset; // 位移
    void  *_private; // 對Consumer,爲RdKafka私有指針;對於Producer,爲dr_msg_cb
} rd_kafka_message_t;

Kafka Producer生產的數據在application層調用接口後最終會將數據封裝成rd_kafka_message_t結構,Consumer從Broker消費的數據回調給application層時也會封裝成rd_kafka_message_t結構。
rd_kafka_msg_t和rd_kafka_msgq_t定義在rdkafka_msg.h文件:

typedef struct rd_kafka_msg_s
{
    rd_kafka_message_t rkm_rkmessage;  // Kafka 消息,必須時第一個字段
    TAILQ_ENTRY(rd_kafka_msg_s)  rkm_link;// 增長TAILQ字段
    int        rkm_flags; // 消息類型標識
    rd_kafka_timestamp_type_t rkm_tstype; // 消息時間戳
    int64_t    rkm_timestamp;// V1消息格式的時間戳
    rd_kafka_headers_t *rkm_headers;
    rd_kafka_msg_status_t rkm_status; // 消息持久化狀態
    union
    {
        struct
        {
            rd_ts_t ts_timeout; // 消息超時
            rd_ts_t ts_enq; // 入隊列或生產消息時間戳
            rd_ts_t ts_backoff; 
            uint64_t msgid; // 用於保序的消息ID,從1開始
            uint64_t last_msgid; // 
            int     retries; // 重試次數
        } producer;
#define rkm_ts_timeout rkm_u.producer.ts_timeout
#define rkm_ts_enq     rkm_u.producer.ts_enq
#define rkm_msgid      rkm_u.producer.msgid
        struct
        {
            rd_kafkap_bytes_t binhdrs;
        } consumer;
    } rkm_u;
} rd_kafka_msg_t;

TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s);
typedef struct rd_kafka_msgq_s {
        struct rd_kafka_msgs_head_s rkmq_msgs;  /* TAILQ_HEAD */
        int32_t rkmq_msg_cnt;
        int64_t rkmq_msg_bytes;
} rd_kafka_msgq_t;
Kafka Message隊列
static rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {
    return (rd_kafka_msg_t *)rkmessage;
}

將rd_kafka_message_t類型消息轉換爲rd_kafka_msg_t類型消息

int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
              int msgflags,
              char *payload, size_t len,
              const void *keydata, size_t keylen,
              void *msg_opaque);

建立一條新的Kafka消息並將其入對到相應分區的消息隊列。
static void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,rd_kafka_msgq_t *src);
將src消息隊列的全部消息合併到dst消息隊列尾部,src會被清空。
static void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,rd_kafka_msgq_t *src);
將src消息隊列的全部元素移動到dst消息隊列,src會被清空
static void rd_kafka_msgq_purge (rd_kafka_t *rk, rd_kafka_msgq_t *rkmq);
清空Kafka Handle的消息隊列





static rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,
                   rd_kafka_msg_t *rkm,
                   int do_count);

將rkm消息從消息隊列rkmq中刪除
static rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq);
將rkm消息從消息隊列rkmq中刪除

int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt,
                              rd_kafka_msgq_t *rkmq,
                              rd_kafka_msg_t *rkm);

將rkm消息按照消息ID排序插入rnkmq消息隊列
static void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq,rd_kafka_msg_t *rkm);
將rkm消息插入消息隊列rkmq頭部
static int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,rd_kafka_msg_t *rkm);
將rkm消息追加到rkmq消息隊列



int rd_kafka_msgq_age_scan (struct rd_kafka_toppar_s *rktp,
                            rd_kafka_msgq_t *rkmq,
                            rd_kafka_msgq_t *timedout,
                            rd_ts_t now,
                            rd_ts_t *abs_next_timeout);

掃描rkmq消息隊列,將超時的消息增長到timeout消息隊列,並從rkmq消息隊列將其刪除。

int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
                              rd_dolock_t do_lock);

對寫入rkt主題的rkm消息進行分區分配
rd_kafka_message_t *rd_kafka_message_get (struct rd_kafka_op_s *rko);
從OP操做提取消息
rd_kafka_message_t *rd_kafka_message_new (void);
建立空的Kafka消息



四、Kafka Topic

Kafka Topic相關封裝位於rdkafka_topic.h文件中。

struct rd_kafka_itopic_s
{
    TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;
    rd_refcnt_t        rkt_refcnt; // 引入計數
    rwlock_t           rkt_lock;
    rd_kafkap_str_t   *rkt_topic; // Topic名稱
    shptr_rd_kafka_toppar_t  *rkt_ua; // 未分配分區
    shptr_rd_kafka_toppar_t **rkt_p; // 擁有TopicPartition的鏈表
    int32_t            rkt_partition_cnt; // 分區計數
    rd_list_t          rkt_desp;
    rd_ts_t            rkt_ts_metadata; // 最近更新Meta的時間戳
    mtx_t              rkt_app_lock;
    rd_kafka_topic_t *rkt_app_rkt; //  Topic對應用層的指針
    int               rkt_app_refcnt;
    enum
    {
        RD_KAFKA_TOPIC_S_UNKNOWN,
        RD_KAFKA_TOPIC_S_EXISTS,
        RD_KAFKA_TOPIC_S_NOTEXISTS,
    } rkt_state; // Topic狀態
    int               rkt_flags; //
    rd_kafka_t       *rkt_rk; // Kafka Handle
    rd_avg_t          rkt_avg_batchsize;
    rd_avg_t          rkt_avg_batchcnt;
    shptr_rd_kafka_itopic_t *rkt_shptr_app;
    rd_kafka_topic_conf_t rkt_conf; // Topic配置
};
shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, const char *topic,
                                              rd_kafka_topic_conf_t *conf,
                                              int *existing, int do_lock);

建立rd_kafka_itopic_s對象
void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics);
獲取當前rd_kafka_t對象持有的全部topic名字,保存在一個rd_list中
void rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now);
掃描Kafka Handle持有的全部topic的全部分區,篩選出未分配分區的超時消息、須要在Broker上建立的Topic、Meta數據太舊須要被更新的Topic、Leader未知的分區。



static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,
                        int32_t partition_cnt);

更新topic的partition個數,若是分區數量有變化,返回1,不然返回0。

rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
                                      rd_kafka_topic_conf_t *conf);

建立Topic對象

static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,
                                       rd_kafka_resp_err_t err);

分配未分配分區上的消息到可用分區

int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,
                    int32_t partition);

查詢Topic的分區是否可用,即分區是否未Leader

五、Kafka TopicPartition

rd_kafka_topic_partition_t定義在rdkafka.h文件中。

typedef struct rd_kafka_topic_partition_s
{
    char        *topic; // Topic名稱
    int32_t      partition; // 分區
    int64_t      offset;  // 位移
    void        *metadata; // 元數據
    size_t       metadata_size;
    void        *opaque;
    rd_kafka_resp_err_t err;
    void       *_private;
} rd_kafka_topic_partition_t;
typedef struct rd_kafka_topic_partition_list_s {
        int cnt; // 當前元數數量             
        int size; // 分配數組大小            
        rd_kafka_topic_partition_t *elems; // 數組
} rd_kafka_topic_partition_list_t;
struct rd_kafka_toppar_s
{
    TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink;
    TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink;
    CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_activelink;
    TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink;
    TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink;
    TAILQ_ENTRY(rd_kafka_toppar_s)  rktp_txnlink;
    rd_kafka_itopic_t       *rktp_rkt;
    shptr_rd_kafka_itopic_t *rktp_s_rkt; // 指向Topic對象
    int32_t            rktp_partition; // 分區
    int32_t            rktp_leader_id; // 當前Leader ID
    int32_t            rktp_broker_id; // 當前Broker ID
    rd_kafka_broker_t *rktp_leader; // 當前Leader Broker
    rd_kafka_broker_t *rktp_broker; // 當前preferred Broker
    rd_kafka_broker_t *rktp_next_broker; // 下一個preferred Broker
    rd_refcnt_t        rktp_refcnt; // 引用計數
    mtx_t              rktp_lock;
    rd_kafka_q_t      *rktp_msgq_wakeup_q; // 喚醒消息隊列
    rd_kafka_msgq_t    rktp_msgq; //
    rd_kafka_msgq_t    rktp_xmit_msgq;
    int                rktp_fetch;
    rd_kafka_q_t      *rktp_fetchq; // 從Broker取消息的隊列
    rd_kafka_q_t      *rktp_ops; // 主線程OP隊列
    rd_atomic32_t      rktp_msgs_inflight;
    uint64_t           rktp_msgid; // 當前/最新消息ID
    struct
    {
        rd_kafka_pid_t pid;
        uint64_t acked_msgid;
        uint64_t epoch_base_msgid;
        int32_t next_ack_seq;
        int32_t next_err_seq;
        rd_bool_t wait_drain;
    } rktp_eos;
    rd_atomic32_t      rktp_version; // 最新OP版本
    int32_t            rktp_op_version; // 從Broker收到的當前命令的OP版本
    int32_t            rktp_fetch_version; // 當前Fetch的OP版本
    enum
    {
        RD_KAFKA_TOPPAR_FETCH_NONE = 0,
        RD_KAFKA_TOPPAR_FETCH_STOPPING,
        RD_KAFKA_TOPPAR_FETCH_STOPPED,
        RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
        RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
        RD_KAFKA_TOPPAR_FETCH_ACTIVE,
    } rktp_fetch_state;
    int32_t            rktp_fetch_msg_max_bytes;
    rd_ts_t            rktp_ts_fetch_backoff;
    int64_t            rktp_query_offset;
    int64_t            rktp_next_offset;
    int64_t            rktp_last_next_offset;
    int64_t            rktp_app_offset; //
    int64_t            rktp_stored_offset; // 最近存儲的位移,可能沒有提交
    int64_t            rktp_committing_offset; // 當前正在提交位移
    int64_t            rktp_committed_offset; // 最新提交位移
    rd_ts_t            rktp_ts_committed_offset; // 最新提交位移的時間戳
    struct offset_stats rktp_offsets; //
    struct offset_stats rktp_offsets_fin; //
    int64_t rktp_ls_offset; // 當前最新穩定位移
    int64_t rktp_hi_offset; // 當前高水位
    int64_t rktp_lo_offset; // 當前低水位
    rd_ts_t            rktp_ts_offset_lag;
    char              *rktp_offset_path; // 位移文件路徑
    FILE              *rktp_offset_fp; // 位移文件描述符
    rd_kafka_cgrp_t   *rktp_cgrp;
    int                rktp_assigned;
    rd_kafka_replyq_t  rktp_replyq; //
    int                rktp_flags; // 分區狀態
    shptr_rd_kafka_toppar_t *rktp_s_for_desp; // rkt_desp鏈表指針
    shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; // rkcg_toppars鏈表指針
    shptr_rd_kafka_toppar_t *rktp_s_for_rkb; // rkb_toppars鏈表指針
    rd_kafka_timer_t rktp_offset_query_tmr; // 位移查詢定時器
    rd_kafka_timer_t rktp_offset_commit_tmr; // 位移提交定時器
    rd_kafka_timer_t rktp_offset_sync_tmr; // 位移文件同步定時器
    rd_kafka_timer_t rktp_consumer_lag_tmr; // 消費者滯後監視定時器
    rd_interval_t      rktp_lease_intvl; // Preferred副本租約
    rd_interval_t      rktp_new_lease_intvl; // 建立新的Preferred副本租約的間隔
    rd_interval_t      rktp_new_lease_log_intvl; //
    rd_interval_t      rktp_metadata_intvl; // Preferred副本的Meta請求的最大頻率
    int rktp_wait_consumer_lag_resp;
    struct rd_kafka_toppar_err rktp_last_err;
    struct
    {
        rd_atomic64_t tx_msgs; // 生產者發送的消息數量
        rd_atomic64_t tx_msg_bytes; // 生產者發送的字節數量
        rd_atomic64_t rx_msgs; // 消費者接收的消息數量
        rd_atomic64_t rx_msg_bytes; // 消費者消費字節數
        rd_atomic64_t producer_enq_msgs; // 生產者入對列的消息數量
        rd_atomic64_t rx_ver_drops; // 消費者丟棄過時消息數量
    } rktp_c;
};

六、Kafka Transport

RdKafka與Broker網絡通訊不須要支持高併發,所以RdKafka選擇了Poll網絡IO模型,對transport數據傳輸層進行了封裝。
RdKafka與Kafka Broker間採用TCP鏈接,所以須要根據Kafka Message協議進行拆包組包:前4個字節是payload長度;payload部分分爲header和body兩部分,接收數據時先收4字節,即payload長度,再根據payload長度收取payload內容。
rd_kafka_transport_s定義在rdkafka_transport_init.h文件:

struct rd_kafka_transport_s
{
    rd_socket_t rktrans_s;  // 與Broker通訊的Socket fd
    rd_kafka_broker_t *rktrans_rkb; // 所鏈接Broker
    struct
    {
        void *state;
        int           complete;
        struct msghdr msg;
        struct iovec  iov[2];

        char          *recv_buf;
        int            recv_of;
        int            recv_len;
    } rktrans_sasl; // SASL權限驗證
    rd_kafka_buf_t *rktrans_recv_buf; // 接收數據Buffer
    rd_pollfd_t rktrans_pfd[2]; // Poll IO模型的fd:TCP Socket,Wake up fd
    int rktrans_pfd_cnt; //
    size_t rktrans_rcvbuf_size; // Socket接收數據Buffer大小
    size_t rktrans_sndbuf_size; // Socket發送數據Buffer大小
};

typedef struct rd_kafka_transport_s rd_kafka_transport_t;
rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb,
        const rd_sockaddr_inx_t *sinx,
        char *errstr,
        size_t errstr_size)
{
    rd_kafka_transport_t *rktrans;
    int s = -1;
    int r;

    rkb->rkb_addr_last = sinx;

    s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family,
                                       SOCK_STREAM, IPPROTO_TCP,
                                       rkb->rkb_rk->rk_conf.opaque);
    if (s == -1)
    {
        rd_snprintf(errstr, errstr_size, "Failed to create socket: %s",
                    rd_socket_strerror(rd_socket_errno));
        return NULL;
    }

    rktrans = rd_kafka_transport_new(rkb, s, errstr, errstr_size);
    if (!rktrans)
        goto err;

    rd_rkb_dbg(rkb, BROKER, "CONNECT", "Connecting to %s (%s) "
               "with socket %i",
               rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_FAMILY |
                               RD_SOCKADDR2STR_F_PORT),
               rd_kafka_secproto_names[rkb->rkb_proto], s);

    /* Connect to broker */
    if (rkb->rkb_rk->rk_conf.connect_cb)
    {
        rd_kafka_broker_lock(rkb); /* for rkb_nodename */
        r = rkb->rkb_rk->rk_conf.connect_cb(
                s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),
                rkb->rkb_nodename, rkb->rkb_rk->rk_conf.opaque);
        rd_kafka_broker_unlock(rkb);
    }
    else
    {
        if (connect(s, (struct sockaddr *)sinx,
                    RD_SOCKADDR_INX_LEN(sinx)) == RD_SOCKET_ERROR &&
                (rd_socket_errno != EINPROGRESS
                ))
            r = rd_socket_errno;
        else
            r = 0;
    }

    if (r != 0)
    {
        rd_rkb_dbg(rkb, BROKER, "CONNECT",
                   "couldn't connect to %s: %s (%i)",
                   rd_sockaddr2str(sinx,
                                   RD_SOCKADDR2STR_F_PORT |
                                   RD_SOCKADDR2STR_F_FAMILY),
                   rd_socket_strerror(r), r);
        rd_snprintf(errstr, errstr_size,
                    "Failed to connect to broker at %s: %s",
                    rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE),
                    rd_socket_strerror(r));
        goto err;
    }

    /* Set up transport handle */
    rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s;
    if (rkb->rkb_wakeup_fd[0] != -1)
    {
        rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN;
        rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0];
    }

    /* Poll writability to trigger on connection success/failure. */
    rd_kafka_transport_poll_set(rktrans, POLLOUT);

    return rktrans;

err:
    if (s != -1)
        rd_kafka_transport_close0(rkb->rkb_rk, s);

    if (rktrans)
        rd_kafka_transport_close(rktrans);

    return NULL;
}

創建與Broker創建的TCP鏈接,初始化rd_kafka_transport_s對象並返回
int rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans, int timeout_ms);
Poll並處理IO操做
void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans, int events);
處理IO操做



ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,
                                   rd_slice_t *slice,
                                   char *errstr, size_t errstr_size);

系統調用sendmsg方法的封裝

ssize_t rd_kafka_transport_send (rd_kafka_transport_t *rktrans,
                                 rd_slice_t *slice,
                                 char *errstr, size_t errstr_size);

系統調用send方法的封裝

ssize_t rd_kafka_transport_recv (rd_kafka_transport_t *rktrans,
                                 rd_buf_t *rbuf,
                                 char *errstr, size_t errstr_size);

系統調用recv方法的封裝

rd_kafka_transport_t *rd_kafka_transport_new (rd_kafka_broker_t *rkb,
        rd_socket_t s,
        char *errstr,
        size_t errstr_size);

使用已有Socket建立rd_kafka_transport_t對象
int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout);
Poll方法封裝

ssize_t rd_kafka_transport_socket_recv (rd_kafka_transport_t *rktrans,
                                rd_buf_t *buf,
                                char *errstr, size_t errstr_size) {
#ifndef _MSC_VER
        // Windows系統調用封裝
        return rd_kafka_transport_socket_recvmsg(rktrans, buf,
                                                 errstr, errstr_size);
#endif  
        // Linux系統調用封裝
        return rd_kafka_transport_socket_recv0(rktrans, buf,
                                               errstr, errstr_size);
}

七、Kafka Meta

Kafka集羣的Meta Data包括:全部Broker的信息:IP和Port;
全部Topic的信息:Topic名稱,Partition數量,每一個Partition的Leader,ISR,Replica集合等。
Kafka集羣的每一臺Broker都會緩存整個集羣的Meta Data,當Broker或某一個Topic的Meta Data信息發生變化時, Kafka集羣的Controller都會感知到做相應的狀態轉換,同時把發生變化的新Meta Data信息廣播到全部的Broker。
RdKafka對Meta Data的封裝和操做包括Meta Data獲取、定時刷新以及引用的操做,如Partition Leader遷移,Partition個數的變化,Broker上下線等等。
Meta Data分爲Broker、Topic、Partition三種,定義在rdkafka.h中。



typedef struct rd_kafka_metadata_broker
{
    int32_t     id; // Broker ID
    char       *host; // Broker主機名稱
    int         port; // Broker監聽端口
} rd_kafka_metadata_broker_t;

typedef struct rd_kafka_metadata_partition
{
    int32_t     id;  // Partition ID
    rd_kafka_resp_err_t err; // Broker報告的分區錯誤
    int32_t     leader; // 分區Leader Broker
    int         replica_cnt; // 副本中的Broker數量
    int32_t    *replicas; // 副本Broker列表
    int         isr_cnt; // ISR列表中的ISR Broker數量
    int32_t    *isrs; // ISR Broker列表
} rd_kafka_metadata_partition_t;

/**
 * @brief Topic information
 */
typedef struct rd_kafka_metadata_topic
{
    char       *topic; // Topic名稱
    int         partition_cnt; // 分區數量
    struct rd_kafka_metadata_partition *partitions; //
    rd_kafka_resp_err_t err; // Broker報告的Topic錯誤
} rd_kafka_metadata_topic_t;

typedef struct rd_kafka_metadata
{
    int         broker_cnt; // Broker數量
    struct rd_kafka_metadata_broker *brokers; // Broker Meta
    int         topic_cnt; // Topic數量
    struct rd_kafka_metadata_topic *topics; // Topic Meta
    int32_t     orig_broker_id; // Broker ID
    char       *orig_broker_name; // Broker名稱
} rd_kafka_metadata_t;
rd_kafka_resp_err_t
rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
                   rd_kafka_topic_t *only_rkt,
                   const struct rd_kafka_metadata **metadatap,
                   int timeout_ms);

請求Meta Data數據,阻塞操做

struct rd_kafka_metadata *
rd_kafka_metadata_copy (const struct rd_kafka_metadata *md, size_t size);

深度拷貝Meta Data

rd_kafka_resp_err_t rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb,
                         rd_kafka_buf_t *request,
                         rd_kafka_buf_t *rkbuf,
                         struct rd_kafka_metadata **mdp);

處理Meta Data請求響應

size_t
rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos,
                               const rd_kafka_topic_partition_list_t *match);

從當前緩存的Meta Data中查找與match匹配的Topic,並將其加入tinfos

size_t
rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos,
                                const rd_kafka_topic_partition_list_t *match);

增長緩存Meta Data中與match匹配的全部Topic到tinfos

rd_kafka_resp_err_t
rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
                                  const rd_list_t *topics, int force,
                                  const char *reason);

刷新指定topics的全部Meta Data

rd_kafka_resp_err_t
rd_kafka_metadata_refresh_known_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
                                        int force, const char *reason);

刷新已知Topic的Meta Data

rd_kafka_resp_err_t
rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
                                   const char *reason);

根據Broker刷新Meta Data

rd_kafka_resp_err_t
rd_kafka_metadata_refresh_all (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
                               const char *reason);

刷新集羣中全部Topic的Meta Data

rd_kafka_resp_err_t
rd_kafka_metadata_request (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
                           const rd_list_t *topics,
                           const char *reason, rd_kafka_op_t *rko);

Meta Data請求
void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk);
快速刷新分區Leader的Meta Data

八、Kafka Handle對象建立

Kafka生產者、消費者客戶端對象經過rd_kafka_new函數進行建立,rd_kafka_new源碼以下:

rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
                          char *errstr, size_t errstr_size){
    ...
    // 建立conf或指定conf
    if (!app_conf)
        conf = rd_kafka_conf_new();
    else
        conf = app_conf;
    ...
    /* Call on_new() interceptors */
    rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
    ...
    // 建立隊列
    rk->rk_rep = rd_kafka_q_new(rk);
    rk->rk_ops = rd_kafka_q_new(rk);
    rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
    rk->rk_ops->rkq_opaque = rk;
    ...
    if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
        rk->rk_drmode = RD_KAFKA_DR_MODE_CB;
    else if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR)
        rk->rk_drmode = RD_KAFKA_DR_MODE_EVENT;
    else
        rk->rk_drmode = RD_KAFKA_DR_MODE_NONE;
    if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE)
        rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;

    if (rk->rk_conf.rebalance_cb)
        rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
    if (rk->rk_conf.offset_commit_cb)
        rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
    if (rk->rk_conf.error_cb)
        rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR;

    rk->rk_controllerid = -1;
    ...
    if (type == RD_KAFKA_CONSUMER &&
            RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0)
        rk->rk_cgrp = rd_kafka_cgrp_new(rk,
                                        rk->rk_group_id,
                                        rk->rk_client_id);
    ...
    // 後臺線程和後臺事件隊列建立
    if (rk->rk_conf.background_event_cb)
    {
        /* Hold off background thread until thrd_create() is done. */
        rd_kafka_wrlock(rk);
        rk->rk_background.q = rd_kafka_q_new(rk);
        rk->rk_init_wait_cnt++;
        if ((thrd_create(&rk->rk_background.thread,
                         rd_kafka_background_thread_main, rk)) != thrd_success)

            ...
    }
    /* Create handler thread */
    rk->rk_init_wait_cnt++;
    if ((thrd_create(&rk->rk_thread, rd_kafka_thread_main, rk)) != thrd_success)
    {
        ...
    }
    // 啓動Logic Broker線程
    rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
                          RD_KAFKA_PROTO_PLAINTEXT,
                          "", 0, RD_KAFKA_NODEID_UA);
    // 根據配置增長Broker
    if (rk->rk_conf.brokerlist)
    {
        if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
            rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
                            "No brokers configured");
    }
    ...
}

rd_kafka_new主要工做以下;
(1)根據配置設置屬性;
(2)建立Kafka Handle對象的OP隊列;
(3)建立後臺線程和後臺事件隊列;
(4)建立RdKafka主線程,執行rd_kafka_thread_main函數,主線程名稱爲rdk:main;
(5)建立Broker內部線程;
(6)根據配置建立Broker線程(每一個Broker一個線程)。





int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)
{
    return rd_kafka_brokers_add0(rk, brokerlist);
}
int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist)
{
    ...
    if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) &&
            rkb->rkb_source == RD_KAFKA_CONFIGURED)
    {
        cnt++;
    }
    else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED,
                                 proto, host, port,
                                 RD_KAFKA_NODEID_UA) != NULL)
        cnt++;
    ...
}
rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
                                        rd_kafka_confsource_t source,
                                        rd_kafka_secproto_t proto,
                                        const char *name, uint16_t port,
                                        int32_t nodeid)
{
    ...
    thrd_create(&rkb->rkb_thread, rd_kafka_broker_thread_main, rkb);
    ...
}
static int rd_kafka_broker_thread_main (void *arg)
{
    rd_kafka_set_thread_name("%s", rkb->rkb_name);
    rd_kafka_set_thread_sysname("rdk:broker%"PRId32, rkb->rkb_nodeid);
    ...
    rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
    ...
    rd_kafka_broker_ops_serve(rkb, RD_POLL_NOWAIT);
    ...
}

九、Producer生產消息過程

(1)rd_kafka_produce
rd_kafka_produce函數位於rdkafka_msg.c文件:

int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
                      int msgflags,
                      void *payload, size_t len,
                      const void *key, size_t keylen,
                      void *msg_opaque) {
        return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition,
                                msgflags, payload, len,
                                key, keylen, msg_opaque);
}

(2)rd_kafka_msg_new
rd_kafka_msg_new函數位於rdkafka_msg.c文件:

int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
                      int msgflags,
                      char *payload, size_t len,
                      const void *key, size_t keylen,
                      void *msg_opaque)
{
    ...
    // 建立rd_kafka_msg_t消息
    rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags,
                            payload, len, key, keylen, msg_opaque,
                            &err, &errnox, NULL, 0, rd_clock());
    ...
    // 對消息進行分區分配
    err = rd_kafka_msg_partitioner(rkt, rkm, 1);
    ...
}

rd_kafka_msg_new內部經過rd_kafka_msg_new0建立Kafka消息,使用rd_kafka_msg_partitioner對Kafka消息進行分區分配。
(3)rd_kafka_msg_partitioner
rd_kafka_msg_partitioner函數位於rdkafka_msg.c文件:

int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
                              rd_dolock_t do_lock)
{
    // 獲取分區號
    ...
    // 獲取分區
    s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
    ...
    rktp_new = rd_kafka_toppar_s2i(s_rktp_new);
    rd_atomic64_add(&rktp_new->rktp_c.producer_enq_msgs, 1);
    /* Update message partition */
    if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
        rkm->rkm_partition = partition;
    // 將消息入隊分區隊列
    rd_kafka_toppar_enq_msg(rktp_new, rkm);
    ...
}

rd_kafka_msg_partitioner內部經過經過rd_kafka_toppar_enq_msg將分區加入分區隊列。
(4)rd_kafka_toppar_enq_msg

void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm)
{
    ...
    // 入隊列
    if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA ||
            rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO)
    {
        queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
    }
    else
    {
        queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt, &rktp->rktp_msgq, rkm);
    }
    ...
}

(5)rd_kafka_msgq_enq

static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,
        rd_kafka_msg_t *rkm)
{
    TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
    rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len;
    return (int)++rkmq->rkmq_msg_cnt;
}

(6)rd_kafka_msgq_enq_sorted
rd_kafka_msgq_enq_sorted函數位於rdkafka_msg.c文件:

int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt,
                              rd_kafka_msgq_t *rkmq,
                              rd_kafka_msg_t *rkm)
{
    rd_dassert(rkm->rkm_u.producer.msgid != 0);
    return rd_kafka_msgq_enq_sorted0(rkmq, rkm,
                                     rkt->rkt_conf.msg_order_cmp);
}
int rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq,
                               rd_kafka_msg_t *rkm,
                               int (*order_cmp) (const void *, const void *))
{
    TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *,
                        rkm_link, order_cmp);
    rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len;
    return ++rkmq->rkmq_msg_cnt;
}

隊列的操做位於rdsysqueue.h文件中。
rd_kafka_broker_add函數位於rdkafka_broker.c文件:

rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
                                        rd_kafka_confsource_t source,
                                        rd_kafka_secproto_t proto,
                                        const char *name, uint16_t port,
                                        int32_t nodeid)
{
    rd_kafka_broker_t *rkb;
    rkb = rd_calloc(1, sizeof(*rkb));
    // 設置rd_kafka_broker_t對象屬性
    ...
    if (thrd_create(&rkb->rkb_thread, rd_kafka_broker_thread_main, rkb) != thrd_success)
    {
       ...
    }
}

rd_kafka_broker_add建立Broker線程,啓動執行rd_kafka_broker_thread_main函數。

static int rd_kafka_broker_thread_main (void *arg)
{
   ...
    rd_kafka_set_thread_name("%s", rkb->rkb_name);
    rd_kafka_set_thread_sysname("rdk:broker%"PRId32, rkb->rkb_nodeid);
    ...
    rd_kafka_broker_serve(rkb, ...);
    ...
}
static void rd_kafka_broker_serve (rd_kafka_broker_t *rkb, int timeout_ms) {
        ...
        if (rkb->rkb_source == RD_KAFKA_INTERNAL)
                rd_kafka_broker_internal_serve(rkb, abs_timeout);
        else if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER)
                rd_kafka_broker_producer_serve(rkb, abs_timeout);
        else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER)
                rd_kafka_broker_consumer_serve(rkb, abs_timeout);
}
static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb,
        rd_ts_t abs_timeout)
{
    // 
    rd_kafka_broker_produce_toppars(rkb, now, &next_wakeup,
                                    do_timeout_scan);
    rd_kafka_broker_ops_io_serve(rkb, next_wakeup);
}
static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb,
        rd_ts_t abs_timeout)
{
    ...
    rd_kafka_broker_ops_serve(rkb, rd_timeout_remains_us(abs_timeout));
    ...
}
static int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb,
                                      rd_ts_t timeout_us)
{
    rd_kafka_op_t *rko;
    int cnt = 0;

    while ((rko = rd_kafka_q_pop(rkb->rkb_ops, timeout_us, 0)) &&
            (cnt++, rd_kafka_broker_op_serve(rkb, rko)))
        timeout_us = RD_POLL_NOWAIT;

    return cnt;
}

rdkafka_broker.c文件:

static ssize_t rd_kafka_broker_send (rd_kafka_broker_t *rkb, rd_slice_t *slice)
{
    ...
    r = rd_kafka_transport_send(rkb->rkb_transport, slice,
                                errstr, sizeof(errstr));
    ...
}

rdkafka_transport.c文件:

ssize_t rd_kafka_transport_send (rd_kafka_transport_t *rktrans,
                                 rd_slice_t *slice, char *errstr, size_t errstr_size)
{
    ..
    r = rd_kafka_transport_socket_send(rktrans, slice,
                                       errstr, errstr_size);
    ...
}
static ssize_t rd_kafka_transport_socket_send (rd_kafka_transport_t *rktrans,
                                rd_slice_t *slice,
                                char *errstr, size_t errstr_size) {
#ifndef _MSC_VER
        /* FIXME: Use sendmsg() with iovecs if there's more than one segment
         * remaining, otherwise (or if platform does not have sendmsg)
         * use plain send(). */
        return rd_kafka_transport_socket_sendmsg(rktrans, slice,
                                                 errstr, errstr_size);
#endif
        return rd_kafka_transport_socket_send0(rktrans, slice,
                                               errstr, errstr_size);
}
static ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,
                                   rd_slice_t *slice,
                                   char *errstr, size_t errstr_size)
{
    ...
    r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT
    ...
}

十、Consumer消費消息過程

(1)開啓消息消費
RdKafka提供了rd_kafka_consume_start、rd_kafka_consume、rd_kafka_consume_start_queue、rd_kafka_consume_queue接口用於消息消費。

int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition,
                    int64_t offset, rd_kafka_q_t *rkq) {
    shptr_rd_kafka_toppar_t *s_rktp;

    if (partition < 0) {
        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
                    ESRCH);
        return -1;
    }

        if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
                return -1;
        }

    rd_kafka_topic_wrlock(rkt);
    s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
    rd_kafka_topic_wrunlock(rkt);

        /* Verify offset */
    if (offset == RD_KAFKA_OFFSET_BEGINNING ||
        offset == RD_KAFKA_OFFSET_END ||
            offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
                /* logical offsets */

    } else if (offset == RD_KAFKA_OFFSET_STORED) {
        /* offset manager */

                if (rkt->rkt_conf.offset_store_method ==
                    RD_KAFKA_OFFSET_METHOD_BROKER &&
                    RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
                        /* Broker based offsets require a group id. */
                        rd_kafka_toppar_destroy(s_rktp);
            rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
                        EINVAL);
                        return -1;
                }

    } else if (offset < 0) {
        rd_kafka_toppar_destroy(s_rktp);
        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
                    EINVAL);
        return -1;

        }
        rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset,
                       rkq, RD_KAFKA_NO_REPLYQ);
        rd_kafka_toppar_destroy(s_rktp);
    rd_kafka_set_last_error(0, 0);
    return 0;
}

int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
                int64_t offset) {
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
                     "Start consuming partition %"PRId32,partition);
    return rd_kafka_consume_start0(rkt, partition, offset, NULL);
}

int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
                  int64_t offset, rd_kafka_queue_t *rkqu) {
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);

    return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
}

static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
                                              rd_kafka_q_t *rkq,
                          int timeout_ms) {
    rd_kafka_op_t *rko;
    rd_kafka_message_t *rkmessage = NULL;
    rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);

        if (timeout_ms)
                rd_kafka_app_poll_blocking(rk);

    rd_kafka_yield_thread = 0;
        while ((rko = rd_kafka_q_pop(rkq,
                                     rd_timeout_remains_us(abs_timeout), 0))) {
                rd_kafka_op_res_t res;

                res = rd_kafka_poll_cb(rk, rkq, rko,
                                       RD_KAFKA_Q_CB_RETURN, NULL);

                if (res == RD_KAFKA_OP_RES_PASS)
                        break;

                if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
                             rd_kafka_yield_thread)) {
                        /* Callback called rd_kafka_yield(), we must
                         * stop dispatching the queue and return. */
                        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
                                                EINTR);
                        rd_kafka_app_polled(rk);
                        return NULL;
                }

                /* Message was handled by callback. */
                continue;
        }

    if (!rko) {
        /* Timeout reached with no op returned. */
        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
                    ETIMEDOUT);
                rd_kafka_app_polled(rk);
        return NULL;
    }

        rd_kafka_assert(rk,
                        rko->rko_type == RD_KAFKA_OP_FETCH ||
                        rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
    /* Get rkmessage from rko */
    rkmessage = rd_kafka_message_get(rko);
    /* Store offset */
    rd_kafka_op_offset_store(rk, rko);
    rd_kafka_set_last_error(0, 0);
        rd_kafka_app_polled(rk);
    return rkmessage;
}

rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
                                      int32_t partition,
                      int timeout_ms) {
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
        shptr_rd_kafka_toppar_t *s_rktp;
    rd_kafka_toppar_t *rktp;
    rd_kafka_message_t *rkmessage;
    rd_kafka_topic_rdlock(rkt);
    s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
    if (unlikely(!s_rktp))
        s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
    rd_kafka_topic_rdunlock(rkt);
    if (unlikely(!s_rktp)) {
        /* No such toppar known */
        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
                    ESRCH);
        return NULL;
    }
        rktp = rd_kafka_toppar_s2i(s_rktp);
    rkmessage = rd_kafka_consume0(rkt->rkt_rk,
                                      rktp->rktp_fetchq, timeout_ms);
    rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
    return rkmessage;
}

rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
                        int timeout_ms) {
    return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
}

(2)Poll輪詢消息隊列

int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) {
        int r;
        if (timeout_ms)
                rd_kafka_app_poll_blocking(rk);
        r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0,
                             RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
        rd_kafka_app_polled(rk);
        return r;
}

rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
                                            int timeout_ms) {
        rd_kafka_cgrp_t *rkcg;

        if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
                rd_kafka_message_t *rkmessage = rd_kafka_message_new();
                rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
                return rkmessage;
        }

        return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
}

rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) {
        rd_kafka_op_t *rko;
        if (timeout_ms)
                rd_kafka_app_poll_blocking(rkqu->rkqu_rk);
        rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0, RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);
        rd_kafka_app_polled(rkqu->rkqu_rk);
        if (!rko)
                return NULL;
        return rko;
}

2、RdKafka C++源碼分析

一、C++ API對C API的封裝

C++ API主要是對RdKafka C API的封裝,根據不一樣的功能模塊封裝爲不一樣功能類,類定義在rdkafkacpp.h文件中,並使用RdKafka命名空間進行限定,主要類以下Conf、Handle、TopicPartition、Topic、Message、Queue、KafkaConsumer、Consumer、Producer、BrokerMetadata、PartitionMetadata、TopicMetadata、Metadata、DeliveryReportCb、PartitionerCb、PartitionerKeyPointerCb、EventCb、Event、ConsumeCb:Consume、RebalanceCb、OffsetCommitCb、SocketCb、OpenCb。

二、Consumer與KafkaConsumer

Consumer對partition、offset有徹底的控制能力;KafkaConsumer提供了Topic訂閱接口,默認使用latest消費方式,能夠經過assign方法指定開始消費的partition和offset。

三、Producer生產消息過程

(1)Producer建立

RdKafka::Producer *RdKafka::Producer::create (RdKafka::Conf *conf,
                                              std::string &errstr) {
  char errbuf[512];
  RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
  RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl();
  rd_kafka_conf_t *rk_conf = NULL;

  if (confimpl) {
    if (!confimpl->rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      delete rkp;
      return NULL;
    }

    rkp->set_common_config(confimpl);

    rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);

    if (confimpl->dr_cb_) {
      rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline);
      rkp->dr_cb_ = confimpl->dr_cb_;
    }
  }

  rd_kafka_t *rk;
  if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf,
                          errbuf, sizeof(errbuf)))) {
    errstr = errbuf;
    // rd_kafka_new() takes ownership only if succeeds
    if (rk_conf)
      rd_kafka_conf_destroy(rk_conf);
    delete rkp;
    return NULL;
  }

  rkp->rk_ = rk;

  return rkp;
}

建立Producer時須要準備好Conf對象。
(2)生產消息

RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
                                                   int32_t partition,
                                                   int msgflags,
                                                   void *payload, size_t len,
                                                   const std::string *key,
                                                   void *msg_opaque) {
  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);

  if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,
                       payload, len,
                       key ? key->c_str() : NULL, key ? key->size() : 0,
                       msg_opaque) == -1)
    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());

  return RdKafka::ERR_NO_ERROR;
}

生產消息時須要指定Topic對象。
(3)Poll輪詢

int RdKafka::HandleImpl::poll(int timeout_ms)
{
    return rd_kafka_poll(rk_, timeout_ms);
}

produce生產消息是異步的,將消息放入到內部隊列後會馬上返回,所以須要由poll返回最終寫入結果。 produce是盡力送達的,會在嘗試直至超過message.timeout.ms才彙報失敗。

四、Consumer消費消息過程

(1)建立KafkaConsumer

RdKafka::KafkaConsumer *RdKafka::KafkaConsumer::create (RdKafka::Conf *conf,
                                                        std::string &errstr) {
  char errbuf[512];
  RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
  RdKafka::KafkaConsumerImpl *rkc = new RdKafka::KafkaConsumerImpl();
  rd_kafka_conf_t *rk_conf = NULL;
  size_t grlen;

  if (!confimpl || !confimpl->rk_conf_) {
    errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
    delete rkc;
    return NULL;
  }

  if (rd_kafka_conf_get(confimpl->rk_conf_, "group.id",
                        NULL, &grlen) != RD_KAFKA_CONF_OK ||
      grlen <= 1 /* terminating null only */) {
    errstr = "\"group.id\" must be configured";
    delete rkc;
    return NULL;
  }

  rkc->set_common_config(confimpl);

  rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);

  rd_kafka_t *rk;
  if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf,
                          errbuf, sizeof(errbuf)))) {
    errstr = errbuf;
    // rd_kafka_new() takes ownership only if succeeds
    rd_kafka_conf_destroy(rk_conf);
    delete rkc;
    return NULL;
  }

  rkc->rk_ = rk;

  /* Redirect handle queue to cgrp's queue to provide a single queue point */
  rd_kafka_poll_set_consumer(rk);

  return rkc;
}

(2)訂閱Topic

RdKafka::ErrorCode
RdKafka::KafkaConsumerImpl::subscribe (const std::vector<std::string> &topics) {
  rd_kafka_topic_partition_list_t *c_topics;
  rd_kafka_resp_err_t err;

  c_topics = rd_kafka_topic_partition_list_new((int)topics.size());

  for (unsigned int i = 0 ; i < topics.size() ; i++)
    rd_kafka_topic_partition_list_add(c_topics, topics[i].c_str(),
                                      RD_KAFKA_PARTITION_UA);

  err = rd_kafka_subscribe(rk_, c_topics);

  rd_kafka_topic_partition_list_destroy(c_topics);

  return static_cast<RdKafka::ErrorCode>(err);
}

(3)消費消息

RdKafka::Message *RdKafka::KafkaConsumerImpl::consume (int timeout_ms) {
  rd_kafka_message_t *rkmessage;

  rkmessage = rd_kafka_consumer_poll(this->rk_, timeout_ms);

  if (!rkmessage)
    return new RdKafka::MessageImpl(NULL, RdKafka::ERR__TIMED_OUT);

  return new RdKafka::MessageImpl(rkmessage);

}

3、RdKafka多線程設計

一、Producer/Consumer多線程設計

RdKafka內部使用多線程對硬件資源進行充分利用,RdKafka API是線程安全的,應用程序能夠在任意時間調用其線程內的任意API函數。
每一個Producer/Consumer實例會建立線程以下:
(1)應用線程,處理具體應用業務邏輯。
(2)Kafka Handler線程:每建立一個Producer/Consumer即會建立一個Handler線程,即RdKafka主線程,線程名稱爲rdk::main,線程執行函數爲rd_kafka_thread_main。
(3)Kafka Broker線程:對於增長到Producer/Consumer的每一個Broker會建立一個線程,負責與Broker通訊,線程執行函數爲rd_kafka_broker_thread_main,線程名稱爲rdk::brokerxxx。
(4)Inner Broker線程,用於處未分配分區的OP操做隊列。
(5)後臺線程
若是配置對象設置了background_event_cb,Kafka Handler建立時會建立相應的後臺線程和後臺隊列,線程執行函數爲rd_kafka_background_thread_main。






二、線程查看

Linux查看KafkaConsumer進程的線程的方法:

ps -T -p pid
top -H -p pid

Cosnumer線程查看結果:
Kafka快速入門(十一)——RdKafka源碼分析
Producer線程查看結果:
Kafka快速入門(十一)——RdKafka源碼分析

相關文章
相關標籤/搜索