剛剛用了,記錄下來,之後忘了,方便可以快速想起來。java
首先說明,因爲RabbitMQ服務端非JAVA,C++語言,固然也就看不懂,因此本文的理解都是過於主觀的。ubuntu
推薦最好的安裝方式:去官網,去官網,去官網,重要的事情說三遍。tomcat
我通常的操做流程是:用google右上角翻譯網頁,而後看個大概意思,而後再顯示原網頁,一個單詞單詞的看。ide
仍是總結一下Ubuntu,RabbitMQ安裝步驟(依次執行下面四條命令就ok了):函數
1,echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list 2,wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - 3,sudo apt-get update 4,sudo apt-get install rabbitmq-server
服務端配置,基本上不須要配置就能知足大多數需求。官網如是說,且相信他一次 ui
2.1 界面管理插件的安裝google
先說一下ubuntu目錄下面的日誌目錄,與腳本目錄,通常這是咱們最關心的目錄:spa
1,日誌目錄: /var/log/robbitmq-server 能夠經過/etc/logrotate.d/rabbitmq-server進行配置
2,腳本目錄:/usr/lib/rabbitmq/bin/插件
而後進入到腳本目錄也就是(cd /usr/lib/rabbitmq/bin/),執行以下命令翻譯
rabbitmq-plugins enable rabbitmq_management
這樣管理插件算是裝好了。
2.2 用戶管理
經過http://localhost:15672登陸會發現(默認用戶guest,密碼也是guest),認證失敗(登陸失敗去日誌文件查找緣由)。
而後天然須要受權了,給一個受權的命令demo,詳情自行腦補:
#username就是用戶名,能夠隨便取,pwd就是你要設置的密碼 rabbitmqctl add_user username pwd #administrator爲用戶的角色,與tomcat那種管理員配置有點像 rabbitmqctl set_user_tags username administrator #授予權限 /表明vhost主機根目錄,後面的*j就是讀寫之類的權限 rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
受權以後,講道理就能登陸了,能夠看到下面這樣的界面:
先體驗一下整個消息投遞過程:
3.1 RabbitMQ的核心:
核心官網有介紹,說的connecnton,channel之類的,到底怎麼樣,who care?
整體來看,咱們關注業務實現是:1)消息怎麼投遞的。2)消費者怎麼消費消息。3)消息是不是可靠投遞。4)消息投遞方式。5)消息的生命週期。6)消息隊列生命週期
3.2 消息是怎麼投遞的?(記住一點,生產者消息投遞都是面向交換機的)
RabbitMQ 是面向交換機投遞消息的。交換機可能綁定有許多隊列,交換機如何將消息投遞給這些隊列呢?
首先說一下面向交換機的設計的優點:1)這明顯藉助了數據鏈路層那個交換機的設計思想。除了層級分明之外,還能從分提升鏈路利用率(可能有點抽像)。
2)從代碼層面來看:若是沒有交換機,你至少得維護一個十分龐大的路由表,而後從路由表正確投遞消息,有了交互機,這裏路
由表就會被拆分到多個交換機裏面,效果沒必要多說。
3)而後就是高度的解耦,不一樣的交換機可有不一樣的路由規則,要是沒有交換機。。。。。。
在RabbitMQ,交換機有4種投遞方式,就是枚舉類BuiltinExchangeType的4個枚舉變量:
DIRECT:會將全部消息先取消息的ROUTE_KEY,而後投遞到與ROUTE_KEY綁定的隊列裏面(if(msg.routekey.equals(queue.routekey)))。
FANOUT:此種模式下,根本不檢查消息的ROUTE_KEY,直接投送到交換機所擁有的全部隊列裏面。
TOPIC,HEADERS自行看一下官網怎麼說的,不想碼字了^_^||
總結起來就一個函數就把消息發出去了:channel.basicPublish(excange_name,route_key,false,bs,"test".getBytes());能夠去官網查一下這個API
3.3 消費者怎麼消費消息(記住一點,消費者消費消息是面向消息隊列的,這與生成者有點不同)
還不是就是TCP長鏈接心跳的那些事,就是這麼一個API:channel.basicConsume(QUEUE_AUTODELETE, true, consumer);consumer是Consumer類的一個實例,
你直接去處理回調接口就ok了
3.4 消息傳遞是否可靠
很明顯是可靠的,除非你將消息隊列,聲明成非持久模式,這事你又重啓了機器。這會丟失消息的。還有就是他有應答機制,你能夠經過設置消費者消費消息的模式,
去手動應答。channel.basicConsume(?,autoACk,?)的autoAck參數設置
3.5 消息的生命週期
一旦受到消費者應答,標識消息已被消費,則消息被回收掉。
3.6 隊列生命週期
channel.queueDeclare(QUEUE_NAME,false,false,true,null);
第二個參數設置爲true,會將消息持久化到磁盤,第四個參數設置爲true表示沒有消息而且沒有鏈接則刪除改隊列,詳情能夠查一下API
4.1 生產者代碼:
自行導入相關依賴包或相關依賴
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("username"); factory.setPort(5672);//注意這裏的端口與管理插件的端口不同 factory.setPassword("pwd"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明一個dirent模式的交換機 channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true); //聲明一個非持久化自動刪除的隊列 channel.queueDeclare("queue_name",false,false,true,null);//若是該隊列不在被使用就刪除他 zhe //將綁定到改交換機 channel.queueBind("queue_name","exchange_name","route_key"); //聲明一個消息頭部 Map<String,Object> header=new HashMap<>(); AMQP.BasicProperties.Builder b= new AMQP.BasicProperties.Builder(); header.put("charset","utf-8"); b.headers(header); AMQP.BasicProperties bp=b.build(); //將消息發出去 channel.basicPublish("exchange_name","route_key",false,bp,"test3".getBytes());
4.2 消費者代碼
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("username"); factory.setPort(5672);//注意這裏的端口與管理插件的端口不同 factory.setPassword("pwd"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明一個dirent模式的交換機 channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true); //聲明一個非持久化自動刪除的隊列 channel.queueDeclare("queue_name",false,false,true,null);//若是該隊列不在被使用就刪除他 zhe //將綁定到改交換機 channel.queueBind("queue_name","exchange_name","route_key"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume("queue_name", true, consumer);