//durable = true 表明持久化 交換機和隊列都要爲true ,持久表明服務重啓,沒有處理的消息依然存在
//topic 根據不一樣的routkey 發送和接收信息spa
//fanout 廣播模式orm
//廣播模式,表明每一個消費者都會收到消息,每個收到的都是1,2,3,4,5,6blog
//輪詢模式,當兩個消費者時候,每一個消費者都會挨個接收消息 好比第一個接收1,2,3 第二個接收到的消息是2,4,6隊列
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms; namespace IOT_DeviceSocket { public partial class FormRabbmitMQ : Form { public FormRabbmitMQ() { InitializeComponent(); } string EXCHANGE_NAME = "EXCHANGE_NAME1111"; string queuename = "queuename1111"; private void button1_Click(object sender, EventArgs e) { //TOPIC發送(); 輪詢發送(); } private void button2_Click(object sender, EventArgs e) { //TOPIC接收(); 輪詢接收(); } public void 輪詢發送() { #region 輪詢 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; //隊列名稱 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //durable = true 表明持久化 交換機和隊列都要爲true //topic 輪詢模式 fanout 廣播模式 //輪詢模式,當兩個消費者時候,每一個消費者都會挨個接收消息 好比第一個接收1,2,3 第二個接收到的消息是2,4,6 //廣播模式,表明每一個消費者都會收到消息,每個收到的都是1,2,3,4,5,6 //申明交換機並指定交換機類型 能夠刪除,也能夠topic改成fanout模式 channel.ExchangeDeclare(EXCHANGE_NAME, "topic", true);//申明交換機並指定交換機類型 channel.QueueDeclare(queuename, true, false, false, null); //公平分發 //channel.BasicQos(0, 1, false); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //持久化 for (int i = 0; i < 1; i++) { var body = Encoding.UTF8.GetBytes(textBox1.Text); channel.BasicPublish("", queuename, properties, body); } } } #endregion } public void 輪詢接收() { for (int i = 0; i < 2; i++) { var s = i; #region 輪詢 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); var consumer = new EventingBasicConsumer(channel); //申明交換機並指定交換機類型 能夠刪除,也能夠topic改成fanout模式 channel.ExchangeDeclare(EXCHANGE_NAME, "topic", true); var queuenames = channel.QueueDeclare().QueueName; channel.QueueBind(queuenames, EXCHANGE_NAME, ""); consumer.Received += (model, ea) => { var body = ea.Body; try { //處理消息 textBox2.Invoke(new EventHandler(delegate { textBox2.Text += s.ToString() + ":" + Encoding.Default.GetString(body) + "\r\n"; })); } catch (Exception ex) { } finally { //確認完成 channel.BasicAck(ea.DeliveryTag, false); } }; //設置手動完成確認(noAck) channel.BasicConsume(queuename, false, consumer); #endregion } } public void 廣播發送() { #region 廣播 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; var EXCHANGE_NAME = "eee"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { // 聲明該channel是fanout類型 channel.ExchangeDeclare(EXCHANGE_NAME, "fanout"); // 將消息發送給exchange //channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); var body = Encoding.UTF8.GetBytes(textBox1.Text); channel.BasicPublish(EXCHANGE_NAME, "", null, body); } } #endregion } public void 廣播接收() { for (int i = 0; i < 2; i++) { var s = i; #region 廣播 // 建立鏈接和channel ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); channel.ExchangeDeclare(EXCHANGE_NAME, "fanout"); // 由RabbitMQ自行建立的臨時隊列,惟一且隨消費者的停止而自動刪除的隊列 String queueName = channel.QueueDeclare().QueueName; // binding channel.QueueBind(queueName, EXCHANGE_NAME, ""); var consumer = new EventingBasicConsumer(channel); // 指定隊列消費者 channel.BasicConsume(queueName, true, consumer); consumer.Received += (model, ea) => { var body = ea.Body; try { //處理消息 textBox2.Invoke(new EventHandler(delegate { textBox2.Text += s.ToString() + ":" + Encoding.Default.GetString(body) + "\r\n"; })); } catch (Exception ex) { } finally { } }; #endregion } } String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "black.critical.high"; String routeKeyName2 = "red.critical.high"; String routeKeyName3 = "white.critical.high"; public void TOPIC發送() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; //隊列名稱 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //durable = true 表明持久化 交換機和隊列都要爲true //topic 輪詢模式 fanout 廣播模式 //輪詢模式,當兩個消費者時候,每一個消費者都會挨個接收消息 好比第一個接收1,2,3 第二個接收到的消息是2,4,6 //廣播模式,表明每一個消費者都會收到消息,每個收到的都是1,2,3,4,5,6 channel.ExchangeDeclare(exchangeName, "topic", true);//申明交換機並指定交換機類型 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //持久化 properties.Persistent = true; for (int i = 0; i < 1; i++) { //發送的消息 var body = Encoding.UTF8.GetBytes(textBox1.Text); //給相應的routingKey 的推送消息,模擬給三個不一樣的key發送同一個消息,也能夠給一個key發送消息 //消息推送routeKeyName1 channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName1, basicProperties: properties, body: body); ////消息推送routeKeyName2 //channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body); ////消息推送routeKeyName3 //channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body); } } } } public void TOPIC接收() { #region topic模式 根據routingkey for (int i = 0; i < 2; i++) { var s = i; var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); var consumer = new EventingBasicConsumer(channel); channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; //接收一種就綁定一種routeKey channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null); //channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null); consumer.Received += (model, ea) => { var body = ea.Body; try { //處理消息 textBox2.Invoke(new EventHandler(delegate { textBox2.Text += s.ToString() + ":" + Encoding.Default.GetString(body) + "\r\n"; })); } catch (Exception ex) { } finally { //確認完成 channel.BasicAck(ea.DeliveryTag, false); } }; //設置手動完成確認(noAck) channel.BasicConsume(queueName, false, consumer); } #endregion } } }