demo rabbitmq topic(主題模式),fanout(廣播模式),輪詢分發,確認接收Ack處理

//durable = true 表明持久化 交換機和隊列都要爲true ,持久表明服務重啓,沒有處理的消息依然存在
//topic 根據不一樣的routkey 發送和接收信息spa

//fanout 廣播模式orm

//廣播模式,表明每一個消費者都會收到消息,每個收到的都是1,2,3,4,5,6blog

//輪詢模式,當兩個消費者時候,每一個消費者都會挨個接收消息 好比第一個接收1,2,3 第二個接收到的消息是2,4,6隊列

 

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace IOT_DeviceSocket
{
	public partial class FormRabbmitMQ : Form
	{
		public FormRabbmitMQ()
		{
			InitializeComponent();
		}
		string EXCHANGE_NAME = "EXCHANGE_NAME1111";
		string queuename = "queuename1111";
		private void button1_Click(object sender, EventArgs e)
		{
			//TOPIC發送();
			輪詢發送();
		}

		private void button2_Click(object sender, EventArgs e)
		{
			//TOPIC接收();
			輪詢接收();
		}

		public void 輪詢發送()
		{
			#region 輪詢
			var factory = new ConnectionFactory();
			factory.HostName = "127.0.0.1";
			factory.UserName = "Teld";
			factory.Password = "Teld@Teld.cn";
			//隊列名稱

			using (var connection = factory.CreateConnection())
			{
				using (var channel = connection.CreateModel())
				{
					//durable = true 表明持久化 交換機和隊列都要爲true
					//topic 輪詢模式 fanout 廣播模式
					//輪詢模式,當兩個消費者時候,每一個消費者都會挨個接收消息 好比第一個接收1,2,3 第二個接收到的消息是2,4,6
					//廣播模式,表明每一個消費者都會收到消息,每個收到的都是1,2,3,4,5,6

					//申明交換機並指定交換機類型 能夠刪除,也能夠topic改成fanout模式
					channel.ExchangeDeclare(EXCHANGE_NAME, "topic", true);//申明交換機並指定交換機類型
					channel.QueueDeclare(queuename, true, false, false, null);
					//公平分發
					//channel.BasicQos(0, 1, false);
					var properties = channel.CreateBasicProperties();
					properties.DeliveryMode = 2; //持久化
					for (int i = 0; i < 1; i++)
					{
						var body = Encoding.UTF8.GetBytes(textBox1.Text);
						channel.BasicPublish("", queuename, properties, body);
					}

				}
			}
			#endregion
		}
		public void 輪詢接收()
		{
			for (int i = 0; i < 2; i++)
			{
				var s = i;
				#region 輪詢
				var factory = new ConnectionFactory();
				factory.HostName = "127.0.0.1";
				factory.UserName = "Teld";
				factory.Password = "Teld@Teld.cn";
				var connection = factory.CreateConnection();
				var channel = connection.CreateModel();
				var consumer = new EventingBasicConsumer(channel);
				//申明交換機並指定交換機類型 能夠刪除,也能夠topic改成fanout模式
				channel.ExchangeDeclare(EXCHANGE_NAME, "topic", true);
				var queuenames = channel.QueueDeclare().QueueName;
				channel.QueueBind(queuenames, EXCHANGE_NAME, "");
				consumer.Received += (model, ea) =>
				{
					var body = ea.Body;
					try
					{
						//處理消息
						textBox2.Invoke(new EventHandler(delegate
						{
							textBox2.Text += s.ToString() + ":" + Encoding.Default.GetString(body) + "\r\n";
						}));
					}
					catch (Exception ex)
					{

					}
					finally
					{
						//確認完成
						channel.BasicAck(ea.DeliveryTag, false);
					}
				};
				//設置手動完成確認(noAck)
				channel.BasicConsume(queuename, false, consumer);
				#endregion

			}
		}


		public void 廣播發送()
		{
			#region 廣播
			var factory = new ConnectionFactory();
			factory.HostName = "127.0.0.1";
			factory.UserName = "Teld";
			factory.Password = "Teld@Teld.cn";
			var EXCHANGE_NAME = "eee";
			using (var connection = factory.CreateConnection())
			{
				using (var channel = connection.CreateModel())
				{
					// 聲明該channel是fanout類型
					channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

					// 將消息發送給exchange
					//channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
					var body = Encoding.UTF8.GetBytes(textBox1.Text);
					channel.BasicPublish(EXCHANGE_NAME, "", null, body);
				}
			}
			#endregion
		}

		public void 廣播接收()
		{
			for (int i = 0; i < 2; i++)
			{
				var s = i;
				#region 廣播
				// 建立鏈接和channel
				ConnectionFactory factory = new ConnectionFactory();
				factory.HostName = "127.0.0.1";
				factory.UserName = "Teld";
				factory.Password = "Teld@Teld.cn";
				var connection = factory.CreateConnection();
				var channel = connection.CreateModel();
				channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");
				// 由RabbitMQ自行建立的臨時隊列,惟一且隨消費者的停止而自動刪除的隊列
				String queueName = channel.QueueDeclare().QueueName;
				// binding
				channel.QueueBind(queueName, EXCHANGE_NAME, "");

				var consumer = new EventingBasicConsumer(channel);
				// 指定隊列消費者
				channel.BasicConsume(queueName, true, consumer);
				consumer.Received += (model, ea) =>
				{
					var body = ea.Body;
					try
					{
						//處理消息
						textBox2.Invoke(new EventHandler(delegate
						{
							textBox2.Text += s.ToString() + ":" + Encoding.Default.GetString(body) + "\r\n";
						}));
					}
					catch (Exception ex)
					{

					}
					finally
					{

					}
				};
				#endregion
			}
		}



		String exchangeName = "wytExchangeTopic";
		String routeKeyName1 = "black.critical.high";
		String routeKeyName2 = "red.critical.high";
		String routeKeyName3 = "white.critical.high";

		public void TOPIC發送()
		{
			var factory = new ConnectionFactory();
			factory.HostName = "127.0.0.1";
			factory.UserName = "Teld";
			factory.Password = "Teld@Teld.cn";
			//隊列名稱

			using (var connection = factory.CreateConnection())
			{
				using (var channel = connection.CreateModel())
				{
					//durable = true 表明持久化 交換機和隊列都要爲true
					//topic 輪詢模式 fanout 廣播模式
					//輪詢模式,當兩個消費者時候,每一個消費者都會挨個接收消息 好比第一個接收1,2,3 第二個接收到的消息是2,4,6
					//廣播模式,表明每一個消費者都會收到消息,每個收到的都是1,2,3,4,5,6
					channel.ExchangeDeclare(exchangeName, "topic", true);//申明交換機並指定交換機類型
					var properties = channel.CreateBasicProperties();
					properties.DeliveryMode = 2; //持久化
					properties.Persistent = true;
					for (int i = 0; i < 1; i++)
					{
						//發送的消息
						var body = Encoding.UTF8.GetBytes(textBox1.Text);

						//給相應的routingKey 的推送消息,模擬給三個不一樣的key發送同一個消息,也能夠給一個key發送消息
						//消息推送routeKeyName1
						channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName1, basicProperties: properties, body: body);
						////消息推送routeKeyName2
						//channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body);
						////消息推送routeKeyName3
						//channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body);
					}

				}
			}
		}

		public void TOPIC接收()
		{
			#region   topic模式 根據routingkey

			for (int i = 0; i < 2; i++)
			{
				var s = i;

				var factory = new ConnectionFactory();
				factory.HostName = "127.0.0.1";
				factory.UserName = "Teld";
				factory.Password = "Teld@Teld.cn";

				var connection = factory.CreateConnection();
				var channel = connection.CreateModel();
				var consumer = new EventingBasicConsumer(channel);
				channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);

				String queueName = channel.QueueDeclare().QueueName;

				//接收一種就綁定一種routeKey
				channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);
				//channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null);

				consumer.Received += (model, ea) =>
				{
					var body = ea.Body;
					try
					{
					//處理消息
					textBox2.Invoke(new EventHandler(delegate
						{
							textBox2.Text += s.ToString() + ":" + Encoding.Default.GetString(body) + "\r\n";
						}));
					}
					catch (Exception ex)
					{

					}
					finally
					{
					//確認完成
					channel.BasicAck(ea.DeliveryTag, false);
					}
				};
				//設置手動完成確認(noAck)
				channel.BasicConsume(queueName, false, consumer);
			}
			#endregion



		}
	}
}
相關文章
相關標籤/搜索