【轉】c++(11)使用librdkafka庫實現kafka的消費實例

版權聲明:本文爲博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處連接和本聲明。
本文連接:https://blog.csdn.net/lijinqi1987/article/details/76691170c++

 

librdkafka在c語言的基礎上封裝了一層c++的API,能夠實現kafka的消費操做,基本操做步驟以下bootstrap


一、建立kafka 配置fetch

RdKafka::Conf *conf = nullptr;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);.net

 

二、設置kafka各項參數server


/*設置broker list*/
conf->set("bootstrap.servers", brokers_, errstr);
blog

/*設置consumer group*/
conf->set("group.id", groupid_, errstr);
kafka

/*每次從單個分區中拉取消息的最大尺寸*/
conf->set("max.partition.fetch.bytes", strfetch_num, errstr);it


三、建立kafka topic配置
RdKafka::Conf *tconf = nullptr;
tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);io


四、設置kafka topic參數class

if(tconf->set("auto.offset.reset", "smallest", errstr)


五、建立kafka consumer實例

kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);


六、建立kafka topic

RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);


七、啓動kafka consumer實例

RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);


八、消費kafka

kafka_consumer_->consume(topic_, partition_, timeout_ms);


九、阻塞等待消息

kafka_consumer_->poll(0);


十、中止消費

kafka_consumer_->stop(topic_, partition_);


十一、銷燬consumer實例

RdKafka::wait_destroyed(5000);

相關文章
相關標籤/搜索