mac安裝rabbitmq 須要注意的是,mac安裝rabbitmq,啓動的時候命令前,須要加 sudo,否則會報錯誤 spring
2.1 Producer(生產者)apache
2.2 Consumer(消費者)windows
2.3 Exchange(交換機)服務器
2.4 Queue(隊列)函數
2.5 rountingKey(交換機與隊列之間的關係)工具
官網的6中模式,能夠點開這個網址,顯示6中模式,第6中模式RPC遠程調用咱們不須要用該模式,因此咱們只要關注前五種就能夠了。spa
接下來咱們就直接簡單教學rabbitmq的簡單使用線程
/** * 聲明隊列,五個參數列表,若是直接使用默認channel.queueDeclare("queue"),那麼其餘參數都會自動默認設置屬性,因此通常咱們幾乎都默認它 * String queue 隊列名稱 * boolean durable 隊列是須要持久化,意思就是rabbitmq重啓的時候,若是不是持久化,那麼該隊列就會消失,默認true * boolean exclusive 若是你想建立一個只有本身可見的隊列,不容許其它用戶訪問RabbitMQ容許你將一個Queue聲明成爲排他性的,只對首次聲明它的鏈接(Connection)可見,會在其鏈接斷開的時候自動刪除。因此咱們開發中通常不須要此操做,默認false * boolean autoDelete 消息是須要持久化,意思就是rabbitmq重啓的時候,若是爲true,那麼關閉期間接受的消息就會自動消失,默認false * Map<String, Object> arguments 這個參數咱們只會在使用延遲隊列中才會用到,就是延遲隊列的相關配置等屬性 * 方法channel.queueDelete("queue")等同於channel.queueDeclare("queue",true,false,false,null); */ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** * 綁定隊列到交換機 * String queue 隊列名 * String exchange 交換機名 * String routingKey 綁定關係 如 大頭兒子,小頭爸爸 他們的rountingKey就是父子 */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
版本隨意,本博客任何版本高版本也行。xml
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.0.RELEASE</version> </dependency>
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; public class ConnectionUtil { public static Connection getConnection() throws Exception { //定義鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("localhost"); //端口 factory.setPort(5672); //設置帳號信息,用戶名、密碼、vhost factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); // 經過工程獲取鏈接 Connection connection = factory.newConnection(); return connection; } }
理解:該rabbit服務器只有一個單一的隊列
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 從鏈接中建立通道 Channel channel = connection.createChannel(); // 聲明(建立)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息內容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //關閉通道和鏈接 channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "mujiutian_queue"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); //建立channel Channel channel = connection.createChannel(); //建立隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); //監聽隊列,發送消息 channel.basicConsume(QUEUE_NAME, true, consumer); // 獲取消息,該線程一直進行 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("獲取到消息"+message); } } }
先打開rece的main函數,這樣就能夠執行send發送消息。
理解:rabbit服務器有一個exchange(交換機),該交換機下有兩個隊列,總共發送了100條消息,A隊列效率高能夠搶到80條消息給消費者,B隊列只能搶到20條消息給發送者,他們的總和是100條,爲工做模式。
先執行,先執行消費者main函數,在看結果圖:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { // 消息內容 String message = "" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(10); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("收到消息'" + message + "'"); // 返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
如圖:
// 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1);
跟這個有關係,這也就是咱們所說的工做模式
理解:就是羣發,該交換機下的全部隊列,都會接受相同的全部交換機發來的消息,相似於qq羣同樣
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息內容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
理解就是:你是人,若是性別是女,請進女廁,是男性,請進男廁,如同,一個交換機下面的多個隊列,根據rountingKey判斷該接受的消息
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息內容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
理解就是:路徑aas.# A隊列接受這種全部路徑,B隊列接受路徑下的全部隊列aab.#
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息內容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_topic_work"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_topic_work2"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }