RabbitMQ-C客戶端使用說明

rabbitmq-c是一個用於C語言的,與AMQP server進行交互的client庫,AMQP協議爲版本0-9-1。rabbitmq-c與server進行交互前須要首先進行login操做,在操做後,能夠根據AMQP協議規範,執行一系列操做。html

這裏,根據項目需求,只進行部分接口說明,文後附demo的github地址。python

 

接口描述:git

amqp_connection_state_t amqp_new_connection(void);github

    接口說明:聲明一個新的amqp connectionsocket

 

int amqp_open_socket(char const *hostname, int portnumber);fetch

    接口說明:獲取socket.ui

    參數說明:hostname         RabbitMQ server所在主機code

                  portnumber      RabbitMQ server監聽端口   server

 

void amqp_set_sockfd(amqp_connection_state_t state,int sockfd);htm

     接口說明:將amqp connection和sockfd進行綁定

 

amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost,int channel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method, ...);

    接口說明:用於登陸RabbitMQ server,主要目的爲了進行權限管理;

    參數說明:state    amqp connection

                  vhost   rabbit-mq的虛機主機,是rabbit-mq進行權限管理的最小單位

                  channel_max  最大連接數,此處設成0便可

                  frame_max  和客戶端通訊時所容許的最大的frame size.默認值爲131072,增大這個值有助於提升吞吐,下降這個值有利於下降時延

                  heartbeat 含義未知,默認值填0

                  sasl_method  用於SSL鑑權,默認值參考後文demo

 

amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);

    接口說明:用於關聯conn和channel

 

amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments); 

    接口說明:聲明declare

    參數說明:state

                  channel

                  exchange

                  type     "fanout"  "direct" "topic"三選一

                  passive

                  curable

                  arguments

 

amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments); 

    接口說明:聲明queue

    參數說明:state   amqp connection

                  channel 

                  queue  queue name

                  passive 

                  durable  隊列是否持久化

                  exclusive  當前鏈接不在時,隊列是否自動刪除

                  aoto_delete 沒有consumer時,隊列是否自動刪除

                  arguments 用於拓展參數,好比x-ha-policy用於mirrored queue

 

amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_tab le_t arguments);

    接口說明:聲明binding    

 

amqp_basic_qos_ok_t *amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global);

     接口說明:qos是 quality of service,咱們這裏使用主要用於控制預取消息數,避免消息按條數均勻分配,須要和no_ack配合使用

     參數說明:state

                   channel

                   prefetch_size 以bytes爲單位,0爲unlimited

                   prefetch_count 預取的消息條數

                   global

 

amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments); 

     接口說明:開始一個queue consumer

     參數說明:state

                   channel

                   queue

                   consumer_tag

                   no_local

                   no_ack    是否須要確認消息後再從隊列中刪除消息

                   exclusive

                   arguments

 

int amqp_basic_ack(amqp_connection_state_t state,amqp_channel_t channel,uint64_t delivery_tag,amqp_boolean_t multiple);

                     

int amqp_basic_publish(amqp_connection_state_t state,amqp_channel_t channel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_t mandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_ const *properties,amqp_bytes_t body);

    接口說明:發佈消息

    參數說明:state 

                  channel

                  exchange  

                  routing_key  當exchange爲默認「」時,此處填寫queue_name,當exchange爲direct,此處爲binding_key

                  mandatory 參見參考文獻2

                  immediate 同上

                  properties 更多屬性,如何設置消息持久化,參見文後demo

                  body 消息體

 

amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,int code);

 

amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,int code);

 

int amqp_destroy_connection(amqp_connection_state_t state);

 

如何consume消息,參見文後demo。

 

demo

https://github.com/liuhaobupt/rabbitmq_work_queues_demo-with-rabbit-c-client-lib

其中 rmq_new_task.c和rmq_worker.c對應於RabbitMQ tutorial裏的work queues章節(http://www.rabbitmq.com/tutorials/tutorial-two-python.html),emit_log_direct.c和receive_logs_direct.c對應於RabbitMQ tutorial裏的routing章節(http://www.rabbitmq.com/tutorials/tutorial-four-python.html),這兩個demo覆蓋了RabbitMQ的經常使用應用場景。

編譯須要librabbitmq.a庫,同時須要rabbitmq-c提供的幾個頭文件(amqp.h和amqp_framing.h)以及utils.c文件,這些在github project頁面都可得到。

參考文獻

  1. rabbitmq-c主頁 http://hg.rabbitmq.com/rabbitmq-c/summary
  2. http://www.rabbitmq.com/amqp-0-9-1-reference.html
相關文章
相關標籤/搜索