AMQP協議

定義

AMQP:Advanced Message Queuing Protocol,高級消息協議。服務器

RabbitMQ就是AMQP協議的erlang實現,AMQP的模型架構和RabbitMQ的模型架構是同樣的,生產者將消息送給交換器,交換器和隊列綁定。當生產者發送消息時所攜帶的RoutingKey與綁定時的BindingKey相匹配時,消息即被存入相應的隊列之中,消費者能夠訂閱相應的隊列來獲取消息。架構

AMQP協議自己包括三層:ide

  ❤ Module Layer:位於協議的最高層,主要定義了一些供客戶端調用的命令,客戶端能夠利用這些命令實現本身的業務邏輯。例如:客戶端可使用Queue.Declare命令聲明一個隊列或者使用Basic.Consum訂閱消費一個隊列中的消息。fetch

  ❤ Session Layer:位於中間層,主要負責將客戶端的命令發送給服務器,再將服務器的應答返回給客戶端,主要爲客戶端與服務器之間的通訊提供可靠性同步機制和錯誤處理。ui

  ❤ Transport Layer:位於最底層,主要傳輸二進制數據流,提供幀的處理、信道複用、錯誤檢測和數據表示等。spa

AMQP說到底仍是一個通訊協議,通訊協議都會涉及報文交互,從low-level層面舉例來講,AMQP自己是應用層的協議,其填充於TCP協議層的數據部分,而從high-level層面來講,AMQP是經過協議命令交互的。AMQP協議能夠看做是一系列結構化命令的集合,這裏的命令表明一種操做,相似於Http中的方法(GET、POST、PUT、DELETE等)。code

AMQP生產者流轉過程

爲了更好的說明AMQP協議命令的流轉過程,下面經過代碼的方式來解釋:blog

      //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("joe");
        factory.setPassword("123456");
        //建立鏈接
        Connection connection = factory.newConnection();
        //建立信道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        String message = "Hello Rabbitmq";
        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

        //關閉資源
        channel.close();
        connection.close();

  當客戶端與Broker(RabbitMQ服務器)創建鏈接的時候,會調用factory.newConnection方法,這個方法會進一步封裝成Protocol Header的報文頭髮送給Broker,以此來通知Broker本次交互採用的是AMQP協議,緊接着Broker返回Connection.Start來創建鏈接,在鏈接的過程當中涉及Connection.Start/.Start-OK、Connection.Tune/.Tune-OK、Connection.Open/.Open-ok這6個命令的交互。隊列

  當客戶端調用connection.createChannel方法準備開啓信道的時候,其包裝Channel.Open命令發送給Broker,等待Channel.Open-Ok命令。資源

  當客戶端發送消息的時候,須要調用channel.basicPublish方法,對應的AMQP命令爲Basic.Publish,注意這個命令和前面涉及的命令略有不一樣,這個命令還包括了Content Header和Content Body。Content Header裏面包含的是消息體的屬性,例如:投遞模式、優先級等,而Content Body包含消息體的自己。

  當客戶端發送完消息須要關閉資源時,涉及Channel.Close/.Close-Ok與Connection.Close/.Close-Ok的命令交互。詳細的流轉過程以下圖2-10所示:

AMQP消費者流轉過程

仍是經過代碼的方式來了解流轉的過程:

Address[] addresses = new Address[]{new Address(IP_ADDRESS,PORT)};
        //鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("joe");
        factory.setPassword("123456");
        //創建鏈接
        Connection connection = factory.newConnection(addresses);
        final Channel channel = connection.createChannel();//建立信道
        channel.basicQos(64);
        //消費消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("receive message : " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,consumer);
        //關閉資源
        channel.close();
        connection.close();

  消費者客戶端一樣須要與Broker創建鏈接,與生產者客戶端同樣,協議交互一樣涉及Connection.Start/.Start-Ok、Connection.Tune/.Tune-Ok和Connection.Open/.Open-Ok等。

  緊接着就是在Connection之上創建channel,和以前的生產者同樣協議涉及Channel.Open/Open-Ok。

  若是在消費以前調用了Channel.basicQos(int prefetchCount)的方法來設置消費者客戶端最大能「保持」的未確認的消息數(即預取個數),那麼協議流轉就會涉及Basix.Qos/.Qos-Ok這兩個AMQP命令。

  在真正的消費以前,消費者客戶端須要向Broker發送Basic.Consume命令(即調用channel.basicConsume方法)將Channel設置爲接收模式,以後Broker回執Basic.Consume-Ok以告訴消費者客戶端準備好消費消息。緊接着Broker向消費者客戶端推送消息(Push),即Basic.Deliver命令,有意思的是這個和Basic.Publish命令同樣會攜帶Content Header 和Content Body。

  消費者接收到消息並正確消費後,向Broker發送確認,即Basic.Ack命令。

  在消費者中止消費的時候,主動關閉鏈接,這點和生產者是同樣的,涉及到Channel.Close/Channel-Ok和Connection.Close/.Close-Ok。

下圖2-11是消費者詳細的流轉過程:

 AMQP命令概覽

下圖是一些其餘的AMQP命令:

 參考:《RabbitMQ實戰指南》 朱忠華 編著;

相關文章
相關標籤/搜索