一、Linux安裝RabbitMQ。html
參考網址:RPM安裝RabbitMQ 仔細閱讀。python
先安裝erlang:react
su -c 'rpm -Uvh http://mirrors.neusoft.edu.cn/epel/epel-release-latest-7.noarch.rpm' ... su -c 'yum install foo'
####################################
建議使用這個安裝erlang庫
# 這裏還有http://www.rabbitmq.com/releases/erlang/ erlang的各個版本下載,安裝這個便可。
####################################wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
yum install erlang
下載rabbitmq 的rpm包並安裝json
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc yum install rabbitmq-server-3.6.2-1.noarch.rpm
安裝時可能會報須要安裝esl-erlang_16.b.3。這個能夠到這裏下載https://www.erlang-solutions.com/resources/download.html服務器
二、開始編寫rabbitmq代碼,參照http://www.rabbitmq.com/getstarted.html,重點,下面就不寫具體代碼了。app
cp /usr/share/doc/rabbitmq-server-3.6.2/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config ,service rabbitmq-server restartide
1)遇到的問題,報錯:pika.exceptions.ProbableAuthenticationError,能夠查看日誌(重要):函數
tail -f /var/log/rabbitmq/rabbit\@Minion.logoop
=ERROR REPORT==== 27-Jun-2016::02:06:19 ===
Error on AMQP connection <0.542.0> (192.168.243.131:43681 -> 192.168.243.131:5672, state: starting):
PLAIN login refused: user 'guest' can only connect via localhostfetch
解決辦法:修改配置文件:%% {loopback_users, [<<"guest">>]}, 爲 {loopback_users, []} ,service rabbitmq restart,參考http://www.rabbitmq.com/configure.html
二、rabbitmq-server啓動,發送、接收、等各類命令執行慢的出奇,緣由是dns配置問題。
三、no_ack = False(默認),就拿最簡單的"hello world「來講,啓動兩個recive.py,callback()函數裏面根據接收的消息的dot數來sleep,在send.py端連續發送7個消息(帶6個點),這時中止一個recive.py,會看到這7個消息會發送到另外一個recive.py。可是這裏你會發現執行rabbitmqctl list_queues顯示隊列的消息數並無減小。
這裏呀no_ack = False應該只表示 revice告訴queue,我接收完消息會發acK的,可是發不發ack由:ch.basic_ack(delivery_tag = method.delivery_tag)控制,這個能夠寫到callback()最後面。
rabbitmqctl list_queues name messages_ready messages_unacknowledged能夠看到沒有收到ack的消息數量。
四、消息和隊列持久化
隊列持久化:
channel.queue_declare(queue='hello', durable=True)
消息持久化:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.
五、公平分發
In order to defeat that we can use the basic.qos method with the prefetch_count=1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
channel.basic_qos(prefetch_count=1)
六、消息廣播 ,下面這種方式當先開send的話,send的消息會丟失。
發送端定義exchange:
channel.exchange_declare(exchange='logs', type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body=message)
接收端綁定queue到exchange。
result = channel.queue_declare(exclusive=True) # exclusive=Ture,當客戶端斷開,則此queue也將隨之被銷燬。
channel.queue_bind(exchange='logs', queue=result.method.queue) # 綁定queue 到 exchange
[root@test2 ~]# rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-O86WeoCyhQu0YvjtJo7Dew queue amq.gen-O86WeoCyhQu0YvjtJo7Dew []
logs exchange amq.gen-fYeLgPULbT7hVLgahg21jQ queue amq.gen-fYeLgPULbT7hVLgahg21jQ []
七、消息路由,發送時指定exchange和routing_key,綁定了此routing_key的exchange和queue會接收到此消息。
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with abasic_publish parameter we're going to call it a binding key. This is how we could create a binding with a key:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.
We will use a direct exchange instead. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing keyof the message.
To illustrate that, consider the following setup:
In this setup, we can see the direct exchange X with two queues bound to it. The first queue is bound with binding key orange, and the second has two bindings, one with binding key blackand the other one with green.
In such a setup a message published to the exchange with a routing key orange will be routed to queue Q1. Messages with a routing key of black or green will go to Q2. All other messages will be discarded.
一個binding_key也能夠綁定到多個queue。
It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues. A message with routing key black will be delivered to both Q1 and Q2.
發送端:
Like always we need to create an exchange first:
channel.exchange_declare(exchange='direct_logs', type='direct')
And we're ready to send a message:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
To simplify things we will assume that 'severity' can be one of 'info', 'warning', 'error'.
接收端:
Receiving messages will work just like in the previous tutorial, with one exception - we're going to create a new binding for each severity we're interested in.
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
啓動接收端:python receive_logs_direct.py info warning error
在rabbitmq-server服務器:rabbitmqctl list_bindings
direct_logs exchange amq.gen-W4eTG2YC97A9aMwBVxeJJQ queue error []
direct_logs exchange amq.gen-W4eTG2YC97A9aMwBVxeJJQ queue info []
direct_logs exchange amq.gen-2jS9NghfwuydRFdY5XHkhw queue info []
direct_logs exchange amq.gen-W4eTG2YC97A9aMwBVxeJJQ queue warning []
八、更復雜的消息路由
In this example, we're going to send messages which all describe animals. The messages will be sent with a routing key that consists of three words (two dots). The first word in the routing key will describe a celerity, second a colour and third a species: "<celerity>.<colour>.<species>".
We created three bindings: Q1 is bound with binding key "*.orange.*" and Q2 with "*.*.rabbit" and "lazy.#".
These bindings can be summarised as:
A message with a routing key set to "quick.orange.rabbit" will be delivered to both queues. Message "lazy.orange.elephant" also will go to both of them. On the other hand "quick.orange.fox" will only go to the first queue, and "lazy.brown.fox" only to the second. "lazy.pink.rabbit" will be delivered to the second queue only once, even though it matches two bindings. "quick.brown.fox" doesn't match any binding so it will be discarded.
What happens if we break our contract and send a message with one or four words, like "orange" or "quick.orange.male.rabbit"? Well, these messages won't match any bindings and will be lost.
On the other hand "lazy.orange.male.rabbit", even though it has four words, will match the last binding and will be delivered to the second queue.
九、消息屬性,經過在send端,basic_publish(properties = pika.BasicProperties(xx=xxx)配置。
The AMQP protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
十、RPC
Our RPC will work like this:
The code for rpc_server.py:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
The server code is rather straightforward:
The code for rpc_client.py:
#!/usr/bin/env python import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
The client code is slightly more involved:
Our RPC service is now ready. We can start the server:
$ python rpc_server.py
[x] Awaiting RPC requests
To request a fibonacci number run the client:
$ python rpc_client.py
[x] Requesting fib(30)
The presented design is not the only possible implementation of a RPC service, but it has some important advantages:
Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:
下章將分析上面這幾個問題。