版權聲明:本文爲博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處連接和本聲明。
本文連接:https://blog.csdn.net/lijinqi1987/article/details/76691170c++
librdkafka在c語言的基礎上封裝了一層c++的API,能夠實現kafka的消費操做,基本操做步驟以下bootstrap
一、建立kafka 配置fetch
RdKafka::Conf *conf = nullptr;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);.net
二、設置kafka各項參數server
/*設置broker list*/
conf->set("bootstrap.servers", brokers_, errstr);
blog
/*設置consumer group*/
conf->set("group.id", groupid_, errstr);
kafka
/*每次從單個分區中拉取消息的最大尺寸*/
conf->set("max.partition.fetch.bytes", strfetch_num, errstr);it
三、建立kafka topic配置
RdKafka::Conf *tconf = nullptr;
tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);io
四、設置kafka topic參數class
if(tconf->set("auto.offset.reset", "smallest", errstr)
五、建立kafka consumer實例
kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);
六、建立kafka topic
RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);
七、啓動kafka consumer實例
RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);
八、消費kafka
kafka_consumer_->consume(topic_, partition_, timeout_ms);
九、阻塞等待消息
kafka_consumer_->poll(0);
十、中止消費
kafka_consumer_->stop(topic_, partition_);
十一、銷燬consumer實例
RdKafka::wait_destroyed(5000);