RabbitMQ入門

1、簡述html

  a) RabbitMQ是mq的一種,目前全球來說社區是最活躍的,有問題一查就有。之前寫過騰訊的CMQ,社區文檔真的少啊。java

  b) RabbitMQ用處通常爲啦異步處理,而且可實現一個業務同步處理,速度、效率明顯提升。如註冊,那麼你想給客戶發短信與郵件,那麼只須要把這兩個消息放入MQ中就可。spring

   還能夠實現項目間的解耦通信等。windows

  c) 配上在網上扒的原理圖:springboot

  

  d) 分類異步

    我感受能夠給其分爲兩大類,經過有沒有路由的維度分Queue(隊列)與Exchange(路由)。通常開發場景都用Exchange。socket

    Queue可一對一,可一對多,消息均攤。maven

    Exchange有幾種分別爲:fanout,direct,topic,headers。ide

類型名稱 類型描述
fanout 廣播路由,把全部發送到該Exchange的消息路由到全部與它綁定的Queue中
direct Routing Key==Binding Key
topic 模糊路由經過規則匹配
headers Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。

2、安裝RabbitMQ(windows)spring-boot

  a) 安裝Erlang

      首先,您須要安裝支持的 Windows 版Erlang。下載並運行Erlang for Windows 安裝程序。下載地址http://www.erlang.org/downloads

       

  b) RabbitMQ的下載安裝

   下載地址http://www.rabbitmq.com/install-windows.html

     

  c) erl環境變量配置    ERLANG_HOME=d:\javasoft\erl7.1

  

   Path中加入%ERLANG_HOME%\bin;

  

  測試erl配置是否正確,開始-運行-cmd,輸入erl,顯示以下,證實配置正確

  

  d) RabbitMQ環境變量配置   都同樣先配置環境變量再加入Path中

   RABBITMQ_SERVER=C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.3

  

  在Path中加入%RABBITMQ_SERVER%\sbin;

  

  e) 激活rabbitmq_management

  在sbin目錄下輸入   rabbitmq-plugins enable rabbitmq_management

  或在CMD中鍵入以下命令 :  

  "C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.3\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
  

  f) 啓動RebbitMQ

  啓動命令: net start RabbitMQ
  中止命令: net stop RabbitMQ
  g) RabbitMQ測試
  測試地址 http://localhost:15672/
  默認的用戶名:guest
  默認的密碼爲:guest

 

3、編寫例子(springboot) 例子來自於官網  https://www.rabbitmq.com/tutorials/tutorial-one-java.html

  在pom文件中加入maven依賴

  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

  a) 發送端(生產者)

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class SendMQ {
    private final static String QUEUE_NAME = "Hello";

    public static void main(String[] args) throws IOException, Exception {
        // connection是socket鏈接的抽象,而且爲咱們管理協議版本協商(protocol version negotiation),
        // 認證(authentication )等等事情。這裏咱們要鏈接的消息代理在本地,所以咱們將host設爲「localhost」。
        // 若是咱們想鏈接其餘機器上的代理,只須要將這裏改成特定的主機名或IP地址。
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672); //默認端口號
        factory.setUsername("guest");//默認用戶名
        factory.setPassword("guest");//默認密碼
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 接下來,咱們建立一個channel,絕大部分API方法須要經過調用它來完成。
        // 發送以前,咱們必須聲明消息要發往哪一個隊列,而後咱們能夠向隊列發一條消息:
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello world";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}

  b) 接受端(消費者)

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import org.junit.Test;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RecvMQ {
    private final static String QUEUE_NAME = "Hello";

    public static void main(String[] args) throws IOException, Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

  很少說運行看結果。

   感謝網上的奉獻者,在這裏致謝。

相關文章
相關標籤/搜索