RabbitMQ能作啥javascript
場景一:支付的通知php
生產者:微信支付完成以後在其回調方法中調用一個服務接收消息,這個服務做爲生產者。java
消費者:消費者服務是一個不斷從隊列中獲取支付結果的應用,而後在app或者頁面展現。node
場景二:註冊的短信或者郵件通知python
生產者:註冊成功以後的回調中,發送註冊成功信息到隊列生產者。git
消費者:應用程序不斷的獲取隊列中的消息,獲取到就發送短信或郵件。github
一、安裝編程
sudo apt-get update sudo apt-get install rabbitmq-server
二、啓動服務、中止服務、查看服務狀態c#
sudo service rabbitmq-server start sudo service rabbitmq-server stop sudo service rabbitmq-server status
三、修改打開文件句柄限制ruby
打開/etc/default/rabbitmq-server,修改ulimit
四、支持的操做系統
五、支持的編程語言
六、java示例-生產者
private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");//由於兩個進程在同一個機器上 Connection connection = null; Channel channel = null; connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [Producer] Sent '" + message + "'"); channel.close(); connection.close(); }
java示例-消費者
private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [Consumer] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); }
七、C#示例生產者
using System; using System.Diagnostics; using System.Text; using System.Threading; using RabbitMQ.Client; namespace Producer { classProgram { staticvoidMain(string[] args) { Thread.Sleep(1000); var connectionFactory = new ConnectionFactory(); IConnection connection = connectionFactory.CreateConnection(); IModel channel = connection.CreateModel(); channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct); stringvalue = DoSomethingInteresting(); stringlogMessage = string.Format("{0}:{1}",TraceEventType.Information,value); byte[]message = Encoding.UTF8.GetBytes(logMessage); channel.BasicPublish("direct-exchange-example","",null,message); channel.Close(); connection.Close(); } staticstringDoSomethingInteresting() { returnGuid.NewGuid().ToString(); } } }
C#示例消費者
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Consumer { classProgram { staticvoidMain(string[] args) { var connectionFactory = new ConnectionFactory(); IConnection connection = connectionFactory.CreateConnection(); IModel channel = connection.CreateModel(); channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct); channel.QueueDeclare("logs",false,false,true,null); channel.QueueBind("logs","direct-exchange-example",""); varconsumer = new QueueingBasicConsumer(channel); channel.BasicConsume("logs",true,consumer); vareventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); stringmessage = Encoding.UTF8.GetString(eventArgs.Body); Console.WriteLine(message); channel.Close(); connection.Close(); Console.ReadLine(); } } }
八、Go語言生產者
package main import ( "github.com/streadway/amqp" ) var conn *amqp.Connection var ch *amqp.Channel func main() { //設置鏈接地址和交換器隊列名稱 addr := "amqp://test:test@192.168.1.104:5672/" exname := "amqp.ex" quname := "amqp.qu" //建立鏈接 conn, _ = amqp.Dial(addr) //建立通道,注意,一個鏈接能夠有多個通道 ch, _ = conn.Channel() //建立一個交換器,參數fanout說明該交換器會將消息轉發到全部與之綁定的隊列中 ch.ExchangeDeclare(exname, "fanout", true, false, false, true, nil) //建立一個隊列,用來接受和消費信息,注意這裏的隊列名稱,消費者獲取數據須要該名稱 ch.QueueDeclare(quname, true, false, false, false, nil) //綁定交換器和隊列 ch.QueueBind(quname, "", exname, false, nil) //發送消息 ch.Publish(exname, quname, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte("hello,world"), }) //關閉鏈接 conn.Close() ch.Close() select {} }
Go語言消費者
package main import ( "bytes" "fmt" "github.com/streadway/amqp" ) var conn *amqp.Connection var ch *amqp.Channel func main() { addr := "amqp://test:test@192.168.1.104:5672/" quname := "amqp.qu" conn, _ = amqp.Dial(addr) ch, _ = conn.Channel() //接受消息 value, _ := ch.Consume(quname, "", true, false, false, false, nil) go func() { for v := range value { s := bytesToString(&(v.Body)) fmt.Println(*s) } }() conn.Close() ch.Close() select {} } func bytesToString(b *[]byte) *string { s := bytes.NewBuffer(*b) r := s.String() return &r }
九、PHP生產者
<?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交換機名 //$q_name = 'q_linvo'; //無需隊列名 $k_route = 'key_1'; //路由key //建立鏈接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //建立交換機對象 $ex = new AMQPExchange($channel); $ex->setName($e_name); date_default_timezone_set("Asia/Shanghai"); //發送消息 //$channel->startTransaction(); //開始事務 for($i=0; $i<5; ++$i){ sleep(1);//休眠1秒 //消息內容 $message = "TEST MESSAGE!".date("h:i:sa"); echo "Send Message:".$ex->publish($message, $k_route)."\n"; } //$channel->commitTransaction(); //提交事務 $conn->disconnect(); ?>
PHP消費者
<?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交換機名 $q_name = 'q_linvo'; //隊列名 $k_route = 'key_1'; //路由key //建立鏈接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //建立交換機 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()."\n"; //建立隊列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 echo "Message Total:".$q->declare()."\n"; //綁定交換機與隊列,並指定路由鍵 echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; //阻塞模式接收消息 echo "Message:\n"; while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 } $conn->disconnect(); /** * 消費回調函數 * 處理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //處理消息 $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答 } ?>
參考:https://blog.csdn.net/calm_85/article/details/80848664https://www.shiyanlou.com/courses/630/learning/https://blog.csdn.net/pony_maggie/article/details/69781478https://blog.csdn.net/luoye4321/article/details/83722437