rabbitmq消息流轉分析

鏈接(connection):消費者或者生產者與消息中間件創建的tcp鏈接; app

頻道(channel):也叫信道,tcp鏈接創建以後,必須如今鏈接上開頻道,才能進行其餘操做(緣由?socket

登陸(logging):創建頻道以後,要登陸到特定的虛擬機,一組虛擬機持有一組交換機和隊列,其餘虛擬機用戶沒法訪問當前用戶對應的虛擬機中的交換機和隊列; tcp

交換機(exchange):在rabbitmq消息中間件啓動時就會建立一個默認的交換機(固然也能夠人爲建立),與鏈接無關,負責整個消息中間件中消息的投遞;交換機不會存儲消息, ui

若是沒有任何隊列與之綁定,那麼交換機會丟棄收到的消息; this

隊列(queue):用來存儲交換機投遞過來的消息,經過路由鍵與交換機綁定,進行消息的持久化存儲; spa

隊列由消費者或者生產者連上消息中間件後自行建立,人爲指定隊列名稱,若是當前建立的隊列rabbitmq上已經存在,rabbitmq不會重複建立; rest

路由鍵(routingkey):交換機和隊列進行消息投遞的識別碼,人爲指定; 中間件

1、生產者發送消息: rabbitmq

conn = amqp_new_connection();// 隊列

socket = amqp_tcp_socket_new(conn);

status = amqp_socket_open(socket, hostname, port);

amqp_login(conn, "ois", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "ois", "1");

amqp_channel_open(conn, 1);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

amqp_bytes_t queuename;

{

     amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 0, 0, 1,

         amqp_empty_table);

     die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");

     queuename = amqp_bytes_malloc_dup(r->queue);

     if (queuename.bytes == NULL) {

         fprintf(stderr, "Out of memory while copying queue name");

         return 1;

     }

}//建立名稱爲myqueue的隊列

amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey),

     amqp_empty_table);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");//將隊列myqueue經過路由鍵routingkey綁定到交換機exchange上

 

for (;;)

{

amqp_basic_properties_t props;

props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;

props.content_type = amqp_cstring_bytes("text/plain");

props.delivery_mode = 2; /* persistent delivery mode */

die_on_error(amqp_basic_publish(conn,

1,

amqp_cstring_bytes(exchange),

amqp_cstring_bytes(routingkey),

0,

0,

&props,

amqp_cstring_bytes("test message")),

"Publishing");

    microsleep(1*1000*100);

}//生產者發佈消息,發佈消息須要指定接收消息的交換機,以及路由鍵,交換機須要根據路由鍵投遞消息到具體的隊列中

2、消費者獲取消息進行處理

static void run(amqp_connection_state_t conn)

{

uint64_t start_time = now_microseconds();

int received = 0;

int previous_received = 0;

uint64_t previous_report_time = start_time;

uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

 

amqp_frame_t frame;

 

uint64_t now;

 

for (;;) {

amqp_rpc_reply_t ret;

amqp_envelope_t envelope;

 

now = now_microseconds();

if (now > next_summary_time) {

int countOverInterval = received - previous_received;

double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);

printf("%d ms: Received %d - %d since last report (%d Hz)\n",

(int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);

 

previous_received = received;

previous_report_time = now;

next_summary_time += SUMMARY_EVERY_US;

}

 

amqp_maybe_release_buffers(conn);

ret = amqp_consume_message(conn, &envelope, NULL, 0);//定時獲取隊列中的消息

 

if (AMQP_RESPONSE_NORMAL != ret.reply_type) {

if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&

AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {

if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {

return;

}

 

if (AMQP_FRAME_METHOD == frame.frame_type) {

switch (frame.payload.method.id) {

case AMQP_BASIC_ACK_METHOD:

/* if we've turned publisher confirms on, and we've published a message

* here is a message being confirmed

*/

 

break;

case AMQP_BASIC_RETURN_METHOD:

/* if a published message couldn't be routed and the mandatory flag was set

* this is what would be returned. The message then needs to be read.

*/

{

amqp_message_t message;

ret = amqp_read_message(conn, frame.channel, &message, 0);

if (AMQP_RESPONSE_NORMAL != ret.reply_type) {

return;

}

 

amqp_destroy_message(&message);

}

 

break;

 

case AMQP_CHANNEL_CLOSE_METHOD:

/* a channel.close method happens when a channel exception occurs, this

* can happen by publishing to an exchange that doesn't exist for example

*

* In this case you would need to open another channel redeclare any queues

* that were declared auto-delete, and restart any consumers that were attached

* to the previous channel

*/

return;

 

case AMQP_CONNECTION_CLOSE_METHOD:

/* a connection.close method happens when a connection exception occurs,

* this can happen by trying to use a channel that isn't open for example.

*

* In this case the whole connection must be restarted.

*/

return;

 

default:

fprintf(stderr ,"An unexpected method was received %u\n", frame.payload.method.id);

return;

}

}

}

 

} else {

amqp_destroy_envelope(&envelope);

}

 

received++;

}

}

 

int main(int argc, char const *const *argv)

{

conn = amqp_new_connection();

socket = amqp_tcp_socket_new(conn);

status = amqp_socket_open(socket, hostname, port);

die_on_amqp_error(amqp_login(conn, "ot", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),

"Logging in");

amqp_channel_open(conn, 1);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

{

amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,

amqp_empty_table);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");

queuename = amqp_bytes_malloc_dup(r->queue);

if (queuename.bytes == NULL) {

fprintf(stderr, "Out of memory while copying queue name");

return 1;

}

}

 

amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),

amqp_empty_table);//綁定消息隊列

die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");

 

amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");//指定這是一個消費者

 

run(conn);

 

die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");

die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");

die_on_error(amqp_destroy_connection(conn), "Ending connection");

 

return 0;

}

相關文章
相關標籤/搜索