咱們常常但願維持隊列中的消息,按必定次序轉發給消息者。然而當有多個JMS Session和消息消費者實例的從同一個隊列中獲取消息的時候,就不能保證消息順序處理。由於消息被多個不一樣線程併發處理着。
在ActiveMQ4.x中能夠採用Exclusive Consumer或者Exclusive Queues,避免這種狀況,Broker會從消息隊列中,一次發送消息給一個消息消費者來保證順序。
A. 當在接收信息的時候有一個或者多個備份接收消息者和一個獨佔消息者的同時接收時候,不管二者建立前後,在接收的時候,均爲獨佔消息者接收。
B. 當在接收信息的時候,有多個獨佔消費者的時候,只有一個獨佔消費者能夠接收到消息。
C. 當有多個備份消息者和多個獨佔消費者的時候,當全部的獨佔消費者均close的時候,只有一個備份消費者接到到消息。
備註:備份消費者爲不帶任何參數的消費者。
12.1.1選擇一個獨佔的message consumer
對於應用來講,那些重要的order ,或者,你須要確保這裏僅僅只有一個message consumer對於queen,activemq提供了一個客戶端選項來確保只有一個active message consumer來處理message
activemq meaasge broker也會在queen上選擇一個consumer來處理消息,這樣的好處就是容許broker來選擇,即便consumer失敗或者中止了,而後另一個message consumer可以被選擇成爲
active的
若是你混合了標準consumer和exclusive consumer在同一個queen上 ,the activemq將會僅僅選擇exclusive的其中一個consumer,,若是全部的exclusive consumer都變爲inactive那麼就會選擇
標準的consumer,而後queen的消費將會變爲正常的傳輸模式,
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
12.1.2利用exclusive consumer來提供分佈式鎖的功能
一般你用message從外部資源來廣播數據,若是你想構建一個冗餘的, 即便你有一個實例閱讀和廣播changedate失敗了【改變數據庫記錄,在文件裏面的內容用逗號分隔】,另外一個實例都將要接管,一般你依靠鎖住資源【行鎖或者文件鎖】來確保
僅僅只有一個程序可以acess data而且廣播over topic ,可是當你不想利用數據庫,或者想要運行一個程序跨越一個機器(不能用分佈式鎖),而後你就只能用獨佔consumer來建立一個分佈式鎖
爲了可以使用獨佔consumer來建立分佈式鎖,咱們須要我沒得producer訂閱獨佔的queen, 若是message producer接收到queen,他就便激活了, 而且可以 訂閱實時的feed和把實時數據變爲jms message
this.connection = this.factory.createConnection();
this.connection.start();
this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = this.session.createQueue(this.queueName + "?consumer.exclusive=true");
Message message = this.session.createMessage();
MessageProducer producer = this.session.createProducer(destination);
producer.send(message);
MessageConsumer consumer = this.session.createConsumer(destination);
consumer.setMessageListener(this);
在這個代碼片中,咱們老是send a message到queen這一步老是被外部的管理程序執行的,注意到Session.CLIENT_ACKNOWLEDGE模式來消費這個消息,儘管咱們想要被通知咱們是獨佔的consumer, 所以咱們有鎖,咱們不想要remove,咱們
不想要remove這一條消息嗎, 若是咱們失敗了, 咱們的另外一個獨佔producer將會active
正在這個列子中咱們實現了MessageListener,若是咱們沒有active, 咱們將要call一個功能性方法start producing ,若是咱們是實時應用, 這個方法將要訂閱一個實時的而且轉換實時的data 進入jms message
public void onMessage(Message message) {
if (message != null && this.active==false) {
this.active=true;
startProducing();
}
}
12.2 message groups
所有的message 都將要轉向單一的message consumer,message也可以分組來給予單一的consumer, 一個message producer也能指定一個group,經過指定message header JMSXGroupId,
ActiveMQ將要確保所有相同的JmsxGroupID的message發送給相同的consumer
若是Activemq broker制定了consumer接受消息經過JmsxGroupID,那麼他就應該close掉,而後activemq broker將要選擇一個不一樣的message consumer來dispatch給不一樣的message
爲了建立一個group,你須要設置JmsGroupID string property在消息上
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("<foo>test</foo>");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
producer.send(message);
這個列子顯示了message producer已經被建立了, 而且設置好textmessage 屬於message group TEST_GROUP_A
message group利用正常的message consumer,所以沒有額外的工做須要group來消費message, 所有的工做都被message producer來定義一個group的消息屬於什麼, activemqbroker選擇一個
message consumer來處理所有的分組消息
activemqbroker對於group裏面的每一條消息都會添加一條sequeence no,[經過JMSXGroupSeq,從1開始]
可是從consumer視角來講,你不能假定你從一個新的group裏收到的第一條JMSXGroupSeq設置1, 若是一個存在的group close掉或者死掉以後, 任何消息route到這個group裏的都會分配給一個新的consumer
爲了幫助識別一個消息的consumer 從一個新的group裏收到消息,或者一個新的group歷來沒有被看見過, 一個boolean 參數叫作JMSXGroupFirstForConsumer被設置了對於第一個message, 你也可以覈對是否
他是爲第一條message設置的【對於新組】, 你也可以覈對消息是否被
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive();
String groupId = message.getStringProperty("JMSXGroupId");
if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
// do processing for new group
}
The Activemq message代理容許 分配各類各樣的消息groups跨越多個consumer,可是若是這裏早已經有message等着dispatch, the message group典型的分配給第一個consumer,爲了確保一個基數de
的分佈式負載均衡,他可能考慮message broker等着開啓更多的messgae consumer , 爲了這樣作, 你不得不設置destination policy在active broker 配置裏面,設置好consumersBeforeDispatchStarts參數
用
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="5000">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
這個配置告訴ActiveMq broker, 都應該等着兩個consumer在dispatch以前, 另外咱們也能夠看到timeBeforeDispatchStarts參數5000ms來通知activemq broker若是兩個message consumer在5s
內沒有砸queen上獲得消息,利用messgae group添加最小化的active broker 就每一個消息group存儲routing 信息而言。這是明晰的關掉message group經過發送message從activemq broker 的JMSXGroupID
設置爲-1
Connection connection = new ActiveMQConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("<foo>close</foo>");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
message.setIntProperty("JMSXGroupSeq", -1);
producer.send(message);
12.3activemqstream
Activemq stream是一個高級的特點,他容許使用activemq來做爲Jave Io stream,activeMQ將要break 一個outputstream對於不一樣的data chunk而且send每個chunk經過activemq做爲jms message
一個相應的activemq jms inputstream應該用在consumer邊從新結合data chunk
若是你用queen 做爲streamd的destination,使用不止一個consumer 在queen上(或者一個獨佔的consumer)是很好的, 因爲group的這個特點【用一樣的groupid指向一個單一的consumer】,使用超過一個的producer可能會形成message排序order
的問題
利用jms的好處就是activemq 把breank stream 分爲了管理的塊【chunk】, 而且容許你在consumer端給合併, 所以這是容許你傳輸大文件用這個功能
爲了證實這個用stream
//source of our large data
FileInputStream in = new FileInputStream("largetextfile.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
OutputStream out = connection.createOutputStream(destination);
//now write the file on to ActiveMQ
byte[] buffer = new byte[1024];
while(true){
int bytesRead = in.read(buffer);
if (bytesRead==-1){
break;
}
out.write(buffer,0,bytesRead);
}
out.close();
在下面的這個例子中咱們建立了一個ActiveMQConnection而且建立了一個inputstream利用一個queen, 注意到咱們利用一個獨佔的consumer經過apend"?consumer.exclusive=true";
咱們確保僅僅一個consumer 可以閱讀到一個queen,咱們read InputStream而且經過FileOutputStream來重組file在硬盤上
你也可以使用topic, 儘管這個
//destination of our large data
FileOutputStream out = new FileOutputStream("copied.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//we want be be an exclusive consumer
String exclusiveQueueName= QUEUE_NAME + "?consumer.exclusive=true";
Queue destination = session.createQueue(exclusiveQueueName);
InputStream in = connection.createInputStream(destination);
//now write the file from ActiveMQ
byte[] buffer = new byte[1024];
while(true){
int bytesRead = in.read(buffer);
if (bytesRead==-1){
break;
}
out.write(buffer,0,bytesRead);
}
out.close();
}
12.4 Blob消息
activemq引進了blob來處理large message
本身處理中轉
若是本身處理文件的話,一個簡單方式是使用共享或ftp、dfs等方式,先把文件發送到一個你們均可以拿到的地方,而後發送message,payload或properties中包含文件的路徑信息。這樣,consumer拿到文件路徑後去指定的地方,按照給定的方式去獲取文件數據便可。
優點:這種方式能夠用來處理大數據,而且不須要client或broker在內存中持有文件數據自己,很是的節省資源。並且文件是經過額外的方式處理,跟ActiveMQ自己無關,因此符合jms協議、處理的效率也相對比較高。
劣勢:須要本身處理不少文件相關的操做。
BlobMessage對文件中轉的封裝
幸運的是,ActiveMQ把上面繁複的文件處理工做進行了封裝,屏蔽掉文件中轉的整個處理過程,使得咱們可使用相似jms規範的API來簡單操做文件傳輸。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com"));
producer.send(message);
consumer for blob::
FileOutputStream out = new FileOutputStream("blob.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
BlobMessage blobMessage = (BlobMessage) consumer.receive();
InputStream in = blobMessage.getInputStream();
// now write the file from ActiveMQ
byte[] buffer = new byte[1024];
while (true) {
int bytesRead = in.read(buffer);
if (bytesRead == -1) {
break;
}
out.write(buffer, 0, bytesRead);
}
out.close();
}
12.5網絡存活 或者代理失敗後的失效轉移協議
failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)
activemq將會隨機的選擇list中的其中一個用失效轉移協議,若是僅僅只有一個uri那麼客戶端將會隔斷時間查看是否broker available,你能夠利用TransportListener來監聽activemq的鏈接
public class ClientTransportListener implements TransportListener {
protected final Logger logger = LoggerFactory.getLogger(ClientTransportListener.class);
public void onCommand(Object o) {
logger.debug("onCommand檢測到服務端命令:{}", o);
}
public void onException(IOException error) {
logger.error("onException,與服務器鏈接發生錯誤......");
}
public void transportInterupted() {
logger.error("transportInterupted,與服務器鏈接發生中斷......");
IConnector connector = new Connector();
connector.reConnect();
}
public void transportResumed() {
logger.info("transportResumed,恢復與服務器鏈接....");
}
}
當你想要按照順序來啓動
failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)?random=false
若是個了段時間仍是連不上,the failover protocol將會增長一段總量來鏈接activemq broker,這個叫作指數退避算法Exponential Backoff默認的useExponentialBackoff是enable
參數 默認值 含義
initialReconnectDelay 10ms, 重連以前等待的時間(ms)
backOffMultiplier 1.5 增大等待時間的係數
maxReconnectDelay 30000 重連以前等待的最大時間(ms)
failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)?backOffMultiplier=2,initialReconnectDelay=1000
在maxInactivityDuration時間裏沒有鏈接上話就是invalidate
failover:(tcp://host1:61616?wireformat.maxInactivityDuration=0,tcp://host2:61616,ssl://host3:61616?wireformat.maxInactivityDuration=0)
默認的話activemq傳輸是持久化的,若是你使用非持久化的方式傳輸的話,爲了防止丟失你就要使用trackMessages=true
maxCachesize
backup=true,backupPoolSize=2
updateClusterClients
rebalanceClusterClients
updateClusterClientOnRemove
updateClusterFilter
12.6在future傳輸message
Property name type description
AMQ_SCHEDULED_DELAY long The time in milliseconds that a message will wait before being scheduled to
be delivered by the broker
AMQ_SCHEDULED_PERIOD long The time in milliseconds to wait after the start time to wait before scheduling
the message again
AMQ_SCHEDULED_REPEAT int The number of times to repeat scheduling a message for delivery
AMQ_SCHEDULED_CRON String Use a Cron entry to set the schedule
例如,有一個消息,原定在60秒-交付你須要設置amq_scheduled_delay
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long time = 60 * 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
producer.send(message);
你能夠設置一個消息,等待一個初始延遲,並重復傳送10次,等待10秒之間的每個從新交付
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);
你也可使用cron調度信息,例如,若是你想要一個消息如期交付的每個小時,你就須要設置cron入口是0 * -例如
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);
cron調度優先使用消息延遲,然而,若是一個重複週期設置一個cron入門,ActiveMQ調度器將安排每次cron進入火災的消息傳遞。用一個例子來解釋更容易。
假設你想要一個消息,10次,一一秒的延遲之間的每個消息-你但願這個發生每小時-你會這樣作:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
producer.send(message)
算法