RabbitMQ學習筆記

1、rabbitmqctl緩存

啓動rabbitmq rabbitmqctl start_app app

關閉rabbitmq  rabbitmqctl stop_app ide

格式化rabbitmq   rabbitmqctl reset (格式化以前須要先關閉rabbitmq)性能

強制格式化rabbitmq   rabbitmqctl force_reset  fetch

 

2、ExChangespa

1,Direct (直連)3d

 經過routingkey發送到指定的queuecode

using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DirectConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex= bus.ExchangeDeclare("direct", ExchangeType.Direct);
            var que= bus.QueueDeclare("001");//001爲queue的名稱
            bus.Bind(ex, que, "000");//000爲routingkey

            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));

            Console.ReadKey();
        }
    }
}
DirectConsumer
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DirectProduce
{
    class Program
    {
        static void Main(string[] args)
        {

            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("direct", ExchangeType.Direct);a

            var message = new Message<string>("sad");
            //000:爲routingkey
            bus.Publish<string>(ex, "000", false, message);

            bus.Dispose();

        }
    }
}
DirectProduce

 

2,Fanout(廣播)blog

 

 

使用這種類型的Exchange,會忽略routing key的存在,直接將message廣播到全部的Queue中。rabbitmq

適用場景:

                第一:大型玩家在玩在線遊戲的時候,能夠用它來廣播重大消息。這讓我想到電影微微一笑很傾城中,有款遊戲須要在世界上公佈玩家重大消息,也許這個就是用的MQ實現的。這讓我不由佩服肖奈,人家在大學的時候就知道RabbitMQ的這種特性了。

                第二:體育新聞實時更新到手機客戶端。

                第三:羣聊功能,廣播消息給當前羣聊中的全部人。

 

using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FanoutConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout);
            var que = bus.QueueDeclare("directQueue");//001爲queue的名稱
            bus.Bind(ex, que, string.Empty);//Fanout不須要設置routingkey
            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));

            Console.ReadKey();

        }
    }
}
FanoutConsumer
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FanoutConsumer2
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout);
            var que = bus.QueueDeclare("directQueue2");//001爲queue的名稱
            bus.Bind(ex, que, string.Empty);//Fanout不須要設置routingkey
            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));

            Console.ReadKey();
        }
    }
}
FanoutConsumer2
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FanoutProduce
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout);
            var message = new Message<string>("sad");

            //Fanout不須要設置routingkey
            bus.Publish<string>(ex, string.Empty, false, message);

            bus.Dispose();


        }
    }
}
FanoutProduce

 

3,Topic(主題)

Topic Exchange是根據routing key和Exchange的類型將message發送到一個或者多個Queue中

使用場景:

               新聞的分類更新

               同一任務多個工做者協調完成

               同一問題須要特定人員知曉

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using EasyNetQ;

namespace TopicConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter");

            //正確的使用方法:
            //c=> { c.WithTopic("*.cn"); }設置    Routing key。若是沒有這句,則Routing key爲#
            //*.com 只是queue的名稱
            //bus.Subscribe<string>("*.com", r => Console.WriteLine(r),c=> { c.WithTopic("*.cn"); });

            //subscriptionId是queue的名稱
            //subscriptionId+exchangeType=惟一
            bus.Subscribe<string>("*.com", r => Console.WriteLine(r));
            bus.Subscribe<string>("*.cn", r => Console.WriteLine(r));

            Console.ReadKey();
        }
    }
}
TopicConsumer
using EasyNetQ;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TopicProduce
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter");

            bus.Publish<string>("你好", "www.oyunkeji.com");
            //bus.Publish<string>("你好", c => c.WithTopic("www.oyunkeji.com"));

            bus.Dispose();
        }
    }
}
TopicProduce

 

4,Headers(頭信息)

 

它是根據Message的一些頭部信息來分發過濾Message,忽略routing key的屬性,若是Header信息和message消息的頭信息相匹配,那麼這條消息就匹配上了

x-match的頭部必須設置:

當x-match的值設置爲all時,header信息必須所有知足纔會匹配上

當x-match的值設置爲any時,header信息知足其中任意一個就會匹配上

