RabbitMQ基礎知識篇

一、Linux安裝RabbitMQ。html

參考網址:RPM安裝RabbitMQ   仔細閱讀。python

先安裝erlang:react

su -c 'rpm -Uvh http://mirrors.neusoft.edu.cn/epel/epel-release-latest-7.noarch.rpm'
...
su -c 'yum install foo'

####################################
建議使用這個安裝erlang庫
#  這裏還有http://www.rabbitmq.com/releases/erlang/  erlang的各個版本下載,安裝這個便可。
####################################wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

yum install erlang

下載rabbitmq 的rpm包並安裝json

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.2-1.noarch.rpm

安裝時可能會報須要安裝esl-erlang_16.b.3。這個能夠到這裏下載https://www.erlang-solutions.com/resources/download.html服務器

 

二、開始編寫rabbitmq代碼,參照http://www.rabbitmq.com/getstarted.html,重點,下面就不寫具體代碼了。app

cp /usr/share/doc/rabbitmq-server-3.6.2/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config ,service rabbitmq-server restartide

 

1)遇到的問題,報錯:pika.exceptions.ProbableAuthenticationError,能夠查看日誌(重要):函數

tail -f /var/log/rabbitmq/rabbit\@Minion.logoop

=ERROR REPORT==== 27-Jun-2016::02:06:19 ===
Error on AMQP connection <0.542.0> (192.168.243.131:43681 -> 192.168.243.131:5672, state: starting):
PLAIN login refused: user 'guest' can only connect via localhostfetch

解決辦法:修改配置文件:%% {loopback_users, [<<"guest">>]},   爲 {loopback_users, []}     ,service rabbitmq restart,參考http://www.rabbitmq.com/configure.html

二、rabbitmq-server啓動,發送、接收、等各類命令執行慢的出奇,緣由是dns配置問題。

三、no_ack = False(默認),就拿最簡單的"hello world「來講,啓動兩個recive.py,callback()函數裏面根據接收的消息的dot數來sleep,在send.py端連續發送7個消息(帶6個點),這時中止一個recive.py,會看到這7個消息會發送到另外一個recive.py。可是這裏你會發現執行rabbitmqctl list_queues顯示隊列的消息數並無減小。

這裏呀no_ack = False應該只表示 revice告訴queue,我接收完消息會發acK的,可是發不發ack由:ch.basic_ack(delivery_tag = method.delivery_tag)控制,這個能夠寫到callback()最後面。

rabbitmqctl list_queues name messages_ready messages_unacknowledged能夠看到沒有收到ack的消息數量。

四、消息和隊列持久化

隊列持久化:
channel.queue_declare(queue='hello', durable=True)
消息持久化:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))

Note on message persistence

Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.

五、公平分發

In order to defeat that we can use the basic.qos method with the prefetch_count=1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

channel.basic_qos(prefetch_count=1)

 

 六、消息廣播 ,下面這種方式當先開send的話,send的消息會丟失。

發送端定義exchange:

channel.exchange_declare(exchange='logs', type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body=message)

 接收端綁定queue到exchange。

result = channel.queue_declare(exclusive=True) # exclusive=Ture,當客戶端斷開,則此queue也將隨之被銷燬。
channel.queue_bind(exchange='logs', queue=result.method.queue) # 綁定queue 到 exchange

[root@test2 ~]# rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-O86WeoCyhQu0YvjtJo7Dew queue amq.gen-O86WeoCyhQu0YvjtJo7Dew []
logs exchange amq.gen-fYeLgPULbT7hVLgahg21jQ queue amq.gen-fYeLgPULbT7hVLgahg21jQ []

七、消息路由,發送時指定exchange和routing_key,綁定了此routing_key的exchange和queue會接收到此消息。

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.

Bindings can take an extra routing_key parameter. To avoid the confusion with abasic_publish parameter we're going to call it a binding key. This is how we could create a binding with a key:

channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black') 

The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.

We will use a direct exchange instead. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing keyof the message.

To illustrate that, consider the following setup:

In this setup, we can see the direct exchange X with two queues bound to it. The first queue is bound with binding key orange, and the second has two bindings, one with binding key blackand the other one with green.

In such a setup a message published to the exchange with a routing key orange will be routed to queue Q1. Messages with a routing key of black or green will go to Q2. All other messages will be discarded.

一個binding_key也能夠綁定到多個queue。

It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues. A message with routing key black will be delivered to both Q1 and Q2.

發送端:

Like always we need to create an exchange first:

