Kafka快速入門(十)——C++客戶端

Kafka快速入門(十)——C++客戶端

1、C++ API

一、數據結構

RdKafka::DeliveryReportCb:Delivery Report回調類
RdKafka::PartitionerCb:Partitioner回調類
RdKafka::PartitionerKeyPointerCb:帶key指針的Partitioner回調類
RdKafka::EventCb:Event回調類
RdKafka::Event:Event類
RdKafka::ConsumeCb:Consume回調類
RdKafka::RebalanceCb:KafkaConsunmer: Rebalance回調類
RdKafka::OffsetCommitCb:Offset Commit回調類
RdKafka::SocketCb:Socket回調類
RdKafka::OpenCb:Open回調類
RdKafka::Conf:配置接口類
RdKafka::Handle:客戶端基類
RdKafka::TopicPartition:Topic+Partion類
RdKafka::Topic:Topic Handle
RdKafka::Message:消息對象類
RdKafka::Queue:隊列接口
RdKafka::KafkaConsumer:KafkaConsumer高級接口
RdKafka::Consumer:簡單Consumer類
RdKafka::Producer:Producer類
RdKafka::BrokerMetadata:Broker元數據信息類
RdKafka::PartitionMetadata:Partition元數據信息類
RdKafka::TopicMetadata:Topic元數據信息類
RdKafka::Metadata:元數據容器
librdkafka C++ API定義在rdkafkacpp.h文件中,兼容STD C++ 03標準,遵循Google編碼規範。ios

二、通用API

int RdKafka::version ();
獲取librdkafka版本
std::string RdKafka::version_str();
獲取librdkafka版本
std::string RdKafka::get_debug_contexts ();
獲取librdkafka調試環境
int RdKafka::wait_destroyed(int timeout_ms);
等待全部的 rd_kafka_t對象銷燬
std::string RdKafka::err2str(RdKafka::ErrorCode err);
將Kafka錯誤代碼轉換成可讀字符串c++

三、RdKafka::Conf

enum  ConfType{ 
  CONF_GLOBAL, // 全局配置
  CONF_TOPIC // Topic配置
};
enum  ConfResult{ 
  CONF_UNKNOWN = -2, 
  CONF_INVALID = -1, 
  CONF_OK = 0 
};

static Conf * create(ConfType type);
建立配置對象
Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
設置配置對象的屬性值,成功返回CONF_OK,錯誤時錯誤信息輸出到errstr。
Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
設置dr_cb屬性值
Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
設置event_cb屬性值
Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr);
設置用於自動訂閱Topic的默認Topic配置
Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
設置partitioner_cb屬性值,配置對象必須是CONF_TOPIC類型。
Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb, std::string &errstr);
設置partitioner_key_pointer_cb屬性值
Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr);
設置socket_cb屬性值
Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr);
設置open_cb屬性值
Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr);
設置rebalance_cb屬性值
Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr);
設置offset_commit_cb屬性值
Conf::ConfResult get(const std::string &name, std::string &value) const;
查詢單條屬性配置值
std::list<std::string> * dump ();
按name,value元組序列化配置對象的屬性名稱和屬性值到鏈表
virtual struct rd_kafka_conf_s *c_ptr_global () = 0;
若是是CONF_GLOBAL類型配置對象,返回底層數據結構rd_kafka_conf_t句柄,不然返回NULL。
virtual struct rd_kafka_topic_conf_s *c_ptr_topic () = 0;
若是是CONF_TOPIC類型配置對象,返回底層數據結構的rd_kafka_topic_conf_t句柄,不然返回0。git

四、RdKafka::Topic

static Topic * create(Handle *base, const std::string &topic_str, Conf *conf, std::string &errstr);
使用conf配置建立名爲topic_str的Topic句柄
const std::string name ();
獲取Topic名稱
bool partition_available(int32_t partition) const;
獲取parition分區是否可用,只能在 RdKafka::PartitionerCb回調函數內被調用。
ErrorCode offset_store(int32_t partition, int64_t offset);
存儲Topic的partition分區的offset位移,只能用於RdKafka::Consumer,不能用於 RdKafka::KafkaConsumer高級接口類。使用本接口時,auto.commit.enable參數必須設置爲false。
virtual struct rd_kafka_topic_s *c_ptr () = 0;
返回底層數據結構的rd_kafka_topic_t句柄,不推薦利用rd_kafka_topic_t句柄調用C API,但若是C++ API沒有提供相應功能,能夠直接使用C API和librdkafka核心交互。
static const int32_t PARTITION_UA = -1;
未賦值分區
static const int64_t OFFSET_BEGINNING = -2;
特殊位移,從開始消費
static const int64_t OFFSET_END = -1;
特殊位移,從末尾消費
static const int64_t OFFSET_STORED = -1000;
使用offset存儲github

五、RdKafka::Message

Message表示一條消費或生產的消息,或是事件。
std::string errstr() const;
若是消息是一條錯誤事件,返回錯誤字符串,不然返回控字符串。
ErrorCode err() const;
若是消息是一條錯誤事件,返回錯誤代碼,不然返回0
Topic * topic() const;
返回消息的Topic對象。若是消息的Topic對象沒有顯示使用RdKafka::Topic::create()建立,須要使用topic_name函數。
std::string topic_name() const;
返回消息的Topic名稱
int32_t partition() const;
若是分區可用,返回分區號
void * payload() const;
返回消息數據
size_t len() const;
返回消息數據的長度
const std::string * key() const;
返回字符串類型的消息key
const void * key_pointer() const;
返回void類型的消息key
size_t key_len() const;
返回消息key的二進制長度
int64_t offset () const;
返回消息或錯誤的位移
void * msg_opaque() const;
返回RdKafka::Producer::produce()提供的msg_opaque
virtual MessageTimestamp timestamp() const = 0;
返回消息時間戳
virtual int64_t latency() const = 0;
返回produce函數內生產消息的微秒級時間延遲,若是延遲不可用,返回-1。
virtual struct rd_kafka_message_s *c_ptr () = 0;
返回底層數據結構的C rd_kafka_message_t句柄
virtual Status status () const = 0;
返回消息在Topic Log的持久化狀態
virtual RdKafka::Headers *headers () = 0;
返回消息頭
virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0;
返回消息頭,錯誤信息會輸出到errdocker

