1、rabbitmqctl緩存
啓動rabbitmq rabbitmqctl start_app app
關閉rabbitmq rabbitmqctl stop_app ide
格式化rabbitmq rabbitmqctl reset (格式化以前須要先關閉rabbitmq)性能
強制格式化rabbitmq rabbitmqctl force_reset fetch
2、ExChangespa
1,Direct (直連)3d
經過routingkey發送到指定的queuecode
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace DirectConsumer { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced; var ex= bus.ExchangeDeclare("direct", ExchangeType.Direct); var que= bus.QueueDeclare("001");//001爲queue的名稱 bus.Bind(ex, que, "000");//000爲routingkey bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() => { var message = Encoding.UTF8.GetString(body); Console.WriteLine("Got message: '{0}'", message); })); Console.ReadKey(); } } }
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace DirectProduce { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced; var ex = bus.ExchangeDeclare("direct", ExchangeType.Direct);a var message = new Message<string>("sad"); //000:爲routingkey bus.Publish<string>(ex, "000", false, message); bus.Dispose(); } } }
2,Fanout(廣播)blog
使用這種類型的Exchange,會忽略routing key的存在,直接將message廣播到全部的Queue中。rabbitmq
第一:大型玩家在玩在線遊戲的時候,能夠用它來廣播重大消息。這讓我想到電影微微一笑很傾城中,有款遊戲須要在世界上公佈玩家重大消息,也許這個就是用的MQ實現的。這讓我不由佩服肖奈,人家在大學的時候就知道RabbitMQ的這種特性了。
第二:體育新聞實時更新到手機客戶端。
第三:羣聊功能,廣播消息給當前羣聊中的全部人。
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace FanoutConsumer { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced; var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout); var que = bus.QueueDeclare("directQueue");//001爲queue的名稱 bus.Bind(ex, que, string.Empty);//Fanout不須要設置routingkey bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() => { var message = Encoding.UTF8.GetString(body); Console.WriteLine("Got message: '{0}'", message); })); Console.ReadKey(); } } }
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace FanoutConsumer2 { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced; var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout); var que = bus.QueueDeclare("directQueue2");//001爲queue的名稱 bus.Bind(ex, que, string.Empty);//Fanout不須要設置routingkey bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() => { var message = Encoding.UTF8.GetString(body); Console.WriteLine("Got message: '{0}'", message); })); Console.ReadKey(); } } }
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace FanoutProduce { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced; var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout); var message = new Message<string>("sad"); //Fanout不須要設置routingkey bus.Publish<string>(ex, string.Empty, false, message); bus.Dispose(); } } }
3,Topic(主題)
Topic Exchange是根據routing key和Exchange的類型將message發送到一個或者多個Queue中
新聞的分類更新
同一任務多個工做者協調完成
同一問題須要特定人員知曉
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using EasyNetQ; namespace TopicConsumer { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter"); //正確的使用方法: //c=> { c.WithTopic("*.cn"); }設置 Routing key。若是沒有這句,則Routing key爲# //*.com 只是queue的名稱 //bus.Subscribe<string>("*.com", r => Console.WriteLine(r),c=> { c.WithTopic("*.cn"); }); //subscriptionId是queue的名稱 //subscriptionId+exchangeType=惟一 bus.Subscribe<string>("*.com", r => Console.WriteLine(r)); bus.Subscribe<string>("*.cn", r => Console.WriteLine(r)); Console.ReadKey(); } } }
using EasyNetQ; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace TopicProduce { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter"); bus.Publish<string>("你好", "www.oyunkeji.com"); //bus.Publish<string>("你好", c => c.WithTopic("www.oyunkeji.com")); bus.Dispose(); } } }
4,Headers(頭信息)
它是根據Message的一些頭部信息來分發過濾Message,忽略routing key的屬性,若是Header信息和message消息的頭信息相匹配,那麼這條消息就匹配上了
x-match的頭部必須設置:
當x-match的值設置爲all時,header信息必須所有知足纔會匹配上
當x-match的值設置爲any時,header信息知足其中任意一個就會匹配上
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace HeadersConsumer { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced; var ex = bus.ExchangeDeclare("headers", ExchangeType.Header); var que = bus.QueueDeclare("headersQueue");//001爲queue的名稱 bus.Bind(ex, que, string.Empty,new Dictionary<string, object>() { { "x-match","all"}, { "username","hunter"}, { "password","hunter"} });//Header不須要設置routingkey bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() => { var message = Encoding.UTF8.GetString(body); Console.WriteLine("Got message: '{0}'", message); })); Console.ReadKey(); } } }
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace HeadersProduce { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced; var ex = bus.ExchangeDeclare("headers", ExchangeType.Header); var properties = new MessageProperties(); properties.Headers.Add("username", "hunter"); properties.Headers.Add("password", "hunter"); //Fanout不須要設置routingkey bus.Publish(ex, string.Empty, false, properties, Encoding.UTF8.GetBytes("你好")); bus.Dispose(); } } }
案例下載:https://pan.baidu.com/s/1gVBO3qLl9Dw5tIhIpETvkw
3、Arguments
1,Message TTL(x-message-ttl)
發佈到隊列的消息在丟棄以前能夠存活多長時間(毫秒)。
①針對隊列中的全部消息
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced; var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct); //001爲queue的名稱 //001隊列下的消息5秒鐘沒有被消費自動刪除 var que = bus.QueueDeclare("001",perQueueMessageTtl:5000); bus.Bind(ex, que, "000");//000爲routingkey bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() => { var message = Encoding.UTF8.GetString(body); Console.WriteLine("Got message: '{0}'", message); })); Console.ReadKey(); } } }
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; namespace ConsoleApp2 { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced; var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct); var properties = new MessageProperties(); var message = new Message<string>("你好"); //000:爲routingkey bus.Publish<string>(ex, "000", false, message); bus.Dispose(); } } }
②指定某個消息
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced; var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct); //001爲queue的名稱 //001隊列下的消息5秒鐘沒有被消費自動刪除 var que = bus.QueueDeclare("001"); bus.Bind(ex, que, "000");//000爲routingkey bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() => { var message = Encoding.UTF8.GetString(body); Console.WriteLine("Got message: '{0}'", message); })); Console.ReadKey(); } } }
using EasyNetQ; using EasyNetQ.Topology; using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; namespace ConsoleApp2 { class Program { static void Main(string[] args) { var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced; var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct); var properties = new MessageProperties(); properties.Expiration = "5000";//單位:毫秒 //000:爲routingkey bus.Publish(ex, "000", false, properties, Encoding.UTF8.GetBytes("你好")); bus.Dispose(); } } }
2,Auto expire(x-expires)
queue在指定的時間未被訪問,就會被刪除(毫秒)。
3,Max length(x-max-length)
限定隊列的最大長度,
4,Max length bytes(x-max-length-bytes)
限定隊列的最大佔用空間大小
5,Overflow behaviour(x-overflow)
設置隊列溢出行爲。這決定了在到達隊列的最大長度時消息會發生什麼狀況。有效值是 drop-head(刪除頭)或者 reject-publish(拒絕發佈) 。
6,Dead letter exchange/Dead letter routing key(x-dead-letter-exchange/x-dead-letter-routing-key)
queue中的message過時時間。
basicreject...basicnack等等。。。
這三種狀況通常會drop這些message。。。
Dead letter exchange:時候咱們不但願message被drop掉,而是走到另外一個隊列中,又或者是保存起來
Dead letter routing key:指定的routing key
8,Maximum priority
(x-max-priority)
定義消息的優先級
9,Lazy mode(x-queue-mode)
將隊列設置爲延遲模式,在磁盤上保留儘量多的消息以減小RAM使用量; 若是未設置,隊列將保留內存中的緩存以儘量快地傳遞消息。
10,Master locator (x-queue-master-locator)
將隊列設置爲主位置模式,肯定隊列主節點在節點集羣上聲明時所處的規則。
4、高可靠消息隊列
1,消費端的確認
①自動確認
message出隊列的時候就自動確認
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Consumer { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "hunter", Password = "hunter" }; //建立connection var connection = factory.CreateConnection(); //建立chanel var channel = connection.CreateModel(); //建立exchange channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null); //建立queue channel.QueueDeclare("queue1", true, false, false, null); //exchange綁定queue channel.QueueBind("queue1", "exchange1", "queue1", null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (send, e) => { Console.WriteLine(Encoding.UTF8.GetString(e.Body)); }; //autoAck 設置爲true:自動確認 channel.BasicConsume("queue1", true, consumer); Console.ReadKey(); } } }
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; namespace Produce { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "hunter", Password = "hunter" }; //建立connection var connection = factory.CreateConnection(); //建立chanel var channel = connection.CreateModel(); for (int i = 0; i < 10; i++) { channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString())); } channel.Dispose(); Console.WriteLine("發佈完畢"); Console.ReadKey(); } } }
②手動確認
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Consumer { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "hunter", Password = "hunter" }; //建立connection var connection = factory.CreateConnection(); //建立chanel var channel = connection.CreateModel(); //建立exchange channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null); //建立queue channel.QueueDeclare("queue1", true, false, false, null); //exchange綁定queue channel.QueueBind("queue1", "exchange1", "queue1", null); var result = channel.BasicGet("queue1", false); Console.WriteLine(Encoding.UTF8.GetString(result.Body)); //拒絕掉 //requeue:true:從新放回隊列 false:直接丟棄 channel.BasicReject(result.DeliveryTag, false); //BasicRecover方法則是進行補發操做, //其中的參數若是爲true是把消息退回到queue可是有可能被其它的consumer接收到,設置爲false是隻補發給當前的consumer //channel.BasicRecover(true); Console.ReadKey(); } } }
2,發佈端的確認
其中事務的性能消耗最大,confirm其次
①confirm機制
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; namespace Produce { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "hunter", Password = "hunter" }; //建立connection var connection = factory.CreateConnection(); //建立chanel var channel = connection.CreateModel(); channel.ConfirmSelect(); for (int i = 0; i < 10000; i++) { channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString())); } var isallPublish = channel.WaitForConfirms(); Console.WriteLine(isallPublish); channel.Dispose(); connection.Dispose(); Console.WriteLine("發佈完畢"); Console.ReadKey(); } } }
② 事物機制
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; namespace Produce { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "hunter", Password = "hunter" }; //建立connection var connection = factory.CreateConnection(); //建立chanel var channel = connection.CreateModel(); try { channel.TxSelect(); for (int i = 0; i < 10000; i++) { channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString())); } channel.TxCommit(); } catch (Exception ex) { channel.TxRollback(); } channel.Dispose(); connection.Dispose(); Console.WriteLine("發佈完畢"); Console.ReadKey(); } } }
5、Consumer消費問題
Consumer消費時,無論你是否卻不確認,消息都會一股腦所有打入到你的consumer中去,致使consumer端內存暴漲(EasynetQ的Subscribe不會出現這種狀況)
解決方法:
①eventbasicconsumer+QOS
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading; using RabbitMQ.Client; using RabbitMQ.Client.Events; using EasyNetQ; namespace Consumer { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "hunter", Password = "hunter" }; //建立connection var connection = factory.CreateConnection(); //建立chanel var channel = connection.CreateModel(); //建立exchange channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null); //建立queue channel.QueueDeclare("queue1", true, false, false, null); //exchange綁定queue channel.QueueBind("queue1", "exchange1", "queue1", null); //prefetchSize:預取大小 prefetchCount:預取數量 channel.BasicQos(0, 1, false); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (send, e) => { Console.WriteLine(Encoding.UTF8.GetString(e.Body)); channel.BasicAck(e.DeliveryTag, false);//確認送達 Thread.Sleep(1000000); }; //autoAck 設置爲true:自動確認 channel.BasicConsume("queue1", false, consumer); Console.ReadKey(); Console.ReadKey(); } } }
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; namespace Produce { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "hunter", Password = "hunter" }; //建立connection var connection = factory.CreateConnection(); //建立chanel var channel = connection.CreateModel(); try { channel.TxSelect(); for (int i = 0; i < 10000; i++) { channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString())); } channel.TxCommit(); } catch (Exception ex) { channel.TxRollback(); } channel.Dispose(); connection.Dispose(); Console.WriteLine("發佈完畢"); Console.ReadKey(); } } }