kafka之c接口經常使用API------librdkafka

1 安裝方法以及相關庫文件git

  https://github.com/edenhill/librdkafkagithub

api

  • High-level producer
  • High-level consumer
  • Simple (Low-level) consumer
  • 壓縮:snappy, gzip, lz4
  • SSL
  • SASL

  consumer有兩套API,高級(high-level)和底層(simple)的,應該叫底層API或者低級API,它跟高級API的區別是沒有自動負載均衡,而高級API會自動進行負載均衡。app

3 kafka主要的用途負載均衡

  發數據---->produceride

    //發一條spa

    rd_kafka_produce().net

    //發多條code

    rd_kafka_produce_batch()blog

  收數據---->consumer

 

 

 

 

 

  在收發數據以前至少須要一個統一的句柄,方便kafka內部準備好連接brokers集羣,初始化kafka內部結構等

  

  創建這個kafka句柄須要知道鏈接到哪一個broker

  

  發佈消息使用rd_kafka_top_t, 

 1 // 對rd_kafka_topic_partition_list_t結構的操做  2 // 建立
 3 rd_kafka_topic_partition_list_new();  4 // 增長元素
 5 rd_kafka_topic_partition_list_add();  6 // 刪除元素
 7 rd_kafka_topic_partition_list_del();  8 // 查找元素
 9 rd_kafka_topic_partition_list_find(); 10 
11 // 對rd_kafka_topic_t的操做 12 // 建立
13 rd_kafka_topic_new(); 14 // 刪除
15 rd_kafka_topic_destroy(); 16 // 獲取該topic的名字
17 rd_kafka_topic_name(); 18 // 獲取該topic傳入的應用參數
19 rd_kafka_topic_opaque(); 20 
21 // 使用rd_kafka_topic_partition_list_t的時候,topic+partition是連在一塊兒的, 22 // 因此給kafka句柄的時候只用一個參數就夠了 23 // 訂閱消息
24 rd_kafka_subscribe (rd_kafka_t *rk, 25                             const rd_kafka_topic_partition_list_t *topics); 26 // 指定消費的partition,能夠在運行時更換
27 rd_kafka_assign (rd_kafka_t *rk, 28                             const rd_kafka_topic_partition_list_t *partitions); 29 
30 // 用rd_kafka_topic_t比較麻煩,須要配合一個partition才行 31 // 直接啓動consumer了
32 rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition, 33  int64_t offset); 34 // 每次接收也要帶上partition
35 rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, 36                             int timeout_ms); 37 
38 
39 // 發送的時候使用 rd_kafka_topic_t
40 int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, 41                             int msgflags, 42                             void *payload, size_t len, 43                             const void *key, size_t keylen, 44                             void *msg_opaque);
View Code

後續用了繼續作筆記,關於錯誤處理,topic_partition_list操做等

https://blog.csdn.net/lijinqi1987/article/details/76582067

http://suntus.github.io/2016/07/07/librdkafka--kafka%20C%20api%E4%BB%8B%E7%BB%8D/

相關文章
相關標籤/搜索