目前經常使用的消息隊列組建無非就是MSMQ和ActiveMQ,至於他們的異同,這裏不想作過多的比較。簡單來講,MSMQ內置於微軟操做系統之中,在部署上包含一個隱性條件:Server須要是微軟操做系統。(對於這點我並去調研過MSMQ是否能夠部署在非微軟系統,好比:Linux,只是拍腦殼想了想,感受上是不能夠)。對於ActiveMQ,微軟系統和Linux都是能夠部署的。從功能方面來講,通常最經常使用的就是:消息的收/發,感受差別不大。從性能上來講,通常的說法是ActiveMQ略高。在穩定性上,我的感受MSMQ更好。若是這兩種經常使用隊列都用過的同窗,應該來講最大的差別在於:MSMQ若是要訪問遠程隊列(好比機器A上的程序訪問機器B上的隊列),會比較噁心。在數據量比較大的狀況之下,通常來講隊列服務器會專門的一臺或者多臺(多臺的話,用程序去作熱備+負載比較方便,也不須要額外的硬件成本。簡單來講作法能夠這樣:消息發送的時候隨機向着多臺隊列服務器上發送消息;接受的時候開多個線程去分別監聽;熱備方面,能夠維護一個帶狀態的隊列鏈接池,若是消息收發失敗那麼將狀態置爲不可用,而後起一個線程去定時監測壞的鏈接是否可用,這個過程通常狀況下能夠不用加鎖,爲何,你們根據各自須要去取捨吧)。最近搞完了短彩信的網關鏈接服務,這兩種隊列我均使用了。大體的過程是這樣的:上層應用若是要發端彩信,那麼將消息直接發送至ActiveMQ(目前用的就是上面說的多臺熱備+負載,由於實際中下行量很是大5千萬條/天以上),而後端彩信網關鏈接服務部署多套,每套均依賴本機的MSMQ。爲何呢?用ActiveMQ的緣由是:上層應用程序和網關鏈接服務彼此獨立,消息須要跨機訪問。用MSMQ的緣由是:ActiveMQ中的數據是一條不分省的大隊列,網關鏈接服務須要按省流控,因此端彩信網關鏈接服務:首先把消息從ActiveMQ取出來,而後存至本機上的分省MSMQ,這樣作另外的一個好處就是:ActiveMQ不至於過多擠壓,他的數據會分攤到N臺短彩信網關鏈接服務所部署的機器上的MSMQ之中,也就說MSMQ能夠起到分攤數據和緩衝的做用。html
在以前的隨筆中,已經介紹過MSMQ,如今先介紹一下ActiveMQ一些配置,目前好像ActiveMQ配置上的介紹還比較少。如下是本身總結一些相關資料,貼出來給你們共享java
官方:node
From 5.3 onwards - we recommend you use KahaDB - which offers improved scalability and recoverability over the AMQ Message Store.apache
The AMQ Message Store which although faster than KahaDB - does not scales as well as KahaDB and recovery times take longer.後端
非官方:緩存
kaha文件系統實際上上是一個文件索引系統,有兩部分組成,一個是數據文件系統,由一個個獨立的文件組成,缺省文件大小是32M大(可配置),另一個是索引文件系統,記錄消息在數據文件中的位置信息以及數據文件中的空閒塊信息。數據文件是存儲到硬盤上的,索引文件是緩存在內存中的。因此這個存儲系統對大消息存儲有利,象咱們的memberId之類的文本消息,其實是浪費,索引比消息還大,哈。安全
我方分析:服務器
推薦: Amq持久方式session
理由:雖然官方推薦使用KahaDB持久方式,但其提到的優點:可伸縮性和恢復性較好,對於咱們實際的應用意義不大。從咱們本身的使用經驗來看,KahaDB持久方式,Data文件是一個大文件(感受文件過大後,形成隊列服務癱死的可能性會增大),從官網的相關配置(附錄1)也找不到哪裏能夠設置數據的文件的最大Size。)而Amq持久方式能夠設置Data文件最大Size,這樣能夠保證即時消息積壓不少,Data文件也不至於過大。併發
解決方法:
在創建鏈接的Uri中加入: wireFormat.maxInactivityDuration=0
參考資源:
http://jinguo.iteye.com/blog/243153
You can do the following to fix the issues:
1) Append max inactivity duration to your Uri in the format below: wireFormat.maxInactivityDuration=0
2) Use the same Uri at the client side as well as at the server side
Regards,
若是不這樣設置,對應的錯誤會出現:
2008-05-07 09:22:56,343 [org.apache.activemq.ActiveMQConnection]-[WARN] Async exception with no exception listener: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
ActiveMQ的tcp url:tcp://localhost:61616後面要加入?wireFormat.maxInactivityDuration=0 這樣的參數,不然當一段時間沒有消息發送時會拋出 "Channel was inactive for too long"異常
解決方法:
1)關閉ActiveMqLog4j
打開:conf/log4j.properties
將:log4j.rootLogger=INFO, console, logfile
修改成:log4j.rootLogger=OFF
2)在創建鏈接的Uri中加入: maxInactivityDurationInitalDelay=30000
例如北京的測試環境鏈接Uri:
tcp://192.168.199.80:61616?wireFormat.maxInactivityDuration=0&maxInactivityDurationInitalDelay=30000&connection.AsyncSend=true
參考資源:
If you get exception like this,it can mean one of three things:
1. You're connecting to the port not used by ActiveMQ TCP transport
Make sure to check that you're connecting to the appropriate host:port
2. You're using log4j JMS appender and doesn't filter out ActiveMQ log messages
Be sure to read How do I use log4j JMS appender with ActiveMQ and more importantly to never send ActiveMQ log messages to JMS appender
3. Your broker is probably under heavy load (or network connection is unreliable), so connection setup cannot be completed in a reasonable time
If you experience sporadic exceptions like this, the best solution is to use failover transport, so that your clients can try connecting again if the first attempt fails. If you're getting these kind of exceptions more frequently you can also try extending wire format negotiation period (default 10 sec). You can do that by using wireFormat.maxInactivityDurationInitalDelay property on the connection URL in your client.
For example
tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=30000
will use 30 sec timeout.(貌似有問題!!!)
解決方法:
1) 設置Java最大內存限制爲合適大小:
Bin/activemq.bat 中ACTIVEMQ_OPTS=-Xmx512M(默認是512)
2)Activemq.xml配置節:systemUsage/ systemUsage配置大小合適,而且特別注意:大於全部durable desitination設置的memoryUsage之和。
備註:
1)尖括號:「>」表明通配符
2)ACTIVEMQ_OPTS的配置〉=memoryUsage中配置〉=全部durable desitination設置之和
3)SystemUsage配置設置了一些系統內存和硬盤容量,當系統消耗超過這些容量設置時,amq會「slow down producer」,仍是很重要的。
參考資料:
http://m.oschina.net/blog/26216
參考-- http://activemq.apache.org/javalangoutofmemory.html
對於MQ的內容實用是可管理和可配置的。首先須要判斷的是MQ的哪部分系統因內存不足而致使泄漏,是JVM,broker仍是消費者、生產者?
1、內存管理
JVM內存管理:
1. 用bin/activemq命令在獨立JVM中運行broker。用-Xmx和-Xss命令便可(activemq.bat文件中修改ACTIVEMQ_OPTS選項參數便可);
2. 默認狀況下,MQ用512M的JVM;
broker內存管理:
1. broker使用的內存並非由JVM的內存決定的。雖然受到JVM的限制,但broker確實獨立管理器內存;
2. systemUsage和destination的內存限制與broker內存息息相關;
3. MQ中內存的關係是:JVM->Broker->broker features;
4. 全部destination的內存總量不能超過broker的總內存;
消費者:
1. 因爲消息大小能夠配置,prefetch limit每每是致使內存溢出的主要緣由;
2. 減小prefetch limit的大小,會減小消費者內存中存儲的消息數量;
生產者:
1. 除非消息數量超過了broker資源的限制,不然生產者不會致使內存溢出;
2. 當內存溢出後,生產者會收到broker的阻塞信息提示;
2、其餘
將消息緩衝之硬盤:
1. 只有當消息在內存中存儲時,才容許消息的快速匹配與分發,而當消費者很慢或者離開時,內存可能會耗盡;
2. 當destination到達它的內存臨界值時,broker會用消息遊標來緩存非持久化的消息到硬盤。
3. 臨界值在broker中經過memoryUsage和systemUsage兩個屬性配置,請參考activemq.xml;
4. 對於緩慢的消費者,當還沒有耗盡內存或者轉變爲生產者併發控制模式前,這個特性容許生產者繼續發送消息到broker;
5. 當有多個destination的時候,默認的內存臨界值可能被打破,而這種狀況將消息緩存到硬盤就顯得頗有意義;
6. precentUsage配置:使用百分比來控制內存使用狀況;
多個線程:
1. 默認狀況下,MQ每一個destination都對應惟一的線程;
2. -Dorg.apache.activema.UseDedicatedTaskRunner=false(activemq.bat文件中修改ACTIVEMQ_OPTS選項參數便可),用線程池來限制線程的數量,從而減小內存消耗;
大數據傳輸:
1. destination policies--maxPageSize:控制進入內存中的消息數量;lazyDispatch:增長控制使用當前消費者列表的預取值;
2. 使用blogMessage或者streamsMessage類型來進行大量文件的傳輸;
泄漏JMS資源:
1. 當session或者producer或者consumer大量存在而沒有關閉的時候;
2. 使用PooledConnectionFactory;
解決方法:
不採用failover鏈接
分析:
採用failover方式鏈接,若是所要鏈接的服務器或者Activemq服務宕了,那麼程序會一直處於等待狀態,不超時,不報錯。
property name |
default value |
Comments |
directory |
activemq-data |
the path to the directory to use to store the message store data and log files |
indexWriteBatchSize |
1000 |
number of indexes written in a batch |
indexCacheSize |
10000 |
number of index pages cached in memory |
enableIndexWriteAsync |
false |
if set, will asynchronously write indexes |
journalMaxFileLength |
32mb |
a hint to set the maximum size of the message data logs |
enableJournalDiskSyncs |
true |
ensure every non transactional journal write is followed by a disk sync (JMS durability requirement) |
cleanupInterval |
30000 |
time (ms) before checking for a discarding/moving message data logs that are no longer used |
checkpointInterval |
5000 |
time (ms) before checkpointing the journal |
ignoreMissingJournalfiles |
false |
If enabled, will ignore a missing message log file |
checkForCorruptJournalFiles |
false |
If enabled, will check for corrupted Journal files on startup and try and recover them |
checksumJournalFiles |
false |
create a checksum for a journal file - to enable checking for corrupted journals |
Available since version 5.4: |
||
archiveDataLogs |
false |
If enabled, will move a message data log to the archive directory instead of deleting it. |
directoryArchive |
null |
Define the directory to move data logs to when they all the messages they contain have been consumed. |
databaseLockedWaitDelay |
10000 |
time (ms) before trying to get acquire a the database lock (used by shared master/slave) |
maxAsyncJobs |
10000 |
the maximum number of asynchronous messages that will be queued awaiting storage (should be the same as the number of concurrent MessageProducers) |
concurrentStoreAndDispatchTopics |
false |
enable the dispatching of Topic messages to interested clients to happen concurrently with message storage |
concurrentStoreAndDispatchQueues |
true |
enable the dispatching of Queue messages to interested clients to happen concurrently with message storage |
property name |
default value |
Comments |
directory |
activemq-data |
the path to the directory to use to store the message store data and log files |
useNIO |
true |
use NIO to write messages to the data logs |
syncOnWrite |
false |
sync every write to disk |
maxFileLength |
32mb |
a hint to set the maximum size of the message data logs |
persistentIndex |
true |
use a persistent index for the message logs. If this is false, an in-memory structure is maintained |
maxCheckpointMessageAddSize |
4kb |
the maximum number of messages to keep in a transaction before automatically committing |
cleanupInterval |
30000 |
time (ms) before checking for a discarding/moving message data logs that are no longer used |
indexBinSize |
1024 |
default number of bins used by the index. The bigger the bin size - the better the relative performance of the index |
indexKeySize |
96 |
the size of the index key - the key is the message id |
indexPageSize |
16kb |
the size of the index page - the bigger the page - the better the write performance of the index |
directoryArchive |
archive |
the path to the directory to use to store discarded data logs |
archiveDataLogs |
false |
if true data logs are moved to the archive directory instead of being deleted |
property name |
default value |
Comments |
memoryUsage |
20M |
amq使用內存大小,照amq論壇上說,這個值應該大於全部durable desitination設置的 memoryUsage之和,不然會致使硬盤swap,影響性能。 |
storeUsage |
1G |
kaha數據存儲大小,若是設置不足,性能會降低到1個1個發 |
tempUsage |
100M |
非persistent的消息存儲在temp區域 |
http://activemq.apache.org/nms/configuring.html
Option Name |
Default |
Description |
transport.timeout |
-1 |
Time that a send operation blocks before failing. |
transport.initialReconnectDelay |
10 |
Time in Milliseconds that the transport waits before attempting to reconnect the first time. |
transport.maxReconnectDelay |
30000 |
The max time in Milliseconds that the transport will wait before attempting to reconnect. |
transport.backOffMultiplier |
2 |
The amount by which the reconnect delay will be multiplied by if useExponentialBackOff is enabled. |
transport.useExponentialBackOff |
true |
Should the delay between connection attempt grow on each try up to the max reconnect delay. |
transport.randomize |
true |
Should the Uri to connect to be chosen at random from the list of available Uris. |
transport.maxReconnectAttempts |
0 |
Maximum number of time the transport will attempt to reconnect before failing (0 means infinite retries) |
transport.startupMaxReconnectAttempts |
0 |
Maximum number of time the transport will attempt to reconnect before failing when there has never been a connection made. (0 means infinite retries) (included in NMS.ActiveMQ v1.5.0+) |
transport.reconnectDelay |
10 |
The delay in milliseconds that the transport waits before attempting a reconnection. |
transport.backup |
false |
Should the Failover transport maintain hot backups. |
transport.backupPoolSize |
1 |
If enabled, how many hot backup connections are made. |
transport.trackMessages |
false |
keep a cache of in-flight messages that will flushed to a broker on reconnect |
transport.maxCacheSize |
256 |
Number of messages that are cached if trackMessages is enabled. |
transport.updateURIsSupported |
true |
Update the list of known brokers based on BrokerInfo messages sent to the client. |
Option Name |
Default |
Description |
connection.AsyncSend |
false |
Are message sent Asynchronously. |
connection.AsyncClose |
true |
Should the close command be sent Asynchronously |
connection.AlwaysSyncSend |
false |
Causes all messages a Producer sends to be sent Asynchronously. |
connection.CopyMessageOnSend |
true |
Copies the Message objects a Producer sends so that the client can reuse Message objects without affecting an in-flight message. |
connection.ProducerWindowSize |
0 |
The ProducerWindowSize is the maximum number of bytes in memory that a producer will transmit to a broker before waiting for acknowledgement messages from the broker that it has accepted the previously sent messages. In other words, this how you configure the producer flow control window that is used for async sends where the client is responsible for managing memory usage. The default value of 0 means no flow control at the client. See also Producer Flow Control |
connection.useCompression |
false |
Should message bodies be compressed before being sent. |
connection.sendAcksAsync |
false |
Should message acks be sent asynchronously |
connection.messagePrioritySupported |
true |
Should messages be delivered to the client based on the value of the Message Priority header. |
connection.dispatchAsync |
false |
Should the broker dispatch messages asynchronously to the connection's consumers. |
Option Name |
Default |
Description |
wireFormat.stackTraceEnabled |
false |
Should the stack trace of exception that occur on the broker be sent to the client? Only used by openwire protocol. |
wireFormat.cacheEnabled |
false |
Should commonly repeated values be cached so that less marshalling occurs? Only used by openwire protocol. |
wireFormat.tcpNoDelayEnabled |
false |
Does not affect the wire format, but provides a hint to the peer that TCP nodelay should be enabled on the communications Socket. Only used by openwire protocol. |
wireFormat.sizePrefixDisabled |
false |
Should serialized messages include a payload length prefix? Only used by openwire protocol. |
wireFormat.tightEncodingEnabled |
false |
Should wire size be optimized over CPU usage? Only used by the openwire protocol. |
wireFormat.maxInactivityDuration |
30000 |
The maximum inactivity duration (before which the socket is considered dead) in milliseconds. On some platforms it can take a long time for a socket to appear to die, so we allow the broker to kill connections if they are inactive for a period of time. Use by some transports to enable a keep alive heart beat feature. Set to a value <= 0 to disable inactivity monitoring. |
maxInactivityDurationInitalDelay |
10000 |
The initial delay in starting the maximum inactivity checks (and, yes, the word 'Inital' is supposed to be misspelled like that) |
一、控制檯安全配置,打開conf/jetty.xml文件,找到
<bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">
<property name="name" value="BASIC" />
<property name="roles" value="admin" />
<property name="authenticate" value="false" />
</bean>
將「false」改成「true」便可。用戶名和密碼存放在conf/jetty-realm.properties文件中。
二、生產者和消費者鏈接MQ須要密碼
打開conf/activemq.xml文件,在<broker>標籤裏的<systemUsage>標籤前加入:
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
注意必須在<systemUsage>標籤前,不然啓動ActiveMQ會報錯。
用戶名和密碼存放在conf/credentials.properties文件中