AMQP:Advanced Message Queuing Protocol,高級消息協議。服務器
RabbitMQ就是AMQP協議的erlang實現,AMQP的模型架構和RabbitMQ的模型架構是同樣的,生產者將消息送給交換器,交換器和隊列綁定。當生產者發送消息時所攜帶的RoutingKey與綁定時的BindingKey相匹配時,消息即被存入相應的隊列之中,消費者能夠訂閱相應的隊列來獲取消息。架構
AMQP協議自己包括三層:ide
❤ Module Layer:位於協議的最高層,主要定義了一些供客戶端調用的命令,客戶端能夠利用這些命令實現本身的業務邏輯。例如:客戶端可使用Queue.Declare命令聲明一個隊列或者使用Basic.Consum訂閱消費一個隊列中的消息。fetch
❤ Session Layer:位於中間層,主要負責將客戶端的命令發送給服務器,再將服務器的應答返回給客戶端,主要爲客戶端與服務器之間的通訊提供可靠性同步機制和錯誤處理。ui
❤ Transport Layer:位於最底層,主要傳輸二進制數據流,提供幀的處理、信道複用、錯誤檢測和數據表示等。spa
AMQP說到底仍是一個通訊協議,通訊協議都會涉及報文交互,從low-level層面舉例來講,AMQP自己是應用層的協議,其填充於TCP協議層的數據部分,而從high-level層面來講,AMQP是經過協議命令交互的。AMQP協議能夠看做是一系列結構化命令的集合,這裏的命令表明一種操做,相似於Http中的方法(GET、POST、PUT、DELETE等)。code
爲了更好的說明AMQP協議命令的流轉過程,下面經過代碼的方式來解釋:blog
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("joe"); factory.setPassword("123456"); //建立鏈接 Connection connection = factory.newConnection(); //建立信道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY); String message = "Hello Rabbitmq"; channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); //關閉資源 channel.close(); connection.close();
當客戶端與Broker(RabbitMQ服務器)創建鏈接的時候,會調用factory.newConnection方法,這個方法會進一步封裝成Protocol Header的報文頭髮送給Broker,以此來通知Broker本次交互採用的是AMQP協議,緊接着Broker返回Connection.Start來創建鏈接,在鏈接的過程當中涉及Connection.Start/.Start-OK、Connection.Tune/.Tune-OK、Connection.Open/.Open-ok這6個命令的交互。隊列
當客戶端調用connection.createChannel方法準備開啓信道的時候,其包裝Channel.Open命令發送給Broker,等待Channel.Open-Ok命令。資源
當客戶端發送消息的時候,須要調用channel.basicPublish方法,對應的AMQP命令爲Basic.Publish,注意這個命令和前面涉及的命令略有不一樣,這個命令還包括了Content Header和Content Body。Content Header裏面包含的是消息體的屬性,例如:投遞模式、優先級等,而Content Body包含消息體的自己。
當客戶端發送完消息須要關閉資源時,涉及Channel.Close/.Close-Ok與Connection.Close/.Close-Ok的命令交互。詳細的流轉過程以下圖2-10所示:
仍是經過代碼的方式來了解流轉的過程:
Address[] addresses = new Address[]{new Address(IP_ADDRESS,PORT)}; //鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("joe"); factory.setPassword("123456"); //創建鏈接 Connection connection = factory.newConnection(addresses); final Channel channel = connection.createChannel();//建立信道 channel.basicQos(64); //消費消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("receive message : " + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME,consumer); //關閉資源 channel.close(); connection.close();
消費者客戶端一樣須要與Broker創建鏈接,與生產者客戶端同樣,協議交互一樣涉及Connection.Start/.Start-Ok、Connection.Tune/.Tune-Ok和Connection.Open/.Open-Ok等。
緊接着就是在Connection之上創建channel,和以前的生產者同樣協議涉及Channel.Open/Open-Ok。
若是在消費以前調用了Channel.basicQos(int prefetchCount)的方法來設置消費者客戶端最大能「保持」的未確認的消息數(即預取個數),那麼協議流轉就會涉及Basix.Qos/.Qos-Ok這兩個AMQP命令。
在真正的消費以前,消費者客戶端須要向Broker發送Basic.Consume命令(即調用channel.basicConsume方法)將Channel設置爲接收模式,以後Broker回執Basic.Consume-Ok以告訴消費者客戶端準備好消費消息。緊接着Broker向消費者客戶端推送消息(Push),即Basic.Deliver命令,有意思的是這個和Basic.Publish命令同樣會攜帶Content Header 和Content Body。
消費者接收到消息並正確消費後,向Broker發送確認,即Basic.Ack命令。
在消費者中止消費的時候,主動關閉鏈接,這點和生產者是同樣的,涉及到Channel.Close/Channel-Ok和Connection.Close/.Close-Ok。
下圖2-11是消費者詳細的流轉過程:
下圖是一些其餘的AMQP命令:
參考:《RabbitMQ實戰指南》 朱忠華 編著;