Consumer的高級特性linux
獨有消費者:web
Consumer消息的異步分發:spring
Consumer的優先級:windows
管理持久化消息:
tomcat
持久化發送端:服務器
持久化接收端:session
運行程序,從界面上查看訂閱者,20秒以後再刷新界面就能夠看到這個不活躍的訂閱者就會消失:多線程
消息分組:併發
前面講了使用queue消息的時候,若是消息隊列中有1,2,3,共3個消息,此時若是是併發的線程來拿消息進行消費,會出現消息順序不一致的狀況,爲了防止這種狀況使用獨有消費者,可是會有性能上的問題,因此考慮使用消息分組。框架
發送端:
接收端:
運行接收端和發送端,從運行結果中能夠看的出來,雖然有兩個接收端,可是由於設置了分組,因此發送的全部消息只會給一個消費者。如今修改一下發送端,咱們讓發送端發給不一樣的組,那這些消息就會根據組的不一樣被不一樣的消費者消費,看運行的結果:
因此使用消息分組,一個組的消息就會給一個消費者,這一個組的消息不會被分發給不一樣的消費者。不一樣的組的消息會發給不一樣的消費者。
實際上這個組用完就關閉了,因此這裏的關閉其實是沒有什麼用的。
消息選擇器(消息過濾):
使用上一節講的消息分組的發送端和接收端。修改發送端:
修改接收端:
由於使用了過濾條件,因此運行程序以後能夠看的出來是沒有信息的。雖然消費者不接受不知足本身條件的消息,可是消息仍是在Activemq隊列中的,若是如今修改一下接收端,直接只運行接收端,就能夠接收到(原來的消息還在隊列中):
從新傳遞消息策略:
如今來測試一下默認的重傳次數,用之前寫過的一個發送端:
先把接收端的session.commit註釋掉。而後運行程序,經過測試能夠發現,咱們運行一次發送端,運行n次接收端,由於把接收端的session.commit註釋了,因此開啓了不少次的接收端仍是能夠接收到消息,這是由於接收端的session.commit註釋了,因此即便消費端消費了消息,在消息隊列中仍是不會清除掉已經消費的消息,仍是會存在,因此能夠一直接收到消息:
可是消費端一直消費到第7次之後的時候就不能再接受了,由於默認的重傳次數是6,超過7就不能接受了:
從web界面中能夠看到那個DLQ隊列:
如今來測試使用消息重傳策略。修改消費端,設置的重傳次數是3:
如今從新來測試,啓動消費端,啓動生產端,消費者一共接受了4次消息,再次啓動消費者的時候就接受不到了,由於已經達到重傳次數了。觀察web界面中的死消息的隊列的時候就能夠看到多的消息了:
能夠自動刪除過時消息:
固然也能夠爲每個topic或者queue的目的地設置重傳策略:
慢消費者處理:
在配置文件中能夠看到默認是1000:
監控和管理Broker
第一種方式:這種方式就是咱們常說的web界面:
第二種方式:這是ActiveMQ5.9之後可使用的:
這裏有不少能夠查詢或者使用的功能:
集成ActiveMQ和Tomcat
就是說把ActiveMQ嵌入到Tomcat中去跑,而不是做爲一個單獨的程序應用運行。其實通常的狀況下是不但願二者結合使用的,由於單獨用ActiveMQ能夠很方便的作修改和擴展,可是若是有要使用ActiveMQ和tomcat結合的需求的時候能夠用。結合以後就但願ActiveMQ是儘可能不要常常變化的了。如今來操做一下,修改裏面的web.xml內容:
在tomcat中加入spring和xbean的包。下面的除了default包之外的spring和xbean的包都是須要加入到Linux上的tomcat中的,因此這些包在windows上指定一個位置處放好:
經過下面的命令將windows上的spring和xbean的包傳入到linux的tomcat上(選中的是裏面的jar包):
ActiveMQ的使用場景
4.做爲RPC的替代:就是遠程過程的調用。
若是A,B在同一個虛擬機中那麼他們能夠隨便的通訊,可是若是不在同一個機器上,就要經過RPC框架。ActiveMQ就至關於RPC。好比A要調用B的方法,那麼A就給ActiveMQ發一個消息,B就會執行相應的方法,而後A給ActiveMQ發消息說要獲得返回結果,而後B就把結果告訴A。
ActiveMQ的優化
ActiveMQ結合業務
通常就是適合於高併發量的,插入修改等複雜業務邏輯的時候使用,若是是高併發的訪問適合用memcache這些。上面的例子:若是生成訂單,若是同時有很大的併發量要求生成訂單,一個服務器壓力是很大的,那麼每一個客戶那邊反應就會很慢,因此使用ActiveMQ,將客戶的生成訂單的請求放在隊列中,能夠很快的放進去,而後若是併發量很大的時候能夠有多個消費者從隊列中拿請求作處理。因此這個時候對於客戶來講反應是很快的,真正處理業務邏輯的就是一個個的消費者了。
那ActiveMQ如何咱們的應用進行結合呢?
通常咱們使用這種異步的組件的時候仍是用第二種方式,方便之後擴展或其餘的使用。第二種方式就是單獨的啓動ActiveMQ。就是將service中的一堆的處理邏輯做爲Consumer。而後還要考慮用queueu仍是用topic,這裏咱們用的是queue。固然咱們一個service中有不少的業務處理邏輯,因此能夠搞多個消費者的線程一塊兒處理。
如今有一個業務,接收消費者的Id,下訂單:
如今使用ActiveMQ的消息:
service的方法就能夠直接使用ActiveMQ消息了:
如今來寫消費端。這裏面就是當時具體的service中的業務邏輯,由於下面具體的業務邏輯中須要的uuid是從發送端發送過來的,因此接收端要首先接受一下:
消費者的一端能夠用多線程的方式,若是有須要共享的資源就要加上鎖,注意加鎖是爲哪一個加鎖,不能隨便使用this。
因此爲了想在web應用啓動的時候接收端就建立好,有兩種方式,第一種是把接收端作成Lister(就是和配置Spring的監聽器相似,在web.xml中把service配置好),另外一種方式是讓web容器啓動的時候就把spring相關的service都建立好,這樣就把接收端也建立好了。
如今咱們來試試第二種方式,讓咱們的接收端實現監聽器,而且加上註解,讓web容器建立的時候spring就把這個接收端建立,好:
修改接收端,去掉session的關閉:
不關閉鏈接這樣服務器就是一直開着的,只要有消息就會進行接受。咱們這裏就是把接收端作成一個服務了,是爲了方便起見,固然也能夠根據實際的業務須要作修改。
第一種方式:將接收端做爲監聽器
實現監聽器接口,就要實現初始化和銷燬的方法,初始化作的就是讓接收端打開鏈接:
而後在web應用的web.xml中把監聽器配置上,名字要對應,注意:咱們的接收端的Listener要放在spring的監聽器的後面,由於只有spring建立成功了,才能建立它管理的接收端和service:
如今分析一下。咱們一共有3個部分,業務的service,發送端和接收端,咱們把接收端作成了監聽器,不用咱們手動啓動了。
由於咱們的接收端作成監聽器了,每次啓動web容器的時候都會啓動監聽器,因此接收端就是一直運行的,若是咱們把接收端的監聽器先去掉,先不讓她啓動,看一下業務的速度:
這樣咱們就只管發消息,這樣速度明顯就快了。根據具體的業務具體處理。