librdkafka是用c語言實現的一個高性能的kafka客戶端,由於性能強大,開發者們基於librdkafka開發了各類語言的kafka客戶端,好比librdkafkad(c++),, node-rdkafka(Node.js), confulent-kafka-python(Python)等。
librdkafka的高性能主要體如今其多線程的設計以及儘量的下降內存拷貝。node
librdkafka github地址:https://github.com/edenhill/l... ,
其中,C語言API能夠參考src/rdkafka.h頭文件,簡要介紹幾個關鍵的對象python
建立這幾個對象所使用的函數:c++
librdkafka支持多種協議以控制kafka服務器的訪問權限,如SASL_PALIN, PLAINTEXT, SASL_SSL等,在使用librdkafka時須要經過security.protocol參數指定協議類型,再輔以相應協議所需的其它參數完成權限認證。git
若是使用SASL協議進行權限認證,須要對librdkafka添加SASL庫依賴並從新編譯。例如:在CentOS下安裝以下依賴包:github
yum -y install cyrus-sasl cyrus-sasl-devel
通過從新編譯librdkafka後,進入examples目錄下,執行bootstrap
./rdkafka_example -X builtin.features
結果爲:api
builtin.features = gzip,snappy,ssl,sasl,regex
能夠看到librdkafka已經有了sasl特性,後續能夠經過sasl協議進行訪問認證。安全
初始化producer服務器
int KafkaApi::init_producer(const std::string &brokers, const std::string &username, const std::string &password) { char errstr[512]; /* Kafka configuration */ if (NULL == conf_) { conf_ = rd_kafka_conf_new(); } rd_kafka_conf_set(conf_, "queued.min.messages", "20", NULL, 0); rd_kafka_conf_set(conf_, "bootstrap.servers", brokers.c_str(), errstr, sizeof(errstr)); rd_kafka_conf_set(conf_, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)); rd_kafka_conf_set(conf_, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)); rd_kafka_conf_set(conf_, "sasl.username", username.c_str(), errstr, sizeof(errstr)); rd_kafka_conf_set(conf_, "sasl.password", password.c_str(), errstr, sizeof(errstr)); rd_kafka_conf_set(conf_, "api.version.request", "true", errstr, sizeof(errstr)); rd_kafka_conf_set_dr_msg_cb(conf_, dr_msg_cb_trampoline); /* Create Kafka handle */ if (!(rk_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to init producer: %s\n", errstr); exit(1); } return 0; }
初始化過程介紹:多線程
一、首先經過rd_kafka_conf_new()函數建立rd_kafka_conf_t對象
二、設置rd_kafka_conf_t對象,設置kafka客戶端參數,示例參數爲:
三、調用rd_kafka_new()函數建立rd_kafka_t對象
發送消息
int KafkaApi::send_message(const std::string &topic, const char *data, const int &data_len) { rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk_, topic.c_str(), NULL); if (!rkt) { COMMLIB_LOG_ERR("kafka: create topic failed, err:%s", rd_kafka_err2str(rd_kafka_errno2err(errno))); return rt::KDFKA_PRODUCE_ERR; } int ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, const_cast<char *>(data), data_len, NULL, 0, NULL); if (ret == -1) { COMMLIB_LOG_ERR("kafka: send message failed, err:%s", rd_kafka_err2str(rd_kafka_errno2err(errno))); return rt::KDFKA_PRODUCE_ERR; } COMMLIB_LOG_DEBUG("produce message [%s]", data); rd_kafka_poll(rk_, 0); return rt::SUCCESS; }
發送消息過程介紹:
一、經過rd_kafka_topic_new()方法建立rd_kafka_topic_t對象,注意topic是自動建立的(須要broker端設置可否自動建立topic的參數:auto.create.topics.enable=true), 除此以外,topic可否建立成功還與認證用戶的權限有關,若是認證用戶在broker端爲super.users,則topic可以自動建立成功,不然則會報錯: 用戶無權限,須要先給用戶添加ACL權限才行;最後一點,對於已經存在的topic, rd_kafka_topic_new()方法仍然返回的是舊的對象
二、發送消息經過調用rd_kafka_produce()函數完成,該函數的參數爲:
三、調用rd_kafka_poll()函數,使得消息發送的回調函數可以觸發, 該函數第一個參數爲rd_kafka_t對象,第二個參數爲timeout_ms,設置爲0表示爲非阻塞
注意事項
在使用librdkafka帶鑑權認證訪問kafka服務器的過程當中,解決消息發送失敗問題的關鍵點有: