RabbitMq入門級教程之徹底掌握5種開發方案

一、安裝

    windows安裝rabbitmqhtml

    mac安裝rabbitmq 須要注意的是,mac安裝rabbitmq,啓動的時候命令前,須要加 sudo,否則會報錯誤 spring

二、rabbitmq 開發概念詞

    2.1 Producer(生產者)apache

    2.2 Consumer(消費者)windows

    2.3 Exchange(交換機)服務器

    2.4 Queue(隊列)函數

    2.5 rountingKey(交換機與隊列之間的關係)工具

三、RabbitMQ開發方案

官網的6中模式,能夠點開這個網址,顯示6中模式,第6中模式RPC遠程調用咱們不須要用該模式,因此咱們只要關注前五種就能夠了。spa

接下來咱們就直接簡單教學rabbitmq的簡單使用線程

3.0 代碼講解

/**
 * 聲明隊列,五個參數列表,若是直接使用默認channel.queueDeclare("queue"),那麼其餘參數都會自動默認設置屬性,因此通常咱們幾乎都默認它
 * String queue  隊列名稱
 * boolean durable 隊列是須要持久化,意思就是rabbitmq重啓的時候,若是不是持久化,那麼該隊列就會消失,默認true
 * boolean exclusive 若是你想建立一個只有本身可見的隊列,不容許其它用戶訪問RabbitMQ容許你將一個Queue聲明成爲排他性的,只對首次聲明它的鏈接(Connection)可見,會在其鏈接斷開的時候自動刪除。因此咱們開發中通常不須要此操做,默認false
 * boolean autoDelete 消息是須要持久化,意思就是rabbitmq重啓的時候,若是爲true,那麼關閉期間接受的消息就會自動消失,默認false
 * Map<String, Object> arguments 這個參數咱們只會在使用延遲隊列中才會用到,就是延遲隊列的相關配置等屬性
 * 方法channel.queueDelete("queue")等同於channel.queueDeclare("queue",true,false,false,null);
 */

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
 * 綁定隊列到交換機 
 * String queue 隊列名
 * String exchange 交換機名
 * String routingKey 綁定關係 如 大頭兒子,小頭爸爸 他們的rountingKey就是父子
 */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

3.1 pom.xml

版本隨意,本博客任何版本高版本也行。xml

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.4.1</version>
</dependency>
<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.7</version>
</dependency>
<dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-lang3</artifactId>
   <version>3.3.2</version>
</dependency>
<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-rabbit</artifactId>
   <version>1.4.0.RELEASE</version>
</dependency>

3.2 工具類準備

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定義鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //設置帳號信息,用戶名、密碼、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 經過工程獲取鏈接
        Connection connection = factory.newConnection();
        return connection;
    }
}

3.3 簡單模式(simple)

理解:該rabbit服務器只有一個單一的隊列

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 從鏈接中建立通道
        Channel channel = connection.createChannel();
        // 聲明(建立)隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息內容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //關閉通道和鏈接
        channel.close();
        connection.close();
    }
}
public class Recv {

    private final static String QUEUE_NAME = "mujiutian_queue";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        //建立channel
        Channel channel = connection.createChannel();
        //建立隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //監聽隊列,發送消息
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 獲取消息,該線程一直進行
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("獲取到消息"+message);
        }
    }
}

先打開rece的main函數,這樣就能夠執行send發送消息。

3.4 工做模式(work)

理解:rabbit服務器有一個exchange(交換機),該交換機下有兩個隊列,總共發送了100條消息,A隊列效率高能夠搶到80條消息給消費者,B隊列只能搶到20條消息給發送者,他們的總和是100條,爲工做模式。

先執行,先執行消費者main函數,在看結果圖:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 100; i++) {
            // 消息內容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(10);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("收到消息'" + message + "'");
            // 返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

如圖:

// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);

跟這個有關係,這也就是咱們所說的工做模式

3.5 訂閱模式(Publish/Subribe)

理解:就是羣發,該交換機下的全部隊列,都會接受相同的全部交換機發來的消息,相似於qq羣同樣

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息內容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

3.6 路由模式(Routing)

理解就是:你是人,若是性別是女,請進女廁,是男性,請進男廁,如同,一個交換機下面的多個隊列,根據rountingKey判斷該接受的消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 消息內容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

3.7 通配符模式(Topics)

理解就是:路徑aas.# A隊列接受這種全部路徑,B隊列接受路徑下的全部隊列aab.#

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息內容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_work";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_topic_work2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
相關文章
相關標籤/搜索