AMQP 0-9-1 Model Explained Why does the queue memory grow and shrink when publishing/consuming? AMQP

 AMQP 0-9-1 Model Explained — RabbitMQ http://next.rabbitmq.com/tutorials/amqp-concepts.htmlhtml

AMQP 0-9-1 Model Explained

About This Guide

This guide provides an overview of the the AMQP 0-9-1 protocol, one of the protocols supported by RabbitMQ.node

High-level Overview of AMQP 0-9-1 and the AMQP Model

What is AMQP 0-9-1?

AMQP 0-9-1 (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.python

Brokers and Their Role

Messaging brokers receive messages from publishers (applications that publish them, also known as producers) and route them to consumers (applications that process them).shell

Since it is a network protocol, the publishers, consumers and the broker can all reside on different machines.express

AMQP 0-9-1 Model in Brief

The AMQP 0-9-1 Model has the following view of the world: messages are published toexchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules called bindings. Then AMQP brokers either deliver messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand.編程

 

 

 

publisher發佈者--->messages消息--->exchages郵局,信箱---->queues隊列 ...consumers消費者json

 

exchange郵局按照必定的規則rules/bindings將消息message複製copy到隊列queue安全

 

AMQPbrokers將消息發送deliver給訂閱隊列queue的消費者consumerbash

或者服務器

消費者consumer按照須要on demand去拉取fetch/pull消息messages

 

【對代理不透明部分】

消息的元數據中對代理brocker:透明opaque、不透明部分

 

When publishing a message, publishers may specify various message attributes (message meta-data). Some of this meta-data may be used by the broker, however, the rest of it is completely opaque to the broker and is only used by applications that receive the message.

message acknowledgements

消費者在收到消息後,能夠發送消息確認,也能夠不發送;如發送,則代理broker將消息從隊列中刪除remove

Networks are unreliable and applications may fail to process messages therefore the AMQP model has a notion of message acknowledgements: when a message is delivered to a consumer the consumer notifies the broker, either automatically or as soon as the application developer chooses to do so. When message acknowledgements are in use, a broker will only completely remove a message from a queue when it receives a notification for that message (or group of messages).

【沒有接收路由的狀況dead letter queue】

在消息message不能被路由routed的狀況下,消息被broker投入死信隊列dead letter queue;這種狀況的處理由發佈者publisher決定

In certain situations, for example, when a message cannot be routed, messages may bereturned to publishers, dropped, or, if the broker implements an extension, placed into a so-called "dead letter queue". Publishers choose how to handle situations like this by publishing messages using certain parameters.

Queues, exchanges and bindings are collectively referred to as AMQP entities.

 

【Queues, exchanges and bindings are collectively referred to as AMQP entities

 

 

AMQP is a Programmable Protocol

AMQP 0-9-1 is a programmable protocol in the sense that AMQP 0-9-1 entities and routing schemes are primarily defined by applications themselves, not a broker administrator. Accordingly, provision is made for protocol operations that declare queues and exchanges, define bindings between them, subscribe to queues and so on.

This gives application developers a lot of freedom but also requires them to be aware of potential definition conflicts. In practice, definition conflicts are rare and often indicate a misconfiguration.

【刪除郵局、隊列的理論依據】

Applications declare the AMQP 0-9-1 entities that they need, define necessary routing schemes and may choose to delete AMQP 0-9-1 entities when they are no longer used.

 

【協議是國際標準】

 Home | AMQP http://www.amqp.org/

Advanced Message Queuing Protocol 1.0 approved as an International Standard

Click above for the press release.  The International Standard (ISO/IEC 19464) can be down loadedhere.

See this presentation to learn more about AMQP and its value.

 

Advanced Message Queuing Protocol 1.0 becomes OASIS Standard

Click above for the press release.  The Standard can be down loaded here.

See the executive briefing paper on the value proposition of OASIS AMQP to learn more.

AMQP_百度百科 https://baike.baidu.com/item/AMQP/8354716?fr=aladdin

 

【動手所得】


一、流程啓動server;
二、配置用戶權限;
三、外網/內網發佈、接收消息;

systemctl start rabbitmq-server

/是rabbitmq默認的虛擬機,以前默認鏈接的都是它
建立一個用戶
rabbitmqctl add_user username password
爲用戶分配角色
rabbitmqctl set_user_tags username administrator
#Tags 能夠是:administrator, monitoring, management
設置訪問權限
rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"

 

【文檔】

http://www.rabbitmq.com/admin-guide.html

http://www.rabbitmq.com/clients.html

http://www.rabbitmq.com/man/rabbitmqctl.8.html http://www.rabbitmq.com/man/rabbitmqctl.8.html

NAME

rabbitmqctl — command line for managing a RabbitMQ broker

SYNOPSIS

rabbitmqctl [-q] [-l] [-n node] [-t timeoutcommand [command_options]

DESCRIPTION

RabbitMQ is a multi-protocol open source messaging broker.

 

rabbitmqctl is a command line tool for managing a RabbitMQ broker. It performs all actions by connecting to one of the broker's nodes.

 

Diagnostic information is displayed if the broker was not running, could not be reached, or rejected the connection due to mismatching Erlang cookies.

OPTIONS

-n  node
Default node is 「rabbit@ server」, where  server is the local host. On a host named 「myserver.example.com」, the node name of the RabbitMQ Erlang node will usually be 「rabbit@myserver」 (unless  RABBITMQ_NODENAME has been set to some non-default value at broker startup time). The output of 「hostname -s」 is usually the correct suffix to use after the 「@」 sign. See  rabbitmq-server(8) for details of configuring the RabbitMQ broker.
-q--quiet
Quiet output mode is selected. Informational messages are suppressed when quiet mode is in effect.
--dry-run
Do not run the command. Only print information message.
-t  timeout--timeout  timeout
Operation timeout in seconds. Only applicable to 「list」 commands. Default is  infinity.
-l--longnames
Use longnames for erlang distribution. If RabbitMQ broker uses long node names for erlang distribution, the option must be specified.
--erlang-cookie  cookie
Erlang distribution cookie. If RabbitMQ node is using a custom erlang cookie value, the cookie value must be set vith this parameter.

COMMANDS

help [ -l] [command_name]
Prints usage for all available commands.
-l--list-commands
List command usages only, without parameter explanation.
command_name
Prints usage for the specified command.

Application Management

force_reset
Forcefully returns a RabbitMQ node to its virgin state.
 
The  force_reset command differs from  reset in that it resets the node unconditionally, regardless of the current management database state and cluster configuration. It should only be used as a last resort if the database or cluster configuration has been corrupted.
 
For  reset and  force_reset to succeed the RabbitMQ application must have been stopped, e.g. with  stop_app.
 
For example, to reset the RabbitMQ node:
 
rabbitmqctl force_reset
hipe_compile  directory
Performs HiPE-compilation and caches resulting  .beam-files in the given directory.
 
Parent directories are created if necessary. Any existing  .beam files from the directory are automatically deleted prior to compilation.
 
To use this precompiled files, you should set  RABBITMQ_SERVER_CODE_PATH environment variable to directory specified in  hipe_compile invokation.
 
For example, to HiPE-compile modules and store them to  /tmp/rabbit-hipe/ebin directory:
 
rabbitmqctl hipe_compile /tmp/rabbit-hipe/ebin
reset
Returns a RabbitMQ node to its virgin state.
 
Removes the node from any cluster it belongs to, removes all data from the management database, such as configured users and vhosts, and deletes all persistent messages.
 
For  reset and  force_reset to succeed the RabbitMQ application must have been stopped, e.g. with  stop_app.
 
For example, to resets the RabbitMQ node:
 
rabbitmqctl reset
rotate_logs
Instructs the RabbitMQ node to perform internal log rotation.
 
Log rotation is performed according to lager settings specified in configuration file.
 
Note that there is no need to call this command in case of external log rotation (e.g. from logrotate(8)), because lager detects renames and automatically reopens log files.
 
For example, this command starts internal log rotation process:
 
rabbitmqctl rotate_logs
 
Rotation is performed asynchronously, so there is no guarantee that it will be completed when this command returns.
shutdown
Shuts down the Erlang process on which RabbitMQ is running. The command is blocking and will return after the Erlang process exits. If RabbitMQ fails to stop, it will return a non-zero exit code.
 
Unlike the stop command, the shutdown command:
  • does not require a pid_file to wait for the Erlang process to exit
  • returns a non-zero exit code if RabbitMQ node is not running
 
For example, to shut down the Erlang process on which RabbitMQ is running:
 
rabbitmqctl shutdown
start_app
Starts the RabbitMQ application.
 
This command is typically run after performing other management actions that required the RabbitMQ application to be stopped, e.g.  reset.
 
For example, to instruct the RabbitMQ node to start the RabbitMQ application:
 
rabbitmqctl start_app
stop [ pid_file]
Stops the Erlang node on which RabbitMQ is running. To restart the node follow the instructions for 「Running the Server」 in the  installation guide.
 
If a  pid_file is specified, also waits for the process specified there to terminate. See the description of the  wait command for details on this file.
 
For example, to instruct the RabbitMQ node to terminate:
 
rabbitmqctl stop
stop_app
Stops the RabbitMQ application, leaving the Erlang node running.
 
This command is typically run prior to performing other management actions that require the RabbitMQ application to be stopped, e.g.  reset.
 
For example, to instruct the RabbitMQ node to stop the RabbitMQ application:
 
rabbitmqctl stop_app
wait  pid_filewait  --pid  pid
Waits for the RabbitMQ application to start.
 
This command will wait for the RabbitMQ application to start at the node. It will wait for the pid file to be created if  pidfile is specified, then for a process with a pid specified in the pid file or the  --pid argument, and then for the RabbitMQ application to start in that process. It will fail if the process terminates without starting the RabbitMQ application.
 
If the specified pidfile is not created or erlang node is not started within  --timeout the command will fail. Default timeout is 10 seconds.
 
A suitable pid file is created by the  rabbitmq-server(8) script. By default this is located in the Mnesia directory. Modify the  RABBITMQ_PID_FILE environment variable to change the location.
 
For example, this command will return when the RabbitMQ node has started up:
 
rabbitmqctl wait /var/run/rabbitmq/pid

Cluster Management

join_cluster  clusternode [ --ram]
clusternode
Node to cluster with.
--ram
If provided, the node will join the cluster as a RAM node.
 
Instructs the node to become a member of the cluster that the specified node is in. Before clustering, the node is reset, so be careful when using this command. For this command to succeed the RabbitMQ application must have been stopped, e.g. with  stop_app.
 
Cluster nodes can be of two types: disc or RAM. Disc nodes replicate data in RAM and on disc, thus providing redundancy in the event of node failure and recovery from global events such as power failure across all nodes. RAM nodes replicate data in RAM only (with the exception of queue contents, which can reside on disc if the queue is persistent or too big to fit in memory) and are mainly used for scalability. RAM nodes are more performant only when managing resources (e.g. adding/removing queues, exchanges, or bindings). A cluster must always have at least one disc node, and usually should have more than one.
 
The node will be a disc node by default. If you wish to create a RAM node, provide the  --ram flag.
 
After executing the  join_cluster command, whenever the RabbitMQ application is started on the current node it will attempt to connect to the nodes that were in the cluster when the node went down.
 
To leave a cluster,  reset the node. You can also remove nodes remotely with the  forget_cluster_node command.
 
For more details see the  Clustering guide.
 
For example, this command instructs the RabbitMQ node to join the cluster that 「hare@elena」 is part of, as a ram node:
 
rabbitmqctl join_cluster hare@elena --ram
cluster_status
Displays all the nodes in the cluster grouped by node type, together with the currently running nodes.
 
For example, this command displays the nodes in the cluster:
 
rabbitmqctl cluster_status
change_cluster_node_type  type
Changes the type of the cluster node.
 
The  type must be one of the following:
 
The node must be stopped for this operation to succeed, and when turning a node into a RAM node the node must not be the only disc node in the cluster.
 
For example, this command will turn a RAM node into a disc node:
 
rabbitmqctl change_cluster_node_type disc
forget_cluster_node [ --offline]
--offline
Enables node removal from an offline node. This is only useful in the situation where all the nodes are offline and the last node to go down cannot be brought online, thus preventing the whole cluster from starting. It should not be used in any other circumstances since it can lead to inconsistencies.
 
Removes a cluster node remotely. The node that is being removed must be offline, while the node we are removing from must be online, except when using the  --offline flag.
 
When using the  --offline flag ,  rabbitmqctl will not attempt to connect to a node as normal; instead it will temporarily become the node in order to make the change. This is useful if the node cannot be started normally. In this case the node will become the canonical source for cluster metadata (e.g. which queues exist), even if it was not before. Therefore you should use this command on the latest node to shut down if at all possible.
 
For example, this command will remove the node 「rabbit@stringer」 from the node 「hare@mcnulty」:
 
rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer
rename_cluster_node  oldnode1  newnode1 [ oldnode2 newnode2 ...]
Supports renaming of cluster nodes in the local database.
 
This subcommand causes  rabbitmqctl to temporarily become the node in order to make the change. The local cluster node must therefore be completely stopped; other nodes can be online or offline.
 
This subcommand takes an even number of arguments, in pairs representing the old and new names for nodes. You must specify the old and new names for this node and for any other nodes that are stopped and being renamed at the same time.
 
It is possible to stop all nodes and rename them all simultaneously (in which case old and new names for all nodes must be given to every node) or stop and rename nodes one at a time (in which case each node only needs to be told how its own name is changing).
 
For example, this command will rename the node 「rabbit@misshelpful」 to the node 「rabbit@cordelia」
 
rabbitmqctl rename_cluster_node rabbit@misshelpful rabbit@cordelia
update_cluster_nodes  clusternode
clusternode
The node to consult for up-to-date information.
 
Instructs an already clustered node to contact  clusternode to cluster when waking up. This is different from  join_cluster since it does not join any cluster - it checks that the node is already in a cluster with  clusternode.
 
The need for this command is motivated by the fact that clusters can change while a node is offline. Consider the situation in which node  A and  B are clustered.  A goes down,  C clusters with  B, and then  B leaves the cluster. When  Awakes up, it'll try to contact  B, but this will fail since  B is not in the cluster anymore. The following command will solve this situation:
 
update_cluster_nodes -n A C
force_boot
Ensures that the node will start next time, even if it was not the last to shut down.
 
Normally when you shut down a RabbitMQ cluster altogether, the first node you restart should be the last one to go down, since it may have seen things happen that other nodes did not. But sometimes that's not possible: for instance if the entire cluster loses power then all nodes may think they were not the last to shut down.
 
In such a case you can invoke  force_boot while the node is down. This will tell the node to unconditionally start next time you ask it to. If any changes happened to the cluster after this node shut down, they will be lost.
 
If the last node to go down is permanently lost then you should use  forget_cluster_node  --offline in preference to this command, as it will ensure that mirrored queues which were mastered on the lost node get promoted.
 
For example, this will force the node not to wait for other nodes next time it is started:
 
rabbitmqctl force_boot
sync_queue [ -p vhostqueue
queue
The name of the queue to synchronise.
 
Instructs a mirrored queue with unsynchronised slaves to synchronise itself. The queue will block while synchronisation takes place (all publishers to and consumers from the queue will block). The queue must be mirrored for this command to succeed.
 
Note that unsynchronised queues from which messages are being drained will become synchronised eventually. This command is primarily useful for queues which are not being drained.
cancel_sync_queue [ -p vhostqueue
queue
The name of the queue to cancel synchronisation for.
 
Instructs a synchronising mirrored queue to stop synchronising itself.
purge_queue [ -p vhostqueue
queue
The name of the queue to purge.
 
Purges a queue (removes all messages in it).
set_cluster_name  name
Sets the cluster name to  name. The cluster name is announced to clients on connection, and used by the federation and shovel plugins to record where a message has been. The cluster name is by default derived from the hostname of the first node in the cluster, but can be changed.
 
For example, this sets the cluster name to 「london」:
 
rabbitmqctl set_cluster_name london

User Management

Note that rabbitmqctl manages the RabbitMQ internal user database. Users from any alternative authentication backend will not be visible to rabbitmqctl.

add_user  username  password
username
The name of the user to create.
password
The password the created user will use to log in to the broker.
 
For example, this command instructs the RabbitMQ broker to create a (non-administrative) user named 「tonyg」 with (initial) password 「changeit」:
 
rabbitmqctl add_user tonyg changeit
delete_user  username
username
The name of the user to delete.
 
For example, this command instructs the RabbitMQ broker to delete the user named 「tonyg」:
 
rabbitmqctl delete_user tonyg
change_password  username  newpassword
username
The name of the user whose password is to be changed.
newpassword
The new password for the user.
 
For example, this command instructs the RabbitMQ broker to change the password for the user named 「tonyg」 to 「newpass」:
 
rabbitmqctl change_password tonyg newpass
clear_password  username
username
The name of the user whose password is to be cleared.
 
For example, this command instructs the RabbitMQ broker to clear the password for the user named 「tonyg」:
 
rabbitmqctl clear_password tonyg
 
This user now cannot log in with a password (but may be able to through e.g. SASL EXTERNAL if configured).
authenticate_user  username  password
username
The name of the user.
password
The password of the user.
 
For example, this command instructs the RabbitMQ broker to authenticate the user named 「tonyg」 with password 「verifyit」:
 
rabbitmqctl authenticate_user tonyg verifyit
set_user_tags  username [ tag ...]
username
The name of the user whose tags are to be set.
tag
Zero, one or more tags to set. Any existing tags will be removed.
 
For example, this command instructs the RabbitMQ broker to ensure the user named 「tonyg」 is an administrator:
 
rabbitmqctl set_user_tags tonyg administrator
 
This has no effect when the user logs in via AMQP, but can be used to permit the user to manage users, virtual hosts and permissions when the user logs in via some other means (for example with the management plugin).
 
This command instructs the RabbitMQ broker to remove any tags from the user named 「tonyg」:
 
rabbitmqctl set_user_tags tonyg
list_users
Lists users. Each result row will contain the user name followed by a list of the tags set for that user.
 
For example, this command instructs the RabbitMQ broker to list all users:
 
rabbitmqctl list_users

Access Control

Note that rabbitmqctl manages the RabbitMQ internal user database. Permissions for users from any alternative authorisation backend will not be visible to rabbitmqctl.

add_vhost  vhost
vhost
The name of the virtual host entry to create.
 
Creates a virtual host.
 
For example, this command instructs the RabbitMQ broker to create a new virtual host called 「test」:
 
rabbitmqctl add_vhost test
delete_vhost  vhost
vhost
The name of the virtual host entry to delete.
 
Deletes a virtual host.
 
Deleting a virtual host deletes all its exchanges, queues, bindings, user permissions, parameters and policies.
 
For example, this command instructs the RabbitMQ broker to delete the virtual host called 「test」:
 
rabbitmqctl delete_vhost test
list_vhosts [ vhostinfoitem ...]
Lists virtual hosts.
 
The  vhostinfoitem parameter is used to indicate which virtual host information items to include in the results. The column order in the results will match the order of the parameters.  vhostinfoitem can take any value from the list that follows:
name
The name of the virtual host with non-ASCII characters escaped as in C.
tracing
Whether tracing is enabled for this virtual host.
 
If no  vhostinfoitem are specified then the vhost name is displayed.
 
For example, this command instructs the RabbitMQ broker to list all virtual hosts:
 
rabbitmqctl list_vhosts name tracing
set_permissions [ -p vhostuser conf write read
vhost
The name of the virtual host to which to grant the user access, defaulting to 「/」.
user
The name of the user to grant access to the specified virtual host.
conf
A regular expression matching resource names for which the user is granted configure permissions.
write
A regular expression matching resource names for which the user is granted write permissions.
read
A regular expression matching resource names for which the user is granted read permissions.
 
Sets user permissions.
 
For example, this command instructs the RabbitMQ broker to grant the user named 「tonyg」 access to the virtual host called 「/myvhost」, with configure permissions on all resources whose names starts with 「tonyg-」, and write and read permissions on all resources:
 
rabbitmqctl set_permissions -p /myvhost tonyg 「^tonyg-.*」 「.*」 「.*」
clear_permissions [ -p vhostusername
vhost
The name of the virtual host to which to deny the user access, defaulting to 「/」.
username
The name of the user to deny access to the specified virtual host.
 
Sets user permissions.
 
For example, this command instructs the RabbitMQ broker to deny the user named 「tonyg」 access to the virtual host called 「/myvhost」:
 
rabbitmqctl clear_permissions -p /myvhost tonyg
list_permissions [ -p vhost]
vhost
The name of the virtual host for which to list the users that have been granted access to it, and their permissions. Defaults to 「/」.
 
Lists permissions in a virtual host.
 
For example, this command instructs the RabbitMQ broker to list all the users which have been granted access to the virtual host called 「/myvhost」, and the permissions they have for operations on resources in that virtual host. Note that an empty string means no permissions granted:
 
rabbitmqctl list_permissions -p /myvhost
list_user_permissions  username
username
The name of the user for which to list the permissions.
 
Lists user permissions.
 
For example, this command instructs the RabbitMQ broker to list all the virtual hosts to which the user named 「tonyg」 has been granted access, and the permissions the user has for operations on resources in these virtual hosts:
 
rabbitmqctl list_user_permissions tonyg
set_topic_permissions [ -p vhostuser exchange write read
vhost
The name of the virtual host to which to grant the user access, defaulting to 「/」.
user
The name of the user the permissions apply to in the target virtual host.
exchange
The name of the topic exchange the authorisation check will be applied to.
write
A regular expression matching the routing key of the published message.
read
A regular expression matching the routing key of the consumed message.
 
Sets user topic permissions.
 
For example, this command instructs the RabbitMQ broker to let the user named 「tonyg」 publish and consume messages going through the 「amp.topic」 exchange of the 「/myvhost」 virtual host with a routing key starting with 「tonyg-」:
 
rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic 「^tonyg-.*」 「^tonyg-.*」
 
Topic permissions support variable expansion for the following variables: username, vhost, and client_id. Note that client_id is expanded only when using MQTT. The previous example could be made more generic by using 「^{username}-.*」:
 
rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic 「^{username}-.*」 「^{username}-.*」
clear_topic_permissions [ -p vhostusername [exchange]
vhost
The name of the virtual host to which to clear the topic permissions, defaulting to 「/」.
username
The name of the user to clear topic permissions to the specified virtual host.
exchange
The name of the topic exchange to clear topic permissions, defaulting to all the topic exchanges the given user has topic permissions for.
 
Clear user topic permissions.
 
For example, this command instructs the RabbitMQ broker to remove topic permissions for user named 「tonyg」 for the topic exchange 「amq.topic」 in the virtual host called 「/myvhost」:
 
rabbitmqctl clear_topic_permissions -p /myvhost tonyg amq.topic
list_topic_permissions [ -p vhost]
vhost
The name of the virtual host for which to list the users topic permissions. Defaults to 「/」.
 
Lists topic permissions in a virtual host.
 
For example, this command instructs the RabbitMQ broker to list all the users which have been granted topic permissions in the virtual host called 「/myvhost:」
 
rabbitmqctl list_topic_permissions -p /myvhost
list_user_topic_permissions  username
username
The name of the user for which to list the topic permissions.
 
Lists user topic permissions.
 
For example, this command instructs the RabbitMQ broker to list all the virtual hosts to which the user named 「tonyg」 has been granted access, and the topic permissions the user has in these virtual hosts:
 
rabbitmqctl list_topic_user_permissions tonyg

Parameter Management

Certain features of RabbitMQ (such as the federation plugin) are controlled by dynamic, cluster-wide parameters. There are 2 kinds of parameters: parameters scoped to a virtual host and global parameters. Each vhost-scoped parameter consists of a component name, a name and a value. The component name and name are strings, and the value is an Erlang term. A global parameter consists of a name and value. The name is a string and the value is an Erlang term. Parameters can be set, cleared and listed. In general you should refer to the documentation for the feature in question to see how to set parameters.

set_parameter [ -p vhostcomponent_name name value
Sets a parameter.
component_name
The name of the component for which the parameter is being set.
name
The name of the parameter being set.
value
The value for the parameter, as a JSON term. In most shells you are very likely to need to quote this.
 
For example, this command sets the parameter 「node01」 for the 「federation-upstream」 component in the default virtual host to following JSON:
 
rabbitmqctl set_parameter federation-upstream node01 '{"uri":"amqp://user:password@server/%2F","ack-mode":"on-publish"}'
clear_parameter [ -p vhostcomponent_name key
Clears a parameter.
component_name
The name of the component for which the parameter is being cleared.
name
The name of the parameter being cleared.
 
For example, this command clears the parameter 「node01」 for the 「federation-upstream」 component in the default virtual host:
 
rabbitmqctl clear_parameter federation-upstream node01
list_parameters [ -p vhost]
Lists all parameters for a virtual host.
 
For example, this command lists all parameters in the default virtual host:
 
rabbitmqctl list_parameters
set_global_parameter  name  value
Sets a global runtime parameter. This is similar to  set_parameter but the key-value pair isn't tied to a virtual host.
name
The name of the global runtime parameter being set.
value
The value for the global runtime parameter, as a JSON term. In most shells you are very likely to need to quote this.
 
For example, this command sets the global runtime parameter 「mqtt_default_vhosts」 to the JSON term {"O=client,CN=guest":"/"}:
 
rabbitmqctl set_global_parameter mqtt_default_vhosts '{"O=client,CN=guest":"/"}'
clear_global_parameter  name
Clears a global runtime parameter. This is similar to  clear_parameter but the key-value pair isn't tied to a virtual host.
name
The name of the global runtime parameter being cleared.
 
For example, this command clears the global runtime parameter 「mqtt_default_vhosts」:
 
rabbitmqctl clear_global_parameter mqtt_default_vhosts
list_global_parameters
Lists all global runtime parameters. This is similar to  list_parameters but the global runtime parameters are not tied to any virtual host.
 
For example, this command lists all global parameters:
 
rabbitmqctl list_global_parameters

Policy Management

Policies are used to control and modify the behaviour of queues and exchanges on a cluster-wide basis. Policies apply within a given vhost, and consist of a name, pattern, definition and an optional priority. Policies can be set, cleared and listed.

set_policy [ -p vhost] [--priority priority] [--apply-to apply-toname pattern definition
Sets a policy.
name
The name of the policy.
pattern
The regular expression, which when matches on a given resources causes the policy to apply.
definition
The definition of the policy, as a JSON term. In most shells you are very likely to need to quote this.
priority
The priority of the policy as an integer. Higher numbers indicate greater precedence. The default is 0.
apply-to
Which types of object this policy should apply to. Possible values are: The default is  all ..
 
For example, this command sets the policy 「federate-me」 in the default virtual host so that built-in exchanges are federated:
 
rabbitmqctl set_policy federate-me ^amq. '{"federation-upstream-set":"all"}'
clear_policy [ -p vhostname
Clears a policy.
name
The name of the policy being cleared.
 
For example, this command clears the 「federate-me」 policy in the default virtual host:
 
rabbitmqctl clear_policy federate-me
list_policies [ -p vhost]
Lists all policies for a virtual host.
 
For example, this command lists all policies in the default virtual host:
 
rabbitmqctl list_policies
set_operator_policy [ -p vhost] [--priority priority] [--apply-to apply-toname pattern definition
Sets an operator policy that overrides a subset of arguments in user policies. Arguments are identical to those of  set_policy.
 
Supported arguments are:
  • expires
  • message-ttl
  • max-length
  • max-length-bytes
clear_operator_policy [ -p vhostname
Clears an operator policy. Arguments are identical to those of  clear_policy.
list_operator_policies [ -p vhost]
Lists operator policy overrides for a virtual host. Arguments are identical to those of  list_policies.

Virtual Host Limits

It is possible to enforce certain limits on virtual hosts.

set_vhost_limits [ -p vhostdefinition
Sets virtual host limits.
definition
The definition of the limits, as a JSON term. In most shells you are very likely to need to quote this.
 
Recognised limits are:
  • max-connections
  • max-queues
 
Use a negative value to specify "no limit".
 
For example, this command limits the max number of concurrent connections in vhost 「qa_env」 to 64:
 
rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": 64}'
 
This command limits the max number of queues in vhost 「qa_env」 to 256:
 
rabbitmqctl set_vhost_limits -p qa_env '{"max-queues": 256}'
 
This command clears the max number of connections limit in vhost 「qa_env」:
 
rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": -1}'
 
This command disables client connections in vhost 「qa_env」:
 
rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": 0}'
clear_vhost_limits [ -p vhost]
Clears virtual host limits.
 
For example, this command clears vhost limits in vhost 「qa_env」:
 
rabbitmqctl clear_vhost_limits -p qa_env
list_vhost_limits [ -p vhost] [--global]
Displays configured virtual host limits.
--global
Show limits for all vhosts. Suppresses the  -p parameter.

Server Status

The server status queries interrogate the server and return a list of results with tab-delimited columns. Some queries ( list_queueslist_exchangeslist_bindings and list_consumers) accept an optional vhost parameter. This parameter, if present, must be specified immediately after the query.

 

The list_queueslist_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is 「/」.

list_queues [ -p vhost] [--offline | --online | --local] [queueinfoitem ...]
Returns queue details. Queue details of the 「/」 virtual host are returned if the  -p flag is absent. The  -p flag can be used to override this default.
 
Displayed queues can be filtered by their status or location using one of the following mutually exclusive options:
--offline
List only those durable queues that are not currently available (more specifically, their master node isn't).
--online
List queues that are currently available (their master node is).
--local
List only those queues whose master process is located on the current node.
 
The  queueinfoitem parameter is used to indicate which queue information items to include in the results. The column order in the results will match the order of the parameters.  queueinfoitem can take any value from the list that follows:
name
The name of the queue with non-ASCII characters escaped as in C.
durable
Whether or not the queue survives server restarts.
auto_delete
Whether the queue will be deleted automatically when no longer used.
arguments
Queue arguments.
policy
Policy name applying to the queue.
pid
Id of the Erlang process associated with the queue.
owner_pid
Id of the Erlang process representing the connection which is the exclusive owner of the queue. Empty if the queue is non-exclusive.
exclusive
True if queue is exclusive (i.e. has owner_pid), false otherwise.
exclusive_consumer_pid
Id of the Erlang process representing the channel of the exclusive consumer subscribed to this queue. Empty if there is no exclusive consumer.
exclusive_consumer_tag
Consumer tag of the exclusive consumer subscribed to this queue. Empty if there is no exclusive consumer.
messages_ready
Number of messages ready to be delivered to clients.
messages_unacknowledged
Number of messages delivered to clients but not yet acknowledged.
messages
Sum of ready and unacknowledged messages (queue depth).
messages_ready_ram
Number of messages from messages_ready which are resident in ram.
messages_unacknowledged_ram
Number of messages from messages_unacknowledged which are resident in ram.
messages_ram
Total number of messages which are resident in ram.
messages_persistent
Total number of persistent messages in the queue (will always be 0 for transient queues).
message_bytes
Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.
message_bytes_ready
Like  message_bytes but counting only those messages ready to be delivered to clients.
message_bytes_unacknowledged
Like  message_bytes but counting only those messages delivered to clients but not yet acknowledged.
message_bytes_ram
Like  message_bytes but counting only those messages which are in RAM.
message_bytes_persistent
Like  message_bytes but counting only those messages which are persistent.
head_message_timestamp
The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.
disk_reads
Total number of times messages have been read from disk by this queue since it started.
disk_writes
Total number of times messages have been written to disk by this queue since it started.
consumers
Number of consumers.
consumer_utilisation
Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.
memory
Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.
slave_pids
If the queue is mirrored, this gives the IDs of the current slaves.
synchronised_slave_pids
If the queue is mirrored, this gives the IDs of the current slaves which are synchronised with the master - i.e. those which could take over from the master without message loss.
state
The state of the queue. Normally 「running」, but may be 「{syncing,  message_count}」 if the queue is synchronising.
 
Queues which are located on cluster nodes that are currently down will be shown with a status of 「down」 (and most other  queueinfoitem will be unavailable).
 
If no  queueinfoitem are specified then queue name and depth are displayed.
 
For example, this command displays the depth and number of consumers for each queue of the virtual host named 「/myvhost」
 
rabbitmqctl list_queues -p /myvhost messages consumers
list_exchanges [ -p vhost] [exchangeinfoitem ...]
Returns exchange details. Exchange details of the 「/」 virtual host are returned if the  -p flag is absent. The  -p flag can be used to override this default.
 
The  exchangeinfoitem parameter is used to indicate which exchange information items to include in the results. The column order in the results will match the order of the parameters.  exchangeinfoitem can take any value from the list that follows:
name
The name of the exchange with non-ASCII characters escaped as in C.
type
The exchange type, such as:
  • direct
  • topic
  • headers
  • fanout
durable
Whether or not the exchange survives server restarts.
auto_delete
Whether the exchange will be deleted automatically when no longer used.
internal
Whether the exchange is internal, i.e. cannot be directly published to by a client.
arguments
Exchange arguments.
policy
Policy name for applying to the exchange.
 
If no  exchangeinfoitem are specified then exchange name and type are displayed.
 
For example, this command displays the name and type for each exchange of the virtual host named 「/myvhost」:
 
rabbitmqctl list_exchanges -p /myvhost name type
list_bindings [ -p vhost] [bindinginfoitem ...]
Returns binding details. By default the bindings for the 「/」 virtual host are returned. The  -p flag can be used to override this default.
 
The  bindinginfoitem parameter is used to indicate which binding information items to include in the results. The column order in the results will match the order of the parameters.  bindinginfoitem can take any value from the list that follows:
source_name
The name of the source of messages to which the binding is attached. With non-ASCII characters escaped as in C.
source_kind
The kind of the source of messages to which the binding is attached. Currently always exchange. With non-ASCII characters escaped as in C.
destination_name
The name of the destination of messages to which the binding is attached. With non-ASCII characters escaped as in C.
destination_kind
The kind of the destination of messages to which the binding is attached. With non-ASCII characters escaped as in C.
routing_key
The binding's routing key, with non-ASCII characters escaped as in C.
arguments
The binding's arguments.
 
If no  bindinginfoitem are specified then all above items are displayed.
 
For example, this command displays the exchange name and queue name of the bindings in the virtual host named 「/myvhost」
 
rabbitmqctl list_bindings -p /myvhost exchange_name queue_name
list_connections [ connectioninfoitem ...]
Returns TCP/IP connection statistics.
 
The  connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters.  connectioninfoitem can take any value from the list that follows:
pid
Id of the Erlang process associated with the connection.
name
Readable name for the connection.
port
Server port.
host
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.
peer_port
Peer port.
peer_host
Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
ssl
Boolean indicating whether the connection is secured with SSL.
ssl_protocol
SSL protocol (e.g. 「tlsv1」).
ssl_key_exchange
SSL key exchange algorithm (e.g. 「rsa」).
ssl_cipher
SSL cipher algorithm (e.g. 「aes_256_cbc」).
ssl_hash
SSL hash function (e.g. 「sha」).
peer_cert_subject
The subject of the peer's SSL certificate, in RFC4514 form.
peer_cert_issuer
The issuer of the peer's SSL certificate, in RFC4514 form.
peer_cert_validity
The period for which the peer's SSL certificate is valid.
state
Connection state; one of:
  • starting
  • tuning
  • opening
  • running
  • flow
  • blocking
  • blocked
  • closing
  • closed
channels
Number of channels using the connection.
protocol
Version of the AMQP protocol in use; currently one of:
  • {0,9,1}
  • {0,8,0}
 
Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.
auth_mechanism
SASL authentication mechanism used, such as 「PLAIN」.
user
Username associated with the connection.
vhost
Virtual host name with non-ASCII characters escaped as in C.
timeout
Connection timeout / negotiated heartbeat interval, in seconds.
frame_max
Maximum frame size (bytes).
channel_max
Maximum number of channels on this connection.
client_properties
Informational properties transmitted by the client during connection establishment.
recv_oct
Octets received.
recv_cnt
Packets received.
send_oct
Octets send.
send_cnt
Packets sent.
send_pend
Send queue size.
connected_at
Date and time this connection was established, as timestamp.
 
If no  connectioninfoitem are specified then user, peer host, peer port, time since flow control and memory block state are displayed.
 
For example, this command displays the send queue size and server port for each connection:
 
rabbitmqctl list_connections send_pend port
list_channels [ channelinfoitem ...]
Returns information on all current channels, the logical containers executing most AMQP commands. This includes channels that are part of ordinary AMQP connections, and channels created by various plug-ins and other extensions.
 
The  channelinfoitem parameter is used to indicate which channel information items to include in the results. The column order in the results will match the order of the parameters.  channelinfoitem can take any value from the list that follows:
pid
Id of the Erlang process associated with the connection.
connection
Id of the Erlang process associated with the connection to which the channel belongs.
name
Readable name for the channel.
number
The number of the channel, which uniquely identifies it within a connection.
user
Username associated with the channel.
vhost
Virtual host in which the channel operates.
transactional
True if the channel is in transactional mode, false otherwise.
confirm
True if the channel is in confirm mode, false otherwise.
consumer_count
Number of logical AMQP consumers retrieving messages via the channel.
messages_unacknowledged
Number of messages delivered via this channel but not yet acknowledged.
messages_uncommitted
Number of messages received in an as yet uncommitted transaction.
acks_uncommitted
Number of acknowledgements received in an as yet uncommitted transaction.
messages_unconfirmed
Number of published messages not yet confirmed. On channels not in confirm mode, this remains 0.
prefetch_count
QoS prefetch limit for new consumers, 0 if unlimited.
global_prefetch_count
QoS prefetch limit for the entire channel, 0 if unlimited.
 
If no  channelinfoitem are specified then pid, user, consumer_count, and messages_unacknowledged are assumed.
 
For example, this command displays the connection process and count of unacknowledged messages for each channel:
 
rabbitmqctl list_channels connection messages_unacknowledged
list_consumers [ -p vhost]
Lists consumers, i.e. subscriptions to a queue´s message stream. Each line printed shows, separated by tab characters, the name of the queue subscribed to, the id of the channel process via which the subscription was created and is managed, the consumer tag which uniquely identifies the subscription within a channel, a boolean indicating whether acknowledgements are expected for messages delivered to this consumer, an integer indicating the prefetch limit (with 0 meaning 「none」), and any arguments for this consumer.
status
Displays broker status information such as the running applications on the current Erlang node, RabbitMQ and Erlang versions, OS name, memory and file descriptor statistics. (See the  cluster_status command to find out which nodes are clustered and running.)
 
For example, this command displays information about the RabbitMQ broker:
 
rabbitmqctl status
node_health_check
Health check of the RabbitMQ node. Verifies the rabbit application is running, list_queues and list_channels return, and alarms are not set.
 
For example, this command performs a health check on the RabbitMQ node:
 
rabbitmqctl node_health_check -n rabbit@stringer
environment
Displays the name and value of each variable in the application environment for each running application.
report
Generate a server status report containing a concatenation of all server status information for support purposes. The output should be redirected to a file when accompanying a support request.
 
For example, this command creates a server report which may be attached to a support request email:
 
rabbitmqctl report > server_report.txt
eval  expr
Evaluate an arbitrary Erlang expression.
 
For example, this command returns the name of the node to which  rabbitmqctl has connected:
 
rabbitmqctl eval 「node().」

Miscellaneous

close_connection  connectionpid  explanation
connectionpid
Id of the Erlang process associated with the connection to close.
explanation
Explanation string.
 
Instructs the broker to close the connection associated with the Erlang process id  connectionpid (see also the  list_connections command), passing the  explanation string to the connected client as part of the AMQP connection shutdown protocol.
 
For example, this command instructs the RabbitMQ broker to close the connection associated with the Erlang process id 「<rabbit@tanto.4262.0>」, passing the explanation 「go away」 to the connected client:
 
rabbitmqctl close_connection 「<rabbit@tanto.4262.0>」 「go away」
close_all_connections [ -p vhost] [--global] [--per-connection-delay delay] [--limit limitexplanation
-p  vhost
The name of the virtual host for which connections should be closed. Ignored when  --global is specified.
--global
If connections should be close for all vhosts. Overrides  -p
--per-connection-delay  delay
Time in milliseconds to wait after each connection closing.
--limit  limit
Number of connection to close. Only works per vhost. Ignored when  --global is specified.
explanation
Explanation string.
 
Instructs the broker to close all connections for the specified vhost or entire RabbitMQ node.
 
For example, this command instructs the RabbitMQ broker to close 10 connections on 「qa_env」 vhost, passing the explanation 「Please close」:
 
rabbitmqctl close_all_connections -p qa_env --limit 10 'Please close'
 
This command instructs broker to close all connections to the node:
 
rabbitmqctl close_all_connections --global
 
trace_on [ -p vhost]
vhost
The name of the virtual host for which to start tracing.
 
Starts tracing. Note that the trace state is not persistent; it will revert to being off if the server is restarted.
trace_off [ -p vhost]
vhost
The name of the virtual host for which to stop tracing.
 
Stops tracing.
set_vm_memory_high_watermark  fraction
fraction
The new memory threshold fraction at which flow control is triggered, as a floating point number greater than or equal to 0.
set_vm_memory_high_watermark absolute  memory_limit
memory_limit
The new memory limit at which flow control is triggered, expressed in bytes as an integer number greater than or equal to 0 or as a string with memory units (e.g. 512M or 1G). Available units are:
kkiB
kibibytes (2^10 bytes)
MMiB
mebibytes (2^20 bytes)
GGiB
gibibytes (2^30 bytes)
kB
kilobytes (10^3 bytes)
MB
megabytes (10^6 bytes)
GB
gigabytes (10^9 bytes)
set_disk_free_limit  disk_limit
disk_limit
Lower bound limit as an integer in bytes or a string with memory units (see vm_memory_high_watermark), e.g. 512M or 1G. Once free disk space reaches the limit, a disk alarm will be set.
set_disk_free_limit mem_relative  fraction
fraction
Limit relative to the total amount available RAM as a non-negative floating point number. Values lower than 1.0 can be dangerous and should be used carefully.
encode  value  passphrase [ --cipher cipher] [--hash hash] [--iterations iterations]
value  passphrase
Value to encrypt and passphrase.
 
For example:
 
rabbitmqctl encode '<<"guest">>' mypassphrase
--cipher  cipher  --hash  hash  --iterations  iterations
Options to specify the encryption settings. They can be used independently.
 
For example:
 
rabbitmqctl encode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '<<"guest">>' mypassphrase
decode  value  passphrase [ --cipher cipher] [--hash hash] [--iterations iterations]
value  passphrase
Value to decrypt (as produced by the encode command) and passphrase.
 
For example:
 
rabbitmqctl decode '{encrypted, <<"...">>}' mypassphrase
--cipher  cipher  --hash  hash  --iterations  iterations
Options to specify the decryption settings. They can be used independently.
 
For example:
 
rabbitmqctl decode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '{encrypted,<<"...">>} mypassphrase
list_hashes
Lists hash functions supported by encoding commands.
 
For example, this command instructs the RabbitMQ broker to list all hash functions supported by encoding commands:
 
rabbitmqctl list_hashes
list_ciphers
Lists cipher suites supported by encoding commands.
 
For example, this command instructs the RabbitMQ broker to list all cipher suites supported by encoding commands:
 
rabbitmqctl list_ciphers

PLUGIN COMMANDS

RabbitMQ plugins can extend rabbitmqctl tool to add new commands when enabled. Currently available commands can be found in rabbitmqctl help output. Following commands are added by RabbitMQ plugins, available in default distribution:

Shovel plugin

shovel_status
Prints a list of configured shovels
delete_shovel [ -p vhostname
Instructs the RabbitMQ node to delete the configured shovel by  name.

Federation plugin

federation_status [ --only-down]
Prints a list of federation links.
--only-down
Only list federation links which are not running.
restart_federation_link  link_id
Instructs the RabbitMQ node to restart the federation link with specified  link_id.

AMQP-1.0 plugin

list_amqp10_connections [ amqp10_connectioninfoitem ...]
Similar to the  list_connections command, but returns fields which make sense for AMQP-1.0 connections.  amqp10_connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters.  amqp10_connectioninfoitem can take any value from the list that follows:
pid
Id of the Erlang process associated with the connection.
auth_mechanism
SASL authentication mechanism used, such as 「PLAIN」.
host
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.
frame_max
Maximum frame size (bytes).
timeout
Connection timeout / negotiated heartbeat interval, in seconds.
user
Username associated with the connection.
state
Connection state; one of:
  • starting
  • waiting_amqp0100
  • securing
  • running
  • blocking
  • blocked
  • closing
  • closed
recv_oct
Octets received.
recv_cnt
Packets received.
send_oct
Octets send.
send_cnt
Packets sent.
ssl
Boolean indicating whether the connection is secured with SSL.
ssl_protocol
SSL protocol (e.g. 「tlsv1」).
ssl_key_exchange
SSL key exchange algorithm (e.g. 「rsa」).
ssl_cipher
SSL cipher algorithm (e.g. 「aes_256_cbc」).
ssl_hash
SSL hash function (e.g. 「sha」).
peer_cert_subject
The subject of the peer's SSL certificate, in RFC4514 form.
peer_cert_issuer
The issuer of the peer's SSL certificate, in RFC4514 form.
peer_cert_validity
The period for which the peer's SSL certificate is valid.
node
The node name of the RabbitMQ node to which connection is established.

MQTT plugin

list_mqtt_connections [ mqtt_connectioninfoitem]
Similar to the  list_connections command, but returns fields which make sense for MQTT connections.  mqtt_connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters.  mqtt_connectioninfoitem can take any value from the list that follows:
host
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.
port
Server port.
peer_host
Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
peer_port
Peer port.
protocol
MQTT protocol version, which can be on of the following:
  • {'MQTT', N/A}
  • {'MQTT', 3.1.0}
  • {'MQTT', 3.1.1}
channels
Number of channels using the connection.
channel_max
Maximum number of channels on this connection.
frame_max
Maximum frame size (bytes).
client_properties
Informational properties transmitted by the client during connection establishment.
ssl
Boolean indicating whether the connection is secured with SSL.
ssl_protocol
SSL protocol (e.g. 「tlsv1」).
ssl_key_exchange
SSL key exchange algorithm (e.g. 「rsa」).
ssl_cipher
SSL cipher algorithm (e.g. 「aes_256_cbc」).
ssl_hash
SSL hash function (e.g. 「sha」).
conn_name
Readable name for the connection.
connection_state
Connection state; one of:
  • starting
  • running
  • blocked
connection
Id of the Erlang process associated with the internal amqp direct connection.
consumer_tags
A tuple of consumer tags for QOS0 and QOS1.
message_id
The last Packet ID sent in a control message.
client_id
MQTT client identifier for the connection.
clean_sess
MQTT clean session flag.
will_msg
MQTT Will message sent in CONNECT frame.
exchange
Exchange to route MQTT messages configured in rabbitmq_mqtt application environment.
ssl_login_name
SSL peer cert auth name
retainer_pid
Id of the Erlang process associated with retain storage for the connection.
user
Username associated with the connection.
vhost
Virtual host name with non-ASCII characters escaped as in C.

STOMP plugin

list_stomp_connections [ stomp_connectioninfoitem]
Similar to the  list_connections command, but returns fields which make sense for STOMP connections.  stomp_connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters.  stomp_connectioninfoitem can take any value from the list that follows:
conn_name
Readable name for the connection.
connection
Id of the Erlang process associated with the internal amqp direct connection.
connection_state
Connection state; one of:
  • running
  • blocking
  • blocked
session_id
STOMP protocol session identifier
channel
AMQP channel associated with the connection
version
Negotiated STOMP protocol version for the connection.
implicit_connect
Indicates if the connection was established using implicit connect (without CONNECT frame)
auth_login
Effective username for the connection.
auth_mechanism
STOMP authorization mechanism. Can be one of:
  • config
  • ssl
  • stomp_headers
port
Server port.
host
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
peer_port
Peer port.
peer_host
Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
protocol
STOMP protocol version, which can be on of the following:
  • {'STOMP', 0}
  • {'STOMP', 1}
  • {'STOMP', 2}
channels
Number of channels using the connection.
channel_max
Maximum number of channels on this connection.
frame_max
Maximum frame size (bytes).
client_properties
Informational properties transmitted by the client during connection
ssl
Boolean indicating whether the connection is secured with SSL.
ssl_protocol
SSL protocol (e.g. 「tlsv1」).
ssl_key_exchange
SSL key exchange algorithm (e.g. 「rsa」).
ssl_cipher
SSL cipher algorithm (e.g. 「aes_256_cbc」).
ssl_hash
SSL hash function (e.g. 「sha」).

Management agent plugin

reset_stats_db [ --all]
Reset management stats database for the RabbitMQ node.
--all
Reset stats database for all nodes in the cluster.

SEE ALSO

rabbitmq-env.conf(5)rabbitmq-echopid(8)rabbitmq-plugins(8)rabbitmq-server(8)rabbitmq-service(8)

AUTHOR

The RabbitMQ Team <info@rabbitmq.com>

 

【參考】

rabbitmq 啓動報錯 - 糉先生 - 博客園 https://www.cnblogs.com/straycats/p/7719933.html

【測試代碼】

Asynchronous publisher example — pika 0.12.0 documentation https://pika.readthedocs.io/en/stable/examples/asynchronous_publisher_example.html

 

# -*- coding: utf-8 -*-

import logging
import pika
import json

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)


class ExamplePublisher(object):
    """This is an example publisher that will handle unexpected interactions
    with RabbitMQ such as channel and connection closures.

    If RabbitMQ closes the connection, it will reopen it. You should
    look at the output, as there are limited reasons why the connection may
    be closed, which usually are tied to permission related issues or
    socket timeouts.

    It uses delivery confirmations and illustrates one way to keep track of
    messages that have been sent and if they've been confirmed by RabbitMQ.

    """
    EXCHANGE = 'messageTEST'
    EXCHANGE_TYPE = 'topic'
    PUBLISH_INTERVAL = 1
    QUEUE = 'textTEST'
    ROUTING_KEY = 'example.text'

    def __init__(self, amqp_url):
        """Setup the example publisher object, passing in the URL we will use
        to connect to RabbitMQ.

        :param str amqp_url: The URL for connecting to RabbitMQ

        """
        self._connection = None
        self._channel = None

        self._deliveries = None
        self._acked = None
        self._nacked = None
        self._message_number = None

        self._stopping = False
        self._url = amqp_url

    def connect(self):
        """This method connects to RabbitMQ, returning the connection handle.
        When the connection is established, the on_connection_open method
        will be invoked by pika. If you want the reconnection to work, make
        sure you set stop_ioloop_on_close to False, which is not the default
        behavior of this adapter.

        :rtype: pika.SelectConnection

        """
        LOGGER.info('Connecting to %s', self._url)
        return pika.SelectConnection(pika.URLParameters(self._url),
                                     on_open_callback=self.on_connection_open,
                                     on_close_callback=self.on_connection_closed,
                                     stop_ioloop_on_close=False)

    def on_connection_open(self, unused_connection):
        """This method is called by pika once the connection to RabbitMQ has
        been established. It passes the handle to the connection object in
        case we need it, but in this case, we'll just mark it unused.

        :type unused_connection: pika.SelectConnection

        """
        LOGGER.info('Connection opened')
        self.open_channel()

    def on_connection_closed(self, connection, reply_code, reply_text):
        """This method is invoked by pika when the connection to RabbitMQ is
        closed unexpectedly. Since it is unexpected, we will reconnect to
        RabbitMQ if it disconnects.

        :param pika.connection.Connection connection: The closed connection obj
        :param int reply_code: The server provided reply_code if given
        :param str reply_text: The server provided reply_text if given

        """
        self._channel = None
        if self._stopping:
            self._connection.ioloop.stop()
        else:
            LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
                           reply_code, reply_text)
            self._connection.add_timeout(5, self._connection.ioloop.stop)

    def open_channel(self):
        """This method will open a new channel with RabbitMQ by issuing the
        Channel.Open RPC command. When RabbitMQ confirms the channel is open
        by sending the Channel.OpenOK RPC reply, the on_channel_open method
        will be invoked.

        """
        LOGGER.info('Creating a new channel')
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_channel_open(self, channel):
        """This method is invoked by pika when the channel has been opened.
        The channel object is passed in so we can make use of it.

        Since the channel is now open, we'll declare the exchange to use.

        :param pika.channel.Channel channel: The channel object

        """
        LOGGER.info('Channel opened')
        self._channel = channel
        self.add_on_channel_close_callback()
        self.setup_exchange(self.EXCHANGE)

    def add_on_channel_close_callback(self):
        """This method tells pika to call the on_channel_closed method if
        RabbitMQ unexpectedly closes the channel.

        """
        LOGGER.info('Adding channel close callback')
        self._channel.add_on_close_callback(self.on_channel_closed)

    def on_channel_closed(self, channel, reply_code, reply_text):
        """Invoked by pika when RabbitMQ unexpectedly closes the channel.
        Channels are usually closed if you attempt to do something that
        violates the protocol, such as re-declare an exchange or queue with
        different parameters. In this case, we'll close the connection
        to shutdown the object.

        :param pika.channel.Channel channel: The closed channel
        :param int reply_code: The numeric reason the channel was closed
        :param str reply_text: The text reason the channel was closed

        """
        LOGGER.warning('Channel was closed: (%s) %s', reply_code, reply_text)
        self._channel = None
        if not self._stopping:
            self._connection.close()

    def setup_exchange(self, exchange_name):
        """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
        command. When it is complete, the on_exchange_declareok method will
        be invoked by pika.

        :param str|unicode exchange_name: The name of the exchange to declare

        """
        LOGGER.info('Declaring exchange %s', exchange_name)
        self._channel.exchange_declare(self.on_exchange_declareok,
                                       exchange_name,
                                       self.EXCHANGE_TYPE)

    def on_exchange_declareok(self, unused_frame):
        """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
        command.

        :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame

        """
        LOGGER.info('Exchange declared')
        self.setup_queue(self.QUEUE)

    def setup_queue(self, queue_name):
        """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
        command. When it is complete, the on_queue_declareok method will
        be invoked by pika.

        :param str|unicode queue_name: The name of the queue to declare.

        """
        LOGGER.info('Declaring queue %s', queue_name)
        self._channel.queue_declare(self.on_queue_declareok, queue_name)

    def on_queue_declareok(self, method_frame):
        """Method invoked by pika when the Queue.Declare RPC call made in
        setup_queue has completed. In this method we will bind the queue
        and exchange together with the routing key by issuing the Queue.Bind
        RPC command. When this command is complete, the on_bindok method will
        be invoked by pika.

        :param pika.frame.Method method_frame: The Queue.DeclareOk frame

        """
        LOGGER.info('Binding %s to %s with %s',
                    self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
        self._channel.queue_bind(self.on_bindok, self.QUEUE,
                                 self.EXCHANGE, self.ROUTING_KEY)

    def on_bindok(self, unused_frame):
        """This method is invoked by pika when it receives the Queue.BindOk
        response from RabbitMQ. Since we know we're now setup and bound, it's
        time to start publishing."""
        LOGGER.info('Queue bound')
        self.start_publishing()

    def start_publishing(self):
        """This method will enable delivery confirmations and schedule the
        first message to be sent to RabbitMQ

        """
        LOGGER.info('Issuing consumer related RPC commands')
        self.enable_delivery_confirmations()
        self.schedule_next_message()

    def enable_delivery_confirmations(self):
        """Send the Confirm.Select RPC method to RabbitMQ to enable delivery
        confirmations on the channel. The only way to turn this off is to close
        the channel and create a new one.

        When the message is confirmed from RabbitMQ, the
        on_delivery_confirmation method will be invoked passing in a Basic.Ack
        or Basic.Nack method from RabbitMQ that will indicate which messages it
        is confirming or rejecting.

        """
        LOGGER.info('Issuing Confirm.Select RPC command')
        self._channel.confirm_delivery(self.on_delivery_confirmation)

    def on_delivery_confirmation(self, method_frame):
        """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
        command, passing in either a Basic.Ack or Basic.Nack frame with
        the delivery tag of the message that was published. The delivery tag
        is an integer counter indicating the message number that was sent
        on the channel via Basic.Publish. Here we're just doing house keeping
        to keep track of stats and remove message numbers that we expect
        a delivery confirmation of from the list used to keep track of messages
        that are pending confirmation.

        :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame

        """
        confirmation_type = method_frame.method.NAME.split('.')[1].lower()
        LOGGER.info('Received %s for delivery tag: %i',
                    confirmation_type,
                    method_frame.method.delivery_tag)
        if confirmation_type == 'ack':
            self._acked += 1
        elif confirmation_type == 'nack':
            self._nacked += 1
        self._deliveries.remove(method_frame.method.delivery_tag)
        LOGGER.info('Published %i messages, %i have yet to be confirmed, '
                    '%i were acked and %i were nacked',
                    self._message_number, len(self._deliveries),
                    self._acked, self._nacked)

    def schedule_next_message(self):
        """If we are not closing our connection to RabbitMQ, schedule another
        message to be delivered in PUBLISH_INTERVAL seconds.

        """
        LOGGER.info('Scheduling next message for %0.1f seconds',
                    self.PUBLISH_INTERVAL)
        self._connection.add_timeout(self.PUBLISH_INTERVAL,
                                     self.publish_message)

    def publish_message(self):
        """If the class is not stopping, publish a message to RabbitMQ,
        appending a list of deliveries with the message number that was sent.
        This list will be used to check for delivery confirmations in the
        on_delivery_confirmations method.

        Once the message has been sent, schedule another message to be sent.
        The main reason I put scheduling in was just so you can get a good idea
        of how the process is flowing by slowing down and speeding up the
        delivery intervals by changing the PUBLISH_INTERVAL constant in the
        class.

        """
        if self._channel is None or not self._channel.is_open:
            return

        hdrs = {u'مفتاح': u' قيمة',
                u'鍵': u'值',
                u'キー': u'値'}
        properties = pika.BasicProperties(app_id='example-publisher',
                                          content_type='application/json',
                                          headers=hdrs)

        message = u'مفتاح قيمة 鍵 值 キー 値'
        self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY,
                                    json.dumps(message, ensure_ascii=False),
                                    properties)
        self._message_number += 1
        self._deliveries.append(self._message_number)
        LOGGER.info('Published message # %i', self._message_number)
        self.schedule_next_message()

    def run(self):
        """Run the example code by connecting and then starting the IOLoop.

        """
        while not self._stopping:
            self._connection = None
            self._deliveries = []
            self._acked = 0
            self._nacked = 0
            self._message_number = 0

            try:
                self._connection = self.connect()
                self._connection.ioloop.start()
            except KeyboardInterrupt:
                self.stop()
                if (self._connection is not None and
                        not self._connection.is_closed):
                    # Finish closing
                    self._connection.ioloop.start()

        LOGGER.info('Stopped')

    def stop(self):
        """Stop the example by closing the channel and connection. We
        set a flag here so that we stop scheduling new messages to be
        published. The IOLoop is started because this method is
        invoked by the Try/Catch below when KeyboardInterrupt is caught.
        Starting the IOLoop again will allow the publisher to cleanly
        disconnect from RabbitMQ.

        """
        LOGGER.info('Stopping')
        self._stopping = True
        self.close_channel()
        self.close_connection()

    def close_channel(self):
        """Invoke this command to close the channel with RabbitMQ by sending
        the Channel.Close RPC command.

        """
        if self._channel is not None:
            LOGGER.info('Closing the channel')
            self._channel.close()

    def close_connection(self):
        """This method closes the connection to RabbitMQ."""
        if self._connection is not None:
            LOGGER.info('Closing connection')
            self._connection.close()


def main():
    logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)

    # Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
    # example = ExamplePublisher('amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat_interval=3600')
    rmqAddr, rmqPort, rmqQueue = '101.201.41.72', 5672, 'video_article'
    rmqU, rmqP = 'liaohy', 'liaohy'

    C = 'amqp://{}:{}@{}:{}/%2F?connection_attempts=3&heartbeat_interval=3600'.format(rmqU, rmqP, rmqAddr, rmqPort)
    example = ExamplePublisher(C)
    example.run()


if __name__ == '__main__':
    main()

  

 

 

Why does the queue memory grow and shrink when publishing/consuming?

http://www.rabbitmq.com/memory-use.html#queue-memory-usage http://www.rabbitmq.com/memory-use.html#queue-memory-usage

 

 

AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣的開發語言等條件的限制。Erlang中的實現有 RabbitMQ等。

中文名
高級消息隊列協議
外文名
Advanced Message Queuing Protocol
屬    性
應用層標準協議
應用領域
計算機

簡介

編輯
AMQP協議
第1章 概述
1.3. 摘要
1.3.1. 什麼是AMQP
高級消息隊列協議使得聽從該規範的客戶端應用和消息中間件服務器的全功能互操做成爲可能。
1.3.2. 爲何要用AMQP
咱們的目標是實現一種在全行業普遍使用的標準消息中間件技術,以便下降企業和系統集成的開銷,而且向大衆提供工業級的集成服務。
咱們的宗旨是經過AMQP,讓消息中間件的能力最終被網絡自己所具備,而且經過消息中間件的普遍使用發展出一系列有用的應用程序。
1.3.3. AMQP的範圍
爲了徹底實現消息中間件的互操做性,須要充分定義網絡協議和消息代理服務的功能語義。
所以,AMQP定義網絡協議和代理服務以下:
一套肯定的消息交換功能,也就是「高級消息交換協議模型」。AMQP模型包括一套用於路由和存儲消息的功能模塊,以及一套在這些模塊之間交換消息的規則。
一個網絡線級協議(數據傳輸格式),客戶端應用能夠經過這個協議與消息代理和它實現的AMQP模型進行交互通訊。
能夠只實現AMQP協議規範中的的部分語義,可是咱們相信明確的描述這些語義有助於理解這個協議。

相關協議

編輯
1.3.4.1. AMQP模型
咱們須要明確的定義服務器的語義,由於全部服務器實現都應該保持這些語義的一致性,不然就沒法進行互操做。
所以AMQP模型描述了一套模塊化的組件以及這些組件之間進行鏈接的標準規則。
在服務器中,三個主要功能模塊鏈接成一個處理鏈完成預期的功能:
「exchange」接收發布應用程序發送的消息,並根據必定的規則將這些消息路由到「消息隊列」。
「message queue」存儲消息,直到這些消息被消費者安全處理完爲止。
「binding」定義了exchange和message queue之間的關聯,提供路由規則。
使用這個模型咱們能夠很容易的模擬出存儲轉發隊列和主題訂閱這些典型的消息中間件概念。
一個AMQP服務器相似於郵件服務器,exchange相似於消息傳輸代理(email裏的概念),message queue相似於郵箱。Binding定義了每個傳輸代理中的消息路由表,發佈者將消息發給特定的傳輸代理,而後傳輸代理將這些消息路由到郵箱中,消費者從這些郵箱中取出消息。
在之前的中間件系統的應用場景中,發佈者直接將消息發送給郵箱或者郵件列表。
區別就在於用戶能夠控制message queue與exchage的鏈接規則,這能夠作不少有趣的事情,好比定義一條規則:「將全部包含這樣這樣的消息頭的消息都複製一份再發送到消息隊列中」。
AMQP模型有如下目標:
支持金融服務領域的語義要求。
支持金融服務領域所要求的性能要求。
可以很方便的擴展新的消息路由和隊列。
經過AMQP協議(AMQP和AMQP Protocol的是總體和部分的關係),服務器應用能夠經過編程的方式來實現具體的功能語義。
簡單而靈活。
1.3.4.2. AMQP協議
AMQP協議是一個二進制協議,擁有一些現代特色:多信道、協商式、異步、安全、跨平臺、中立、高效。
AMQP一般被劃分爲三層:
模型層定義了一套命令(按功能分類),客戶端應用能夠利用這些命令來實現它的業務功能。
會話層負責將命令從客戶端應用傳遞給服務器,再將服務器的應答傳遞給客戶端應用,會話層爲這個傳遞過程提供可靠性、同步機制和錯誤處理。
傳輸層提供幀處理、信道複用、錯誤檢測和數據表示。
實現者能夠將傳輸層替換成任意傳輸協議,只要不改變AMQP協議中與客戶端應用程序相關的功能。實現者還可使用其餘高層協議中的會話層。
AMQP模型的設計由如下幾個需求所驅動:
保證聽從AMQP規範的服務器實現之間可以進行互操做。
爲服務質量提供顯示控制。
支持全部消息中間件的功能:消息交換、文件傳輸、流傳輸、遠程進程調用等。
兼容已有的消息API規範(好比Sun公司的JMS規範)。
造成一致和明確的命名。
經過AMQP協議能夠完整的配置服務器線路(TODO:server wiring是啥意思?答:服務器鏈接)。
使用命令符號能夠很容易的映射成應用級別的API。
明肯定義每個操做只作一件事情。
AMQP傳輸層的設計由如下幾個主要的需求所驅動,這些需求不分前後次序:
使用可以快速打包解包的二進制編碼來保證數據的緊湊性。
可以處理任意大小的消息。
容許零拷貝數據傳輸(好比遠程DMA)。
一個鏈接支持多個會話。
保證會話可以從網絡錯誤、服務器失效中恢復。
爲了長期存在,沒有隱含的內置限制(TODO:To be long-lived,with no significant in-built limitations)。
異步傳輸消息。
可以很容易的處理新的和變化的需求。
高版本的AMQP規範可以兼容低版本的規範。
使用強斷言模型來保證應用程序的可修復性。
保持編程語言的中立性。
適宜使用代碼生成工具生成協議處理模塊。

功能範圍

編輯
咱們支持各類消息交換的體系結構:
存儲轉發(多個消息發送者,單個消息接收者)。
分佈式事務(多個消息發送者,多個消息接收者)。
發佈訂閱(多個消息發送者,多個消息接收者)。
基於內容的路由(多個消息發送者,多個消息接收者)。
文件傳輸隊列(多個消息發送者,多個消息接收者)。
點對點鏈接(單個消息發送者,單個消息接收者)。

文檔結構

編輯
本文檔分紅兩個部分:
「概念」部分將對AMQP的概念作一個簡單的介紹,描述AMQP怎麼工做,以及AMQP的用途。
「標準」部分將對AMQP的模型層、會話層的每一個組成部分作精確的定義,還將定義AMQP在網絡上傳輸的二進制消息結構。
咱們用IETF RFC2119中的術語定義:必須、沒必要、應該、不該該和能夠(詳見參考資料  [1]  )。
當咱們討論聽從AMQP規範的服務器的具體行爲時,咱們使用術語「服務器」來表示這些服務器。
當咱們討論聽從AMQP規範的客戶端應用的具體行爲時,咱們使用術語「客戶端」來表示這些客戶端應用。
咱們使用「端點」來表示「服務器或者客戶端」。
除非另有說明,全部數字都是十進制的。
協議中的常量都用大寫字母的名字來表示。AMQP的實現若是須要在代碼或者文檔中定義和使用這些常量,必須用這些名字來表示。
屬性名、命令或者控制參數,以及幀字段都用小寫字母的名字來表示。AMQP的實現必須在代碼或者文檔中與之保持一致。
1.5. 約定
1.5.1. 定義
1.5.2. 版本號
AMQP版本用兩個版本號表示——主版本號和次版本號。咱們約定版本由主版本號後面加小數點再加上次版本號組成(好比1-3表示主版本號爲1,次版本號爲3)。
主版本號和次版本號能夠用0到255以內的全部值。
主版本號保持不變,次版本號遞增。當AMQP工做組提高主版本號時,次版本號將被設置爲0。所以,有可能出現這樣的版本序列:1-2,1-3,1-4,2-0,2-1……
一旦本協議發佈以後(主版本號大於1),應儘可能防止次版本號遞增到9。不過在發佈以前(版本0-x),因爲會對本協議進行頻繁的修訂,能夠不遵照這條約定。
一旦本協議發佈以後(主版本號大於1),同一個主版本不一樣次版本的實現必須向後兼容。而在發佈以前,這些次版本的實現不須要兼容。
大於或者等於99的主版本號用於測試和開發目的。

技術術語

編輯
AMQP模型(AMQP Model):一個由關鍵實體和語義表示的邏輯框架,聽從AMQP規範的服務器必須提供這些實體和語義。爲了實現本規範中定義的語義,客戶端能夠發送命令來控制AMQP服務器。
鏈接(Connection):一個網絡鏈接,好比TCP/IP套接字鏈接。
會話(Session):端點之間的命名對話。在一個會話上下文中,保證「剛好傳遞一次」。
信道(Channel):多路複用鏈接中的一條獨立的雙向數據流通道。爲會話提供物理傳輸介質。
客戶端(Client):AMQP鏈接或者會話的發起者。AMQP是非對稱的,客戶端生產和消費消息,服務器存儲和路由這些消息。
服務器(Server):接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。也稱爲「消息代理」。
端點(Peer):AMQP對話的任意一方。一個AMQP鏈接包括兩個端點(一個是客戶端,一個是服務器)。
搭檔(Partner):當描述兩個端點之間的交互過程時,使用術語「搭檔」來表示「另外一個」端點的簡記法。好比咱們定義端點A和端點B,當它們進行通訊時,端點B是端點A的搭檔,端點A是端點B的搭檔。
片斷集(Assembly):段的有序集合,造成一個邏輯工做單元。
段(Segment):幀的有序集合,造成片斷集中一個完整子單元。
幀(Frame):AMQP傳輸的一個原子單元。一個幀是一個段中的任意分片。
控制(Control):單向指令,AMQP規範假設這些指令的傳輸是不可靠的。
命令(Command):須要確認的指令,AMQP規範規定這些指令的傳輸是可靠的。
異常(Exception):在執行一個或者多個命令時可能發生的錯誤狀態。
類(Class):一批用來描述某種特定功能的AMQP命令或者控制。
消息頭(Header):描述消息數據屬性的一種特殊段。
消息體(Body):包含應用程序數據的一種特殊段。消息體段對於服務器來講徹底透明——服務器不能查看或者修改消息體。
消息內容(Content):包含在消息體段中的的消息數據。
交換器(Exchange):服務器中的實體,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
交換器類型(Exchange Type):基於不一樣路由語義的交換器類。
消息隊列(Message Queue):一個命名實體,用來保存消息直到發送給消費者。
綁定器(Binding):消息隊列和交換器之間的關聯。
綁定器關鍵字(Binding Key):綁定的名稱。一些交換器類型可能使用這個名稱做爲定義綁定器路由行爲的模式。
路由關鍵字(Routing Key):一個消息頭,交換器能夠用這個消息頭決定如何路由某條消息。
持久存儲(Durable):一種服務器資源,當服務器重啓時,保存的消息數據不會丟失。
臨時存儲(Transient):一種服務器資源,當服務器重啓時,保存的消息數據會丟失。
持久化(Persistent):服務器將消息保存在可靠磁盤存儲中,當服務器重啓時,消息不會丟失。
非持久化(Non-Persistent):服務器將消息保存在內存中,當服務器重啓時,消息可能丟失。
消費者(Consumer):一個從消息隊列中請求消息的客戶端應用程序。
生產者(Producer):一個向交換器發佈消息的客戶端應用程序。
虛擬主機(Virtual Host):一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。客戶端應用程序在登陸到服務器以後,能夠選擇一個虛擬主機。
下面這些術語在AMQP規範的上下文中沒有特別的意義:
主題:一般指發佈消息;AMQP規範用一種或多種交換器來實現主題。
服務:一般等同於服務器。AMQP規範使用「服務器」這個術語來兼容IETF的標準術語,而且明確了協議中每一個部分的角色(兩方也多是AMQP服務)。
消息代理:等同於服務器。AMQP規範使用術語「客戶端」和「服務器」來兼容IETF的標準術語。
 
 

RabbitMQ libraries

RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open, general-purpose protocol for messaging. There are a number of clients for RabbitMQ in many different languages. In this tutorial series we're going to use Pika 1.0.0, which is the Python client recommended by the RabbitMQ team. 

 

In RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. But let's not get dragged down by the details ‒ you can read more about exchanges in the third part of this tutorial. All we need to know now is how to use a default exchange identified by an empty string. This exchange is special ‒ it allows us to specify exactly to which queue the message should go. The queue name needs to be specified in the routing_key parameter:

channel.basic_publish(exchange='',
                      routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") 

Before exiting the program we need to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ. We can do it by gently closing the connection.

connection.close()

 

消息不能直接發送至隊列,須要經過exchange。

初始默認的exchange ""  Exchange: (AMQP default)  

 

publish/subscrib  發佈訂閱

同一個消息多個消費者場景

https://www.rabbitmq.com/tutorials/tutorial-three-python.html

What This Tutorial Focuses On

In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".

To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.

In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen.

Essentially, published log messages are going to be broadcast to all the receivers.

 

 發佈者不知道隊列queue ,只知道交易所 exchange,exchange決定發至哪一個隊列

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

 Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of that type, and call it logs:

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout') 

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for our logger.

數據多寫  交易所類型  

exchange_type='fanout'

 注意

 

Add binding from this exchange

 交易所exchange能夠綁定交易所或隊列queue

 

建立順序

NOT_FOUND - no queue 'queue_19_ehr_analysis_log' in vhost '/' 

Close

 

並行/發 建立 exchange queue

以後作綁定

 

 

 

Exchange: exchange_19_ehr_analysis_log

This exchange

To Routing key Arguments  
queue_19_ehr_analysis_log routing_key_19_ehr_analysis_log  

 

 

'''
exchange_19_ehr_analysis_log
Details
Type direct
Parameters
durable: true
Policy

注意雙寫/多寫Type能夠改成 fanout

queue_19_ehr_analysis_log 配置
Parameters
durable: true
Policy
Exclusive owner None

路由關係
From Routing key Arguments
(Default exchange binding)
exchange_19_ehr_analysis_log
routing_key_19_ehr_analysis_log


This queue

'''

 

#!/usr/bin/env python
# coding: utf-8
import pika
import time

RABBITMQ_DEV_SERVER_ANALYSI = '6.7.9.65'

username, password = "guest", "guest"
host, port = RABBITMQ_DEV_SERVER_ANALYSI, 5672
virtual_host = None
"""A credentials object for the default authentication methodology with
RabbitMQ.

If you do not pass in credentials to the ConnectionParameters object, it
will create credentials for 'guest' with the password of 'guest'.

If you pass True to erase_on_connect the credentials will not be stored
in memory after the Connection attempt has been made.

:param str username: The username to authenticate with
:param str password: The password to authenticate with
:param bool erase_on_connect: erase credentials on connect.

"""
mq_user = pika.PlainCredentials(username=username, password=password)
"""Connection parameters object that is passed into the connection adapter
upon construction.

:param str host: Hostname or IP Address to connect to
:param int port: TCP port to connect to
:param str virtual_host: RabbitMQ virtual host to use
:param pika.credentials.Credentials credentials: auth credentials
:param int channel_max: Maximum number of channels to allow
:param int frame_max: The maximum byte size for an AMQP frame
:param int heartbeat_interval: How often to send heartbeats
:param bool ssl: Enable SSL
:param dict ssl_options: Arguments passed to ssl.wrap_socket as
:param int connection_attempts: Maximum number of retry attempts
:param int|float retry_delay: Time to wait in seconds, before the next
:param int|float socket_timeout: Use for high latency networks
:param str locale: Set the locale value
:param bool backpressure_detection: Toggle backpressure detection

"""
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=host,
    port=port,
    virtual_host=virtual_host,
    credentials=mq_user
))
channel = connection.channel()

analysi_task = {"queue": 'queue_19_ehr_analysis_log', 'exchange': 'exchange_19_ehr_analysis_log',
                'routing_key': 'routing_key_19_ehr_analysis_log'}  # 日誌接口系統配置
'''
exchange_19_ehr_analysis_log
Details
Type    direct
Parameters    
durable:    true
Policy    

注意雙寫/多寫Type能夠改成 fanout

queue_19_ehr_analysis_log 配置
Parameters    
durable:    true
Policy    
Exclusive owner    None

路由關係
From    Routing key    Arguments    
(Default exchange binding)
exchange_19_ehr_analysis_log
routing_key_19_ehr_analysis_log    
⇓

This queue

'''

exchange, routing_key, queue = analysi_task["exchange"], analysi_task["routing_key"], analysi_task["queue"]
"""Declare queue, create if needed. This method creates or checks a
queue. When creating a new queue the client can specify various
properties that control the durability of the queue and its contents,
and the level of sharing for the queue.

Leave the queue name empty for a auto-named queue in RabbitMQ

:param method callback: The method to call on Queue.DeclareOk
:param queue: The queue name
:type queue: str or unicode
:param bool passive: Only check to see if the queue exists
:param bool durable: Survive reboots of the broker
:param bool exclusive: Only allow access by the current connection
:param bool auto_delete: Delete after consumer cancels or disconnects
:param bool nowait: Do not wait for a Queue.DeclareOk
:param dict arguments: Custom key/value arguments for the queue

"""
msgBody = "HelloWorld{}-UCAN".format(time.ctime())

channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msgBody)
channel.basic_publish(exchange="", routing_key=routing_key, body=msgBody)

'''
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
發佈者不知道隊列queue
'''
connection.close()

if False:
    # https://www.rabbitmq.com/uri-spec.html
    BROKER_URL = 'amqp://guest:guest@%s:5672/' % RABBITMQ_DEV_SERVER_ANALYSI  # 配置 rabbitmq
    params = pika.URLParameters(BROKER_URL)
    params.socket_timeout = 5
    connection = pika.BlockingConnection(params)

    msgBody = "HW----HelloWorld{}-UCAN--".format(time.ctime())
    channel = connection.channel()
    channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msgBody)
    connection.close()

 

 

requeue  yes 不從隊列刪除

相關文章
相關標籤/搜索