首先甩官網:http://www.rabbitmq.com/html
而後是.NET Client連接:http://www.rabbitmq.com/dotnet.htmlgit
GitHub倉庫:https://github.com/rabbitmq/rabbitmq-dotnet-clientgithub
下面直接進入正文,一共是兩個主題:消費者怎麼寫?生產者怎麼寫?web
在dotnet core mvc中,消費者確定不能經過API或者其餘的東西啓動,理應是跟着程序一塊兒啓動的.api
因此...mvc
在dotnet core 2.0以上版本,咱們直接用 IHostedService 接口實現.app
直接上代碼.webapp
// RabbitListener.cs 這個是基類,只實現註冊RabbitMQ後到監聽消息,而後每一個消費者本身去重寫RouteKey/QueueName/消息處理函數Process using System; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Test.Listener { public class RabbitListener : IHostedService { private readonly IConnection connection; private readonly IModel channel; public RabbitListener(IOptions<AppConfiguration> options) { try { var factory = new ConnectionFactory() { // 這是我這邊的配置,本身改爲本身用就好 HostName = options.Value.RabbitHost, UserName = options.Value.RabbitUserName, Password = options.Value.RabbitPassword, Port = options.Value.RabbitPort, }; this.connection = factory.CreateConnection(); this.channel = connection.CreateModel(); } catch (Exception ex) { Console.WriteLine($"RabbitListener init error,ex:{ex.Message}"); } } public Task StartAsync(CancellationToken cancellationToken) { Register(); return Task.CompletedTask; } protected string RouteKey; protected string QueueName; // 處理消息的方法 public virtual bool Process(string message) { throw new NotImplementedException(); } // 註冊消費者監聽在這裏 public void Register() { Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}"); channel.ExchangeDeclare(exchange: "message", type: "topic"); channel.QueueDeclare(queue:QueueName, exclusive: false); channel.QueueBind(queue: QueueName, exchange: "message", routingKey: RouteKey); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var result = Process(message); if (result) { channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue: QueueName, consumer: consumer); } public void DeRegister() { this.connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { this.connection.Close(); return Task.CompletedTask; } } } // 隨便貼一個子類 using System; using System.Text; using Microsoft.Extensions.Options; using Newtonsoft.Json.Linq; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Microsoft.Extensions.DependencyInjection; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; namespace Test.Listener { public class ChapterLister : RabbitListener { private readonly ILogger<RabbitListener> _logger; // 由於Process函數是委託回調,直接將其餘Service注入的話二者不在一個scope, // 這裏要調用其餘的Service實例只能用IServiceProvider CreateScope後獲取實例對象 private readonly IServiceProvider _services; public ChapterLister(IServiceProvider services, IOptions<AppConfiguration> options, ILogger<RabbitListener> logger) : base(options) { base.RouteKey = "done.task"; base.QueueName = "lemonnovelapi.chapter"; _logger = logger; _services = services; } public override bool Process(string message) { var taskMessage = JToken.Parse(message); if (taskMessage == null) { // 返回false 的時候回直接駁回此消息,表示處理不了 return false; } try { using (var scope = _services.CreateScope()) { var xxxService = scope.ServiceProvider.GetRequiredService<XXXXService>(); return true; } } catch (Exception ex) { _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}"); _logger.LogError(-1, ex, "Process fail"); return false; } } } }
而後,記住....ide
注入到Startup.cs的時候,使用AddHostedService函數
services.AddHostedService<ChapterLister>();
消費者就這樣玩了.
這個其實更簡單.
using System; using System.Net; using Newtonsoft.Json.Linq; using RestSharp; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using Newtonsoft.Json; using System.Text; namespace Test.SDK { public class RabbitMQClient { private readonly IModel _channel; private readonly ILogger _logger; public RabbitMQClient(IOptions<AppConfiguration> options, ILogger<RabbitMQClient> logger) { try { var factory = new ConnectionFactory() { HostName = options.Value.RabbitHost, UserName = options.Value.RabbitUserName, Password = options.Value.RabbitPassword, Port = options.Value.RabbitPort, }; var connection = factory.CreateConnection(); _channel = connection.CreateModel(); } catch (Exception ex) { logger.LogError(-1, ex, "RabbitMQClient init fail"); } _logger = logger; } public virtual void PushMessage(string routingKey, object message) { _logger.LogInformation($"PushMessage,routingKey:{routingKey}"); _channel.QueueDeclare(queue: "message", durable: false, exclusive: false, autoDelete: false, arguments: null); string msgJson = JsonConvert.SerializeObject(message); var body = Encoding.UTF8.GetBytes(msgJson); _channel.BasicPublish(exchange: "message", routingKey: routingKey, basicProperties: null, body: body); } } }
切記注入實例的時候用單例模式.
services.AddSingleton<RabbitMQClient, RabbitMQClient>();
全文完...