using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HeadersConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("headers", ExchangeType.Header);
            var que = bus.QueueDeclare("headersQueue");//001爲queue的名稱
            bus.Bind(ex, que, string.Empty,new Dictionary<string, object>() {
                { "x-match","all"},
                { "username","hunter"},
                { "password","hunter"}
            });//Header不須要設置routingkey
            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));

            Console.ReadKey();

        }
    }
}
HeadersConsumer
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HeadersProduce
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("headers", ExchangeType.Header);

            var properties = new MessageProperties();
            properties.Headers.Add("username", "hunter");
            properties.Headers.Add("password", "hunter");

            //Fanout不須要設置routingkey
            bus.Publish(ex, string.Empty, false, properties, Encoding.UTF8.GetBytes("你好"));
            bus.Dispose();

        }
    }
}
HeadersProduce

 

案例下載:https://pan.baidu.com/s/1gVBO3qLl9Dw5tIhIpETvkw

 

3、Arguments

1,Message TTL(x-message-ttl)

發佈到隊列的消息在丟棄以前能夠存活多長時間(毫秒)。

①針對隊列中的全部消息

using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);

            //001爲queue的名稱
            //001隊列下的消息5秒鐘沒有被消費自動刪除
            var que = bus.QueueDeclare("001",perQueueMessageTtl:5000);
            bus.Bind(ex, que, "000");//000爲routingkey

            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));


            Console.ReadKey();
        }
    }
}
consumer
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);
            var properties = new MessageProperties();
            var message = new Message<string>("你好");
            //000:爲routingkey
            bus.Publish<string>(ex, "000", false, message);

            bus.Dispose();

        }
    }
}
produce

②指定某個消息

using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);

            //001爲queue的名稱
            //001隊列下的消息5秒鐘沒有被消費自動刪除
            var que = bus.QueueDeclare("001");
            bus.Bind(ex, que, "000");//000爲routingkey

            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));


            Console.ReadKey();
        }
    }
}
consumer
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);
            var properties = new MessageProperties();
            properties.Expiration = "5000";//單位:毫秒
            //000:爲routingkey
            bus.Publish(ex, "000", false, properties, Encoding.UTF8.GetBytes("你好"));

            bus.Dispose();

        }
    }
}
produce

2,Auto expire(x-expires)

 queue在指定的時間未被訪問,就會被刪除(毫秒)。

 

 

3,Max length(x-max-length)

限定隊列的最大長度,

 

 

4,Max length bytes(x-max-length-bytes)

限定隊列的最大佔用空間大小

 

 

5,Overflow behaviour(x-overflow)

 設置隊列溢出行爲。這決定了在到達隊列的最大長度時消息會發生什麼狀況。有效值是 drop-head(刪除頭)或者 reject-publish(拒絕發佈) 。

 

6,Dead letter exchange/Dead letter routing key(x-dead-letter-exchange/x-dead-letter-routing-key)

queue中的message過時時間。

basicreject...basicnack等等。。。

這三種狀況通常會drop這些message。。。

Dead letter exchange:時候咱們不但願message被drop掉,而是走到另外一個隊列中,又或者是保存起來

Dead letter routing key:指定的routing key

 

8,Maximum priority

(x-max-priority)

定義消息的優先級

 

9,Lazy mode(x-queue-mode)

將隊列設置爲延遲模式,在磁盤上保留儘量多的消息以減小RAM使用量; 若是未設置,隊列將保留內存中的緩存以儘量快地傳遞消息。

 

10,Master locator (x-queue-master-locator)

將隊列設置爲主位置模式,肯定隊列主節點在節點集羣上聲明時所處的規則。

4、高可靠消息隊列

1,消費端的確認

①自動確認

message出隊列的時候就自動確認

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //建立connection
            var connection = factory.CreateConnection();

            //建立chanel
            var channel = connection.CreateModel();

            //建立exchange
            channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null);

            //建立queue
            channel.QueueDeclare("queue1", true, false, false, null);

            //exchange綁定queue
            channel.QueueBind("queue1", "exchange1", "queue1", null);


            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (send, e) =>
            {
                Console.WriteLine(Encoding.UTF8.GetString(e.Body));
            };

            //autoAck 設置爲true:自動確認
            channel.BasicConsume("queue1", true, consumer);

            Console.ReadKey();
        }

    }
}
Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace Produce
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //建立connection
            var connection = factory.CreateConnection();

            //建立chanel
            var channel = connection.CreateModel();

            for (int i = 0; i < 10; i++)
            {
                channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
            }

            channel.Dispose();

            Console.WriteLine("發佈完畢");


            Console.ReadKey();

        }
    }
}
Produce

