消息持久性對於可靠消息傳遞來講應該是一種比較好的方法,有了消息持久化,即便發送者和接受者不是同時在線或者消息中心在發送者發送消息後宕機了,在消息 中心從新啓動後仍然能夠將消息發送出去,若是把這種持久化和ReliableMessaging結合起來應該是很好的保證了消息的可靠傳送。java
消息持久性的原理很簡單,就是在發送者將消息發送出去後,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等,而後試圖將消息發送 給接收者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。消息中心啓動之後首先要檢查制定的存儲位置,若是有未發送成功的消息,則須要把消息發送出去。mysql
ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。程序員
一、AMQsql
AMQ是一種文件存儲形式,它具備寫入速度快和容易恢復的特色。消息存儲在一個個文件中,文件的默認大小爲32M,若是一條消息的大小超過了 32M,那麼這個值必須設置大一點。當一個存儲文件中的消息已經所有被消費,那麼這個文件將被標識爲可刪除,在下一個清除階段,這個文件被刪除。AMQ適用於ActiveMQ5.3以前的版本。默認配置以下:數據庫
1
2
3
|
<
persistenceAdapter
>
<
amqPersistenceAdapter
directory
=
"activemq-data"
maxFileLength
=
"32mb"
/>
</
persistenceAdapter
>
|
屬性以下:apache
屬性名稱緩存 |
默認值服務器 |
描述session |
directory |
activemq-data |
消息文件和日誌的存儲目錄 |
useNIO |
true |
使用NIO協議存儲消息 |
syncOnWrite |
false |
同步寫到磁盤,這個選項對性能影響很是大 |
maxFileLength |
32Mb |
一個消息文件的大小 |
persistentIndex |
true |
消息索引的持久化,若是爲false,那麼索引保存在內存中 |
maxCheckpointMessageAddSize |
4kb |
一個事務容許的最大消息量 |
cleanupInterval |
30000 |
清除操做週期,單位ms |
indexBinSize |
1024 |
索引文件緩存頁面數,缺省爲1024,當amq擴充或者縮減存儲時,會鎖定整個broker,致使必定時間的阻塞,因此這個值應該調整到比較大,可是代碼中實現會動態伸縮,調整效果並不理想。 |
indexKeySize |
96 |
索引key的大小,key是消息ID |
indexPageSize |
16kb |
索引的頁大小 |
directoryArchive |
archive |
存儲被歸檔的消息文件目錄 |
archiveDataLogs |
false |
當爲true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除 |
二、KahaDB
KahaDB是基於文件的本地數據庫儲存形式,雖然沒有AMQ的速度快,可是它具備強擴展性,恢復的時間比AMQ短,從5.4版本以後KahaDB作爲默認的持久化方式。默認配置以下:
KahaDB的屬性以下:
屬性名稱 |
默認值 |
描述 |
directory |
activemq-data |
消息文件和日誌的存儲目錄 |
indexWriteBatchSize |
1000 |
一批索引的大小,當要更新的索引量到達這個值時,更新到消息文件中 |
indexCacheSize |
10000 |
內存中,索引的頁大小 |
enableIndexWriteAsync |
false |
索引是否異步寫到消息文件中 |
journalMaxFileLength |
32mb |
一個消息文件的大小 |
enableJournalDiskSyncs |
true |
是否講非事務的消息同步寫入到磁盤 |
cleanupInterval |
30000 |
清除操做週期,單位ms |
checkpointInterval |
5000 |
索引寫入到消息文件的週期,單位ms |
ignoreMissingJournalfiles |
false |
忽略丟失的消息文件,false,當丟失了消息文件,啓動異常 |
checkForCorruptJournalFiles |
false |
檢查消息文件是否損壞,true,檢查發現損壞會嘗試修復 |
checksumJournalFiles |
false |
產生一個checksum,以便可以檢測journal文件是否損壞。 |
5.4版本以後有效的屬性: |
||
archiveDataLogs |
false |
當爲true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除 |
directoryArchive |
null |
存儲被歸檔的消息文件目錄 |
databaseLockedWaitDelay |
10000 |
在使用負載時,等待得到文件鎖的延遲時間,單位ms |
maxAsyncJobs |
10000 |
同個生產者產生等待寫入的異步消息最大量 |
concurrentStoreAndDispatchTopics |
false |
當寫入消息的時候,是否轉發主題消息 |
concurrentStoreAndDispatchQueues |
true |
當寫入消息的時候,是否轉發隊列消息 |
5.6版本以後有效的屬性: |
||
archiveCorruptedIndex |
false |
是否歸檔錯誤的索引 |
每一個KahaDB的實例均可以配置單獨的適配器,若是沒有目標隊列提交給filteredKahaDB,那麼意味着對全部的隊列有效。若是一個隊列沒有對應的適配器,那麼將會拋出一個異常。配置以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
<
persistenceAdapter
>
<
mKahaDBdirectory
=
"${activemq.base}/data/kahadb"
>
<
filteredPersistenceAdapters
>
<!-- match all queues -->
<
filteredKahaDBqueue
=">">
<
persistenceAdapter
>
<
kahaDBjournalMaxFileLength
=
"32mb"
/>
</
persistenceAdapter
>
</
filteredKahaDB
>
<!-- match all destinations -->
<
filteredKahaDB
>
<
persistenceAdapter
>
<
kahaDBenableJournalDiskSyncs
=
"false"
/>
</
persistenceAdapter
>
</
filteredKahaDB
>
</
filteredPersistenceAdapters
>
</
mKahaDB
>
</
persistenceAdapter
>
|
若是filteredKahaDB的perDestination屬性設置爲true,那麼匹配的目標隊列將會獲得本身對應的KahaDB實例。配置以下:
1
2
3
4
5
6
7
8
9
10
11
12
|
<
persistenceAdapter
>
<
mKahaDBdirectory
=
"${activemq.base}/data/kahadb"
>
<
filteredPersistenceAdapters
>
<!-- kahaDB per destinations -->
<
filteredKahaDB
perDestination
=
"true"
>
<
persistenceAdapter
>
<
kahaDBjournalMaxFileLength
=
"32mb"
/>
</
persistenceAdapter
>
</
filteredKahaDB
>
</
filteredPersistenceAdapters
>
</
mKahaDB
>
</
persistenceAdapter
>
|
三、JDBC
能夠將消息存儲到數據庫中,例如:Mysql、SQL Server、Oracle、DB2。
配置JDBC適配器:
1
2
3
|
<
persistenceAdapter
>
<
jdbcPersistenceAdapterdataSource
=
"#mysql-ds"
createTablesOnStartup
=
"false"
/>
</
persistenceAdapter
>
|
dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啓動的時候建立數據表,默認值是true,這樣每次啓動都會去建立數據表了,通常是第一次啓動的時候設置爲true,以後改爲false。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
Mysql持久化bean:
<
bean
id
=
"mysql-ds"
class
=
"org.apache.commons.dbcp.BasicDataSource"
destroy-method
=
"close"
>
<
property
name
=
"driverClassName"
value
=
"com.mysql.jdbc.Driver"
/>
<
property
name
=
"url"
value
=
"jdbc:mysql://localhost/activemq?relaxAutoCommit=true"
/>
<
property
name
=
"username"
value
=
"activemq"
/>
<
property
name
=
"password"
value
=
"activemq"
/>
<
property
name
=
"poolPreparedStatements"
value
=
"true"
/>
</
bean
>
SQL Server持久化bean:
<
bean
id
=
"mssql-ds"
class
=
"net.sourceforge.jtds.jdbcx.JtdsDataSource"
destroy-method
=
"close"
>
<
property
name
=
"serverName"
value
=
"SERVERNAME"
/>
<
property
name
=
"portNumber"
value
=
"PORTNUMBER"
/>
<
property
name
=
"databaseName"
value
=
"DATABASENAME"
/>
<
property
name
=
"user"
value
=
"USER"
/>
<
property
name
=
"password"
value
=
"PASSWORD"
/>
</
bean
>
Oracle持久化bean:
<
bean
id
=
"oracle-ds"
class
=
"org.apache.commons.dbcp.BasicDataSource"
destroy-method
=
"close"
>
<
property
name
=
"driverClassName"
value
=
"oracle.jdbc.driver.OracleDriver"
/>
<
property
name
=
"url"
value
=
"jdbc:oracle:thin:@10.53.132.47:1521:activemq"
/>
<
property
name
=
"username"
value
=
"activemq"
/>
<
property
name
=
"password"
value
=
"activemq"
/>
<
property
name
=
"maxActive"
value
=
"200"
/>
<
property
name
=
"poolPreparedStatements"
value
=
"true"
/>
</
bean
>
DB2持久化bean:
<
bean
id
=
"db2-ds"
class
=
"org.apache.commons.dbcp.BasicDataSource"
destroy-method
=
"close"
>
<
property
name
=
"driverClassName"
value
=
"com.ibm.db2.jcc.DB2Driver"
/>
<
property
name
=
"url"
value
=
"jdbc:db2://hndb02.bf.ctc.com:50002/activemq"
/>
<
property
name
=
"username"
value
=
"activemq"
/>
<
property
name
=
"password"
value
=
"activemq"
/>
<
property
name
=
"maxActive"
value
=
"200"
/>
<
property
name
=
"poolPreparedStatements"
value
=
"true"
/>
</
bean
>
|
四、LevelDB
這種文件系統是從ActiveMQ5.8以後引進的,它和KahaDB很是類似,也是基於文件的本地數據庫儲存形式,可是它提供比KahaDB更快的持久性。與KahaDB不一樣的是,它不是使用傳統的B-樹來實現對日誌數據的提早寫,而是使用基於索引的LevelDB。
默認配置以下:
1
2
3
|
<
persistenceAdapter
>
<
levelDBdirectory
=
"activemq-data"
/>
</
persistenceAdapter
>
|
屬性以下:
屬性名稱 |
默認值 |
描述 |
directory |
"LevelDB" |
數據文件的存儲目錄 |
readThreads |
10 |
系統容許的併發讀線程數量 |
sync |
true |
同步寫到磁盤 |
logSize |
104857600 (100 MB) |
日誌文件大小的最大值 |
logWriteBufferSize |
4194304 (4 MB) |
日誌數據寫入文件系統的最大緩存值 |
verifyChecksums |
false |
是否對從文件系統中讀取的數據進行校驗 |
paranoidChecks |
false |
儘快對系統內部發生的存儲錯誤進行標記 |
indexFactory |
org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory |
在建立LevelDB索引時使用 |
indexMaxOpenFiles |
1000 |
可供索引使用的打開文件的數量 |
indexBlockRestartInterval |
16 |
Number keys between restart points for delta encoding of keys. |
indexWriteBufferSize |
6291456 (6 MB) |
內存中索引數據的最大值 |
indexBlockSize |
4096 (4 K) |
每一個數據塊的索引數據大小 |
indexCacheSize |
268435456 (256 MB) |
使用緩存索引塊容許的最大內存 |
indexCompression |
snappy |
適用於索引塊的壓縮類型 |
logCompression |
none |
適用於日誌記錄的壓縮類型 |
五、 下面詳細介紹一下如何將消息持久化到Mysql數據庫中
Ø 須要將mysql的驅動包放置到ActiveMQ的lib目錄下
Ø 修改activeMQ的配置文件:
1
2
3
|
<
persistenceAdapter
>
<
jdbcPersistenceAdapter
dataDirectory
=
"${activemq.base}/data"
dataSource
=
"#mysql-ds"
createTablesOnStartup
=
"false"
/>
</
persistenceAdapter
>
|
在配置文件中的broker節點外增長:
1
2
3
4
5
6
7
8
|
<
beanid
=
"mysql-ds"
class
=
"org.apache.commons.dbcp.BasicDataSource"
destroy-method
=
"close"
>
<
propertyname
=
"driverClassName"
value
=
"com.mysql.jdbc.Driver"
/>
<
property
name
=
"url"
value
=
"jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"
/>
<
property
name
=
"username"
value
=
"root"
/>
<
property
name
=
"password"
value
=
"root"
/>
<
property
name
=
"maxActive"
value
=
"200"
/>
<
propertyname
=
"poolPreparedStatements"
value
=
"true"
/>
</
bean
>
|
從配置中能夠看出數據庫的名稱是activemq,須要手動在MySql中創建這個數據庫。
而後從新啓動activeMQ,會發現activemq多了三張表:
1:activemq_acks
2:activemq_lock
3:activemq_msgs
Ø 點到點類型
Sender類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
import
javax.jms.Connection;
import
javax.jms.ConnectionFactory;
import
javax.jms.DeliveryMode;
import
javax.jms.Destination;
import
javax.jms.JMSException;
import
javax.jms.MessageProducer;
import
javax.jms.Session;
import
javax.jms.TextMessage;
import
org.apache.activemq.ActiveMQConnection;
import
org.apache.activemq.ActiveMQConnectionFactory;
public
class
Sender {
private
static
final
int
SEND_NUMBER =
2000
;
public
static
void
main(String[] args) {
// ConnectionFactory :鏈接工廠,JMS用它建立鏈接
ConnectionFactory connectionFactory;
// Connection :JMS客戶端到JMS Provider的鏈接
Connection connection =
null
;
// Session:一個發送或接收消息的線程
Session session;
// Destination :消息的目的地;消息發送給誰.
Destination destination;
// MessageProducer:消息發送者
MessageProducer producer;
// TextMessage message;
// 構造ConnectionFactory實例對象,此處採用ActiveMq的實現
connectionFactory =
new
ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
try
{
// 構造從工廠獲得鏈接對象
connection = connectionFactory.createConnection();
//啓動
connection.start();
//獲取操做鏈接
session = connection.createSession(
false
, Session.AUTO_ACKNOWLEDGE);
//獲取session,FirstQueue是一個服務器的queue destination = session.createQueue("FirstQueue");
// 獲得消息生成者【發送者】
producer = session.createProducer(destination);
//設置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//構造消息
sendMessage(session, producer);
//session.commit();
connection.close();
}
catch
(Exception e){
e.printStackTrace();
}
finally
{
if
(
null
!= connection){
try
{
connection.close();
}
catch
(JMSException e) {
// TODO Auto-generatedcatch block
e.printStackTrace();
}
}
}
}
public
static
void
sendMessage(Session session, MessageProducer producer)
throws
Exception{
for
(
int
i=
1
; i<=SEND_NUMBER; i++){
TextMessage message = session.createTextMessage(
"ActiveMQ發送消息"
+i);
System.out.println(
"發送消息:ActiveMQ發送的消息"
+i);
producer.send(message);
}
}
}
|
Receiver類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
import
javax.jms.Connection;
import
javax.jms.ConnectionFactory;
import
javax.jms.Destination;
import
javax.jms.MessageConsumer;
import
javax.jms.Session;
import
javax.jms.TextMessage;
import
org.apache.activemq.ActiveMQConnection;
import
org.apache.activemq.ActiveMQConnectionFactory;
public
class
Receiver {
public
static
void
main(String[] args) {
// ConnectionFactory :鏈接工廠,JMS用它建立鏈接
ConnectionFactory connectionFactory;
// Connection :JMS客戶端到JMS Provider的鏈接
Connection connection =
null
;
// Session:一個發送或接收消息的線程
Session session;
// Destination :消息的目的地;消息發送給誰.
Destination destination;
// 消費者,消息接收者
MessageConsumer consumer;
connectionFactory = newActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
try
{
//獲得鏈接對象
connection =connectionFactory.createConnection();
// 啓動
connection.start();
// 獲取操做鏈接
session = connection.createSession(
false
,
Session.AUTO_ACKNOWLEDGE);
// 建立Queue
destination = session.createQueue(
"FirstQueue"
);
consumer =session.createConsumer(destination);
while
(
true
){
//設置接收者接收消息的時間,爲了便於測試,這裏定爲100s
TextMessagemessage = (TextMessage)consumer.receive(
100000
);
if
(
null
!= message){
System.out.println(
"收到消息"
+message.getText());
}
else
break
;
}
}
catch
(Exception e){
e.printStackTrace();
}
finally
{
try
{
if
(
null
!= connection)
connection.close();
}
catch
(Throwable ignore) {
}
}
}
}
|
測試:
測試一:
A、 先運行Sender類,待運行完畢後,運行Receiver類
B、 在此過程當中activemq數據庫的activemq_msgs表中沒有數據
C、 再次運行Receiver,消費不到任何信息
測試二:
A、 先運行Sender類
B、 重啓電腦
C、 運行Receiver類,無任何信息被消費
測試三:
A、 把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改成producer.setDeliveryMode(DeliveryMode.PERSISTENT);
B、 先運行Sender類,待運行完畢後,運行Receiver類
C、 在此過程當中activemq數據庫的activemq_msgs表中有數據生成,運行完Receiver類後,數據清除
測試四:
A、 把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改成producer.setDeliveryMode(DeliveryMode.PERSISTENT);
B、 運行Sender類
C、 重啓電腦
D、 運行Receiver類,有消息被消費
結論:
經過以上測試,能夠發現,在P2P類型中當DeliveryMode設置爲NON_PERSISTENCE時,消息被保存在內存中,而當 DeliveryMode設置爲PERSISTENCE時,消息保存在broker的相應的文件或者數據庫中。並且P2P中消息一旦被Consumer消 費就從broker中刪除。
Ø 發佈/訂閱類型
Sender類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
import
javax.jms.Connection;
import
javax.jms.ConnectionFactory;
import
javax.jms.DeliveryMode;
import
javax.jms.Destination;
import
javax.jms.JMSException;
import
javax.jms.MessageProducer;
import
javax.jms.Session;
import
javax.jms.TextMessage;
import
javax.jms.Topic;
import
org.apache.activemq.ActiveMQConnection;
import
org.apache.activemq.ActiveMQConnectionFactory;
public
class
Sender {
private
static
final
int
SEND_NUMBER =
100
;
public
static
void
main(String[] args) {
// ConnectionFactory :鏈接工廠,JMS用它建立鏈接
ConnectionFactory connectionFactory;
// Connection :JMS客戶端到JMS Provider的鏈接
Connection connection =
null
;
// Session:一個發送或接收消息的線程
Session session;
// MessageProducer:消息發送者
MessageProducer producer;
// TextMessage message;
// 構造ConnectionFactory實例對象,此處採用ActiveMq的實現
connectionFactory =
new
ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
try
{
//獲得鏈接對象
connection = connectionFactory.createConnection();
//啓動
connection.start();
//獲取操做鏈接
session = connection.createSession(
false
, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(
"MQ_test"
);
// 獲得消息生成者【發送者】
producer = session.createProducer(topic);
//設置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//構造消息
sendMessage(session, producer);
//session.commit();
connection.close();
}
catch
(Exception e){
e.printStackTrace();
}
finally
{
if
(
null
!= connection){
try
{
connection.close();
}
catch
(JMSException e) {
// TODO Auto-generatedcatch block
e.printStackTrace();
}
}
}
}
public
static
void
sendMessage(Session session, MessageProducer producer)
throws
Exception{
for
(
int
i=
1
; i<=SEND_NUMBER; i++){
TextMessage message = session.createTextMessage(
"ActiveMQ發送消息"
+i);
System.out.println(
"發送消息:ActiveMQ發送的消息"
+i);
producer.send(message);
}
}
}
|
Receiver類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
import
javax.jms.Connection;
import
javax.jms.ConnectionFactory;
import
javax.jms.Destination;
import
javax.jms.MessageConsumer;
import
javax.jms.Session;
import
javax.jms.TextMessage;
import
javax.jms.Topic;
import
org.apache.activemq.ActiveMQConnection;
import
org.apache.activemq.ActiveMQConnectionFactory;
public
class
Receiver {
public
static
void
main(String[] args) {
// ConnectionFactory :鏈接工廠,JMS用它建立鏈接
ConnectionFactory connectionFactory;
// Connection :JMS客戶端到JMS Provider的鏈接
Connection connection =
null
;
// Session:一個發送或接收消息的線程
Session session;
// 消費者,消息接收者
MessageConsumer consumer;
connectionFactory = newActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
try
{
// 構造從工廠獲得鏈接對象
connection =connectionFactory.createConnection();
connection.setClientID(
"clientID001"
);
// 啓動
connection.start();
// 獲取操做鏈接
session = connection.createSession(
false
,
Session.AUTO_ACKNOWLEDGE);
// 獲取session
Topic topic = session.createTopic(
"MQ_test"
);
// 獲得消息生成者【發送者】
consumer = session.createDurableSubscriber(topic,
"MQ_sub"
);
while
(
true
){
//設置接收者接收消息的時間,爲了便於測試,這裏誰定爲100s
TextMessagemessage = (TextMessage)consumer.receive(
100000
);
if
(
null
!= message){
System.out.println(
"收到消息"
+message.getText());
}
else
break
;
}
}
catch
(Exception e){
e.printStackTrace();
}
finally
{
try
{
if
(
null
!= connection)
connection.close();
}
catch
(Throwable ignore) {
}
}
}
}
|
測試:
測試一:
A、先啓動Sender類
B、再啓動Receiver類
C、結果無任何記錄被訂閱
測試二:
A、先啓動Receiver類,讓Receiver在相關主題上進行訂閱
B、中止Receiver類,再啓動Sender類
C、待Sender類運行完成後,再啓動Receiver類
D、結果發現相應主題的信息被訂閱
Ø