RabbitMq的基本認識和配置(一)

RabbitMq的基本認識和配置

什麼是消息中間件

  • 消息: 在應用之間傳遞的數據c++

  • 消息中間件:Message Queue Middleware,簡稱MQ,是指利用高效可靠的消息機制進行與平臺無關的數據交流,並基於數通訊機制來進行分佈式系統的集成。經過提供的消息傳遞和消息排隊模型,它能夠在分佈式環境下拓張進程間的通訊.vim

  • 傳遞的模式: 1.點對點(P2P,Point-to-Point)安全

2.發佈訂閱模式(Pub/Sub)app

  • 消息中間件適用的方向: 適用於可靠的數據傳輸的分佈式環境。可以在不一樣的平臺下通訊,屏蔽各個平臺以及協議之間的特性,實現應用程序的連接,而且在任什麼時候刻能夠將消息進行傳送或者存儲轉發,這比遠程過程調用更加進步。

消息中間件的做用

  • 解耦:在項目的啓動之初來預測未來會碰到什麼需求是極其困難的。消息中間件在處理過程當中間插入一個隱含的,基於數據的接口層,兩邊的處理過程都要實現這一個接口,這容許你獨立的拓展或修改兩邊的處理過程,只要確保他們遵循一樣的接口約束便可。異步

  • 冗餘(存儲):有些狀況下,處理數據的過程會失敗。消息中間件能夠把數據進行持久化直到他們已經被徹底處理,經過這一方式規避了數據丟失的風險。在把一個消息從消息中間件中刪除以前,須要你的處理系統明確地指出這個消息被處理完成,從而確保你地數據被安全地保存知道你使用完畢。分佈式

  • 擴展性:消息中間件解耦了應用地處理過程,因此提供了消息入隊和處理地效率是很容易地,只須要另外增長處理過程便可,不須要改變代碼,也不須要調節參數。ide

  • 削峯:在訪問量劇增地狀況下,應用仍然須要繼續發揮做用,可是這個月地突發流量的狀況的不常見。.net

  • 可恢復性:當系統一部分的組件失效時,不會影響到整個系統。消息中間件下降了進程間的耦合度。線程

  • 順序保證:在大所屬使用場景下,數據處理順序很重要,大部分消息中間件支持必定程度上的順序性。unix

  • 緩衝:在任何重要的系統中,都會存在須要不一樣處理時間的元素。消息中間件經過一個緩衝層來幫助任務最高效率的執行,寫入消息中間件的處理會盡量快速。

  • 異步通訊:在不少時候應用不想也不須要當即處理消息。消息中間件提供了異步通訊機制,容許應用把一些消息放入消息中間件中,但並不當即處理它,在以後須要的時候再慢慢處理。

Rabbitmq安裝

安裝erlang

由於RabbitMQ採用Erlang語言編寫的,因此須要配置erlang的環境,配置以下:

1.安裝gcc和openssl模塊

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

2.安裝

yum -y install ncurses-devel

3.指定編譯後程序的路徑

./configure --prefix=/usr/erlang

4.編譯安裝

make

make install

5.配置環境變量

配置好以後使用source /etc/profile將該配置文件生效,生效以後輸入erl回車能夠看到:

安裝完成erlang的環境以後,須要安裝的就是rabiitmq了。

1.這裏我下載的時rabbitmq-server-generic-unix-3.7.9.tar.xz,使用xz -d rabbitmq-server-generic-unix-3.7.9.tar.xz,tar -xvf rabbitmq-server-generic-unix-3.7.9.tar解壓。

2.配置環境變量

vim /etc/profile

3.使用守護線程的模式啓動rabbitmq

rabbitmq-server -detached

4.添加用戶 rabbitmqctl add_user root root123 rabbitmqctl set_permissions -p / root "." "." ".*" rabbitmqctl set_user_tags root administrator

5.雖然已經啓動了rabbitmq,可是後臺管理尚未打開,須要使用下面的命令打開後臺管理。

rabbitmqctl start_app

rabbitmq-plugins enable rabbitmq_management

此時仍是有問題,須要關閉防火牆,systemctl stop firewalld.service

此時訪問http://192.168.123.124:15672 登陸後能夠看到以下的界面:

到此爲止咱們就已經對rabbitmq有了基本認識,也能作些基本安裝配置。

簡單的例子:

1.導入依賴:

<dependencies>
		<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>5.6.0</version>
		</dependency>
	</dependencies>

消息生產者:

public class DemoProducer {
		private static final String EXCHANGE_NAME = "exchange_demo";
		private static final String ROUNTING_KEY = "routingkey_demo";
		private static final String QUEUE_NAME = "queue_demo";
		private static final String IP_ADDRESS = "192.168.124.129";
		private static final int PORT = 5672;

		public static void main(String[] args) throws IOException, TimeoutException {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(IP_ADDRESS);
			factory.setPort(PORT);
			factory.setUsername("root");
			factory.setPassword("root@123");
			//建立連接
			Connection connection = factory.newConnection();
			//建立信道
			Channel channel = connection.createChannel();
			channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
			//建立一個持久化,非排他的,非自動刪除的的隊列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			//將交換器與隊列經過路由鍵綁定
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUNTING_KEY);
			//發送一條持久化的消息:hello world;
			String message = "hello world";
			channel.basicPublish(EXCHANGE_NAME, ROUNTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
			//關閉資源
			channel.close();
			connection.close();
		}
	}

消息消費者:

public class ConsumerDemo {
		private static final String EXCHANGE_NAME = "exchange_demo";
		private static final String ROUNTING_KEY = "routingkey_demo";
		private static final String QUEUE_NAME = "queue_demo";
		private static final String IP_ADDRESS = "192.168.124.129";
		private static final int PORT = 5672;

		public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
			Address[] addresses = new Address[]{
					new Address(IP_ADDRESS, PORT)
			};
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(IP_ADDRESS);
			factory.setPort(PORT);
			factory.setUsername("root");
			factory.setPassword("root@123");
			//這裏的連接方式和生產者的連接連接方式有所不一樣,須要區別對待
			Connection connection = factory.newConnection(addresses);
			//建立信道
			final Channel channel = connection.createChannel();
			//設置客戶端最多接收爲被ack的消息個數
			channel.basicQos(64);
			Consumer consumer = new DefaultConsumer(channel) {
				[@Override](https://my.oschina.net/u/1162528)
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
					System.out.println("recv message:" + new String(body));
					try {
						TimeUnit.SECONDS.sleep(1);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			};
			channel.basicConsume(QUEUE_NAME, consumer);
			TimeUnit.SECONDS.sleep(5);
			channel.close();
			connection.close();
		}
	}

執行結果:

相關文章
相關標籤/搜索