六、RdKafka::TopicPartition

static TopicPartition * create(const std::string &topic, int partition);
建立一個TopicPartition對象
static TopicPartition *create (const std::string &topic, int partition,int64_t offset);
建立TopicPartition對象
static void destroy (std::vector<TopicPartition*> &partitions);
銷燬全部TopicPartition對象
const std::string & topic () const;
返回Topic名稱
int partition ();
返回分區號
int64_t offset();
返回位移
void set_offset(int64_t offset);
設置位移
ErrorCode err();
返回錯誤碼bootstrap

七、RdKafka::Handle

Kafka快速入門(十)——C++客戶端
客戶端handle基類
const std::string name();
返回Handle的名稱
const std::string memberid() const;
返回客戶端組成員ID
int poll (int timeout_ms);
輪詢處理指定的Kafka句柄的Event,返回事件數量。事件會觸發應用程序提供的回調函數調用。timeout_ms參數指定回調函數指定阻塞等待的最大時間間隔;對於非阻塞調用,指定timeout_ms參數爲0;永遠等待事件,設置timeout_ms參數爲-1。RdKafka::KafkaConsumer實例禁止使用poll方法,使用RdKafka::KafkaConsumer::consume()方法代替。
int outq_len();
返回當前出隊列的長度,出隊列包含等待發送到Broker的消息、請求和Broker要確認的消息、請求。
ErrorCode metadata(bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms);
從Broker請求元數據,成功返回RdKafka::ERR_NO_ERROR,超時返回RdKafka::ERR_TIMED_OUT,錯誤返回其它錯誤碼。
virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
暫停分區鏈表中分區的消費和生產,返回ErrorCode::NO_ERROR。partitions中分區會返回成功或錯誤信息。
virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
恢復分區鏈表中分區的生產和消費,返回ErrorCode::NO_ERROR。
partitions中分區會返回成功或錯誤信息。
virtual ErrorCode query_watermark_offsets (const std::string &topic,int32_t partition,int64_t *low, int64_t *high,int timeout_ms) = 0;
查詢topic主題partition分區的高水位和低水位,高水位輸出到high,低水位輸出到low,成功返回RdKafka::ERR_NO_ERROR,失敗返回錯誤碼。
virtual ErrorCode get_watermark_offsets (const std::string &topic,int32_t partition,int64_t *low, int64_t *high) = 0;
獲取topic主題partition分區的高水位和低水位,高水位輸出到high,低水位輸出到low,成功返回RdKafka::ERR_NO_ERROR,失敗返回錯誤碼。
virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,int timeout_ms) = 0;
經過時間戳查詢給定分區的位移,每一個分區返回的位移是最新的位移,阻塞timeout_ms。
virtual Queue *get_partition_queue (const TopicPartition *partition) = 0;
獲取指定TopicPartition的消息隊列,成功返回從指定分區獲取的隊列,不然返回NULL。
virtual ErrorCode set_log_queue (Queue *queue) = 0;
將rdkafka logs轉移到指定消息隊列。queue是要轉移rdkafka logs到的消息隊列,若是爲NULL,則轉移到主消息隊列。Log.queue屬性必須設置爲true。
virtual void yield () = 0;
取消當前回調函數調度器,如Handle::poll(),KafkaConsumer::consume()。只能再RdKafka回調函數內調用。
virtual const std::string clusterid (int timeout_ms) = 0;
返回Broker元數據報告的集羣ID,要求Kafka 0.10.0以上版本。
virtual struct rd_kafka_s *c_ptr () = 0;
返回底層數據的rd_kafka_t句柄
virtual int32_t controllerid (int timeout_ms) = 0;
返回Broker元數據報告的當前控制器ID,要求Kafka 0.10.0以上版本,而且api.version.request=true
virtual ErrorCode fatal_error (std::string &errstr) = 0;
返回客戶端實例的第一個fatal錯誤的錯誤代碼api

virtual ErrorCode oauthbearer_set_token (const std::string &token_value,
                                         int64_t md_lifetime_ms,
                                         const std::string &md_principal_name,
                                         const std::list<std::string> &extensions,
                                         std::string &errstr) = 0;

設置SASL/OAUTHBEARER令牌和元數據
virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr) = 0;
設置SASL/OAUTHBEARER刷新失敗指示器數組

八、RdKafka::Producer

static Producer * create(Conf *conf, std::string &errstr);
建立一個新的Producer客戶端對象,conf用於替換默認配置對象,本函數調用後conf能夠重用。
成功返回新的Producer客戶端對象,失敗返回NULL,errstr可讀錯誤信息。
ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque);
生產和發送單條消息到Broker。
topic:主題
partition:分區
msgflags:可選項爲RK_MSG_BLOCK、RK_MSG_FREE、RK_MSG_COPY。RK_MSG_FREE表示RdKafka調用produce完成後會釋放payload數據;RK_MSG_COPY表示payload數據會被拷貝,在produce調用完成後RdKafka不會使用payload指針;RK_MSG_BLOCK表示在消息隊列滿時阻塞produce函數,若是dr_cb回調函數被使用,應用程序必須調用rd_kafka_poll函數確保投遞消息隊列的投遞消息投遞完。當消息隊列滿時,失敗會致使produce函數的永久阻塞。RK_MSG_FREE和RK_MSG_COPY是互斥操做。
若是produce函數調用時指定了RK_MSG_FREE,並返回了錯誤碼,
與payload指針相關的內存數據必須由使用者負責釋放。
payload:長度爲len的消息負載數據
len:payload消息數據的長度。
key:key是可選的消息key,若是非NULL,會被傳遞給主題partitioner,並被隨消息發送到Broker和傳遞給Consumer。
msg_opaque:msg_opaque是可選的應用程序提供給每條消息的opaque指針,opaque指針會在dr_cb回調函數內提供。
返回錯誤碼:
ERR_NO_ERROR:消息成功發送併入對列。
ERR_QUEUE_FULL:最大消息數量達到queue.buffering.max.message。
ERR_MSG_SIZE_TOO_LARGE:消息數據大小太大,超過messages.max.bytes配置的值。
ERR_UNKNOWN_PARTITION:請求一個Kafka集羣內的未知分區。
ERR_UNKNOWN_TOPIC:topic是Kafka集羣的未知主題。
ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque);
生產和發送單條消息到Broker,傳遞key數據指針和key長度。
ErrorCode produce(Topic *topic, int32_t partition, const std::vector&lt; char &gt; *payload, const std::vector&lt; char &gt; *key, void *msg_opaque);
生產和發送單條消息到Broker,傳遞消息數組和key數組。緩存

