librdkafka帶鑑權認證訪問kafka服務器

librdkafka簡介

librdkafka是用c語言實現的一個高性能的kafka客戶端,由於性能強大,開發者們基於librdkafka開發了各類語言的kafka客戶端,好比librdkafkad(c++),, node-rdkafka(Node.js), confulent-kafka-python(Python)等。
librdkafka的高性能主要體如今其多線程的設計以及儘量的下降內存拷貝。node

librdkakfa API 簡介

librdkafka github地址:https://github.com/edenhill/l... ,
其中,C語言API能夠參考src/rdkafka.h頭文件,簡要介紹幾個關鍵的對象python

  • rd_kafka_t: kafka客戶端對象
  • rd_kafka_conf_t: kafka客戶端配置對象
  • rd_kafka_topic_t: kafka topic對象

建立這幾個對象所使用的函數:c++

  • rd_kafka_new()
  • rd_kafka_conf_new()
  • rd_kafka_topic_new()

librdkafka支持多種協議以控制kafka服務器的訪問權限,如SASL_PALIN, PLAINTEXT, SASL_SSL等,在使用librdkafka時須要經過security.protocol參數指定協議類型,再輔以相應協議所需的其它參數完成權限認證。git

若是使用SASL協議進行權限認證,須要對librdkafka添加SASL庫依賴並從新編譯。例如:在CentOS下安裝以下依賴包:github

yum -y install cyrus-sasl cyrus-sasl-devel

通過從新編譯librdkafka後,進入examples目錄下,執行bootstrap

./rdkafka_example -X builtin.features

結果爲:api

builtin.features = gzip,snappy,ssl,sasl,regex

能夠看到librdkafka已經有了sasl特性,後續能夠經過sasl協議進行訪問認證。安全

producer 代碼示例

初始化producer服務器

int KafkaApi::init_producer(const std::string &brokers,
                            const std::string &username,
                            const std::string &password) {
  char errstr[512];
  /* Kafka configuration */
  if (NULL == conf_) {
    conf_ = rd_kafka_conf_new();
  }

  rd_kafka_conf_set(conf_, "queued.min.messages", "20", NULL, 0);
  rd_kafka_conf_set(conf_, "bootstrap.servers", brokers.c_str(), errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "security.protocol", "sasl_plaintext", errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
  rd_kafka_conf_set(conf_, "sasl.username", username.c_str(), errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "sasl.password", password.c_str(), errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "api.version.request", "true", errstr,
                    sizeof(errstr));
  rd_kafka_conf_set_dr_msg_cb(conf_, dr_msg_cb_trampoline);

  /* Create Kafka handle */
  if (!(rk_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr)))) {
    fprintf(stderr, "%% Failed to init producer: %s\n", errstr);
    exit(1);
  }

  return 0;
}

初始化過程介紹:多線程

一、首先經過rd_kafka_conf_new()函數建立rd_kafka_conf_t對象

二、設置rd_kafka_conf_t對象,設置kafka客戶端參數,示例參數爲:

  • bootstrap.servers: broker地址列表
  • security.protocol: 安全協議類型,示例爲SASL_PLAINTEXT
  • sasl.mechanisms: sasl協議機制,示例爲PLAIN, 表示普通文本
  • sasl.username: 認證用戶名
  • sasl.password: 認證密碼
  • api.version.request:
    可選,librdkafka與kafka服務器版本適配參數,該參數爲true表示容許librdkafka向broker發送請求詢問broker支持的API版本列表(Apache
    Kafka
    v0.10.0版本後支持),以完成版本適配,更多版本適配要點見https://github.com/edenhill/l...
  • 設置發送消息的回調函數,由於librdkafka發送消息爲非阻塞的,須要經過rd_kafka_poll()方法輪詢消息是否發送成功,並設置響應的回調函數確認消息是否發送成功

三、調用rd_kafka_new()函數建立rd_kafka_t對象

發送消息

int KafkaApi::send_message(const std::string &topic, const char *data,
                           const int &data_len) {
  rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk_, topic.c_str(), NULL);

  if (!rkt) {
    COMMLIB_LOG_ERR("kafka: create topic failed, err:%s",
                    rd_kafka_err2str(rd_kafka_errno2err(errno)));
    return rt::KDFKA_PRODUCE_ERR;
  }

  int ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
                             const_cast<char *>(data), data_len, NULL, 0, NULL);
  if (ret == -1) {
    COMMLIB_LOG_ERR("kafka: send message failed, err:%s",
                    rd_kafka_err2str(rd_kafka_errno2err(errno)));
    return rt::KDFKA_PRODUCE_ERR;
  }

  COMMLIB_LOG_DEBUG("produce message [%s]", data);
  rd_kafka_poll(rk_, 0);
  return rt::SUCCESS;
}

發送消息過程介紹:

一、經過rd_kafka_topic_new()方法建立rd_kafka_topic_t對象,注意topic是自動建立的(須要broker端設置可否自動建立topic的參數:auto.create.topics.enable=true), 除此以外,topic可否建立成功還與認證用戶的權限有關,若是認證用戶在broker端爲super.users,則topic可以自動建立成功,不然則會報錯: 用戶無權限,須要先給用戶添加ACL權限才行;最後一點,對於已經存在的topic, rd_kafka_topic_new()方法仍然返回的是舊的對象

二、發送消息經過調用rd_kafka_produce()函數完成,該函數的參數爲:

  • rd_kafka_topic_t對象
  • partition: RD_KAFKA_PARTITION_UA表示爲不設置
  • msgflags: 可設置爲0或RD_KAFKA_MSG_F_COPY, RD_KAFKA_MSG_F_FREE,
    RD_KAFKA_MSG_F_BLOCK,
    RD_KAFKA_MSG_F_COPY表示發送的消息內容參數爲值傳遞,rd_kafka_produce()函數返回以後將不會仍持有消息內容的引用
  • payload, 消息內容指針
  • len, 消息長度
  • key, 消息的key
  • msg_opaque: 每條消息的透明度指針,在消息發送的回調函數中使用

三、調用rd_kafka_poll()函數,使得消息發送的回調函數可以觸發, 該函數第一個參數爲rd_kafka_t對象,第二個參數爲timeout_ms,設置爲0表示爲非阻塞

注意事項
在使用librdkafka帶鑑權認證訪問kafka服務器的過程當中,解決消息發送失敗問題的關鍵點有:

  • librdkafka的SASL依賴有沒有添加
  • SASL認證的參數配置有沒有正確,須要確認用戶在broker端是否已經添加,以及確認用戶擁有的權限
  • api.version.request參數,該參數設置不正確,將直接致使消息發送失敗,使用過程當中須要注意librdkafka的版本與broker的版本
相關文章
相關標籤/搜索