.net core RabbitMQ 消息隊列

上篇咱們說到erlang的安裝,如今有了基礎前提,就能夠繼續安裝RabbitMQ了! 這裏我選用的RabbitMQ版本是: PS:這個RabbitMQ版本是要對應前面erlang版本,因此前面咱們安裝的版本是20.3,由於最大支持21.X版本的erlang才能安裝 rabbitmq-server-3.7.10html

1.安裝RabbitMQ download 下載完執行exe文件,安裝到本身選用的目錄,並配置環境變量windows

rabbitmq的基本操做:瀏覽器

  • 啓動:rabbitmq-server -detached
  • 關閉:rabbitmqctl stop
  • 啓動:rabbitmqctl status

2.配置rabbitmq網頁管理插件 以管理員運行命令提示啓用插件:fetch

rabbitmq-plugins enable rabbitmq_management

打開瀏覽器頁面:http://localhost:15672 能夠看到 spa

默認登錄爲:guest/guest.net

3.開啓rabbitMQ遠程訪問插件

  • 添加用戶,用戶名:XRom 密碼:XRom123
rabbitmqctl add_user XRom XRom123
  • 添加權限
rabbitmqctl set_permissions -p "/" XRom ".*" ".*" ".*"
  • 修改用戶角色
rabbitmqctl set_user_tags XRom administrator

而後就能夠遠程訪問了,能夠用新增的用戶登陸RabbitMQ code

4.Producer與Exchangeorm

  • Producer 消息的生產者,也就是建立消息的對象
  • Exchange 消息的接受者,也就是用來接收消息的對象,Exchange接收到消息後將消息按照規則發送到與他綁定的Queue中。下面咱們來定義一個Producer與Exchange。

新建.net core 控制檯項目,並引入NuGet包 server

接下來能夠用代碼看實現效果:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQConsole
{
    class Program
    {
        /// <summary>
        ///  建立只讀鏈接對象
        /// </summary>
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "",//這裏寫本身電腦hostname,能夠經過命令提示符,直接輸入hostname查詢
            Port = 5672,
            UserName = "XRom",
            Password = "XRom123",
            VirtualHost = "/"
        };

        static void Main(string[] args)
        {
            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";

            using (IConnection conn = rabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);//建立change2
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);//建立queue2
                    channel.QueueBind(queue, exchange, route);//將queue2綁定到change2

                    #region 發送消息
                    var props = channel.CreateBasicProperties();
                    props.Persistent = true; //持久化
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbitmq!!"));
                    #endregion

                    #region 消費消息
                    //while (true)
                    //{
                    //    var message = channel.BasicGet(queue, true);  //第二個參數說明自動釋放消息,如爲false需手動釋放消息
                    //    if (message != null)
                    //    {
                    //        var msgBody = Encoding.UTF8.GetString(message.Body);
                    //        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    //    }
                    //    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                    //}
                    #endregion

                    #region 讓失敗的消息回到隊列中
                    //while (true)
                    //{
                    //    var message = channel.BasicGet(queue, false);
                    //    if (message != null)
                    //    {
                    //        var msgBody = Encoding.UTF8.GetString(message.Body);
                    //        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    //        Console.WriteLine(message.DeliveryTag);   //當前消息被處理的次序數
                    //    if (1 == 1)
                    //            channel.BasicReject(message.DeliveryTag, true);
                    //    }

                    //    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                    //}
                    #endregion

                    #region 監聽消息
                    //channel.BasicQos(prefetchSize: 0, prefetchCount: 20, global: false);  //一次接受10條消息,不然rabbit會把全部的消息一次性推到client,會增大client的負荷
                    //EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    //consumer.Received += (model, ea) =>
                    //{
                    //    Byte[] body = ea.Body;
                    //    String message = Encoding.UTF8.GetString(body);
                    //    Console.WriteLine(message + Thread.CurrentThread.ManagedThreadId);
                    //    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    //};

                    //channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                    //Console.ReadLine();
                    #endregion
                }
            }





        }
    }
}

相關文章
相關標籤/搜索