ErrorCode produce (Topic *topic, int32_t partition,
                             const std::vector<char> *payload,
                             const std::vector<char> *key,
                             void *msg_opaque)

生產和發送消息到Broker,接受數組類型的key和payload,數組會被複制。
ErrorCode flush (int timeout_ms)
等待全部未完成的全部Produce請求完成。
爲了確保全部隊列和已經執行的Produce請求在停止前完成,flush操做優先於銷燬生產者實例完成。
本函數會調用Producer::poll()函數,所以會觸發回調函數。
ErrorCode purge (int purge_flags)
清理生產者當前處理的消息。本函數調用時可能會阻塞必定時間,當後臺線程隊列在清理時。
應用程序須要在調用poll或flush函數後,執行清理消息的dr_cb回調函數。
virtual Error *init_transactions (int timeout_ms) = 0;
初始化Producer實例的事務。
失敗返回RdKafka::Error錯誤對象,成功返回NULL。
經過調用RdKafka::Error::is_retriable()函數能夠檢查返回的錯誤對象是否有權限重試,調用RdKafka::Error::is_fatal()檢查返回的錯誤對象是不是嚴重錯誤。返回的錯誤對象必須delete。
virtual Error *begin_transaction () = 0;
啓動事務。
本函數調用前,init_transactions()函數必須被成功調用。
成功返回NULL,失敗返回錯誤對象。經過調用RdKafka::Error::is_fatal_error()函數能夠檢查是不是嚴重錯誤,返回的錯誤對象必須delete。
virtual Error *send_offsets_to_transaction (const std::vector&lt;TopicPartition*&gt; &offsets,const ConsumerGroupMetadata *group_metadata,int timeout_ms) = 0;
發送TopicPartition位移鏈表到由group_metadata指定的Consumer Group協調器,若是事務提交成功,位移纔會被提交。
virtual Error *commit_transaction (int timeout_ms) = 0;
提交當前事務。在實際提交事務時,任何未完成的消息會被完成投遞。
成功返回NULL,失敗返回錯誤對象。經過調用錯誤對象的方法能夠檢查是否有權限重試,是不是嚴重錯誤、可停止錯誤等。
virtual Error *abort_transaction (int timeout_ms) = 0;
中止事務。本函數從非嚴重錯誤、可終止事務中用於恢復。
未完成消息會被清理。bash

九、RdKafka::Consumer

RdKafka::Consumer是簡單的非Rebalance、非Group的消費者。
static Consumer * create(Conf *conf, std::string &errstr);
建立一個Kafka Consumer客戶端對象
static int64_t OffsetTail(int64_t offset);
從Topic尾部轉換位移爲邏輯位移
ErrorCode start(Topic *topic, int32_t partition, int64_t offset);
從topic主題partition分區的offset位移開始消費消息,offset能夠是普通位移,也能夠是OFFSET_BEGINNING或OFFSET_END,rdkafka會試圖從Broker重複拉取批量消息到本地隊列使其維持queued.min.messages參數值數量的消息。start函數在沒有調用stop函數中止消費時不能對同一個TopicPartition調用屢次。
應用程序會使用consume函數從本地隊列消費消息。
ErrorCode start(Topic *topic, int32_t partition, int64_t offset, Queue *queue);
在消息隊列queue的topic主題的partition分區開始消費。
ErrorCode stop(Topic *topic, int32_t partition);
中止從topic主題的partition分區消費消息,並清理本地隊列的全部消息。應用程序須要在銷燬全部Consumer對象前中止全部消費者。
ErrorCode seek (Topic *topic, int32_t partition, int64_t offset, int timeout_ms)
定位topic的partition分區的Consumer位移到offset
Message * consume(Topic *topic, int32_t partition, int timeout_ms);
從topic主題和partition分區消費一條消息。timeout_ms是等待獲取消息的最大時間。消費者必須提早調用start函數。應用程序須要檢查消費的消息是正常消息仍是錯誤消息。應用程序完成時消息對象必須銷燬。
Message * consume(Queue *queue, int timeout_ms);
從指定消息隊列queue消費一條消息
int consume_callback(Topic *topic, int32_t partition, int timeout_ms, ConsumeCb *consume_cb, void *opaque);
從topic主題和partition分區消費消息,並對每條消費的消息使用指定回調函數處理。consume_callback提供了比consume更高的吞吐量。
opaque參數回被傳遞給consume_cb的回調函數。
int consume_callback(Queue *queue, int timeout_ms, RdKafka::ConsumeCb *consume_cb, void *opaque);
從消息隊列queue消費消息,並對每條消費的消息使用指定回調函數處理。

十、RdKafka::KafkaConsumer