channel.exchange_declare(exchange='direct_logs', type='direct') 

And we're ready to send a message:

channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) 

To simplify things we will assume that 'severity' can be one of 'info', 'warning', 'error'.

接收端:

Receiving messages will work just like in the previous tutorial, with one exception - we're going to create a new binding for each severity we're interested in.

result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

啓動接收端:python receive_logs_direct.py info warning error

在rabbitmq-server服務器:rabbitmqctl list_bindings

direct_logs exchange amq.gen-W4eTG2YC97A9aMwBVxeJJQ queue error []
direct_logs exchange amq.gen-W4eTG2YC97A9aMwBVxeJJQ queue info []

direct_logs     exchange        amq.gen-2jS9NghfwuydRFdY5XHkhw  queue   info    []
direct_logs exchange amq.gen-W4eTG2YC97A9aMwBVxeJJQ queue warning []

 

八、更復雜的消息路由

In this example, we're going to send messages which all describe animals. The messages will be sent with a routing key that consists of three words (two dots). The first word in the routing key will describe a celerity, second a colour and third a species: "<celerity>.<colour>.<species>".

We created three bindings: Q1 is bound with binding key "*.orange.*" and Q2 with "*.*.rabbit" and "lazy.#".

These bindings can be summarised as:

  • Q1 is interested in all the orange animals.
  • Q2 wants to hear everything about rabbits, and everything about lazy animals.

A message with a routing key set to "quick.orange.rabbit" will be delivered to both queues. Message "lazy.orange.elephant" also will go to both of them. On the other hand "quick.orange.fox" will only go to the first queue, and "lazy.brown.fox" only to the second. "lazy.pink.rabbit" will be delivered to the second queue only once, even though it matches two bindings. "quick.brown.fox" doesn't match any binding so it will be discarded.

What happens if we break our contract and send a message with one or four words, like "orange" or "quick.orange.male.rabbit"? Well, these messages won't match any bindings and will be lost.

On the other hand "lazy.orange.male.rabbit", even though it has four words, will match the last binding and will be delivered to the second queue.

九、消息屬性,經過在send端,basic_publish(properties = pika.BasicProperties(xx=xxx)配置。

Message properties

The AMQP protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:

  • delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
  • content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to:application/json.
  • reply_to: Commonly used to name a callback queue.
  • correlation_id: Useful to correlate RPC responses with requests.

十、RPC

 

 

Our RPC will work like this:

  • When the Client starts up, it creates an anonymous exclusive callback queue.
  • For an RPC request, the Client sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
  • The request is sent to an rpc_queue queue.
  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.

The code for rpc_server.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

The server code is rather straightforward:

  • (4) As usual we start by establishing the connection and declaring the queue.
  • (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, it's probably the slowest recursive implementation possible).
  • (19) We declare a callback for basic_consume, the core of the RPC server. It's executed when the request is received. It does the work and sends the response back.
  • (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set the prefetch_count setting.

The code for rpc_client.py:

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

The client code is slightly more involved:

  • (7) We establish a connection, channel and declare an exclusive 'callback' queue for replies.
  • (16) We subscribe to the 'callback' queue, so that we can receive RPC responses.
  • (18) The 'on_response' callback executed on every response is doing a very simple job, for every response message it checks if the correlation_id is the one we're looking for. If so, it saves the response in self.response and breaks the consuming loop.
  • (23) Next, we define our main call method - it does the actual RPC request.
  • (24) In this method, first we generate a unique correlation_id number and save it - the 'on_response' callback function will use this value to catch the appropriate response.
  • (25) Next, we publish the request message, with two properties: reply_to andcorrelation_id.
  • (32) At this point we can sit back and wait until the proper response arrives.
  • (33) And finally we return the response back to the user.

Our RPC service is now ready. We can start the server:

$ python rpc_server.py
 [x] Awaiting RPC requests 

To request a fibonacci number run the client:

$ python rpc_client.py
 [x] Requesting fib(30) 

The presented design is not the only possible implementation of a RPC service, but it has some important advantages:

  • If the RPC server is too slow, you can scale up by just running another one. Try running a second rpc_server.py in a new console.
  • On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queue_declare are required. As a result the RPC client needs only one network round trip for a single RPC request.

Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:

  • How should the client react if there are no servers running?
  • Should a client have some kind of timeout for the RPC?
  • If the server malfunctions and raises an exception, should it be forwarded to the client?
  • Protecting against invalid incoming messages (eg checking bounds) before processing.

 

下章將分析上面這幾個問題。

相關文章
相關標籤/搜索