②手動確認

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //建立connection
            var connection = factory.CreateConnection();

            //建立chanel
            var channel = connection.CreateModel();

            //建立exchange
            channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null);

            //建立queue
            channel.QueueDeclare("queue1", true, false, false, null);

            //exchange綁定queue
            channel.QueueBind("queue1", "exchange1", "queue1", null);

            var result = channel.BasicGet("queue1", false);

            Console.WriteLine(Encoding.UTF8.GetString(result.Body));

            //拒絕掉
            //requeue:true:從新放回隊列 false:直接丟棄
            channel.BasicReject(result.DeliveryTag, false);

            //BasicRecover方法則是進行補發操做,
            //其中的參數若是爲true是把消息退回到queue可是有可能被其它的consumer接收到,設置爲false是隻補發給當前的consumer
            //channel.BasicRecover(true);

            Console.ReadKey();
        }

    }
}
Consumer

 

2,發佈端的確認

其中事務的性能消耗最大,confirm其次

①confirm機制

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace Produce
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //建立connection
            var connection = factory.CreateConnection();

            //建立chanel
            var channel = connection.CreateModel();

            channel.ConfirmSelect();
            for (int i = 0; i < 10000; i++)
            {
                channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
            }
            var isallPublish = channel.WaitForConfirms();
            Console.WriteLine(isallPublish);

            channel.Dispose();
            connection.Dispose();

            Console.WriteLine("發佈完畢");

            Console.ReadKey();

        }
    }
}
Produce

② 事物機制

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace Produce
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //建立connection
            var connection = factory.CreateConnection();

            //建立chanel
            var channel = connection.CreateModel();

            try
            {
                channel.TxSelect();
                for (int i = 0; i < 10000; i++)
                {
                    channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
                }
                channel.TxCommit();
            }
            catch (Exception ex)
            {
                channel.TxRollback();
            }

            channel.Dispose();
            connection.Dispose();

            Console.WriteLine("發佈完畢");

            Console.ReadKey();

        }
    }
}
Produce

 

5、Consumer消費問題

Consumer消費時,無論你是否卻不確認,消息都會一股腦所有打入到你的consumer中去,致使consumer端內存暴漲(EasynetQ的Subscribe不會出現這種狀況)

解決方法:

 ①eventbasicconsumer+QOS

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using EasyNetQ;

namespace Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //建立connection
            var connection = factory.CreateConnection();

            //建立chanel
            var channel = connection.CreateModel();

            //建立exchange
            channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null);

            //建立queue
            channel.QueueDeclare("queue1", true, false, false, null);

            //exchange綁定queue
            channel.QueueBind("queue1", "exchange1", "queue1", null);


            //prefetchSize:預取大小   prefetchCount:預取數量
            channel.BasicQos(0, 1, false);

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (send, e) =>
            {
                Console.WriteLine(Encoding.UTF8.GetString(e.Body));

                channel.BasicAck(e.DeliveryTag, false);//確認送達
                Thread.Sleep(1000000);
            };

            //autoAck 設置爲true:自動確認
            channel.BasicConsume("queue1", false, consumer);


            Console.ReadKey();

            Console.ReadKey();
        }

    }
}
Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace Produce
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //建立connection
            var connection = factory.CreateConnection();

            //建立chanel
            var channel = connection.CreateModel();

            try
            {
                channel.TxSelect();
                for (int i = 0; i < 10000; i++)
                {
                    channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
                }
                channel.TxCommit();
            }
            catch (Exception ex)
            {
                channel.TxRollback();
            }

            channel.Dispose();
            connection.Dispose();

            Console.WriteLine("發佈完畢");

            Console.ReadKey();

        }
    }
}
Produce
相關文章
相關標籤/搜索