KafkaConsumer是高級API,要求Kafka 0.9.0以上版本,當前支持range和roundrobin分區分配策略。
static KafkaConsumer * create(Conf *conf, std::string &errstr);
建立KafkaConsumer對象,conf對象必須配置Consumer要加入的消費者組。使用KafkaConsumer::close()進行關閉。
ErrorCode assignment(std::vector&lt; RdKafka::TopicPartition * &gt; &partitions);
返回由RdKafka::KafkaConsumer::assign() 設置的當前分區
ErrorCode subscription(std::vector&lt; std::string &gt; &topics);
返回由RdKafka::KafkaConsumer::subscribe() 設置的當前訂閱Topic
ErrorCode subscribe(const std::vector&lt; std::string &gt; &topics);
更新訂閱Topic分區
ErrorCode unsubscribe();
將當前訂閱Topic取消訂閱分區
ErrorCode assign(const std::vector&lt; TopicPartition * &gt; &partitions);
將分配分區更新爲partitions
ErrorCode unassign();
中止消費並刪除當前分配的分區
Message * consume(int timeout_ms);
消費消息或獲取錯誤事件,觸發回調函數,會自動調用註冊的回調函數,包括RebalanceCb、EventCb、OffsetCommitCb等。須要使用delete釋放消息。應用程序必須確保consume在指定時間間隔內調用,爲了執行等待調用的回調函數,即便沒有消息。當RebalanceCb被註冊時,在須要調用和適當處理內部Consumer同步狀態時,確保consume在指定時間間隔內調用極爲重要。應用程序必須禁止對KafkaConsumer對象調用poll函數。
若是RdKafka::Message::err()是ERR_NO_ERROR,則返回正常的消息;若是RdKafka::Message::err()是ERR_NO_ERRO,返回錯誤事件;若是RdKafka::Message::err()是ERR_TIMED_OUT,則超時。
ErrorCode commitSync();
提交當前分配分區的位移,同步操做,會阻塞直到位移被提交或提交失敗。若是註冊了RdKafka::OffsetCommitCb回調函數,其會在KafkaConsumer::consume()函數內調用並提交位移。
ErrorCode commitAsync();
異步提交位移
ErrorCode commitSync(Message *message);
基於消息對單個topic+partition對象同步提交位移
virtual ErrorCode commitSync (std::vector&lt;TopicPartition*&gt; &offsets) = 0;
對指定多個TopicPartition同步提交位移
ErrorCode commitAsync(Message *message);
基於消息對單個TopicPartition異步提交位移
virtual ErrorCode commitAsync (const std::vector&lt;TopicPartition*&gt; &offsets) = 0;
對多個TopicPartition異步提交位移
ErrorCode close();
正常關閉,會阻塞直到四個操做完成(觸發避免當前分區分配的局部再平衡,中止當前賦值消費,提交位移,離開分組)
virtual ConsumerGroupMetadata *groupMetadata () = 0;
返回本Consumer實例的Consumer Group的元數據
ErrorCode position (std::vector&lt;TopicPartition*&gt; &partitions)
獲取TopicPartition對象中當前位移,會別填充TopicPartition對象的offset字段。
ErrorCode seek (const TopicPartition &partition, int timeout_ms)
定位TopicPartition的Consumer到位移。
timeout_ms爲0,會開始Seek並當即返回;timeout_ms非0,Seek會等待timeout_ms時間。
ErrorCode offsets_store (std::vector&lt;TopicPartition*&gt; &offsets)
爲TopicPartition存儲位移,位移會在auto.commit.interval.ms時提交或是被手動提交。
enable.auto.offset.store屬性必須設置爲fasle。

十一、RdKafka::Event

enum  Type{ 
  EVENT_ERROR, //錯誤條件事件
  EVENT_STATS, // Json文檔統計事件
  EVENT_LOG, // Log消息事件
  EVENT_THROTTLE // 來自Broker的throttle級信號事件
};

virtual Type type() const =0;
返回事件類型
virtual ErrorCode err() const =0;
返回事件錯誤代碼
virtual Severity severity() const =0;
返回log嚴重級別
virtual std::string fac() const =0;
返回log基礎字符串
virtual std::string str () const =0;
返回Log消息字符串
virtual int throttle_time() const =0;
返回throttle時間
virtual std::string broker_name() const =0;
返回Broker名稱
virtual int broker_id() const =0;
返回Broker ID

十二、RdKafka::Queue

建立新的消息隊列,消息隊列運行客戶端從多個topic+partitions對象從新路由消費消息到單個消息隊列。包含多個topic+partitions對象的消息隊列會運行一次consume(),而不是針對每一個topic+partitions對象都執行。
static Queue * create(Handle *handle);
建立Kafka客戶端的消息隊列
消息隊列容許應用程序轉發從多個Topic+Partition消費的消息到隊列點。
virtual ErrorCode forward (Queue *dst) = 0;
將消息隊列消息轉移到dst消息隊列。不管dst是否爲NULL,調用本函數後,src不會轉移其fetch隊列到消費者隊列。
virtual Message *consume (int timeout_ms) = 0;
從消息隊列中消費消息或獲取錯誤事件。釋放消息須要使用delete。
virtual int poll (int timeout_ms) = 0;
poll消息隊列,在任何入隊回調函數會運行。禁止對包含消息的隊列使用。返回事件數量,超時返回0。
virtual void io_event_enable (int fd, const void *payload, size_t size) = 0;
開啓消息隊列的IO事件觸發。fd=-1,關閉事件觸發。RdKafka會維護一個payload的拷貝。使用轉移隊列時,IO事件觸發必須打開。

1三、RdKafka::BrokerMetadata

virtual int32_t id() const =0;
返回Broker的ID
virtual const std::string host() const =0;
返回Broker主機
virtual int port() const =0;
返回Broker監聽端口

1四、RdKafka::Metadata

typedef std::vector<const BrokerMetadata*> BrokerMetadataVector;
typedef std::vector<const TopicMetadata*> TopicMetadataVector;
typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
typedef TopicMetadataVector::const_iterator TopicMetadataIterator;

