RabbitMQ安裝、配置、基本使用

安裝

ubuntu下參考: https://www.rabbitmq.com/install-debian.htmlhtml

啓動/關閉:python

$ sudo service rabbitmq-server start|stop

修改文件打開數量的限制,提升併發性能

ulimit命令可查看限制信息:shell

Syntax
      ulimit [-acdfHlmnpsStuv] [limit]

Options

   -S   Change and report the soft limit associated with a resource. 
   -H   Change and report the hard limit associated with a resource. 

   -a   All current limits are reported. 
   -c   The maximum size of core files created. 
   -d   The maximum size of a process's data segment. 
   -f   The maximum size of files created by the shell(default option) 
   -l   The maximum size that can be locked into memory. 
   -m   The maximum resident set size. 
   -n   The maximum number of open file descriptors. 
   -p   The pipe buffer size. 
   -s   The maximum stack size. 
   -t   The maximum amount of cpu time in seconds. 
   -u   The maximum number of processes available to a single user. 
   -v   The maximum amount of virtual memory available to the process.

完成下面的兩個步驟,電腦重啓才能生效。ubuntu

一、 引入pam_limits.so緩存

對於ubuntu,在/etc/pam.d/common-session中加入:session

session    required   pam_limits.so

若是/etc/pam.d/common-session-noninteractive存在,也要加入上面的代碼。併發

二、修改/etc/security/limits.confapp

加入如下內容:dom

*               soft     nofile          65536
*               hard     nofile          65536

數值能夠繼續調大。soft的值不能大於hard的值。ide

hard限制智能root修改,soft的限制能夠是進程本身修改。

The hard limit is the ceiling for the soft limit. The soft limit is what is actually enforced for a session or process. This allows the administrator (or user) to set the hard limit to the maximum usage they wish to allow. Other users and processes can then use the soft limit to self-limit their resource usage to even lower levels if they so desire.

重啓電腦後:

$ ulimit -n
65536

另一個更加方便的方法是:在運行rabbitmq-server以前運行ulimit命令:

ulimit -n 32768

能夠把這個命令放入/etc/default/rabbitmq-server文件中。

limits.conf文件自帶說明:

$ cat /etc/security/limits.conf 
# /etc/security/limits.conf
#
#Each line describes a limit for a user in the form:
#
#<domain>        <type>  <item>  <value>
#
#Where:
#<domain> can be:
#        - a user name
#        - a group name, with @group syntax
#        - the wildcard *, for default entry
#        - the wildcard %, can be also used with %group syntax,
#                 for maxlogin limit
#        - NOTE: group and wildcard limits are not applied to root.
#          To apply a limit to the root user, <domain> must be
#          the literal username root.
#
#<type> can have the two values:
#        - "soft" for enforcing the soft limits
#        - "hard" for enforcing hard limits
#
#<item> can be one of the following:
#        - core - limits the core file size (KB)
#        - data - max data size (KB)
#        - fsize - maximum filesize (KB)
#        - memlock - max locked-in-memory address space (KB)
#        - nofile - max number of open files
#        - rss - max resident set size (KB)
#        - stack - max stack size (KB)
#        - cpu - max CPU time (MIN)
#        - nproc - max number of processes
#        - as - address space limit (KB)
#        - maxlogins - max number of logins for this user
#        - maxsyslogins - max number of logins on the system
#        - priority - the priority to run user process with
#        - locks - max number of file locks the user can hold
#        - sigpending - max number of pending signals
#        - msgqueue - max memory used by POSIX message queues (bytes)
#        - nice - max nice priority allowed to raise to values: [-20, 19]
#        - rtprio - max realtime priority
#        - chroot - change root to directory (Debian-specific)
#
#<domain>      <type>  <item>         <value>
#

#*               soft    core            0
#root            hard    core            100000
#*               hard    rss             10000
#@student        hard    nproc           20
#@faculty        soft    nproc           20
#@faculty        hard    nproc           50
#ftp             hard    nproc           0
#ftp             -       chroot          /ftp
#@student        -       maxlogins       4

# End of file

配置

端口占用:

4369 (epmd), 25672 (Erlang distribution)
5672, 5671 (AMQP 0-9-1 without and with TLS)
15672 (if management plugin is enabled)
61613, 61614 (if STOMP is enabled)
1883, 8883 (if MQTT is enabled)

配置文件:

Configuration - 官網
RabbitMQ的安裝,配置,監控

中文入門教程閱讀筆記

教程在http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/,是官方教程的翻譯。

安裝python庫

sudo pip install pika==0.9.5

一些概念

使用「交換機」(exchange)進行路由。

發消息時必須指定一個存在的隊列,或者建立一個隊列,而後向該隊列發送消息。

在RabbitMQ中,消息是不能直接發送到隊列,它須要發送到交換機(exchange)中。

RabbitMq不容許你使用不一樣的參數從新定義一個隊列,它會返回一個錯誤。

隊列能夠設置爲持久化,重啓以後依然存在設置爲持久化的隊列。不過隊列持久化,不表明着消息會持久化,因此消息也要設置持久化參數。RabbitMQ中消息的持久化有些問題:由於RabbitMq並非全部的消息都使用fsync(2)——它有可能只是保存到緩存中,並不必定會立刻寫到硬盤中。

python pika中一個channel對應一個消息隊列。

消息響應默認是開啓的。pika中須要顯式編寫代碼來響應。

一個消息能夠只發送給一個消費者,也能夠發送給多個消費者。發送給多個消費者的模式叫作「訂閱/發佈模式」。

一個交換機上能夠綁定若干隊列。消息從生產者發送給交換機,交換機根據指定的策略,將消息發送給它綁定的隊列。

命令

查看有哪些隊列?隊列中有多少消息?

$ sudo rabbitmqctl list_queues
Listing queues ...
hello    0
...done.

查看全部的交換器及其類型:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

amq.*的交換器是默認建立的。

查看交換機和隊列的綁定關係:

$ sudo rabbitmqctl list_bindings

代碼示例1

來自http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/tutorials_with_python/%5B1%5DHello_World/

send.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

receive.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()

代碼示例2

來自http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/tutorials_with_python/%5B2%5DWork_Queues/

new_task.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()

worker.py:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

代碼示例3

來自http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/tutorials_with_python/%5B3%5DPublish_Subscribe/

emit_log.py :

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print " [x] Sent %r" % (message,)
connection.close()

receive_logs.py :

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

代碼示例4

來自http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/tutorials_with_python/%5B4%5DRouting/

emit_log_direct.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

receive_logs_direct.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

代碼示例5

來自http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/tutorials_with_python/%5B5%5DTopics/

emit_log_topic.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

receive_logs_topic.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

參考:

Installing RabitMQ on Debian / Ubuntu

Permanently set process limit

深刻Linux PAM體系結構 - 紅黑聯盟

Pluggable authentication module - wikipedia

Soft limit vs Hard limit?

ulimit: difference between hard and soft limits

官方英文入門教程

相關文章
相關標籤/搜索