消息中間件-RabbitMq(搭建&消息分發)

消息中間件-RabbitMq(搭建&消息分發)

RabbitMq】是一個【AMQP】協議的實現。服務端使用的是Erlang語言進行編寫,那也就是說,咱們要運行它,就要安裝相關Erlang環境。前面說了AMQP最初是爲了解決金融行業的可用性問題,因此Rabbit在高可用方面表現不俗,而且在我看來他是這幾種中間件中最容易上手的一個。並且它在併發方面表現十分出色,能夠實現大概10w的吞吐量。他的特色是:【可靠性、消息集羣、高可用、插件機制(可讓它支持別的協議)、支持多語言客戶端、管理頁面 so on本篇主要聊聊如何安裝、使用、以及關於他的一些名詞方面的闡述。run。。html

安裝運行

  • 個人環境是CentOS7
  • http://www.rabbitmq.com/which-erlang.html 頁面查看安裝rabbitmq須要安裝erlang對應的版本,前面是Rabbit的版本,後面是Erlang的對它支持的版本。這裏先後要對應下載,版本必須符合他的要求,我這裏使用的就是第一個。
  • https://github.com/rabbitmq/erlang-rpm/releases 中複製對應的版本erlang下載地址
  • https://github.com/rabbitmq/rabbitmq-server/tags 中複製對應的版本rabbitmq的下載地址

  • 下載Erlang
    • wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v23.3.4.3/erlang-23.3.4.3-1.el7.x86_64.rpm
  • 安裝Erlang
    • sudo rpm -Uvh /home/download/erlang-23.3.4.3-1.el7.x86_64.rpm
  • 安裝socat
    • sudo yum install -y socat
  • 下載RabbitMQ
    •  wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-server-3.8.15-1.el7.noarch.rpm
  • 安裝RabbitMQ
    • 【sudo rpm -Uvh /home/download/rabbitmq-server-3.8.15-1.el7.noarch.rpm】

到目前爲止咱們的準備工做完畢,如下是一些啓動和關閉命令。git

中止服務】:sudo systemctl stop rabbitmq-server 【查詢狀態】:sudo systemctl status rabbitmq-server 【啓動】:sudo systemctl start rabbitmq-server 【設置開啓自啓】:sudo systemctl enable rabbitmq-server github

使用啓動命令啓動後,咱們查詢狀態發現狀態爲 dead,這是由於咱們要啓動他的插件 使用【rabbitmq-plugins list】能夠查詢全部他支持的插件,咱們這裏須要啓動服務器

【rabbitmq-plugins  enable rabbitmq_management】併發

 執行完成後使用【 cat /etc/rabbitmq/enabled_plugins】就能夠知道是否啓動插件成功,而後再次啓動發現啓動狀態就爲running,使用【netstat -nplt | grep 15672 】發現他的專用端口已經開啓,至此,安裝啓動完畢。這個時候就能夠對它進行訪問了(你的ip:15672),出現下面的圖,就證實搭建成功。這裏注意開放一下端口,不然別的機器沒法訪問:tcp

  • sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
  • sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
  • sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
  • sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent 

 然而咱們用他本身的gust是沒法login in 進去的,由於這個支持在搭建的服務器自己上訪問,那咱們就要建立本身的用戶,而且賦予相應的權限。ide

  • 【添加一個admin用戶】:rabbitmqctl add_user admin admin 
  • 【分配操做權限】:rabbitmqctl set_user_tags admin administrator
  • 【分配資源權限】:rabbitmqctl set_permissions -p / admin ".*" ".*" ".*

使用admin進行登陸,至此,能夠rabbitmq能夠正常使用ui

使用

添加相關依賴spa

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.5.1</version>
 </dependency>

生產一個消息插件

public class Producer {

    public static void main(String[] args) {
        // 一、建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 二、設置鏈接屬性
        factory.setHost("你的ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 三、從鏈接工廠獲取鏈接
            connection = factory.newConnection();

            // 四、從連接中建立通道
            channel = connection.createChannel();

            /**
             * 五、聲明(建立)隊列
             * 若是隊列不存在,纔會建立
             * RabbitMQ 不容許聲明兩個隊列名相同,屬性不一樣的隊列,不然會報錯
             *
             * queueDeclare參數說明:
             * @param queue 隊列名稱
             * @param durable 隊列是否持久化
             * @param exclusive 是否排他,便是否爲私有的,若是爲true,會對當前隊列加鎖,其它通道不能訪問,而且在鏈接關閉時會自動刪除,不受持久化和自動刪除的屬性控制
             * @param autoDelete 是否自動刪除,當最後一個消費者斷開鏈接以後是否自動刪除
             * @param arguments 隊列參數,設置隊列的有效期、消息最大長度、隊列中全部消息的生命週期等等
             */
            channel.queueDeclare("queue1", false, false, false, null);
            // 消息內容
            String message = "Hello World!";
            // 六、發送消息
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息已發送!");

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 七、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 八、關閉鏈接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

 

生產後你去管理頁面查詢,會發現一個消息還未讀取。

消費一個消息(消費後再次查詢,發現ready中沒有東西了)

public class Consumer {

    public static void main(String[] args) {
        // 一、建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 二、設置鏈接屬性
        factory.setHost("你的ip");
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 三、從鏈接工廠獲取鏈接
            connection = factory.newConnection("消費者");

            // 四、從連接中建立通道
            channel = connection.createChannel();

            channel.queueDeclare("queue1", false, false, false, null);

            // 六、定義收到消息後的回調
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
                }
            };
            // 七、監聽隊列
            channel.basicConsume("queue1", true, callback, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                }
            });