virtual const BrokerMetadataVector * brokers() const =0;
返回Broker鏈表
virtual const TopicMetadataVector * topics() const =0;
返回Topic鏈表
virtual int32_t orig_broker_id() const =0;
返回metadata所在Broker的ID
virtual const std::string orig_broker_name() const =0;
返回metadata所在Broker的名稱

1五、RdKafka::ConsumeCb

virtual void consume_cb(Message &message, void *opaque)=0;
ConsumeCb用於RdKafka::Consumer::consume_callback()接口,對消費的每條消息會調用ConsumeCb回調函數。

1六、RdKafka::DeliveryReportCb

每收到一條RdKafka::Producer::produce()函數生產的消息,調用一次投遞報告回調函數,RdKafka::Message::err()將會標識Produce請求的結果。爲了使用隊列化的投遞報告回調函數,必須調用RdKafka::poll()函數。
virtual void dr_cb(Message &message)=0;
當一條消息成功生產或是rdkafka遇到永久失敗或是重試次數耗盡,
投遞報告回調函數會被調用。

1七、RdKafka::EventCb

事件是從RdKafka傳遞錯誤、統計信息、日誌等消息到應用程序的通用接口。
virtual void event_cb(Event &event)=0;
事件回調函數

1八、RdKafka::OffsetCommitCb

virtual void offset_commit_cb(RdKafka::ErrorCode err, std::vector&lt; TopicPartition * &gt; &offsets)=0;
用於消費者組的位移提交回調函數。
自動或手動提交位移的結果回被位移提交回調函數並被RdKafka::KafkaConsumer::consume()函數使用。
若是沒有分區有合法的位移要提交,位移提交回調函數會被調用,此時err爲ERR_NO_OFFSET。
offsets鏈表包含每一個分區的信息,提交的Topic、Partition、offset、提交錯誤。

1九、RdKafka::OpenCb

virtual int open_cb(const std::string &path, int flags, int mode)=0;
Open回調函數用於使用flags、mode打開指定path的文件。

20、RdKafka::PartitionerCb

PartitionerCb用實現自定義分區策略,須要使用RdKafka::Conf::set()設置partitioner_cb屬性。
virtual int32_t partitioner_cb(const Topic *topic, const std::string *key, int32_t partition_cnt, void *msg_opaque)=0;
Partitioner回調函數
返回topic主題中使用key的分區,key能夠是NULL或字符串。
返回值必須在0到partition_cnt間,若是分區失敗可能返回RD_KAFKA_PARTITION_UA(-1)。
msg_opaque與RdKafka::Producer::produce()調用提供的msg_opaque相同。

2一、RdKafka::PartitionerKeyPointerCb

virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0;
變體partitioner回調函數
使用key指針及其長度替代字符串類型key。
key能夠爲NULL,key_len能夠爲0。

2二、RdKafka::PartitionMetadata

typedef std::vector<int32_t> ReplicasVector;
typedef std::vector<int32_t> ISRSVector;
typedef ReplicasVector::const_iterator ReplicasIterator;
typedef ISRSVector::const_iterator ISRSIterator;

virtual int32_t id() const =0;
返回分區ID
virtual ErrorCode err() const =0;
返回Broker報告的分區錯誤
virtual int32_t leader() const =0;
返回分區Leader的Broker ID
virtual const std::vector&lt;int32_t&gt; * replicas() const =0;
返回備份Broker鏈表
virtual const std::vector&lt;int32_t&gt; * isrs() const =0;
返回ISR Broker鏈表,Broker可能會返回一個緩存或過時的ISR鏈表。

2三、RdKafka::RebalanceCb

virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector&lt; TopicPartition * &gt; &partitions)=0;
用於RdKafka::KafkaConsunmer的組再平衡回調函數
註冊rebalance_cb回調函數會關閉rdkafka的自動分區賦值和再分配並替換應用程序的rebalance_cb回調函數。
再平衡回調函數負責對基於RdKafka::ERR_ASSIGN_PARTITIONS和 RdKafka::ERR_REVOKE_PARTITIONS事件更新rdkafka的分區分配,也能處理任意前二者錯誤除外其它再平衡失敗錯誤。對於RdKafka::ERR_ASSIGN_PARTITIONS和 RdKafka::ERR_REVOKE_PARTITIONS事件以外的其它再平衡失敗錯誤,必須調用unassign()同步狀態。
沒有再平衡回調函數,rdkafka也能自動完成再平衡過程,但註冊一個再平衡回調函數可使應用程序在執行其它操做時擁有更大的靈活性,例如從指定位置獲取位移或手動提交位移。

class MyRebalanceCb : public RdKafka::RebalanceCb
{
public:
    void rebalance_cb (RdKafka::KafkaConsumer *consumer,
                       RdKafka::ErrorCode err,
                       std::vector<RdKafka::TopicPartition*> &partitions)
    {
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
        {
            // application may load offets from arbitrary external
            // storage here and update \p partitions
            consumer->assign(partitions);

        }
        else if (err == RdKafka::ERR__REVOKE_PARTITIONS)
        {
            // Application may commit offsets manually here
            // if auto.commit.enable=false
            consumer->unassign();
        }
        else
        {
            std::cerr << "Rebalancing error: " <<
                      RdKafka::err2str(err) << std::endl;
            consumer->unassign();
        }
    }
};

2四、RdKafka::SocketCb

SocketCb回調函數用於打開一個Socket套接字。
virtual int socket_cb(int domain, int type, int protocol)=0;
Socket回調函數
用於打開使用domain、type、protocol建立的Socket鏈接。

2五、RdKafka::Error

