AMQP 0-9-1 Model Explained — RabbitMQ http://next.rabbitmq.com/tutorials/amqp-concepts.htmlhtml
AMQP 0-9-1 Model Explained
This guide provides an overview of the the AMQP 0-9-1 protocol, one of the protocols supported by RabbitMQ.node
AMQP 0-9-1 (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.python
Messaging brokers receive messages from publishers (applications that publish them, also known as producers) and route them to consumers (applications that process them).shell
Since it is a network protocol, the publishers, consumers and the broker can all reside on different machines.express
The AMQP 0-9-1 Model has the following view of the world: messages are published toexchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules called bindings. Then AMQP brokers either deliver messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand.編程

publisher發佈者--->messages消息--->exchages郵局,信箱---->queues隊列 ...consumers消費者json
exchange郵局按照必定的規則rules/bindings將消息message複製copy到隊列queue安全
AMQPbrokers將消息發送deliver給訂閱隊列queue的消費者consumerbash
或者服務器
消費者consumer按照須要on demand去拉取fetch/pull消息messages
【對代理不透明部分】
消息的元數據中對代理brocker:透明opaque、不透明部分
When publishing a message, publishers may specify various message attributes (message meta-data). Some of this meta-data may be used by the broker, however, the rest of it is completely opaque to the broker and is only used by applications that receive the message.
【message acknowledgements】
消費者在收到消息後,能夠發送消息確認,也能夠不發送;如發送,則代理broker將消息從隊列中刪除remove
Networks are unreliable and applications may fail to process messages therefore the AMQP model has a notion of message acknowledgements: when a message is delivered to a consumer the consumer notifies the broker, either automatically or as soon as the application developer chooses to do so. When message acknowledgements are in use, a broker will only completely remove a message from a queue when it receives a notification for that message (or group of messages).
【沒有接收路由的狀況dead letter queue】
在消息message不能被路由routed的狀況下,消息被broker投入死信隊列dead letter queue;這種狀況的處理由發佈者publisher決定
In certain situations, for example, when a message cannot be routed, messages may bereturned to publishers, dropped, or, if the broker implements an extension, placed into a so-called "dead letter queue". Publishers choose how to handle situations like this by publishing messages using certain parameters.
Queues, exchanges and bindings are collectively referred to as AMQP entities.
【Queues, exchanges and bindings are collectively referred to as AMQP entities】
AMQP is a Programmable Protocol
AMQP 0-9-1 is a programmable protocol in the sense that AMQP 0-9-1 entities and routing schemes are primarily defined by applications themselves, not a broker administrator. Accordingly, provision is made for protocol operations that declare queues and exchanges, define bindings between them, subscribe to queues and so on.
This gives application developers a lot of freedom but also requires them to be aware of potential definition conflicts. In practice, definition conflicts are rare and often indicate a misconfiguration.
【刪除郵局、隊列的理論依據】
Applications declare the AMQP 0-9-1 entities that they need, define necessary routing schemes and may choose to delete AMQP 0-9-1 entities when they are no longer used.
【協議是國際標準】
Home | AMQP http://www.amqp.org/
Click above for the press release. The International Standard (ISO/IEC 19464) can be down loadedhere.
See this presentation to learn more about AMQP and its value.
Click above for the press release. The Standard can be down loaded here.
See the executive briefing paper on the value proposition of OASIS AMQP to learn more.
AMQP_百度百科 https://baike.baidu.com/item/AMQP/8354716?fr=aladdin
【動手所得】
一、流程啓動server;
二、配置用戶權限;
三、外網/內網發佈、接收消息;
systemctl start rabbitmq-server
/是rabbitmq默認的虛擬機,以前默認鏈接的都是它
建立一個用戶
rabbitmqctl add_user username password
爲用戶分配角色
rabbitmqctl set_user_tags username administrator
#Tags 能夠是:administrator, monitoring, management
設置訪問權限
rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"
【文檔】
http://www.rabbitmq.com/admin-guide.html
http://www.rabbitmq.com/clients.html
http://www.rabbitmq.com/man/rabbitmqctl.8.html http://www.rabbitmq.com/man/rabbitmqctl.8.html
rabbitmqctl — command line for managing a RabbitMQ broker
rabbitmqctl |
[-q] [-l] [-n node] [-t timeout] command [command_options] |
RabbitMQ is a multi-protocol open source messaging broker.
rabbitmqctl is a command line tool for managing a RabbitMQ broker. It performs all actions by connecting to one of the broker's nodes.
Diagnostic information is displayed if the broker was not running, could not be reached, or rejected the connection due to mismatching Erlang cookies.
-
-n
node
-
Default node is 「rabbit@
server」, where
server is the local host. On a host named 「myserver.example.com」, the node name of the RabbitMQ Erlang node will usually be 「rabbit@myserver」 (unless
RABBITMQ_NODENAME
has been set to some non-default value at broker startup time). The output of 「hostname -s」 is usually the correct suffix to use after the 「@」 sign. See
rabbitmq-server(8) for details of configuring the RabbitMQ broker.
-
-q,
--quiet
-
Quiet output mode is selected. Informational messages are suppressed when quiet mode is in effect.
-
--dry-run
-
Do not run the command. Only print information message.
-
-t
timeout,
--timeout
timeout
-
Operation timeout in seconds. Only applicable to 「list」 commands. Default is
infinity.
-
-l,
--longnames
-
Use longnames for erlang distribution. If RabbitMQ broker uses long node names for erlang distribution, the option must be specified.
-
--erlang-cookie
cookie
-
Erlang distribution cookie. If RabbitMQ node is using a custom erlang cookie value, the cookie value must be set vith this parameter.
-
help [
-l] [command_name]
-
Prints usage for all available commands.
-
-l,
--list-commands
-
List command usages only, without parameter explanation.
-
command_name
-
Prints usage for the specified command.
-
force_reset
-
Forcefully returns a RabbitMQ node to its virgin state.
The
force_reset command differs from
reset in that it resets the node unconditionally, regardless of the current management database state and cluster configuration. It should only be used as a last resort if the database or cluster configuration has been corrupted.
For
reset and
force_reset to succeed the RabbitMQ application must have been stopped, e.g. with
stop_app.
For example, to reset the RabbitMQ node:
rabbitmqctl force_reset
-
hipe_compile
directory
-
Performs HiPE-compilation and caches resulting
.beam-files in the given directory.
Parent directories are created if necessary. Any existing
.beam files from the directory are automatically deleted prior to compilation.
To use this precompiled files, you should set
RABBITMQ_SERVER_CODE_PATH
environment variable to directory specified in
hipe_compile invokation.
For example, to HiPE-compile modules and store them to
/tmp/rabbit-hipe/ebin directory:
rabbitmqctl hipe_compile /tmp/rabbit-hipe/ebin
-
reset
-
Returns a RabbitMQ node to its virgin state.
Removes the node from any cluster it belongs to, removes all data from the management database, such as configured users and vhosts, and deletes all persistent messages.
For
reset and
force_reset to succeed the RabbitMQ application must have been stopped, e.g. with
stop_app.
For example, to resets the RabbitMQ node:
rabbitmqctl reset
-
rotate_logs
-
Instructs the RabbitMQ node to perform internal log rotation.
Log rotation is performed according to lager settings specified in configuration file.
Note that there is no need to call this command in case of external log rotation (e.g. from logrotate(8)), because lager detects renames and automatically reopens log files.
For example, this command starts internal log rotation process:
rabbitmqctl rotate_logs
Rotation is performed asynchronously, so there is no guarantee that it will be completed when this command returns.
-
shutdown
-
Shuts down the Erlang process on which RabbitMQ is running. The command is blocking and will return after the Erlang process exits. If RabbitMQ fails to stop, it will return a non-zero exit code.
Unlike the stop command, the shutdown command:
- does not require a pid_file to wait for the Erlang process to exit
- returns a non-zero exit code if RabbitMQ node is not running
For example, to shut down the Erlang process on which RabbitMQ is running:
rabbitmqctl shutdown
-
start_app
-
Starts the RabbitMQ application.
This command is typically run after performing other management actions that required the RabbitMQ application to be stopped, e.g.
reset.
For example, to instruct the RabbitMQ node to start the RabbitMQ application:
rabbitmqctl start_app
-
stop [
pid_file]
-
Stops the Erlang node on which RabbitMQ is running. To restart the node follow the instructions for 「Running the Server」 in the
installation guide.
If a
pid_file is specified, also waits for the process specified there to terminate. See the description of the
wait command for details on this file.
For example, to instruct the RabbitMQ node to terminate:
rabbitmqctl stop
-
stop_app
-
Stops the RabbitMQ application, leaving the Erlang node running.
This command is typically run prior to performing other management actions that require the RabbitMQ application to be stopped, e.g.
reset.
For example, to instruct the RabbitMQ node to stop the RabbitMQ application:
rabbitmqctl stop_app
-
wait
pid_file,
wait
--pid
pid
-
Waits for the RabbitMQ application to start.
This command will wait for the RabbitMQ application to start at the node. It will wait for the pid file to be created if
pidfile is specified, then for a process with a pid specified in the pid file or the
--pid argument, and then for the RabbitMQ application to start in that process. It will fail if the process terminates without starting the RabbitMQ application.
If the specified pidfile is not created or erlang node is not started within
--timeout the command will fail. Default timeout is 10 seconds.
A suitable pid file is created by the
rabbitmq-server(8) script. By default this is located in the Mnesia directory. Modify the
RABBITMQ_PID_FILE
environment variable to change the location.
For example, this command will return when the RabbitMQ node has started up:
rabbitmqctl wait /var/run/rabbitmq/pid
-
join_cluster
clusternode [
--ram]
-
-
clusternode
-
Node to cluster with.
-
--ram
-
If provided, the node will join the cluster as a RAM node.
Instructs the node to become a member of the cluster that the specified node is in. Before clustering, the node is reset, so be careful when using this command. For this command to succeed the RabbitMQ application must have been stopped, e.g. with
stop_app.
Cluster nodes can be of two types: disc or RAM. Disc nodes replicate data in RAM and on disc, thus providing redundancy in the event of node failure and recovery from global events such as power failure across all nodes. RAM nodes replicate data in RAM only (with the exception of queue contents, which can reside on disc if the queue is persistent or too big to fit in memory) and are mainly used for scalability. RAM nodes are more performant only when managing resources (e.g. adding/removing queues, exchanges, or bindings). A cluster must always have at least one disc node, and usually should have more than one.
The node will be a disc node by default. If you wish to create a RAM node, provide the
--ram flag.
After executing the
join_cluster command, whenever the RabbitMQ application is started on the current node it will attempt to connect to the nodes that were in the cluster when the node went down.
To leave a cluster,
reset the node. You can also remove nodes remotely with the
forget_cluster_node command.
For more details see the
Clustering guide.
For example, this command instructs the RabbitMQ node to join the cluster that 「hare@elena」 is part of, as a ram node:
rabbitmqctl join_cluster hare@elena --ram
-
cluster_status
-
Displays all the nodes in the cluster grouped by node type, together with the currently running nodes.
For example, this command displays the nodes in the cluster:
rabbitmqctl cluster_status
-
change_cluster_node_type
type
-
Changes the type of the cluster node.
The
type must be one of the following:
The node must be stopped for this operation to succeed, and when turning a node into a RAM node the node must not be the only disc node in the cluster.
For example, this command will turn a RAM node into a disc node:
rabbitmqctl change_cluster_node_type disc
-
forget_cluster_node [
--offline]
-
-
--offline
-
Enables node removal from an offline node. This is only useful in the situation where all the nodes are offline and the last node to go down cannot be brought online, thus preventing the whole cluster from starting. It should not be used in any other circumstances since it can lead to inconsistencies.
Removes a cluster node remotely. The node that is being removed must be offline, while the node we are removing from must be online, except when using the
--offline flag.
When using the
--offline flag ,
rabbitmqctl will not attempt to connect to a node as normal; instead it will temporarily become the node in order to make the change. This is useful if the node cannot be started normally. In this case the node will become the canonical source for cluster metadata (e.g. which queues exist), even if it was not before. Therefore you should use this command on the latest node to shut down if at all possible.
For example, this command will remove the node 「rabbit@stringer」 from the node 「hare@mcnulty」:
rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer
-
rename_cluster_node
oldnode1
newnode1 [
oldnode2 newnode2 ...]
-
Supports renaming of cluster nodes in the local database.
This subcommand causes
rabbitmqctl to temporarily become the node in order to make the change. The local cluster node must therefore be completely stopped; other nodes can be online or offline.
This subcommand takes an even number of arguments, in pairs representing the old and new names for nodes. You must specify the old and new names for this node and for any other nodes that are stopped and being renamed at the same time.
It is possible to stop all nodes and rename them all simultaneously (in which case old and new names for all nodes must be given to every node) or stop and rename nodes one at a time (in which case each node only needs to be told how its own name is changing).
For example, this command will rename the node 「rabbit@misshelpful」 to the node 「rabbit@cordelia」
rabbitmqctl rename_cluster_node rabbit@misshelpful rabbit@cordelia
-
update_cluster_nodes
clusternode
-
-
clusternode
-
The node to consult for up-to-date information.
Instructs an already clustered node to contact
clusternode to cluster when waking up. This is different from
join_cluster since it does not join any cluster - it checks that the node is already in a cluster with
clusternode.
The need for this command is motivated by the fact that clusters can change while a node is offline. Consider the situation in which node
A and
B are clustered.
A goes down,
C clusters with
B, and then
B leaves the cluster. When
Awakes up, it'll try to contact
B, but this will fail since
B is not in the cluster anymore. The following command will solve this situation:
update_cluster_nodes -n A C
-
force_boot
-
Ensures that the node will start next time, even if it was not the last to shut down.
Normally when you shut down a RabbitMQ cluster altogether, the first node you restart should be the last one to go down, since it may have seen things happen that other nodes did not. But sometimes that's not possible: for instance if the entire cluster loses power then all nodes may think they were not the last to shut down.
In such a case you can invoke
force_boot while the node is down. This will tell the node to unconditionally start next time you ask it to. If any changes happened to the cluster after this node shut down, they will be lost.
If the last node to go down is permanently lost then you should use
forget_cluster_node
--offline in preference to this command, as it will ensure that mirrored queues which were mastered on the lost node get promoted.
For example, this will force the node not to wait for other nodes next time it is started:
rabbitmqctl force_boot
-
sync_queue [
-p vhost] queue
-
-
queue
-
The name of the queue to synchronise.
Instructs a mirrored queue with unsynchronised slaves to synchronise itself. The queue will block while synchronisation takes place (all publishers to and consumers from the queue will block). The queue must be mirrored for this command to succeed.
Note that unsynchronised queues from which messages are being drained will become synchronised eventually. This command is primarily useful for queues which are not being drained.
-
cancel_sync_queue [
-p vhost] queue
-
-
queue
-
The name of the queue to cancel synchronisation for.
Instructs a synchronising mirrored queue to stop synchronising itself.
-
purge_queue [
-p vhost] queue
-
-
queue
-
The name of the queue to purge.
Purges a queue (removes all messages in it).
-
set_cluster_name
name
-
Sets the cluster name to
name. The cluster name is announced to clients on connection, and used by the federation and shovel plugins to record where a message has been. The cluster name is by default derived from the hostname of the first node in the cluster, but can be changed.
For example, this sets the cluster name to 「london」:
rabbitmqctl set_cluster_name london
Note that rabbitmqctl manages the RabbitMQ internal user database. Users from any alternative authentication backend will not be visible to rabbitmqctl.
-
add_user
username
password
-
-
username
-
The name of the user to create.
-
password
-
The password the created user will use to log in to the broker.
For example, this command instructs the RabbitMQ broker to create a (non-administrative) user named 「tonyg」 with (initial) password 「changeit」:
rabbitmqctl add_user tonyg changeit
-
delete_user
username
-
-
username
-
The name of the user to delete.
For example, this command instructs the RabbitMQ broker to delete the user named 「tonyg」:
rabbitmqctl delete_user tonyg
-
change_password
username
newpassword
-
-
username
-
The name of the user whose password is to be changed.
-
newpassword
-
The new password for the user.
For example, this command instructs the RabbitMQ broker to change the password for the user named 「tonyg」 to 「newpass」:
rabbitmqctl change_password tonyg newpass
-
clear_password
username
-
-
username
-
The name of the user whose password is to be cleared.
For example, this command instructs the RabbitMQ broker to clear the password for the user named 「tonyg」:
rabbitmqctl clear_password tonyg
This user now cannot log in with a password (but may be able to through e.g. SASL EXTERNAL if configured).
-
authenticate_user
username
password
-
-
username
-
The name of the user.
-
password
-
The password of the user.
For example, this command instructs the RabbitMQ broker to authenticate the user named 「tonyg」 with password 「verifyit」:
rabbitmqctl authenticate_user tonyg verifyit
-
set_user_tags
username [
tag ...]
-
-
username
-
The name of the user whose tags are to be set.
-
tag
-
Zero, one or more tags to set. Any existing tags will be removed.
For example, this command instructs the RabbitMQ broker to ensure the user named 「tonyg」 is an administrator:
rabbitmqctl set_user_tags tonyg administrator
This has no effect when the user logs in via AMQP, but can be used to permit the user to manage users, virtual hosts and permissions when the user logs in via some other means (for example with the management plugin).
This command instructs the RabbitMQ broker to remove any tags from the user named 「tonyg」:
rabbitmqctl set_user_tags tonyg
-
list_users
-
Lists users. Each result row will contain the user name followed by a list of the tags set for that user.
For example, this command instructs the RabbitMQ broker to list all users:
rabbitmqctl list_users
Note that rabbitmqctl manages the RabbitMQ internal user database. Permissions for users from any alternative authorisation backend will not be visible to rabbitmqctl.
-
add_vhost
vhost
-
-
vhost
-
The name of the virtual host entry to create.
Creates a virtual host.
For example, this command instructs the RabbitMQ broker to create a new virtual host called 「test」:
rabbitmqctl add_vhost test
-
delete_vhost
vhost
-
-
vhost
-
The name of the virtual host entry to delete.
Deletes a virtual host.
Deleting a virtual host deletes all its exchanges, queues, bindings, user permissions, parameters and policies.
For example, this command instructs the RabbitMQ broker to delete the virtual host called 「test」:
rabbitmqctl delete_vhost test
-
list_vhosts [
vhostinfoitem ...]
-
Lists virtual hosts.
The
vhostinfoitem parameter is used to indicate which virtual host information items to include in the results. The column order in the results will match the order of the parameters.
vhostinfoitem can take any value from the list that follows:
-
name
-
The name of the virtual host with non-ASCII characters escaped as in C.
-
tracing
-
Whether tracing is enabled for this virtual host.
If no
vhostinfoitem are specified then the vhost name is displayed.
For example, this command instructs the RabbitMQ broker to list all virtual hosts:
rabbitmqctl list_vhosts name tracing
-
set_permissions [
-p vhost] user conf write read
-
-
vhost
-
The name of the virtual host to which to grant the user access, defaulting to 「/」.
-
user
-
The name of the user to grant access to the specified virtual host.
-
conf
-
A regular expression matching resource names for which the user is granted configure permissions.
-
write
-
A regular expression matching resource names for which the user is granted write permissions.
-
read
-
A regular expression matching resource names for which the user is granted read permissions.
Sets user permissions.
For example, this command instructs the RabbitMQ broker to grant the user named 「tonyg」 access to the virtual host called 「/myvhost」, with configure permissions on all resources whose names starts with 「tonyg-」, and write and read permissions on all resources:
rabbitmqctl set_permissions -p /myvhost tonyg 「^tonyg-.*」 「.*」 「.*」
-
clear_permissions [
-p vhost] username
-
-
vhost
-
The name of the virtual host to which to deny the user access, defaulting to 「/」.
-
username
-
The name of the user to deny access to the specified virtual host.
Sets user permissions.
For example, this command instructs the RabbitMQ broker to deny the user named 「tonyg」 access to the virtual host called 「/myvhost」:
rabbitmqctl clear_permissions -p /myvhost tonyg
-
list_permissions [
-p vhost]
-
-
vhost
-
The name of the virtual host for which to list the users that have been granted access to it, and their permissions. Defaults to 「/」.
Lists permissions in a virtual host.
For example, this command instructs the RabbitMQ broker to list all the users which have been granted access to the virtual host called 「/myvhost」, and the permissions they have for operations on resources in that virtual host. Note that an empty string means no permissions granted:
rabbitmqctl list_permissions -p /myvhost
-
list_user_permissions
username
-
-
username
-
The name of the user for which to list the permissions.
Lists user permissions.
For example, this command instructs the RabbitMQ broker to list all the virtual hosts to which the user named 「tonyg」 has been granted access, and the permissions the user has for operations on resources in these virtual hosts:
rabbitmqctl list_user_permissions tonyg
-
set_topic_permissions [
-p vhost] user exchange write read
-
-
vhost
-
The name of the virtual host to which to grant the user access, defaulting to 「/」.
-
user
-
The name of the user the permissions apply to in the target virtual host.
-
exchange
-
The name of the topic exchange the authorisation check will be applied to.
-
write
-
A regular expression matching the routing key of the published message.
-
read
-
A regular expression matching the routing key of the consumed message.
Sets user topic permissions.
For example, this command instructs the RabbitMQ broker to let the user named 「tonyg」 publish and consume messages going through the 「amp.topic」 exchange of the 「/myvhost」 virtual host with a routing key starting with 「tonyg-」:
rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic 「^tonyg-.*」 「^tonyg-.*」
Topic permissions support variable expansion for the following variables: username, vhost, and client_id. Note that client_id is expanded only when using MQTT. The previous example could be made more generic by using 「^{username}-.*」:
rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic 「^{username}-.*」 「^{username}-.*」
-
clear_topic_permissions [
-p vhost] username [exchange]
-
-
vhost
-
The name of the virtual host to which to clear the topic permissions, defaulting to 「/」.
-
username
-
The name of the user to clear topic permissions to the specified virtual host.
-
exchange
-
The name of the topic exchange to clear topic permissions, defaulting to all the topic exchanges the given user has topic permissions for.
Clear user topic permissions.
For example, this command instructs the RabbitMQ broker to remove topic permissions for user named 「tonyg」 for the topic exchange 「amq.topic」 in the virtual host called 「/myvhost」:
rabbitmqctl clear_topic_permissions -p /myvhost tonyg amq.topic
-
list_topic_permissions [
-p vhost]
-
-
vhost
-
The name of the virtual host for which to list the users topic permissions. Defaults to 「/」.
Lists topic permissions in a virtual host.
For example, this command instructs the RabbitMQ broker to list all the users which have been granted topic permissions in the virtual host called 「/myvhost:」
rabbitmqctl list_topic_permissions -p /myvhost
-
list_user_topic_permissions
username
-
-
username
-
The name of the user for which to list the topic permissions.
Lists user topic permissions.
For example, this command instructs the RabbitMQ broker to list all the virtual hosts to which the user named 「tonyg」 has been granted access, and the topic permissions the user has in these virtual hosts:
rabbitmqctl list_topic_user_permissions tonyg
Certain features of RabbitMQ (such as the federation plugin) are controlled by dynamic, cluster-wide parameters. There are 2 kinds of parameters: parameters scoped to a virtual host and global parameters. Each vhost-scoped parameter consists of a component name, a name and a value. The component name and name are strings, and the value is an Erlang term. A global parameter consists of a name and value. The name is a string and the value is an Erlang term. Parameters can be set, cleared and listed. In general you should refer to the documentation for the feature in question to see how to set parameters.
-
set_parameter [
-p vhost] component_name name value
-
Sets a parameter.
-
component_name
-
The name of the component for which the parameter is being set.
-
name
-
The name of the parameter being set.
-
value
-
The value for the parameter, as a JSON term. In most shells you are very likely to need to quote this.
For example, this command sets the parameter 「node01」 for the 「federation-upstream」 component in the default virtual host to following JSON:
rabbitmqctl set_parameter federation-upstream node01 '{"uri":"amqp://user:password@server/%2F","ack-mode":"on-publish"}'
-
clear_parameter [
-p vhost] component_name key
-
Clears a parameter.
-
component_name
-
The name of the component for which the parameter is being cleared.
-
name
-
The name of the parameter being cleared.
For example, this command clears the parameter 「node01」 for the 「federation-upstream」 component in the default virtual host:
rabbitmqctl clear_parameter federation-upstream node01
-
list_parameters [
-p vhost]
-
Lists all parameters for a virtual host.
For example, this command lists all parameters in the default virtual host:
rabbitmqctl list_parameters
-
set_global_parameter
name
value
-
Sets a global runtime parameter. This is similar to
set_parameter but the key-value pair isn't tied to a virtual host.
-
name
-
The name of the global runtime parameter being set.
-
value
-
The value for the global runtime parameter, as a JSON term. In most shells you are very likely to need to quote this.
For example, this command sets the global runtime parameter 「mqtt_default_vhosts」 to the JSON term {"O=client,CN=guest":"/"}:
rabbitmqctl set_global_parameter mqtt_default_vhosts '{"O=client,CN=guest":"/"}'
-
clear_global_parameter
name
-
Clears a global runtime parameter. This is similar to
clear_parameter but the key-value pair isn't tied to a virtual host.
-
name
-
The name of the global runtime parameter being cleared.
For example, this command clears the global runtime parameter 「mqtt_default_vhosts」:
rabbitmqctl clear_global_parameter mqtt_default_vhosts
-
list_global_parameters
-
Lists all global runtime parameters. This is similar to
list_parameters but the global runtime parameters are not tied to any virtual host.
For example, this command lists all global parameters:
rabbitmqctl list_global_parameters
Policies are used to control and modify the behaviour of queues and exchanges on a cluster-wide basis. Policies apply within a given vhost, and consist of a name, pattern, definition and an optional priority. Policies can be set, cleared and listed.
-
set_policy [
-p vhost] [--priority priority] [--apply-to apply-to] name pattern definition
-
Sets a policy.
-
name
-
The name of the policy.
-
pattern
-
The regular expression, which when matches on a given resources causes the policy to apply.
-
definition
-
The definition of the policy, as a JSON term. In most shells you are very likely to need to quote this.
-
priority
-
The priority of the policy as an integer. Higher numbers indicate greater precedence. The default is 0.
-
apply-to
-
Which types of object this policy should apply to. Possible values are:
The default is
all ..
For example, this command sets the policy 「federate-me」 in the default virtual host so that built-in exchanges are federated:
rabbitmqctl set_policy federate-me ^amq. '{"federation-upstream-set":"all"}'
-
clear_policy [
-p vhost] name
-
Clears a policy.
-
name
-
The name of the policy being cleared.
For example, this command clears the 「federate-me」 policy in the default virtual host:
rabbitmqctl clear_policy federate-me
-
list_policies [
-p vhost]
-
Lists all policies for a virtual host.
For example, this command lists all policies in the default virtual host:
rabbitmqctl list_policies
-
set_operator_policy [
-p vhost] [--priority priority] [--apply-to apply-to] name pattern definition
-
Sets an operator policy that overrides a subset of arguments in user policies. Arguments are identical to those of
set_policy.
Supported arguments are:
- expires
- message-ttl
- max-length
- max-length-bytes
-
clear_operator_policy [
-p vhost] name
-
Clears an operator policy. Arguments are identical to those of
clear_policy.
-
list_operator_policies [
-p vhost]
-
Lists operator policy overrides for a virtual host. Arguments are identical to those of
list_policies.
It is possible to enforce certain limits on virtual hosts.
-
set_vhost_limits [
-p vhost] definition
-
Sets virtual host limits.
-
definition
-
The definition of the limits, as a JSON term. In most shells you are very likely to need to quote this.
Recognised limits are:
- max-connections
- max-queues
Use a negative value to specify "no limit".
For example, this command limits the max number of concurrent connections in vhost 「qa_env」 to 64:
rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": 64}'
This command limits the max number of queues in vhost 「qa_env」 to 256:
rabbitmqctl set_vhost_limits -p qa_env '{"max-queues": 256}'
This command clears the max number of connections limit in vhost 「qa_env」:
rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": -1}'
This command disables client connections in vhost 「qa_env」:
rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": 0}'
-
clear_vhost_limits [
-p vhost]
-
Clears virtual host limits.
For example, this command clears vhost limits in vhost 「qa_env」:
rabbitmqctl clear_vhost_limits -p qa_env
-
list_vhost_limits [
-p vhost] [--global]
-
Displays configured virtual host limits.
-
--global
-
Show limits for all vhosts. Suppresses the
-p parameter.
The server status queries interrogate the server and return a list of results with tab-delimited columns. Some queries ( list_queues, list_exchanges, list_bindings and list_consumers) accept an optional vhost parameter. This parameter, if present, must be specified immediately after the query.
The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is 「/」.
-
list_queues [
-p vhost] [--offline | --online | --local] [queueinfoitem ...]
-
Returns queue details. Queue details of the 「/」 virtual host are returned if the
-p flag is absent. The
-p flag can be used to override this default.
Displayed queues can be filtered by their status or location using one of the following mutually exclusive options:
-
--offline
-
List only those durable queues that are not currently available (more specifically, their master node isn't).
-
--online
-
List queues that are currently available (their master node is).
-
--local
-
List only those queues whose master process is located on the current node.
The
queueinfoitem parameter is used to indicate which queue information items to include in the results. The column order in the results will match the order of the parameters.
queueinfoitem can take any value from the list that follows:
-
name
-
The name of the queue with non-ASCII characters escaped as in C.
-
durable
-
Whether or not the queue survives server restarts.
-
auto_delete
-
Whether the queue will be deleted automatically when no longer used.
-
arguments
-
Queue arguments.
-
policy
-
Policy name applying to the queue.
-
pid
-
Id of the Erlang process associated with the queue.
-
owner_pid
-
Id of the Erlang process representing the connection which is the exclusive owner of the queue. Empty if the queue is non-exclusive.
-
exclusive
-
True if queue is exclusive (i.e. has owner_pid), false otherwise.
-
exclusive_consumer_pid
-
Id of the Erlang process representing the channel of the exclusive consumer subscribed to this queue. Empty if there is no exclusive consumer.
-
exclusive_consumer_tag
-
Consumer tag of the exclusive consumer subscribed to this queue. Empty if there is no exclusive consumer.
-
messages_ready
-
Number of messages ready to be delivered to clients.
-
messages_unacknowledged
-
Number of messages delivered to clients but not yet acknowledged.
-
messages
-
Sum of ready and unacknowledged messages (queue depth).
-
messages_ready_ram
-
Number of messages from messages_ready which are resident in ram.
-
messages_unacknowledged_ram
-
Number of messages from messages_unacknowledged which are resident in ram.
-
messages_ram
-
Total number of messages which are resident in ram.
-
messages_persistent
-
Total number of persistent messages in the queue (will always be 0 for transient queues).
-
message_bytes
-
Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.
-
message_bytes_ready
-
Like
message_bytes but counting only those messages ready to be delivered to clients.
-
message_bytes_unacknowledged
-
Like
message_bytes but counting only those messages delivered to clients but not yet acknowledged.
-
message_bytes_ram
-
Like
message_bytes but counting only those messages which are in RAM.
-
message_bytes_persistent
-
Like
message_bytes but counting only those messages which are persistent.
-
head_message_timestamp
-
The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.
-
disk_reads
-
Total number of times messages have been read from disk by this queue since it started.
-
disk_writes
-
Total number of times messages have been written to disk by this queue since it started.
-
consumers
-
Number of consumers.
-
consumer_utilisation
-
Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.
-
memory
-
Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.
-
slave_pids
-
If the queue is mirrored, this gives the IDs of the current slaves.
-
synchronised_slave_pids
-
If the queue is mirrored, this gives the IDs of the current slaves which are synchronised with the master - i.e. those which could take over from the master without message loss.
-
state
-
The state of the queue. Normally 「running」, but may be 「{syncing,
message_count}」 if the queue is synchronising.
Queues which are located on cluster nodes that are currently down will be shown with a status of 「down」 (and most other
queueinfoitem will be unavailable).
If no
queueinfoitem are specified then queue name and depth are displayed.
For example, this command displays the depth and number of consumers for each queue of the virtual host named 「/myvhost」
rabbitmqctl list_queues -p /myvhost messages consumers
-
list_exchanges [
-p vhost] [exchangeinfoitem ...]
-
Returns exchange details. Exchange details of the 「/」 virtual host are returned if the
-p flag is absent. The
-p flag can be used to override this default.
The
exchangeinfoitem parameter is used to indicate which exchange information items to include in the results. The column order in the results will match the order of the parameters.
exchangeinfoitem can take any value from the list that follows:
-
name
-
The name of the exchange with non-ASCII characters escaped as in C.
-
type
-
The exchange type, such as:
- direct
- topic
- headers
- fanout
-
durable
-
Whether or not the exchange survives server restarts.
-
auto_delete
-
Whether the exchange will be deleted automatically when no longer used.
-
internal
-
Whether the exchange is internal, i.e. cannot be directly published to by a client.
-
arguments
-
Exchange arguments.
-
policy
-
Policy name for applying to the exchange.
If no
exchangeinfoitem are specified then exchange name and type are displayed.
For example, this command displays the name and type for each exchange of the virtual host named 「/myvhost」:
rabbitmqctl list_exchanges -p /myvhost name type
-
list_bindings [
-p vhost] [bindinginfoitem ...]
-
Returns binding details. By default the bindings for the 「/」 virtual host are returned. The
-p flag can be used to override this default.
The
bindinginfoitem parameter is used to indicate which binding information items to include in the results. The column order in the results will match the order of the parameters.
bindinginfoitem can take any value from the list that follows:
-
source_name
-
The name of the source of messages to which the binding is attached. With non-ASCII characters escaped as in C.
-
source_kind
-
The kind of the source of messages to which the binding is attached. Currently always exchange. With non-ASCII characters escaped as in C.
-
destination_name
-
The name of the destination of messages to which the binding is attached. With non-ASCII characters escaped as in C.
-
destination_kind
-
The kind of the destination of messages to which the binding is attached. With non-ASCII characters escaped as in C.
-
routing_key
-
The binding's routing key, with non-ASCII characters escaped as in C.
-
arguments
-
The binding's arguments.
If no
bindinginfoitem are specified then all above items are displayed.
For example, this command displays the exchange name and queue name of the bindings in the virtual host named 「/myvhost」
rabbitmqctl list_bindings -p /myvhost exchange_name queue_name
-
list_connections [
connectioninfoitem ...]
-
Returns TCP/IP connection statistics.
The
connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters.
connectioninfoitem can take any value from the list that follows:
-
pid
-
Id of the Erlang process associated with the connection.
-
name
-
Readable name for the connection.
-
port
-
Server port.
-
host
-
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.
-
peer_port
-
Peer port.
-
peer_host
-
Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
-
ssl
-
Boolean indicating whether the connection is secured with SSL.
-
ssl_protocol
-
SSL protocol (e.g. 「tlsv1」).
-
ssl_key_exchange
-
SSL key exchange algorithm (e.g. 「rsa」).
-
ssl_cipher
-
SSL cipher algorithm (e.g. 「aes_256_cbc」).
-
ssl_hash
-
SSL hash function (e.g. 「sha」).
-
peer_cert_subject
-
The subject of the peer's SSL certificate, in RFC4514 form.
-
peer_cert_issuer
-
The issuer of the peer's SSL certificate, in RFC4514 form.
-
peer_cert_validity
-
The period for which the peer's SSL certificate is valid.
-
state
-
Connection state; one of:
- starting
- tuning
- opening
- running
- flow
- blocking
- blocked
- closing
- closed
-
channels
-
Number of channels using the connection.
-
protocol
-
Version of the AMQP protocol in use; currently one of:
Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.
-
auth_mechanism
-
SASL authentication mechanism used, such as 「PLAIN」.
-
user
-
Username associated with the connection.
-
vhost
-
Virtual host name with non-ASCII characters escaped as in C.
-
timeout
-
Connection timeout / negotiated heartbeat interval, in seconds.
-
frame_max
-
Maximum frame size (bytes).
-
channel_max
-
Maximum number of channels on this connection.
-
client_properties
-
Informational properties transmitted by the client during connection establishment.
-
recv_oct
-
Octets received.
-
recv_cnt
-
Packets received.
-
send_oct
-
Octets send.
-
send_cnt
-
Packets sent.
-
send_pend
-
Send queue size.
-
connected_at
-
Date and time this connection was established, as timestamp.
If no
connectioninfoitem are specified then user, peer host, peer port, time since flow control and memory block state are displayed.
For example, this command displays the send queue size and server port for each connection:
rabbitmqctl list_connections send_pend port
-
list_channels [
channelinfoitem ...]
-
Returns information on all current channels, the logical containers executing most AMQP commands. This includes channels that are part of ordinary AMQP connections, and channels created by various plug-ins and other extensions.
The
channelinfoitem parameter is used to indicate which channel information items to include in the results. The column order in the results will match the order of the parameters.
channelinfoitem can take any value from the list that follows:
-
pid
-
Id of the Erlang process associated with the connection.
-
connection
-
Id of the Erlang process associated with the connection to which the channel belongs.
-
name
-
Readable name for the channel.
-
number
-
The number of the channel, which uniquely identifies it within a connection.
-
user
-
Username associated with the channel.
-
vhost
-
Virtual host in which the channel operates.
-
transactional
-
True if the channel is in transactional mode, false otherwise.
-
confirm
-
True if the channel is in confirm mode, false otherwise.
-
consumer_count
-
Number of logical AMQP consumers retrieving messages via the channel.
-
messages_unacknowledged
-
Number of messages delivered via this channel but not yet acknowledged.
-
messages_uncommitted
-
Number of messages received in an as yet uncommitted transaction.
-
acks_uncommitted
-
Number of acknowledgements received in an as yet uncommitted transaction.
-
messages_unconfirmed
-
Number of published messages not yet confirmed. On channels not in confirm mode, this remains 0.
-
prefetch_count
-
QoS prefetch limit for new consumers, 0 if unlimited.
-
global_prefetch_count
-
QoS prefetch limit for the entire channel, 0 if unlimited.
If no
channelinfoitem are specified then pid, user, consumer_count, and messages_unacknowledged are assumed.
For example, this command displays the connection process and count of unacknowledged messages for each channel:
rabbitmqctl list_channels connection messages_unacknowledged
-
list_consumers [
-p vhost]
-
Lists consumers, i.e. subscriptions to a queue´s message stream. Each line printed shows, separated by tab characters, the name of the queue subscribed to, the id of the channel process via which the subscription was created and is managed, the consumer tag which uniquely identifies the subscription within a channel, a boolean indicating whether acknowledgements are expected for messages delivered to this consumer, an integer indicating the prefetch limit (with 0 meaning 「none」), and any arguments for this consumer.
-
status
-
Displays broker status information such as the running applications on the current Erlang node, RabbitMQ and Erlang versions, OS name, memory and file descriptor statistics. (See the
cluster_status command to find out which nodes are clustered and running.)
For example, this command displays information about the RabbitMQ broker:
rabbitmqctl status
-
node_health_check
-
Health check of the RabbitMQ node. Verifies the rabbit application is running, list_queues and list_channels return, and alarms are not set.
For example, this command performs a health check on the RabbitMQ node:
rabbitmqctl node_health_check -n rabbit@stringer
-
environment
-
Displays the name and value of each variable in the application environment for each running application.
-
report
-
Generate a server status report containing a concatenation of all server status information for support purposes. The output should be redirected to a file when accompanying a support request.
For example, this command creates a server report which may be attached to a support request email:
rabbitmqctl report > server_report.txt
-
eval
expr
-
Evaluate an arbitrary Erlang expression.
For example, this command returns the name of the node to which
rabbitmqctl has connected:
rabbitmqctl eval 「node().」
-
close_connection
connectionpid
explanation
-
-
connectionpid
-
Id of the Erlang process associated with the connection to close.
-
explanation
-
Explanation string.
Instructs the broker to close the connection associated with the Erlang process id
connectionpid (see also the
list_connections command), passing the
explanation string to the connected client as part of the AMQP connection shutdown protocol.
For example, this command instructs the RabbitMQ broker to close the connection associated with the Erlang process id 「<rabbit@tanto.4262.0>」, passing the explanation 「go away」 to the connected client:
rabbitmqctl close_connection 「<rabbit@tanto.4262.0>」 「go away」
-
close_all_connections [
-p vhost] [--global] [--per-connection-delay delay] [--limit limit] explanation
-
-
-p
vhost
-
The name of the virtual host for which connections should be closed. Ignored when
--global is specified.
-
--global
-
If connections should be close for all vhosts. Overrides
-p
-
--per-connection-delay
delay
-
Time in milliseconds to wait after each connection closing.
-
--limit
limit
-
Number of connection to close. Only works per vhost. Ignored when
--global is specified.
-
explanation
-
Explanation string.
Instructs the broker to close all connections for the specified vhost or entire RabbitMQ node.
For example, this command instructs the RabbitMQ broker to close 10 connections on 「qa_env」 vhost, passing the explanation 「Please close」:
rabbitmqctl close_all_connections -p qa_env --limit 10 'Please close'
This command instructs broker to close all connections to the node:
rabbitmqctl close_all_connections --global
-
trace_on [
-p vhost]
-
-
vhost
-
The name of the virtual host for which to start tracing.
Starts tracing. Note that the trace state is not persistent; it will revert to being off if the server is restarted.
-
trace_off [
-p vhost]
-
-
vhost
-
The name of the virtual host for which to stop tracing.
Stops tracing.
-
set_vm_memory_high_watermark
fraction
-
-
fraction
-
The new memory threshold fraction at which flow control is triggered, as a floating point number greater than or equal to 0.
-
set_vm_memory_high_watermark absolute
memory_limit
-
-
memory_limit
-
The new memory limit at which flow control is triggered, expressed in bytes as an integer number greater than or equal to 0 or as a string with memory units (e.g. 512M or 1G). Available units are:
-
k,
kiB
-
kibibytes (2^10 bytes)
-
M,
MiB
-
mebibytes (2^20 bytes)
-
G,
GiB
-
gibibytes (2^30 bytes)
-
kB
-
kilobytes (10^3 bytes)
-
MB
-
megabytes (10^6 bytes)
-
GB
-
gigabytes (10^9 bytes)
-
set_disk_free_limit
disk_limit
-
-
disk_limit
-
Lower bound limit as an integer in bytes or a string with memory units (see vm_memory_high_watermark), e.g. 512M or 1G. Once free disk space reaches the limit, a disk alarm will be set.
-
set_disk_free_limit mem_relative
fraction
-
-
fraction
-
Limit relative to the total amount available RAM as a non-negative floating point number. Values lower than 1.0 can be dangerous and should be used carefully.
-
encode
value
passphrase [
--cipher cipher] [--hash hash] [--iterations iterations]
-
-
value
passphrase
-
Value to encrypt and passphrase.
For example:
rabbitmqctl encode '<<"guest">>' mypassphrase
-
--cipher
cipher
--hash
hash
--iterations
iterations
-
Options to specify the encryption settings. They can be used independently.
For example:
rabbitmqctl encode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '<<"guest">>' mypassphrase
-
decode
value
passphrase [
--cipher cipher] [--hash hash] [--iterations iterations]
-
-
value
passphrase
-
Value to decrypt (as produced by the encode command) and passphrase.
For example:
rabbitmqctl decode '{encrypted, <<"...">>}' mypassphrase
-
--cipher
cipher
--hash
hash
--iterations
iterations
-
Options to specify the decryption settings. They can be used independently.
For example:
rabbitmqctl decode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '{encrypted,<<"...">>} mypassphrase
-
list_hashes
-
Lists hash functions supported by encoding commands.
For example, this command instructs the RabbitMQ broker to list all hash functions supported by encoding commands:
rabbitmqctl list_hashes
-
list_ciphers
-
Lists cipher suites supported by encoding commands.
For example, this command instructs the RabbitMQ broker to list all cipher suites supported by encoding commands:
rabbitmqctl list_ciphers
RabbitMQ plugins can extend rabbitmqctl tool to add new commands when enabled. Currently available commands can be found in rabbitmqctl help output. Following commands are added by RabbitMQ plugins, available in default distribution:
-
shovel_status
-
Prints a list of configured shovels
-
delete_shovel [
-p vhost] name
-
Instructs the RabbitMQ node to delete the configured shovel by
name.
-
federation_status [
--only-down]
-
Prints a list of federation links.
-
--only-down
-
Only list federation links which are not running.
-
restart_federation_link
link_id
-
Instructs the RabbitMQ node to restart the federation link with specified
link_id.
-
list_amqp10_connections [
amqp10_connectioninfoitem ...]
-
Similar to the
list_connections command, but returns fields which make sense for AMQP-1.0 connections.
amqp10_connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters.
amqp10_connectioninfoitem can take any value from the list that follows:
-
pid
-
Id of the Erlang process associated with the connection.
-
auth_mechanism
-
SASL authentication mechanism used, such as 「PLAIN」.
-
host
-
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.
-
frame_max
-
Maximum frame size (bytes).
-
timeout
-
Connection timeout / negotiated heartbeat interval, in seconds.
-
user
-
Username associated with the connection.
-
state
-
Connection state; one of:
- starting
- waiting_amqp0100
- securing
- running
- blocking
- blocked
- closing
- closed
-
recv_oct
-
Octets received.
-
recv_cnt
-
Packets received.
-
send_oct
-
Octets send.
-
send_cnt
-
Packets sent.
-
ssl
-
Boolean indicating whether the connection is secured with SSL.
-
ssl_protocol
-
SSL protocol (e.g. 「tlsv1」).
-
ssl_key_exchange
-
SSL key exchange algorithm (e.g. 「rsa」).
-
ssl_cipher
-
SSL cipher algorithm (e.g. 「aes_256_cbc」).
-
ssl_hash
-
SSL hash function (e.g. 「sha」).
-
peer_cert_subject
-
The subject of the peer's SSL certificate, in RFC4514 form.
-
peer_cert_issuer
-
The issuer of the peer's SSL certificate, in RFC4514 form.
-
peer_cert_validity
-
The period for which the peer's SSL certificate is valid.
-
node
-
The node name of the RabbitMQ node to which connection is established.
-
list_mqtt_connections [
mqtt_connectioninfoitem]
-
Similar to the
list_connections command, but returns fields which make sense for MQTT connections.
mqtt_connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters.
mqtt_connectioninfoitem can take any value from the list that follows:
-
host
-
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.
-
port
-
Server port.
-
peer_host
-
Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
-
peer_port
-
Peer port.
-
protocol
-
MQTT protocol version, which can be on of the following:
- {'MQTT', N/A}
- {'MQTT', 3.1.0}
- {'MQTT', 3.1.1}
-
channels
-
Number of channels using the connection.
-
channel_max
-
Maximum number of channels on this connection.
-
frame_max
-
Maximum frame size (bytes).
-
client_properties
-
Informational properties transmitted by the client during connection establishment.
-
ssl
-
Boolean indicating whether the connection is secured with SSL.
-
ssl_protocol
-
SSL protocol (e.g. 「tlsv1」).
-
ssl_key_exchange
-
SSL key exchange algorithm (e.g. 「rsa」).
-
ssl_cipher
-
SSL cipher algorithm (e.g. 「aes_256_cbc」).
-
ssl_hash
-
SSL hash function (e.g. 「sha」).
-
conn_name
-
Readable name for the connection.
-
connection_state
-
Connection state; one of:
-
connection
-
Id of the Erlang process associated with the internal amqp direct connection.
-
consumer_tags
-
A tuple of consumer tags for QOS0 and QOS1.
-
message_id
-
The last Packet ID sent in a control message.
-
client_id
-
MQTT client identifier for the connection.
-
clean_sess
-
MQTT clean session flag.
-
will_msg
-
MQTT Will message sent in CONNECT frame.
-
exchange
-
Exchange to route MQTT messages configured in rabbitmq_mqtt application environment.
-
ssl_login_name
-
SSL peer cert auth name
-
retainer_pid
-
Id of the Erlang process associated with retain storage for the connection.
-
user
-
Username associated with the connection.
-
vhost
-
Virtual host name with non-ASCII characters escaped as in C.
-
list_stomp_connections [
stomp_connectioninfoitem]
-
Similar to the
list_connections command, but returns fields which make sense for STOMP connections.
stomp_connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters.
stomp_connectioninfoitem can take any value from the list that follows:
-
conn_name
-
Readable name for the connection.
-
connection
-
Id of the Erlang process associated with the internal amqp direct connection.
-
connection_state
-
Connection state; one of:
-
session_id
-
STOMP protocol session identifier
-
channel
-
AMQP channel associated with the connection
-
version
-
Negotiated STOMP protocol version for the connection.
-
implicit_connect
-
Indicates if the connection was established using implicit connect (without CONNECT frame)
-
auth_login
-
Effective username for the connection.
-
auth_mechanism
-
STOMP authorization mechanism. Can be one of:
-
port
-
Server port.
-
host
-
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
-
peer_port
-
Peer port.
-
peer_host
-
Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
-
protocol
-
STOMP protocol version, which can be on of the following:
- {'STOMP', 0}
- {'STOMP', 1}
- {'STOMP', 2}
-
channels
-
Number of channels using the connection.
-
channel_max
-
Maximum number of channels on this connection.
-
frame_max
-
Maximum frame size (bytes).
-
client_properties
-
Informational properties transmitted by the client during connection
-
ssl
-
Boolean indicating whether the connection is secured with SSL.
-
ssl_protocol
-
SSL protocol (e.g. 「tlsv1」).
-
ssl_key_exchange
-
SSL key exchange algorithm (e.g. 「rsa」).
-
ssl_cipher
-
SSL cipher algorithm (e.g. 「aes_256_cbc」).
-
ssl_hash
-
SSL hash function (e.g. 「sha」).
-
reset_stats_db [
--all]
-
Reset management stats database for the RabbitMQ node.
-
--all
-
Reset stats database for all nodes in the cluster.
rabbitmq-env.conf(5), rabbitmq-echopid(8), rabbitmq-plugins(8), rabbitmq-server(8), rabbitmq-service(8)
The RabbitMQ Team <info@rabbitmq.com>
【參考】
rabbitmq 啓動報錯 - 糉先生 - 博客園 https://www.cnblogs.com/straycats/p/7719933.html
【測試代碼】
Asynchronous publisher example — pika 0.12.0 documentation https://pika.readthedocs.io/en/stable/examples/asynchronous_publisher_example.html
# -*- coding: utf-8 -*-
import logging
import pika
import json
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
class ExamplePublisher(object):
"""This is an example publisher that will handle unexpected interactions
with RabbitMQ such as channel and connection closures.
If RabbitMQ closes the connection, it will reopen it. You should
look at the output, as there are limited reasons why the connection may
be closed, which usually are tied to permission related issues or
socket timeouts.
It uses delivery confirmations and illustrates one way to keep track of
messages that have been sent and if they've been confirmed by RabbitMQ.
"""
EXCHANGE = 'messageTEST'
EXCHANGE_TYPE = 'topic'
PUBLISH_INTERVAL = 1
QUEUE = 'textTEST'
ROUTING_KEY = 'example.text'
def __init__(self, amqp_url):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param str amqp_url: The URL for connecting to RabbitMQ
"""
self._connection = None
self._channel = None
self._deliveries = None
self._acked = None
self._nacked = None
self._message_number = None
self._stopping = False
self._url = amqp_url
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika. If you want the reconnection to work, make
sure you set stop_ioloop_on_close to False, which is not the default
behavior of this adapter.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_close_callback=self.on_connection_closed,
stop_ioloop_on_close=False)
def on_connection_open(self, unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object in
case we need it, but in this case, we'll just mark it unused.
:type unused_connection: pika.SelectConnection
"""
LOGGER.info('Connection opened')
self.open_channel()
def on_connection_closed(self, connection, reply_code, reply_text):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
"""
self._channel = None
if self._stopping:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
self._connection.add_timeout(5, self._connection.ioloop.stop)
def open_channel(self):
"""This method will open a new channel with RabbitMQ by issuing the
Channel.Open RPC command. When RabbitMQ confirms the channel is open
by sending the Channel.OpenOK RPC reply, the on_channel_open method
will be invoked.
"""
LOGGER.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel):
"""This method is invoked by pika when the channel has been opened.
The channel object is passed in so we can make use of it.
Since the channel is now open, we'll declare the exchange to use.
:param pika.channel.Channel channel: The channel object
"""
LOGGER.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
self.setup_exchange(self.EXCHANGE)
def add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
RabbitMQ unexpectedly closes the channel.
"""
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reply_code, reply_text):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
"""
LOGGER.warning('Channel was closed: (%s) %s', reply_code, reply_text)
self._channel = None
if not self._stopping:
self._connection.close()
def setup_exchange(self, exchange_name):
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
command. When it is complete, the on_exchange_declareok method will
be invoked by pika.
:param str|unicode exchange_name: The name of the exchange to declare
"""
LOGGER.info('Declaring exchange %s', exchange_name)
self._channel.exchange_declare(self.on_exchange_declareok,
exchange_name,
self.EXCHANGE_TYPE)
def on_exchange_declareok(self, unused_frame):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
command.
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
"""
LOGGER.info('Exchange declared')
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
command. When it is complete, the on_queue_declareok method will
be invoked by pika.
:param str|unicode queue_name: The name of the queue to declare.
"""
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(self.on_queue_declareok, queue_name)
def on_queue_declareok(self, method_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
setup_queue has completed. In this method we will bind the queue
and exchange together with the routing key by issuing the Queue.Bind
RPC command. When this command is complete, the on_bindok method will
be invoked by pika.
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
"""
LOGGER.info('Binding %s to %s with %s',
self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
self._channel.queue_bind(self.on_bindok, self.QUEUE,
self.EXCHANGE, self.ROUTING_KEY)
def on_bindok(self, unused_frame):
"""This method is invoked by pika when it receives the Queue.BindOk
response from RabbitMQ. Since we know we're now setup and bound, it's
time to start publishing."""
LOGGER.info('Queue bound')
self.start_publishing()
def start_publishing(self):
"""This method will enable delivery confirmations and schedule the
first message to be sent to RabbitMQ
"""
LOGGER.info('Issuing consumer related RPC commands')
self.enable_delivery_confirmations()
self.schedule_next_message()
def enable_delivery_confirmations(self):
"""Send the Confirm.Select RPC method to RabbitMQ to enable delivery
confirmations on the channel. The only way to turn this off is to close
the channel and create a new one.
When the message is confirmed from RabbitMQ, the
on_delivery_confirmation method will be invoked passing in a Basic.Ack
or Basic.Nack method from RabbitMQ that will indicate which messages it
is confirming or rejecting.
"""
LOGGER.info('Issuing Confirm.Select RPC command')
self._channel.confirm_delivery(self.on_delivery_confirmation)
def on_delivery_confirmation(self, method_frame):
"""Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
command, passing in either a Basic.Ack or Basic.Nack frame with
the delivery tag of the message that was published. The delivery tag
is an integer counter indicating the message number that was sent
on the channel via Basic.Publish. Here we're just doing house keeping
to keep track of stats and remove message numbers that we expect
a delivery confirmation of from the list used to keep track of messages
that are pending confirmation.
:param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
"""
confirmation_type = method_frame.method.NAME.split('.')[1].lower()
LOGGER.info('Received %s for delivery tag: %i',
confirmation_type,
method_frame.method.delivery_tag)
if confirmation_type == 'ack':
self._acked += 1
elif confirmation_type == 'nack':
self._nacked += 1
self._deliveries.remove(method_frame.method.delivery_tag)
LOGGER.info('Published %i messages, %i have yet to be confirmed, '
'%i were acked and %i were nacked',
self._message_number, len(self._deliveries),
self._acked, self._nacked)
def schedule_next_message(self):
"""If we are not closing our connection to RabbitMQ, schedule another
message to be delivered in PUBLISH_INTERVAL seconds.
"""
LOGGER.info('Scheduling next message for %0.1f seconds',
self.PUBLISH_INTERVAL)
self._connection.add_timeout(self.PUBLISH_INTERVAL,
self.publish_message)
def publish_message(self):
"""If the class is not stopping, publish a message to RabbitMQ,
appending a list of deliveries with the message number that was sent.
This list will be used to check for delivery confirmations in the
on_delivery_confirmations method.
Once the message has been sent, schedule another message to be sent.
The main reason I put scheduling in was just so you can get a good idea
of how the process is flowing by slowing down and speeding up the
delivery intervals by changing the PUBLISH_INTERVAL constant in the
class.
"""
if self._channel is None or not self._channel.is_open:
return
hdrs = {u'مفتاح': u' قيمة',
u'鍵': u'值',
u'キー': u'値'}
properties = pika.BasicProperties(app_id='example-publisher',
content_type='application/json',
headers=hdrs)
message = u'مفتاح قيمة 鍵 值 キー 値'
self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY,
json.dumps(message, ensure_ascii=False),
properties)
self._message_number += 1
self._deliveries.append(self._message_number)
LOGGER.info('Published message # %i', self._message_number)
self.schedule_next_message()
def run(self):
"""Run the example code by connecting and then starting the IOLoop.
"""
while not self._stopping:
self._connection = None
self._deliveries = []
self._acked = 0
self._nacked = 0
self._message_number = 0
try:
self._connection = self.connect()
self._connection.ioloop.start()
except KeyboardInterrupt:
self.stop()
if (self._connection is not None and
not self._connection.is_closed):
# Finish closing
self._connection.ioloop.start()
LOGGER.info('Stopped')
def stop(self):
"""Stop the example by closing the channel and connection. We
set a flag here so that we stop scheduling new messages to be
published. The IOLoop is started because this method is
invoked by the Try/Catch below when KeyboardInterrupt is caught.
Starting the IOLoop again will allow the publisher to cleanly
disconnect from RabbitMQ.
"""
LOGGER.info('Stopping')
self._stopping = True
self.close_channel()
self.close_connection()
def close_channel(self):
"""Invoke this command to close the channel with RabbitMQ by sending
the Channel.Close RPC command.
"""
if self._channel is not None:
LOGGER.info('Closing the channel')
self._channel.close()
def close_connection(self):
"""This method closes the connection to RabbitMQ."""
if self._connection is not None:
LOGGER.info('Closing connection')
self._connection.close()
def main():
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
# Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
# example = ExamplePublisher('amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat_interval=3600')
rmqAddr, rmqPort, rmqQueue = '101.201.41.72', 5672, 'video_article'
rmqU, rmqP = 'liaohy', 'liaohy'
C = 'amqp://{}:{}@{}:{}/%2F?connection_attempts=3&heartbeat_interval=3600'.format(rmqU, rmqP, rmqAddr, rmqPort)
example = ExamplePublisher(C)
example.run()
if __name__ == '__main__':
main()
Why does the queue memory grow and shrink when publishing/consuming?
http://www.rabbitmq.com/memory-use.html#queue-memory-usage http://www.rabbitmq.com/memory-use.html#queue-memory-usage
AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣的開發語言等條件的限制。Erlang中的實現有 RabbitMQ等。
-
中文名
-
高級消息隊列協議
-
外文名
-
Advanced Message Queuing Protocol
-
屬 性
-
應用層標準協議
-
應用領域
-
計算機
-
AMQP協議
第1章 概述
1.3. 摘要
1.3.1. 什麼是AMQP
高級消息隊列協議使得聽從該規範的客戶端應用和消息中間件服務器的全功能互操做成爲可能。
1.3.2. 爲何要用AMQP
咱們的目標是實現一種在全行業普遍使用的標準消息中間件技術,以便下降企業和系統集成的開銷,而且向大衆提供工業級的集成服務。
咱們的宗旨是經過AMQP,讓消息中間件的能力最終被網絡自己所具備,而且經過消息中間件的普遍使用發展出一系列有用的應用程序。
1.3.3. AMQP的範圍
爲了徹底實現消息中間件的互操做性,須要充分定義網絡協議和消息代理服務的功能語義。
所以,AMQP定義網絡協議和代理服務以下:
一套肯定的消息交換功能,也就是「高級消息交換協議模型」。AMQP模型包括一套用於路由和存儲消息的功能模塊,以及一套在這些模塊之間交換消息的規則。
一個網絡線級協議(數據傳輸格式),客戶端應用能夠經過這個協議與消息代理和它實現的AMQP模型進行交互通訊。
能夠只實現AMQP協議規範中的的部分語義,可是咱們相信明確的描述這些語義有助於理解這個協議。
1.3.4.1. AMQP模型
咱們須要明確的定義服務器的語義,由於全部服務器實現都應該保持這些語義的一致性,不然就沒法進行互操做。
所以AMQP模型描述了一套模塊化的組件以及這些組件之間進行鏈接的標準規則。
在服務器中,三個主要功能模塊鏈接成一個處理鏈完成預期的功能:
「exchange」接收發布應用程序發送的消息,並根據必定的規則將這些消息路由到「消息隊列」。
「message queue」存儲消息,直到這些消息被消費者安全處理完爲止。
「binding」定義了exchange和message queue之間的關聯,提供路由規則。
使用這個模型咱們能夠很容易的模擬出存儲轉發隊列和主題訂閱這些典型的消息中間件概念。
一個AMQP服務器相似於郵件服務器,exchange相似於消息傳輸代理(email裏的概念),message queue相似於郵箱。Binding定義了每個傳輸代理中的消息路由表,發佈者將消息發給特定的傳輸代理,而後傳輸代理將這些消息路由到郵箱中,消費者從這些郵箱中取出消息。
在之前的中間件系統的應用場景中,發佈者直接將消息發送給郵箱或者郵件列表。
區別就在於用戶能夠控制message queue與exchage的鏈接規則,這能夠作不少有趣的事情,好比定義一條規則:「將全部包含這樣這樣的消息頭的消息都複製一份再發送到消息隊列中」。
AMQP模型有如下目標:
支持金融服務領域的語義要求。
支持金融服務領域所要求的性能要求。
可以很方便的擴展新的消息路由和隊列。
經過AMQP協議(AMQP和AMQP Protocol的是總體和部分的關係),服務器應用能夠經過編程的方式來實現具體的功能語義。
簡單而靈活。
1.3.4.2. AMQP協議
AMQP協議是一個二進制協議,擁有一些現代特色:多信道、協商式、異步、安全、跨平臺、中立、高效。
AMQP一般被劃分爲三層:
模型層定義了一套命令(按功能分類),客戶端應用能夠利用這些命令來實現它的業務功能。
會話層負責將命令從客戶端應用傳遞給服務器,再將服務器的應答傳遞給客戶端應用,會話層爲這個傳遞過程提供可靠性、同步機制和錯誤處理。
傳輸層提供幀處理、信道複用、錯誤檢測和數據表示。
實現者能夠將傳輸層替換成任意傳輸協議,只要不改變AMQP協議中與客戶端應用程序相關的功能。實現者還可使用其餘高層協議中的會話層。
AMQP模型的設計由如下幾個需求所驅動:
保證聽從AMQP規範的服務器實現之間可以進行互操做。
爲服務質量提供顯示控制。
支持全部消息中間件的功能:消息交換、文件傳輸、流傳輸、遠程進程調用等。
兼容已有的消息API規範(好比Sun公司的JMS規範)。
造成一致和明確的命名。
經過AMQP協議能夠完整的配置服務器線路(TODO:server wiring是啥意思?答:服務器鏈接)。
使用命令符號能夠很容易的映射成應用級別的API。
明肯定義每個操做只作一件事情。
AMQP傳輸層的設計由如下幾個主要的需求所驅動,這些需求不分前後次序:
使用可以快速打包解包的二進制編碼來保證數據的緊湊性。
可以處理任意大小的消息。
容許零拷貝數據傳輸(好比遠程DMA)。
一個鏈接支持多個會話。
保證會話可以從網絡錯誤、服務器失效中恢復。
爲了長期存在,沒有隱含的內置限制(TODO:To be long-lived,with no significant in-built limitations)。
異步傳輸消息。
可以很容易的處理新的和變化的需求。
高版本的AMQP規範可以兼容低版本的規範。
使用強斷言模型來保證應用程序的可修復性。
保持編程語言的中立性。
適宜使用代碼生成工具生成協議處理模塊。
咱們支持各類消息交換的體系結構:
存儲轉發(多個消息發送者,單個消息接收者)。
分佈式事務(多個消息發送者,多個消息接收者)。
發佈訂閱(多個消息發送者,多個消息接收者)。
基於內容的路由(多個消息發送者,多個消息接收者)。
文件傳輸隊列(多個消息發送者,多個消息接收者)。
點對點鏈接(單個消息發送者,單個消息接收者)。
本文檔分紅兩個部分:
「概念」部分將對AMQP的概念作一個簡單的介紹,描述AMQP怎麼工做,以及AMQP的用途。
「標準」部分將對AMQP的模型層、會話層的每一個組成部分作精確的定義,還將定義AMQP在網絡上傳輸的二進制消息結構。
咱們用IETF RFC2119中的術語定義:必須、沒必要、應該、不該該和能夠(詳見參考資料
[1] )。
當咱們討論聽從AMQP規範的服務器的具體行爲時,咱們使用術語「服務器」來表示這些服務器。
當咱們討論聽從AMQP規範的客戶端應用的具體行爲時,咱們使用術語「客戶端」來表示這些客戶端應用。
咱們使用「端點」來表示「服務器或者客戶端」。
除非另有說明,全部數字都是十進制的。
協議中的常量都用大寫字母的名字來表示。AMQP的實現若是須要在代碼或者文檔中定義和使用這些常量,必須用這些名字來表示。
屬性名、命令或者控制參數,以及幀字段都用小寫字母的名字來表示。AMQP的實現必須在代碼或者文檔中與之保持一致。
1.5. 約定
1.5.1. 定義
1.5.2. 版本號
AMQP版本用兩個版本號表示——主版本號和次版本號。咱們約定版本由主版本號後面加小數點再加上次版本號組成(好比1-3表示主版本號爲1,次版本號爲3)。
主版本號和次版本號能夠用0到255以內的全部值。
主版本號保持不變,次版本號遞增。當AMQP工做組提高主版本號時,次版本號將被設置爲0。所以,有可能出現這樣的版本序列:1-2,1-3,1-4,2-0,2-1……
一旦本協議發佈以後(主版本號大於1),應儘可能防止次版本號遞增到9。不過在發佈以前(版本0-x),因爲會對本協議進行頻繁的修訂,能夠不遵照這條約定。
一旦本協議發佈以後(主版本號大於1),同一個主版本不一樣次版本的實現必須向後兼容。而在發佈以前,這些次版本的實現不須要兼容。
大於或者等於99的主版本號用於測試和開發目的。
AMQP模型(AMQP Model):一個由關鍵實體和語義表示的邏輯框架,聽從AMQP規範的服務器必須提供這些實體和語義。爲了實現本規範中定義的語義,客戶端能夠發送命令來控制AMQP服務器。
鏈接(Connection):一個網絡鏈接,好比TCP/IP套接字鏈接。
會話(Session):端點之間的命名對話。在一個會話上下文中,保證「剛好傳遞一次」。
信道(Channel):多路複用鏈接中的一條獨立的雙向數據流通道。爲會話提供物理傳輸介質。
客戶端(Client):AMQP鏈接或者會話的發起者。AMQP是非對稱的,客戶端生產和消費消息,服務器存儲和路由這些消息。
服務器(Server):接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。也稱爲「消息代理」。
端點(Peer):AMQP對話的任意一方。一個AMQP鏈接包括兩個端點(一個是客戶端,一個是服務器)。
搭檔(Partner):當描述兩個端點之間的交互過程時,使用術語「搭檔」來表示「另外一個」端點的簡記法。好比咱們定義端點A和端點B,當它們進行通訊時,端點B是端點A的搭檔,端點A是端點B的搭檔。
片斷集(Assembly):段的有序集合,造成一個邏輯工做單元。
段(Segment):幀的有序集合,造成片斷集中一個完整子單元。
幀(Frame):AMQP傳輸的一個原子單元。一個幀是一個段中的任意分片。
控制(Control):單向指令,AMQP規範假設這些指令的傳輸是不可靠的。
命令(Command):須要確認的指令,AMQP規範規定這些指令的傳輸是可靠的。
異常(Exception):在執行一個或者多個命令時可能發生的錯誤狀態。
類(Class):一批用來描述某種特定功能的AMQP命令或者控制。
消息頭(Header):描述消息數據屬性的一種特殊段。
消息體(Body):包含應用程序數據的一種特殊段。消息體段對於服務器來講徹底透明——服務器不能查看或者修改消息體。
消息內容(Content):包含在消息體段中的的消息數據。
交換器(Exchange):服務器中的實體,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
交換器類型(Exchange Type):基於不一樣路由語義的交換器類。
消息隊列(Message Queue):一個命名實體,用來保存消息直到發送給消費者。
綁定器(Binding):消息隊列和交換器之間的關聯。
綁定器關鍵字(Binding Key):綁定的名稱。一些交換器類型可能使用這個名稱做爲定義綁定器路由行爲的模式。
路由關鍵字(Routing Key):一個消息頭,交換器能夠用這個消息頭決定如何路由某條消息。
持久存儲(Durable):一種服務器資源,當服務器重啓時,保存的消息數據不會丟失。
臨時存儲(Transient):一種服務器資源,當服務器重啓時,保存的消息數據會丟失。
持久化(Persistent):服務器將消息保存在可靠磁盤存儲中,當服務器重啓時,消息不會丟失。
非持久化(Non-Persistent):服務器將消息保存在內存中,當服務器重啓時,消息可能丟失。
消費者(Consumer):一個從消息隊列中請求消息的客戶端應用程序。
生產者(Producer):一個向交換器發佈消息的客戶端應用程序。
虛擬主機(Virtual Host):一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。客戶端應用程序在登陸到服務器以後,能夠選擇一個虛擬主機。
下面這些術語在AMQP規範的上下文中沒有特別的意義:
主題:一般指發佈消息;AMQP規範用一種或多種交換器來實現主題。
服務:一般等同於服務器。AMQP規範使用「服務器」這個術語來兼容IETF的標準術語,而且明確了協議中每一個部分的角色(兩方也多是AMQP服務)。
消息代理:等同於服務器。AMQP規範使用術語「客戶端」和「服務器」來兼容IETF的標準術語。
RabbitMQ libraries
RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open, general-purpose protocol for messaging. There are a number of clients for RabbitMQ in many different languages. In this tutorial series we're going to use Pika 1.0.0, which is the Python client recommended by the RabbitMQ team.
In RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. But let's not get dragged down by the details ‒ you can read more about exchanges in the third part of this tutorial. All we need to know now is how to use a default exchange identified by an empty string. This exchange is special ‒ it allows us to specify exactly to which queue the message should go. The queue name needs to be specified in the routing_key parameter:
channel.basic_publish(exchange='',
routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'")
Before exiting the program we need to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ. We can do it by gently closing the connection.
connection.close()
消息不能直接發送至隊列,須要經過exchange。
初始默認的exchange "" Exchange: (AMQP default)
publish/subscrib 發佈訂閱
同一個消息多個消費者場景
https://www.rabbitmq.com/tutorials/tutorial-three-python.html
What This Tutorial Focuses On
In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".
To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.
In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen.
Essentially, published log messages are going to be broadcast to all the receivers.
發佈者不知道隊列queue ,只知道交易所 exchange,exchange決定發至哪一個隊列
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of that type, and call it logs:
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for our logger.
數據多寫 交易所類型
exchange_type='fanout'
注意
Add binding from this exchange
交易所exchange能夠綁定交易所或隊列queue
建立順序
NOT_FOUND - no queue 'queue_19_ehr_analysis_log' in vhost '/'
Close
並行/發 建立 exchange queue
以後作綁定
Exchange: exchange_19_ehr_analysis_log
This exchange
⇓
'''
exchange_19_ehr_analysis_log
Details
Type direct
Parameters
durable: true
Policy
注意雙寫/多寫Type能夠改成 fanout
queue_19_ehr_analysis_log 配置
Parameters
durable: true
Policy
Exclusive owner None
路由關係
From Routing key Arguments
(Default exchange binding)
exchange_19_ehr_analysis_log
routing_key_19_ehr_analysis_log
⇓
This queue
'''
#!/usr/bin/env python
# coding: utf-8
import pika
import time
RABBITMQ_DEV_SERVER_ANALYSI = '6.7.9.65'
username, password = "guest", "guest"
host, port = RABBITMQ_DEV_SERVER_ANALYSI, 5672
virtual_host = None
"""A credentials object for the default authentication methodology with
RabbitMQ.
If you do not pass in credentials to the ConnectionParameters object, it
will create credentials for 'guest' with the password of 'guest'.
If you pass True to erase_on_connect the credentials will not be stored
in memory after the Connection attempt has been made.
:param str username: The username to authenticate with
:param str password: The password to authenticate with
:param bool erase_on_connect: erase credentials on connect.
"""
mq_user = pika.PlainCredentials(username=username, password=password)
"""Connection parameters object that is passed into the connection adapter
upon construction.
:param str host: Hostname or IP Address to connect to
:param int port: TCP port to connect to
:param str virtual_host: RabbitMQ virtual host to use
:param pika.credentials.Credentials credentials: auth credentials
:param int channel_max: Maximum number of channels to allow
:param int frame_max: The maximum byte size for an AMQP frame
:param int heartbeat_interval: How often to send heartbeats
:param bool ssl: Enable SSL
:param dict ssl_options: Arguments passed to ssl.wrap_socket as
:param int connection_attempts: Maximum number of retry attempts
:param int|float retry_delay: Time to wait in seconds, before the next
:param int|float socket_timeout: Use for high latency networks
:param str locale: Set the locale value
:param bool backpressure_detection: Toggle backpressure detection
"""
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=host,
port=port,
virtual_host=virtual_host,
credentials=mq_user
))
channel = connection.channel()
analysi_task = {"queue": 'queue_19_ehr_analysis_log', 'exchange': 'exchange_19_ehr_analysis_log',
'routing_key': 'routing_key_19_ehr_analysis_log'} # 日誌接口系統配置
'''
exchange_19_ehr_analysis_log
Details
Type direct
Parameters
durable: true
Policy
注意雙寫/多寫Type能夠改成 fanout
queue_19_ehr_analysis_log 配置
Parameters
durable: true
Policy
Exclusive owner None
路由關係
From Routing key Arguments
(Default exchange binding)
exchange_19_ehr_analysis_log
routing_key_19_ehr_analysis_log
⇓
This queue
'''
exchange, routing_key, queue = analysi_task["exchange"], analysi_task["routing_key"], analysi_task["queue"]
"""Declare queue, create if needed. This method creates or checks a
queue. When creating a new queue the client can specify various
properties that control the durability of the queue and its contents,
and the level of sharing for the queue.
Leave the queue name empty for a auto-named queue in RabbitMQ
:param method callback: The method to call on Queue.DeclareOk
:param queue: The queue name
:type queue: str or unicode
:param bool passive: Only check to see if the queue exists
:param bool durable: Survive reboots of the broker
:param bool exclusive: Only allow access by the current connection
:param bool auto_delete: Delete after consumer cancels or disconnects
:param bool nowait: Do not wait for a Queue.DeclareOk
:param dict arguments: Custom key/value arguments for the queue
"""
msgBody = "HelloWorld{}-UCAN".format(time.ctime())
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msgBody)
channel.basic_publish(exchange="", routing_key=routing_key, body=msgBody)
'''
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
發佈者不知道隊列queue
'''
connection.close()
if False:
# https://www.rabbitmq.com/uri-spec.html
BROKER_URL = 'amqp://guest:guest@%s:5672/' % RABBITMQ_DEV_SERVER_ANALYSI # 配置 rabbitmq
params = pika.URLParameters(BROKER_URL)
params.socket_timeout = 5
connection = pika.BlockingConnection(params)
msgBody = "HW----HelloWorld{}-UCAN--".format(time.ctime())
channel = connection.channel()
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msgBody)
connection.close()

requeue yes 不從隊列刪除