            System.out.println("開始接收消息");
            System.in.read();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 八、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 九、關閉鏈接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

至此,簡單使用結束。

使用客戶端:這裏首先建立兩個隊列,而後在交換機上模擬發送消息,以topic類型的交換機爲例,他會進行routing key的匹配,在發送消息的時候,把你的routing key 攜帶,便可匹配。

 

 

 

 一個topic類型的交換機的例子

/**
 * Topic--生產者
 *
 * 生產者將消息發送到topic類型的交換器上,和routing的用法相似,都是經過routingKey路由,但topic類型交換器的routingKey支持通配符
 */
public class Producer {

    public static void main(String[] args) {
        // 一、建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 二、設置鏈接屬性
        factory.setHost("你的ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 三、從鏈接工廠獲取鏈接
            connection = factory.newConnection("生產者");

            // 四、從連接中建立通道
            channel = connection.createChannel();

            // 路由關係以下:com.# --> queue-1     *.order.* ---> queue-2
            // 消息內容
            String message = "Hello A";
            // 發送消息到topic_test交換器上
            channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes());
            System.out.println("消息 " + message + " 已發送!");

            // 消息內容
            message = "Hello B";
            // 發送消息到topic_test交換器上
            channel.basicPublish("topic-exchange", "com.sms.create", null, message.getBytes());
            System.out.println("消息 " + message + " 已發送!");

            // 消息內容
            message = "Hello C";
            // 發送消息到topic_test交換器上
            channel.basicPublish("topic-exchange", "cn.order.create", null, message.getBytes());
            System.out.println("消息 " + message + " 已發送!");

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 七、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 八、關閉鏈接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
/**
 * 路由--消費者
 *
 * 消費者經過一個臨時隊列和交換器綁定,接收發送到交換器上的消息
 */
public class Consumer {

    private static Runnable receive = () -> {
        // 一、建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 二、設置鏈接屬性
        factory.setHost("你的ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;
        final String queueName = Thread.currentThread().getName();

        try {
            // 三、從鏈接工廠獲取鏈接
            connection = factory.newConnection("消費者");

            // 四、從連接中建立通道
            channel = connection.createChannel();
            // 定義消息接收回調對象
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
                }
            };
            // 監聽隊列
            channel.basicConsume(queueName, true, callback, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                }
            });

            System.out.println(queueName + " 開始接收消息");
            System.in.read();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 八、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 九、關閉鏈接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(receive, "queue1").start();
        new Thread(receive, "queue-2").start();
    }

}
View Code

 

Rabbit名詞介紹

Blocker:一個rabbit服務器就是一個Blocker

虛擬主機(virtual host一個Blocker中能夠有多個虛擬機,每一個虛擬機相似於一個工做空間,每一個虛擬主機中的消息和其餘虛擬主機的消息不相互影響

connection:消費者和rabbit中間的鏈接,有了這個鏈接,雙方纔能通訊。

RoutingKey:消息被髮給交換機的時候,會攜帶它,這個是用來指定消息的路由規則(能夠爲空)

channel(信道):是在connection上創建的管道,一個connection上能夠創建多個channel,消息經過他們進行傳遞。

BindingKey:Exchange和Queue綁定的關係,Exchange接收到的消息會帶有RoutingKey這個字段。

交換機(exchanger):當rabbit接收到消息後,交換機對這些消息進行轉換,他的類型決定哪一個隊列中應該擁有這些消息,

交換機類型:

  • 【direct】:當發送消息的時候,咱們會在消息體上攜帶一個路由鍵【routekey】,若是消息體上你的路由鍵和隊列匹配則發送給對應的隊列。
  • 【fanout 】:發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。
  • 【headers】:根據發送的消息內容中的【headers】屬性進行匹配,當消息發送到RabbitMQ時會取到該消息的【headers】與Exchange綁定時指定的鍵值對進行匹配,若是匹配到,則對應隊列能夠接受到消息。
  • 【topic】:將路由鍵和某模式進行匹配。此時隊列須要綁定要一個模式上。符號「#」會匹配一個或多個詞,好比【ok.#】--》【ok.1.1 or ok.1.1.2 so on】,只要隊列能夠匹配到,就能夠接受消息

隊列(queue):rabbit接收到的信息存儲在這裏,消費者也是從這裏獲取的消息。

binder: 隊列和交換機之間的綁定

AMQP(advanced message queuing protocol):

他是應用層協議的一個開放標準,爲面向消息的中間件協議。他分爲三層:

【底層協議層】:主要傳輸二進制數據流,

【中間層】:將客戶端的命令轉發給服務器,而後將服務器的回覆轉給客戶端。【將最高層的用戶層傳遞的信息轉化爲二進制,傳遞給底層。把底層的信息轉化爲客戶端能夠知道的語言。】

【最高層】:提供用戶調用的命令。

流轉流程

生產者:創建鏈接->開啓通道->發送消息->關閉資源

消費者:創建鏈接->開啓通道->接受消息->發送確認消息(告訴rabbit,rabbit修改消息狀態爲已經讀 and so on)->釋放資源

相關文章
相關標籤/搜索