static Error *create (ErrorCode code, const std::string *errstr);
建立Kafka錯誤對象,RdKafka::Error對象必需要顯示釋放。
virtual ErrorCode code () const = 0;
返回Kafka錯誤的錯誤碼
virtual std::string name () const = 0;
返回Kafka錯誤的錯誤碼名稱
virtual std::string str () const = 0;
返回Kafka錯誤的錯誤描述
virtual bool is_fatal () const = 0;
若是Kafka錯誤會致使客戶端不可用的fatal錯誤,返回1,不然返回0。
virtual bool is_retriable () const = 0;
若是操做可重試,返回1,不然返回0。
virtual bool txn_requires_abort () const = 0;
若是Kafka錯誤是可終止的事務型錯誤,返回1,不然返回0。

2六、RdKafka::TopicMetadata

typedef std::vector<const PartitionMetadata*> PartitionMetadataVector;
typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;

virtual const std::string topic() const = 0;
返回Topic名稱。
virtual const PartitionMetadataVector *partitions() const = 0;
返回Partition列表
virtual ErrorCode err() const = 0;
返回Broker報告的Topic錯誤。

2、Kafka Producer C++ API封裝

一、Kafka Producer使用流程

(1)建立Kafka配置實例
RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)
(2)建立Topic配置實例
RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC)
(3)設置Kafka配置實例Broker屬性
RdKafka::Conf::ConfResult RdKafka::Conf::set(const std::string &name, const std::string &value, std::string &errstr)
(4)設置Topic配置實例屬性
RdKafka::Conf::ConfResult RdKafka::Conf::set (const std::string &name, const std::string &value, std::string &errstr)
(5)註冊回調函數

Conf::ConfResult RdKafka::Conf::set ("dr_cb", RdKafka::DeliveryReportCb *dr_cb, std::string &errstr);
Conf::ConfResult RdKafka::Conf::set ("event_cb", RdKafka::EventCb *event_cb, std::string &errstr);
Conf::ConfResult RdKafka::Conf::set ("socket_cb", RdKafka::SocketCb *socket_cb, std::string &errstr);
Conf::ConfResult RdKafka::Conf::set ("open_cb", RdKafka::OpenCb *open_cb, std::string &errstr);
Conf::ConfResult RdKafka::Conf::set ("offset_commit_cb", RdKafka::OffsetCommitCb *offset_commit_cb, std::string &errstr);

分區策略回調函數須要註冊到Topic配置實例:

Conf::ConfResult RdKafka::Conf::set ("partitioner_cb", RdKafka::PartitionerCb *dr_cb, std::string &errstr);
Conf::ConfResult RdKafka::Conf::set ("partitioner_key_pointer_cb", RdKafka::PartitionerKeyPointerCb *dr_cb, std::string &errstr);

(6)建立Kafka Producer客戶端實例
static RdKafka::Producer* RdKafka::Producer::create(RdKafka::Conf *conf, std::string &errstr);
conf爲Kafka配置實例
(7)建立Topic實例

static RdKafka::Topic* RdKafka::Topic::create(RdKafka::Handle *base, 
                                              const std::string &topic_str,
                                              RdKafka::Conf *conf, 
                                              std::string &errstr);

conf爲Topic配置實例
(8)生產消息

RdKafka::ErrorCode RdKafka::Producer::produce(RdKafka::Topic *topic,
                                              int32_t partition,
                                              int msgflags,
                                              void *payload,
                                              size_t len,
                                              const std::string *key,
                                              void *msg_opaque);

(9)阻塞等待Producer生產消息完成
int RdKafka::Producer::poll (int timeout_ms);
(10)等待Produce請求完成
RdKafka::ErrorCode RdKafka::Producer::flush(int timeout_ms);
(11)銷燬Kafka Producer客戶端實例
int RdKafka::wait_destroyed(int timeout_ms);

二、Kafka Producer實例

KafkaProducer.h文件:

#ifndef KAFKAPRODUCER_H
#define KAFKAPRODUCER_H

#pragma once
#include <string>
#include <iostream>
#include "rdkafkacpp.h"

class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:
    void dr_cb(RdKafka::Message &message)
    {
        if (message.err())
            std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
        else
            std::cerr << "Message delivered to topic " << message.topic_name()
                      << " [" << message.partition() << "] at offset "
                      << message.offset() << std::endl;
    }
};

class ProducerEventCb : public RdKafka::EventCb
{
public:
    void event_cb(RdKafka::Event &event)
    {
        switch(event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
            break;
        case RdKafka::Event::EVENT_STATS:
            std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl;
            break;
        case RdKafka::Event::EVENT_LOG:
            std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl;
            break;
        case RdKafka::Event::EVENT_THROTTLE:
            std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl;
            break;
        }
    }
};

class HashPartitionerCb : public RdKafka::PartitionerCb
{
public:
    int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
                            int32_t partition_cnt, void *msg_opaque)
    {
        char msg[128] = {0};
        sprintf(msg, "HashPartitionerCb:[%s][%s][%d]", topic->name().c_str(),
                key->c_str(), partition_cnt);
        std::cout << msg << std::endl;
        return generate_hash(key->c_str(), key->size()) % partition_cnt;
    }
private:

    static inline unsigned int generate_hash(const char *str, size_t len)
    {
        unsigned int hash = 5381;
        for (size_t i = 0 ; i < len ; i++)
            hash = ((hash << 5) + hash) + str[i];
        return hash;
    }
};

class KafkaProducer
{
public:
    /**
     * @brief KafkaProducer
     * @param brokers
     * @param topic
     * @param partition
     */
    explicit KafkaProducer(const std::string& brokers, const std::string& topic,
                           int partition);
    /**
     * @brief push Message to Kafka
     * @param str, message data
     */
    void pushMessage(const std::string& str, const std::string& key);
    ~KafkaProducer();

protected:
    std::string m_brokers;//Broker列表,多個使用逗號分隔
    std::string m_topicStr;// Topic名稱
    int m_partition;// 分區
    RdKafka::Conf* m_config;// Kafka Conf對象
    RdKafka::Conf* m_topicConfig;// Topic Conf對象
    RdKafka::Topic* m_topic;// Topic對象
    RdKafka::Producer* m_producer;// Producer對象
    RdKafka::DeliveryReportCb* m_dr_cb;
    RdKafka::EventCb* m_event_cb;
    RdKafka::PartitionerCb* m_partitioner_cb;
};

