1.1 什麼是中間件?java
因爲業務、機構和技術是不斷變化的,所以爲其服務的軟件系統必須適應這樣的變化。在合併、添加服務或擴展可用服務以後,公司可能無力負擔從新建立信息系統所需的成本。正是在這個關鍵時刻,才須要集成新組件或者儘量高效地擴展示有組件。要集成異類組件,最方便的方法不是將它們從新建立爲同類元素,而是提供一個容許它們進行通訊(不考慮它們之間的差別)的層。該層被稱做中間件。
1.2 中間件的分類mysql
消息隊列中間件是分佈式系統中重要的組件,主要解決應用耦合、異步消息、流量削鋒等問題。實現高性能、高可用、可伸縮和最終一致性架構。是大型分佈式系統不可缺乏的中間件。
3.1 什麼是jms?web
JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
3.2 JMS 消息傳送模式sql
3.3 JMS 消息傳送對象數據庫
JMS 消息傳送的對象在編程域中基本保持不變:鏈接工廠、鏈接、會話、生成方、使用方、消息和目的地。
MQ全稱爲Message Queue,消息隊列(MQ)是正確而又完整的 JMS 實現,消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過寫和檢索出入列隊的針對應用程序的數據(消息)來通訊,而無需專用鏈接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。
場景說明:新用戶註冊發放100積分,180元新手大禮包,激活會員卡,傳統的作法有兩種:串行方式,並行方式。
以上兩種方式,很容易發現同步處理的狀況下都會涉及到非主業務的其餘操做,其實註冊的的主流程不該該受其餘事件影響,經過消息隊列的方式,能夠把後續的處理流程進行異步處理能夠大大提升響應速度。
場景說明:企業中常常出現企業合做如:本公司的驢粉卡與電信合做,新開卡的用戶從電信端推送到我方,除了相對應的福利外,首先判斷是否註冊本公司帳戶, 沒有給予註冊,可是新用戶的相對應權益須要對等的發放。
缺點:apache
1.與其餘系統過分耦合
2.短信發放或優惠券發放失敗,影響主業務編程
優勢:瀏覽器
1.註冊完成而後將消息寫入隊列返回成功。
2.發放權益業務不影響主業務,實現解耦。緩存
場景說明:秒殺活動對稀缺或者特價的商品進行定時定量售賣,吸引成大量的消費者進行搶購,但又只有少部分消費者能夠下單成功。 所以,秒殺活動將在較短期內產生比平時大數十倍,上百倍的頁面訪問流量和下單請求流量。
ActiveMQ是Apache軟件基金下的一個開源軟件,它遵循JMS1.1規範(Java Message Service),是消息驅動中間件軟件(MOM)。它爲企業消息傳遞提供高可用,出色性能,可擴展,穩定和安全保障。
P2P (點對點)消息域使用 queue 做爲 Destination,消息能夠被同步或異步的發送和接收,每一個消息只會給一個 Consumer 傳送一次。
Pub/Sub(發佈/訂閱,Publish/Subscribe)消息域使用 topic 做爲 Destination,發佈者向 topic 發送消息,訂閱者註冊接收來自 topic 的消息。發送到 topic 的任何消息都將自動傳遞給全部訂閱者。接收方式(同步和異步)與 P2P 域相同。
消息生產者安全
//建立session會話 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.187.13:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //建立一個消息隊列 session.createQueue("jms.test.topic")--P2P模式 Destination destination = session.createTopic("jms.test.topic"); //建立消息生產者 MessageProducer producer = session.createProducer(destination); //消息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 0; i < messageNum; i++) { producer.send(session.createTextMessage("Message Producer:" + i)); } //提交會話 session.commit();
消息消費者
//建立session會話 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.187.13:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //建立一個消息隊列 session.createQueue("jms.test.topic")--P2P模式 Destination destination = session.createTopic("jms.test.topic"); //建立消息消費者 MessageConsumer consumer = session.createConsumer(destination); while (true) { TextMessage message = (TextMessage) consumer.receive(); if (message != null){ System.out.println("Message Consumer:"+message.getText()); }else { break; } } session.commit();
ActiveMQ 5.3 版本起的默認存儲方式。KahaDB存儲是一個基於文件的快速存儲消息,設計目標是易於使用且儘量快。它使用基於文件的消息數據庫意味着沒有第三方數據庫的先決條件。
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="32mb"/> </persistenceAdapter> </broker>
MQ存儲使用戶可以快速啓動和運行,由於它不依賴於第三方數據庫。AMQ 消息存儲庫是可靠持久性和高性能索引的事務日誌組合,當消息吞吐量是應用程序的主要需求時,該存儲是最佳選擇。但由於它爲每一個索引使用兩個分開的文件,而且每一個 Destination 都有一個索引,因此當你打算在代理中使用數千個隊列的時候,不該該使用它。
<persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/kahadb" syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100" maxFileLength="10mb" /> </persistenceAdapter>
選擇關係型數據庫,一般的緣由是企業已經具有了管理關係型數據的專長,可是它在性能上絕對不優於上述消息存儲實現。事實是,許多企業使用關係數據庫做爲存儲,是由於他們更願意充分利用這些數據庫資源。
<beans> <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> </broker> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://127.0.0.1/jms?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>
內存消息存儲器將全部持久消息保存在內存中。在僅存儲有限數量 Message 的狀況下,內存消息存儲會頗有用,由於 Message 一般會被快速消耗。在 activema.xml 中將 broker 元素上的 persistent 屬性設置爲 false 便可。
<broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://192.168.187.13:61616"/> </transportConnectors> </broker>
-rw-rw-r--. 1 lvmama01 lvmama01 32M 5月 18 09:47 db-1.log -rw-rw-r--. 1 lvmama01 lvmama01 32K 5月 18 09:47 db.data -rw-rw-r--. 1 lvmama01 lvmama01 33K 5月 18 09:47 db.redo -rw-rw-r--. 1 lvmama01 lvmama01 0 5月 16 19:31 lock
能夠看出,上面directory一共有四個文件:
①db.data
它是消息的索引文件。本質上是B-Tree的實現,使用B-Tree做爲索引指向db-*.log裏面存儲的消息。
②db.redo
主要用來進行消息恢復。
③db-*.log 存儲消息的內容。對於一個消息而言,不只僅有消息自己的數據(message data),並且還有(Destinations、訂閱關係、事務...),data log以日誌形式存儲消息,並且新的數據老是以APPEND的方式追加到日誌文件末尾。所以,消息的存儲是很快的。好比,對於持久化消息,Producer把消息發送給Broker,Broker先把消息存儲到磁盤中(enableJournalDiskSyncs配置選項),而後再向Producer返回Acknowledge。Append方式在必定程度上減小了Broker向Producer返回Acknowledge的時間。
④lock文件
KahaDB內部分爲:data logs, 按照Message ID高度優化的索引,memory message cache。
①在內存(cache)中的那部分B-Tree是Metadata Cache
經過將索引緩存到內存中,能夠加快查詢的速度(quick retrival of message data)。可是須要定時將 Metadata Cache 與 Metadata Store同步。這個同步過程就稱爲:check point。由checkpointInterval選項 決定每隔多久時間進行一次checkpoint操做。
②BTree Indexes則是保存在磁盤上的,稱爲Metadata Store,它對應於文件db.data,它就是對Data Logs以B樹的形式索引。
有了它,Broker(消息服務器)能夠快速地重啓恢復,由於它是消息的索引,根據它就能恢復出每條消息的location。若是Metadata Store被損壞,則只能掃描整個Data Logs來重建B樹了。
③Data Logs則對應於文件 db-*.log,默認是32MB
Data Logs以日誌形式存儲消息,它是生產者生產的數據的真正載體。
④Redo Log則對應於文件 db.redo,redo log的原理用到了「Double Write」。
簡要記錄下本身的理解:由於磁盤的頁大小與操做系統的頁大小不同,磁盤的頁大小通常是16KB,而OS的頁大小是4KB。而數據寫入磁盤是以磁盤頁大小爲單位進行的,即一次寫一個磁盤頁大小,這就須要4個OS的頁大小(4*4=16)。若是在寫入過程當中出現故障(忽然斷電)就會致使只寫入了一部分數據(partial page write)
而採用了「Double Write」以後,將數據寫入磁盤時,先寫到一個Recovery Buffer中,而後再寫到真正的目的文件中。在ActiveMQ的源碼PageFile.java中有相應的實現。
public void unload() throws IOException { //load時建立writeFile(db.data)和 recoveryFile(db.redo) writeFile = new RecoverableRandomAccessFile(file, "rw", false); ........ if (enableRecoveryFile) { recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw"); } } private void writeBatch() throws IOException { ....... //將數據寫入磁盤時,先寫到一個Recovery Buffer中(db.data) for (PageWrite w : batch) { try { checksum.update(w.getDiskBound(), 0, pageSize); } catch (Throwable t) { throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t); } recoveryFile.writeLong(w.page.getPageId()); recoveryFile.write(w.getDiskBound(), 0, pageSize); } ....... //寫入真正的目的文件中(db.redo) for (PageWrite w : batch) { writeFile.seek(toOffset(w.page.getPageId())); writeFile.write(w.getDiskBound(), 0, pageSize); w.done(); } }
1.默認的單機部署(kahadb)
略......
2.共享存儲主從模式(基於數據庫)
3.共享存儲主從模式(基於文件系統)
4.基於zookeeper的主從(levelDB Master/Slave詳細說明)
第一步:zookeeper集羣搭建
server.1=lvmama01:2888:3888 server.2=lvmama02:2888:3888 server.3=lvmama03:3888:3888
第二步:activemq集羣搭建修改activemq.xml文件:
<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:0" //zookeeper集羣地址 zkAddress="192.168.187.11:2181,192.168.187.12:2181,192.168.187.13:2181" //本地ip hostname="192.168.187.11" sync="local_disk" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>
第三步:分別啓動三臺activemq(仔細查看日誌):
1.啓動第一臺機器(lvmama01:192.168.187.11)
2.啓動第二臺機器(lvmama02:192.168.187.12)
3.第三臺啓動同第二臺
第三步:查看是否啓動成功(沒成功能夠查看activemq.log日誌)
啓動成功後經過zkCli.sh能夠看到已建立leveldb-stores以下:
第四步:經過流量器訪問web管理頁面(注意只有master機器能夠訪問)
第五步:測試
String userName = ActiveMQConnectionFactory.DEFAULT_USER; String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD; String brokerURL = "failover:(tcp://192.168.187.11:61616,tcp://192.168.187.12:61616,tcp://192.168.187.13:61616)?Randomize=false"; //2. 經過ConnectionFactory創建一個Connection鏈接,而且調用start方法開啓 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, brokerURL); Connection connection = connectionFactory.createConnection(); connection.start(); //3. 經過Connection建立Session,用於接收消息[第一個參數:是否啓用事務;第二個參數:設置簽收模式] Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //4. 經過Session建立Destination對象 Destination destination = session.createQueue("cluster-queue"); //5. 經過Session建立發送或接受對象 MessageProducer messageProducer = session.createProducer(null);
運行結果(此時發送的目標爲192.168.187.11):
Connected to the target VM, address: '127.0.0.1:12266', transport: 'socket' INFO | Successfully connected to tcp://192.168.187.11:61616 生產者:Hello MQ:1 生產者:Hello MQ:2 生產者:Hello MQ:3 生產者:Hello MQ:4 生產者:Hello MQ:5 生產者:Hello MQ:6 生產者:Hello MQ:7 生產者:Hello MQ:8 生產者:Hello MQ:9
此時將activemq master服務中止,集羣自動從新選舉 lvmama02(192.168.187.12)成爲Master
咱們再試運行測試用例發現消息任然能夠發送,只不過發送的目標變爲192.168.187.12
Connected to the target VM, address: '127.0.0.1:12400', transport: 'socket' INFO | Successfully connected to tcp://192.168.187.12:61616 生產者:Hello MQ:1 生產者:Hello MQ:2 生產者:Hello MQ:3 生產者:Hello MQ:4 生產者:Hello MQ:5 生產者:Hello MQ:6 生產者:Hello MQ:7 生產者:Hello MQ:8 生產者:Hello MQ:9
1.安裝Jmeter測試工具,參考
2.新建jndi.properties到jmeter/bin目錄下
//ActiveMQ jar包中init所需的類名 java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory //ActiveMQ的地址 java.naming.provider.url = tcp://127.0.0.1:61616 //鏈接工廠名稱 connectionFactoryNames = connectionFactory //p2p 隊列名稱 queue.MyQueue = example.MyQueue topic.MyTopic = example.MyTopic
3.把配置文件打到ApacheJMeter.jar 中 在jmeter/bin目錄下運行
jar uf ApacheJMeter.jar jndi.properties
4.下載Activemq,並加activemq-all-5.15.3.jar添加到Jmeter/lib下
5.配置Jmeter測試p2p模式
6.進行測試(單線程+60s+10000條消息)
可能因爲機器緣由,測試結果差距蠻大 ^_^