#endif // KAFKAPRODUCER_H

KafkaProducer.cpp文件:

#include "KafkaProducer.h"

KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition)
{
    m_brokers = brokers;
    m_topicStr = topic;
    m_partition = partition;
    // 建立Kafka Conf對象
    m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if(m_config == NULL)
    {
        std::cout << "Create RdKafka Conf failed." << std::endl;
    }
    // 建立Topic Conf對象
    m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if(m_topicConfig == NULL)
    {
        std::cout << "Create RdKafka Topic Conf failed." << std::endl;
    }
    // 設置Broker屬性
    RdKafka::Conf::ConfResult errCode;
    m_dr_cb = new ProducerDeliveryReportCb;
    std::string errorStr;
    errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    m_event_cb = new ProducerEventCb;
    errCode = m_config->set("event_cb", m_event_cb, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }

    m_partitioner_cb = new HashPartitionerCb;
    errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }

    errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }

    errCode = m_config->set("message.max.bytes", "10240000", errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    // 建立Producer
    m_producer = RdKafka::Producer::create(m_config, errorStr);
    if(m_producer == NULL)
    {
        std::cout << "Create Producer failed:" << errorStr << std::endl;
    }
    // 建立Topic對象
    m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
    if(m_topic == NULL)
    {
        std::cout << "Create Topic failed:" << errorStr << std::endl;
    }
}

void KafkaProducer::pushMessage(const std::string& str, const std::string& key)
{
    int32_t len = str.length();
    void* payload = const_cast<void*>(static_cast<const void*>(str.data()));
    RdKafka::ErrorCode errorCode = m_producer->produce(m_topic, RdKafka::Topic::PARTITION_UA,
                                   RdKafka::Producer::RK_MSG_COPY,
                                   payload, len, &key, NULL);
    m_producer->poll(0);
    if (errorCode != RdKafka::ERR_NO_ERROR)
    {
        std::cerr << "Produce failed: " << RdKafka::err2str(errorCode) << std::endl;
        if(errorCode ==  RdKafka::ERR__QUEUE_FULL)
        {
            m_producer->poll(1000);
        }
    }
}

KafkaProducer::~KafkaProducer()
{
    while (m_producer->outq_len() > 0)
    {
        std::cerr << "Waiting for " << m_producer->outq_len() << std::endl;
        m_producer->flush(5000);
    }
    delete m_config;
    delete m_topicConfig;
    delete m_topic;
    delete m_producer;
    delete m_dr_cb;
    delete m_event_cb;
    delete m_partitioner_cb;
}

main.cpp:

#include <iostream>
#include "KafkaProducer.h"
using namespace std;

int main()
{
    // 建立Producer
    KafkaProducer producer("192.168.0.105:9092", "test", 0);
    for(int i = 0; i < 10000; i++)
    {
        char msg[64] = {0};
        sprintf(msg, "%s%4d", "Hello RdKafka", i);
        // 生產消息
        char key[8] = {0};
        sprintf(key, "%d", i);
        producer.pushMessage(msg, key);
    }
    RdKafka::wait_destroyed(5000);
}

CMakeList.txt:

cmake_minimum_required(VERSION 2.8)

project(KafkaProducer)

set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_COMPILER "g++")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")
set(CMAKE_INCLUDE_CURRENT_DIR ON)

# Kafka頭文件路徑
include_directories(/usr/local/include/librdkafka)
# Kafka庫路徑
link_directories(/usr/local/lib)

aux_source_directory(. SOURCE)

add_executable(${PROJECT_NAME} ${SOURCE})
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)

三、Kafka消息查看

進入kafka容器:
docker exec -it kafka-test /bin/bash
查看Topic的消息:
kafka-console-consumer.sh --bootstrap-server kafka-test:9092 --topic test --from-beginning

3、Kafka Consumer C++ API封裝

一、Kafka Consumer使用流程

RdKafka提供了兩種消費者API,低級API的Consumer和高級API的KafkaConsumer,本文使用KafkaConsumer。
(1)建立Kafka配置實例
RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)
(2)建立Topic配置實例
RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC)
(3)設置Kafka配置實例Broker屬性
RdKafka::Conf::ConfResult RdKafka::Conf::set(const std::string &name, const std::string &value, std::string &errstr)
(4)設置Topic配置實例屬性
RdKafka::Conf::ConfResult RdKafka::Conf::set (const std::string &name, const std::string &value, std::string &errstr)
(5)註冊回調函數

Conf::ConfResult RdKafka::Conf::set ("event_cb", RdKafka::EventCb *dr_cb, std::string &errstr);
Conf::ConfResult RdKafka::Conf::set ("socket_cb", RdKafka::SocketCb *socket_cb, std::string &errstr);
Conf::ConfResult RdKafka::Conf::set ("open_cb", RdKafka::OpenCb *open_cb, std::string &errstr);
Conf::ConfResult RdKafka::Conf::set ("rebalance_cb", RdKafka::RebalanceCb *rebalance_cb, std::string &errstr);
Conf::ConfResult RdKafka::Conf::set ("offset_commit_cb", RdKafka::OffsetCommitCb *offset_commit_cb, std::string &errstr);
Conf::ConfResult RdKafka::Conf::set ("consume_cb", RdKafka::ConsumeCb *consume_cb, std::string &errstr);

(6)建立Kafka Consumer客戶端實例
static RdKafka::KafkaConsumer* RdKafka::KafkaConsumer::create(RdKafka::Conf *conf, std::string &errstr);
conf爲Kafka配置實例
(7)建立Topic實例

static RdKafka::Topic* RdKafka::Topic::create(RdKafka::Handle *base, 
                                              const std::string &topic_str,
                                              RdKafka::Conf *conf, 
                                              std::string &errstr);

conf爲Topic配置實例
(8)訂閱主題
RdKafka::ErrorCode RdKafka::KafkaConsumer::subscribe(const std::vector&lt;std::string&gt; &topics);
(9)消費消息
RdKafka::Message* RdKafka::KafkaConsumer::consume (int timeout_ms);
(10)關閉消費者實例
RdKafka::ErrorCode RdKafka::KafkaConsumer::close();
(11)銷燬釋放RdKafka資源
int RdKafka::wait_destroyed(int timeout_ms);

二、Kafka Consumer實例

KafkaConsumer.h文件:

#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H

#pragma once

#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>
#include "rdkafkacpp.h"

class ConsumerEventCb : public RdKafka::EventCb
{
public:
    void event_cb (RdKafka::Event &event)
    {
        switch (event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            if (event.fatal())
            {
                std::cerr << "FATAL ";
            }
            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                      event.str() << std::endl;
            break;

        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;

        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                    event.severity(), event.fac().c_str(), event.str().c_str());
            break;

        case RdKafka::Event::EVENT_THROTTLE:
            std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
                      event.broker_name() << " id " << (int)event.broker_id() << std::endl;
            break;

        default:
            std::cerr << "EVENT " << event.type() <<
                      " (" << RdKafka::err2str(event.err()) << "): " <<
                      event.str() << std::endl;
            break;
        }
    }
};

class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
    static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions)
    {
        for (unsigned int i = 0 ; i < partitions.size() ; i++)
            std::cerr << partitions[i]->topic() <<
                      "[" << partitions[i]->partition() << "], ";
        std::cerr << "\n";
    }

public:
    void rebalance_cb (RdKafka::KafkaConsumer *consumer,
                       RdKafka::ErrorCode err,
                       std::vector<RdKafka::TopicPartition*> &partitions)
    {
        std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
        printTopicPartition(partitions);
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
        {
            consumer->assign(partitions);
            partition_count = (int)partitions.size();
        }
        else
        {
            consumer->unassign();
            partition_count = 0;
        }
    }
private:
    int partition_count;
};

class KafkaConsumer
{
public:/**
     * @brief KafkaConsumer
     * @param brokers
     * @param groupID
     * @param topics
     * @param partition
     */
    explicit KafkaConsumer(const std::string& brokers, const std::string& groupID,
                           const std::vector<std::string>& topics, int partition);
    void pullMessage();
    ~KafkaConsumer();
protected:
    std::string m_brokers;
    std::string m_groupID;
    std::vector<std::string> m_topicVector;
    int m_partition;
    RdKafka::Conf* m_config;
    RdKafka::Conf* m_topicConfig;
    RdKafka::KafkaConsumer* m_consumer;
    RdKafka::EventCb* m_event_cb;
    RdKafka::RebalanceCb* m_rebalance_cb;
};

#endif // KAFKACONSUMER_H

KafkaConsumer.cpp文件:

#include "KafkaConsumer.h"

KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
                             const std::vector<std::string>& topics, int partition)
{
    m_brokers = brokers;
    m_groupID = groupID;
    m_topicVector = topics;
    m_partition = partition;

    std::string errorStr;
    RdKafka::Conf::ConfResult errorCode;
    m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

    m_event_cb = new ConsumerEventCb;
    errorCode = m_config->set("event_cb", m_event_cb, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    m_rebalance_cb = new ConsumerRebalanceCb;
    errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    errorCode = m_config->set("enable.partition.eof", "false", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    errorCode = m_config->set("group.id", m_groupID, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    // 獲取最新的消息數據
    errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Topic Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
    if(m_consumer == NULL)
    {
        std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
    }
    std::cout << "Created consumer " << m_consumer->name() << std::endl;
}

void msg_consume(RdKafka::Message* msg, void* opaque)
{
    switch (msg->err())
    {
    case RdKafka::ERR__TIMED_OUT:
        std::cerr << "Consumer error: " << msg->errstr() << std::endl;
        break;
    case RdKafka::ERR_NO_ERROR:
        std::cout << " Message in " << msg->topic_name() << " ["
                  << msg->partition() << "] at offset " << msg->offset()
                  << "key: " << msg->key() << " payload: "
                  << (char*)msg->payload() << std::endl;
        break;
    default:
        std::cerr << "Consumer error: " << msg->errstr() << std::endl;
        break;
    }
}

void KafkaConsumer::pullMessage()
{
    // 訂閱Topic
    RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
    if (errorCode != RdKafka::ERR_NO_ERROR)
    {
        std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
    }
    // 消費消息
    while(true)
    {
        RdKafka::Message *msg = m_consumer->consume(1000);
        msg_consume(msg, NULL);
        delete msg;
    }
}

KafkaConsumer::~KafkaConsumer()
{
    m_consumer->close();
    delete m_config;
    delete m_topicConfig;
    delete m_consumer;
    delete m_event_cb;
    delete m_rebalance_cb;

}

main.cpp文件:

#include "KafkaConsumer.h"

int main()
{
    std::string brokers = "192.168.0.105:9092";
    std::vector<std::string> topics;
    topics.push_back("test");
    topics.push_back("test2");
    std::string group = "testGroup";
    KafkaConsumer consumer(brokers, group, topics, RdKafka::Topic::OFFSET_BEGINNING);
    consumer.pullMessage();

    RdKafka::wait_destroyed(5000);
    return 0;
}

CMakeList.txt:

cmake_minimum_required(VERSION 2.8)

project(KafkaConsumer)

set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_COMPILER "g++")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")
set(CMAKE_INCLUDE_CURRENT_DIR ON)

# Kafka頭文件路徑
include_directories(/usr/local/include/librdkafka)
# Kafka庫路徑
link_directories(/usr/local/lib)

aux_source_directory(. SOURCE)

add_executable(${PROJECT_NAME} ${SOURCE})
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)

https://github.com/scorpiostudio/Kafka

相關文章
相關標籤/搜索