MQ全稱爲Message Queue,即消息隊列,它是一種應用程序之間的通訊方法,消息隊列在分佈式系統開
發中應用很是普遍。開發中消息隊列一般有以下應用場景:一、任務異步處理。將不須要同步處理的而且耗時長的操做由消息隊列通知消息接收方進行異步處理。提升了應用程序的響應時間。二、應用程序解耦合MQ至關於一箇中介,生產方經過MQ與消費方交互,它將應用程序進行解耦合。市場上還有哪些消息隊列?ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。咱們主要介紹主流的消息中間件,瞭解每一個MQ的優缺點,能知曉什麼樣的場景下選用合適的MQ。html
ActiveMQ 是徹底基於 JMS 規範實現的一個消息中間件產品。 是 Apache 開源基金會研發的消息中間件。ActiveMQ主要應用在分佈式系統架構中,幫助構建高可用、 高性能、可伸縮的企業級面向消息服務的系統。java
Java 消息服務(Java Message Service)是 java 平臺中關於面向消息中間件的 API,用於在兩個應用程序之間,或者分佈式系統中發送消息,進行異步通訊。JMS 是一個與具體平臺無關的 API ,絕大多數 MOM(Message Oriented Middleware)(面向消息中間件)提供商都對 JMS 提供了支持。例如ActiveMQ就是其中一個實現。mysql
MOM 是面向消息的中間件,使用消息傳送提供者來協調消息傳送操做。MOM 須要提供 API 和管理工具。客戶端使用 api 調用,把消息發送到由提供者管理的目的地。在發送消息以後,客戶端會繼續執行其餘工做,而且在接收方收到這個消息確認以前,提供者一直保留該消息。web
咱們已經知道了 JMS 規範的目的是爲了使得 Java 應用程序可以訪問現有 MOM (消息中間件)系統,造成一套統一的標準規範,解決不一樣消息中間件之間的協做問題。在建立 JMS 規範時,設計者但願可以結合現有的消息傳送的精髓,好比說spring
1)鏈接工廠。鏈接工廠(ConnectionFactory)是由管理員建立,並綁定到JNDI樹中。客戶端使用JNDI查找鏈接工廠,而後利用鏈接工廠建立一個JMS鏈接。sql
2)JMS鏈接。JMS鏈接(Connection)表示JMS客戶端和服務器端之間的一個活動的鏈接,是由客戶端經過調用鏈接工廠的方法創建的。docker
3)JMS會話。JMS會話(Session)表示JMS客戶與JMS服務器之間的會話狀態。JMS會話創建在JMS鏈接上,表示客戶與服務器之間的一個會話線程。數據庫
4)JMS目的。JMS目的(Destination),又稱爲消息隊列,是實際的消息源。apache
5)JMS生產者和消費者。生產者(Message Producer)和消費者(Message Consumer)對象由Session對象建立,用於發送和接收消息。windows
6)JMS消息一般有兩種類型:
① 點對點(Point-to-Point)。在點對點的消息系統中,消息分發給一個單獨的使用者。點對點消息每每與隊列(javax.jms.Queue)相關聯。
② 發佈/訂閱(Publish/Subscribe)。發佈/訂閱消息系統支持一個事件驅動模型,消息生產者和消費者都參與消息的傳遞。生產者發佈事件,而使用者訂閱感興趣的事件,並使用事件。該類型消息通常與特定的主題(javax.jms.Topic)關聯。
windows安裝
下載地址:http://activemq.apache.org/activemq-5150-release.html
下載完成後解壓進入bin目錄 運行 activemq.bat。
若是你遇到以下問題,5672端口被佔用
能夠去修改activemq的conf目錄下的activemq.xml,把amqp的端口改成其餘的,這裏改爲了5673
再次啓動:
訪問地址:http://127.0.0.1:8161/admin/進入後臺頁面 初始帳號密碼 admin admin
Docker安裝ActiveMQ
docker run -d --name activemq -p 61616:61616 -p 8161:8161 webcenter/activemq
Springboot集成ActiveMQ
導入依賴
<dependencies> <!--Springboot--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.3.0.RELEASE</version> </dependency> <!--ActiveMq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> <version>1.5.0.RELEASE</version> </dependency> <!--消息隊列鏈接池--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.0</version> </dependency> </dependencies>
配置MQ
server: port: 8080 spring: activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: admin close-timeout: 15s # 在考慮結束以前等待的時間 in-memory: true # 默認代理URL是否應該在內存中。若是指定了顯式代理,則忽略此值。 non-blocking-redelivery: false # 是否在回滾回滾消息以前中止消息傳遞。這意味着當啓用此命令時,消息順序不會被保留。 send-timeout: 0 # 等待消息發送響應的時間。設置爲0等待永遠。 queue-name: active.queue topic-name: active.topic.name.model # packages: # trust-all: true #不配置此項,會報錯 pool: enabled: true max-connections: 10 #鏈接池最大鏈接數 idle-timeout: 30000 #空閒的鏈接過時時間,默認爲30秒 # jms: # pub-sub-domain: true #默認狀況下activemq提供的是queue模式,若要使用topic模式須要配置下面配置 # 是否信任全部包 #spring.activemq.packages.trust-all= # 要信任的特定包的逗號分隔列表(當不信任全部包時) #spring.activemq.packages.trusted= # 當鏈接請求和池滿時是否阻塞。設置false會拋「JMSException異常」。 #spring.activemq.pool.block-if-full=true # 若是池仍然滿,則在拋出異常前阻塞時間。 #spring.activemq.pool.block-if-full-timeout=-1ms # 是否在啓動時建立鏈接。能夠在啓動時用於加熱池。 #spring.activemq.pool.create-connection-on-startup=true # 是否用Pooledconnectionfactory代替普通的ConnectionFactory。 #spring.activemq.pool.enabled=false # 鏈接過時超時。 #spring.activemq.pool.expiry-timeout=0ms # 鏈接空閒超時 #spring.activemq.pool.idle-timeout=30s # 鏈接池最大鏈接數 #spring.activemq.pool.max-connections=1 # 每一個鏈接的有效會話的最大數目。 #spring.activemq.pool.maximum-active-session-per-connection=500 # 當有"JMSException"時嘗試從新鏈接 #spring.activemq.pool.reconnect-on-exception=true # 在空閒鏈接清除線程之間運行的時間。當爲負數時,沒有空閒鏈接驅逐線程運行。 #spring.activemq.pool.time-between-expiration-check=-1ms # 是否只使用一個MessageProducer #spring.activemq.pool.use-anonymous-producers=true
編寫配置類
/** * @author 原 * @date 2020/12/16 * @since 1.0 **/ @Configuration public class BeanConfig { @Value("${spring.activemq.broker-url}") private String brokerUrl; @Value("${spring.activemq.user}") private String username; @Value("${spring.activemq.topic-name}") private String password; @Value("${spring.activemq.queue-name}") private String queueName; @Value("${spring.activemq.topic-name}") private String topicName; @Bean(name = "queue") public Queue queue() { return new ActiveMQQueue(queueName); } @Bean(name = "topic") public Topic topic() { return new ActiveMQTopic(topicName); } @Bean public ConnectionFactory connectionFactory(){ return new ActiveMQConnectionFactory(username, password, brokerUrl); } @Bean public JmsMessagingTemplate jmsMessageTemplate(){ return new JmsMessagingTemplate(connectionFactory()); } /** * 在Queue模式中,對消息的監聽須要對containerFactory進行配置 * @param connectionFactory * @return */ @Bean("queueListener") public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } /** * 在Topic模式中,對消息的監聽須要對containerFactory進行配置 * @param connectionFactory * @return */ @Bean("topicListener") public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } }
編寫啓動類
/** * @author 原 * @date 2020/12/8 * @since 1.0 **/ @SpringBootApplication @EnableJms //開啓JMS支持 public class DemoApplication { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } /** * 應用啓動後,會執行該方法 * 會分別向queue和topic發送一條消息 */ @PostConstruct public void sendMsg(){ jmsMessagingTemplate.convertAndSend(queue,"queue-test"); jmsMessagingTemplate.convertAndSend(topic,"topic-test"); } }
查看activemq後臺
active.queue 爲隊列的名稱
Number Of Pending Messages 等待消費的消息數量 3是由於我本身發了3次
Messages Enqueued 已經進入隊列的消息數量
由於沒有消費者,消息一直沒有被消費。下面咱們編寫消費者代碼。
/** * @author 原 * @date 2020/12/16 * @since 1.0 **/ @Component public class QueueConsumerListener { @JmsListener(destination = "${spring.activemq.queue-name}",containerFactory = "queueListener") public void getQueue(String message){ System.out.println("接受queue:"+message); } @JmsListener(destination = "${spring.activemq.topic-name}",containerFactory = "topicListener") public void getTopic(String message){ System.out.println("接受topic:"+message); } }
在後臺發送一條消息
控制檯打印
發送topic消息
控制檯打印:
可是發現一個問題是,以前在沒有消費的時候,有3條queue和一條topic,可是當我啓動消費者時,queue的3條消息被消費了,topic確沒有。這是由於:
topic模式有普通訂閱和持久化訂閱
普通訂閱:在消費者啓動以前發送過來的消息,消費者啓動以後不會去消費;
持久化訂閱: 在消費者啓動以前發送過來的消息,消費者啓動以後會去消費;
ActiveMQ支持同步、異步兩種發送模式將消息發送到broker上。
同步發送過程當中,發送者發送一條消息會阻塞直到broker反饋一個確認消息,表示消息已經被broker處理。這個機
制提供了消息的安全性保障,可是因爲是阻塞的操做,會影響到客戶端消息發送的性能
異步發送的過程當中,發送者不須要等待broker提供反饋,因此性能相對較高。可是可能會出現消息丟失的狀況。所
以使用異步發送的前提是在某些狀況下容許出現數據丟失的狀況。
默認狀況下,非持久化消息是異步發送的,持久化消息而且是在非事務模式下是同步發送的。
可是在開啓事務的狀況下,消息都是異步發送。因爲異步發送的效率會比同步發送性能更高。因此在發送持久化消
息的時候,儘可能去開啓事務會話。
ProducerWindowSize的含義
producer每發送一個消息,統計一下發送的字節數,當字節數達到ProducerWindowSize值時,須要等待broker的確認,才能繼續發送。
代碼在:ActiveMQSession的1957行
主要用來約束在異步發送時producer端容許積壓的(還沒有ACK)的消息的大小,且只對異步發送有意義。每次發送消息以後,都將會致使memoryUsage大小增長(+message.size),當broker返回producerAck時,memoryUsage尺寸減小(producerAck.size,此size表示先前發送消息的大小)。
能夠經過以下2種方式設置:
Ø 在brokerUrl中設置: "tcp://localhost:61616?jms.producerWindowSize=1048576",這種設置將會對全部的
producer生效。
Ø 在destinationUri中設置: "test-queue?producer.windowSize=1048576",此參數只會對使用此Destination實例
的producer失效,將會覆蓋brokerUrl中的producerWindowSize值。
注意:此值越大,意味着消耗Client端的內存就越大。
ActiveMQMessageProducer.send(...)方法
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { checkClosed();//檢查session鏈接,若已關閉直接拋出異常 if (destination == null) {//校驗發送消息的目的地是否爲空,也就是必須制定queue或者topic信息 if (info.getDestination() == null) { throw new UnsupportedOperationException("A destination must be specified."); } throw new InvalidDestinationException("Don't understand null destinations"); } //這裏作的是封裝Destination ActiveMQDestination dest; if (destination.equals(info.getDestination())) { dest = (ActiveMQDestination)destination; } else if (info.getDestination() == null) { dest = ActiveMQDestination.transform(destination); } else { throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); } if (dest == null) { throw new JMSException("No destination specified"); } //封裝Message if (transformer != null) { Message transformedMessage = transformer.producerTransform(session, this, message); if (transformedMessage != null) { message = transformedMessage; } } //若是設置了producerWindow,則須要校驗producerWindow大小 if (producerWindow != null) { try { producerWindow.waitForSpace(); } catch (InterruptedException e) { throw new JMSException("Send aborted due to thread interrupt."); } } //發送消息 this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete); //作統計的 stats.onMessage(); }
ActiveMQSession的send方法
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { //校驗鏈接 checkClosed(); //校驗發送目標 if (destination.isTemporary() && connection.isDeleted(destination)) { throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); } //互斥鎖,若是一個session的多個producer發送消息到這裏,會保證消息發送的有序性 synchronized (sendMutex) { // tell the Broker we are about to start a new transaction doStartTransaction(); TransactionId txid = transactionContext.getTransactionId(); long sequenceNumber = producer.getMessageSequence(); //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 message.setJMSDeliveryMode(deliveryMode);//設置是否持久化 long expiration = 0L; if (!producer.getDisableMessageTimestamp()) { long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } message.setJMSExpiration(expiration);//消息過時時間 message.setJMSPriority(priority);//消息優先級 message.setJMSRedelivered(false);//是否重複發送 // transform to our own message format here 統一封裝 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); msg.setDestination(destination); //設置消息ID msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); // Set the message id. if (msg != message) {//若是消息是通過轉化的,則更新原來的消息id和目的地 message.setJMSMessageID(msg.getMessageId().toString()); // Make sure the JMS destination is set on the foreign messages too. message.setJMSDestination(destination); } //clear the brokerPath in case we are re-sending this message msg.setBrokerPath(null); msg.setTransactionId(txid); if (connection.isCopyMessageOnSend()) { msg = (ActiveMQMessage)msg.copy(); } msg.setConnection(connection); msg.onSend();//把消息屬性和消息體都設置爲只讀,防止被修改 msg.setProducerId(msg.getMessageId().getProducerId()); if (LOG.isTraceEnabled()) { LOG.trace(getSessionId() + " sending message: " + msg); } //若是onComplete沒有設置,且發送超時時間小於0,且消息不須要反饋,且鏈接器不是同步發送模式,且消息非持久化或者鏈接器是異步發送模式 //或者存在事務id的狀況下,走異步發送,不然走同步發送 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { this.connection.asyncSendPacket(msg); if (producerWindow != null) { // Since we defer lots of the marshaling till we hit the // wire, this might not // provide and accurate size. We may change over to doing // more aggressive marshaling, // to get more accurate sizes.. this is more important once // users start using producer window // flow control. int size = msg.getSize();//異步發送的狀況下,須要設置producerWindow的大小 producerWindow.increaseUsage(size); } } else { if (sendTimeout > 0 && onComplete==null) { this.connection.syncSendPacket(msg,sendTimeout);//帶超時時間的同步發送//帶回調的同步發送 }else { this.connection.syncSendPacket(msg, onComplete);//帶回調的同步發送 } } } }
看下異步發送的代碼ActiveMQConnection. asyncSendPacket()
/** * send a Packet through the Connection - for internal use only * * @param command * @throws JMSException */ public void asyncSendPacket(Command command) throws JMSException { if (isClosed()) { throw new ConnectionClosedException(); } else { doAsyncSendPacket(command); } } private void doAsyncSendPacket(Command command) throws JMSException { try { this.transport.oneway(command); } catch (IOException e) { throw JMSExceptionSupport.create(e); } }
再看看transport是個什麼東西?在哪裏實例化的?按照之前看源碼的慣例來看,它確定不是一個單純的對象。按照以往我看源碼的經驗來看,必定是在建立鏈接的過程當中初始化的。因此咱們定位到代碼
//從connection=connectionFactory.createConnection();這行代碼做爲入口,一直跟蹤ActiveMQConnectionFactory. createActiveMQConnection這個方法中。代碼以下 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { if (brokerURL == null) { throw new ConfigurationException("brokerURL not set."); } ActiveMQConnection connection = null; try { Transport transport = createTransport();//代碼往下看 connection = createActiveMQConnection(transport, factoryStats); connection.setUserName(userName); connection.setPassword(password); //省略後面的代碼 }
//這個方法就是實例化Transport的 1.構建Broker的URL 2.根據這個URL去建立一個連接TransportFactory.connect 默認使用的TCP鏈接 protected Transport createTransport() throws JMSException { try { URI connectBrokerUL = brokerURL; String scheme = brokerURL.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + brokerURL + "]"); } if (scheme.equals("auto")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp")); } else if (scheme.equals("auto+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl")); } else if (scheme.equals("auto+nio")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio")); } else if (scheme.equals("auto+nio+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl")); } return TransportFactory.connect(connectBrokerUL);//裏面的代碼繼續往下看 } catch (Exception e) { throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); } }
TransportFactory. findTransportFactory
//TransportFactory.connect(connectBrokerUL) public static Transport connect(URI location) throws Exception { TransportFactory tf = findTransportFactory(location); return tf.doConnect(location); } //findTransportFactory(location) public static TransportFactory findTransportFactory(URI location) throws IOException { String scheme = location.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + location + "]"); } TransportFactory tf = TRANSPORT_FACTORYS.get(scheme); if (tf == null) { // Try to load if from a META-INF property. try { tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); TRANSPORT_FACTORYS.put(scheme, tf); } catch (Throwable e) { throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); } } return tf; }
調用TransportFactory.doConnect去構建一個鏈接
public Transport doConnect(URI location) throws Exception { try { Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); if( !options.containsKey("wireFormat.host") ) { options.put("wireFormat.host", location.getHost()); } WireFormat wf = createWireFormat(options); Transport transport = createTransport(location, wf); Transport rc = configure(transport, wf, options); //remove auto IntrospectionSupport.extractProperties(options, "auto."); if (!options.isEmpty()) { throw new IllegalArgumentException("Invalid connect parameters: " + options); } return rc; } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } }
configure
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { //組裝一個複合的transport,這裏會包裝兩層,一個是IactivityMonitor.另外一個是WireFormatNegotiator transport = compositeConfigure(transport, wf, options); transport = new MutexTransport(transport);//再作一層包裝,MutexTransport transport = new ResponseCorrelator(transport);//包裝ResponseCorrelator return transport; }
到目前爲止,這個transport實際上就是一個調用鏈了,他的鏈結構爲
ResponseCorrelator(MutexT ransport(WireFormatNegotiator(IactivityMonitor(T cpT ransport()))
每一層包裝表示什麼意思呢?
ResponseCorrelator 用於實現異步請求。
MutexT ransport 實現寫鎖,表示同一時間只容許發送一個請求
WireFormatNegotiator 實現了客戶端鏈接broker的時候先發送數據解析相關的協議信息,好比解析版本號,是否
使用緩存等
InactivityMonitor 用於實現鏈接成功成功後的心跳檢查機制,客戶端每10s發送一次心跳信息。服務端每30s讀取
一次心跳信息。
同步發送和異步發送的區別
public Object request(Object command, int timeout) throws IOException { FutureResponse response = asyncRequest(command, null); return response.getResult(timeout); // 從future方法阻塞等待返回 }
正常狀況下,非持久化消息是存儲在內存中的,持久化消息是存儲在文件中的。可以存儲的最大消息數據在
${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage節點
SystemUsage配置設置了一些系統內存和硬盤容量
<systemUsage> <systemUsage> <memoryUsage> //該子標記設置整個ActiveMQ節點的「可用內存限制」。這個值不能超過ActiveMQ自己設置的最大內存大小。其中的 percentOfJvmHeap屬性表示百分比。佔用70%的堆內存 <memoryUsage percentOfJvmHeap="70" /> </memoryUsage> <storeUsage> //該標記設置整個ActiveMQ節點,用於存儲「持久化消息」的「可用磁盤空間」。該子標記的limit屬性必需要進行設置 <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> //一旦ActiveMQ服務節點存儲的消息達到了memoryUsage的限制,非持久化消息就會被轉儲到 temp store區域,雖然 咱們說過非持久化消息不進行持久化存儲,可是ActiveMQ爲了防止「數據洪峯」出現時非持久化消息大量堆積導致內存耗 盡的狀況出現,仍是會將非持久化消息寫入到磁盤的臨時區域——temp store。這個子標記就是爲了設置這個temp store區域的「可用磁盤空間限制」 <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage>
從上面的配置咱們須要get到一個結論,當非持久化消息堆積到必定程度的時候,也就是內存超過指定的設置閥值時,ActiveMQ會將內存中的非持久化消息寫入到臨時文件,以便騰出內存。可是它和持久化消息的區別是,重啓以後,持久化消息會從文件中恢復,非持久化的臨時文件會直接刪除
消息的持久化策略分析
消息持久性對於可靠消息傳遞來講是一種比較好的方法,即時發送者和接受者不是同時在線或者消息中心在發送者發送消息後宕機了,在消息中心重啓後仍然能夠將消息發送出去。消息持久性的原理很簡單,就是在發送消息出去後,消息中心首先將消息存儲在本地文件、內存或者遠程數據庫,而後把消息發送給接受者,發送成功後再把消息從存儲中刪除,失敗則繼續嘗試。接下來咱們來了解一下消息在broker上的持久化存儲實現方式
持久化存儲支持類型
ActiveMQ支持多種不一樣的持久化方式,主要有如下幾種,不過,不管使用哪一種持久化方式,消息的存儲邏輯都是一致的。
Ø KahaDB存儲(默認存儲方式)
Ø JDBC存儲
Ø Memory存儲
Ø LevelDB存儲
Ø JDBC With ActiveMQ Journal
KahaDB存儲
KahaDB是目前默認的存儲方式,可用於任何場景,提升了性能和恢復能力。消息存儲使用一個事務日誌和僅僅用一個索引文件來存儲它全部的地址。
KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化。在Kaha中,數據被追加到data logs中。當再也不須要log文件中的數據的時候,log文件會被丟棄。
配置方式
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
KahaDB的存儲原理
在data/kahadb這個目錄下,會生成四個文件
Ø db.data 它是消息的索引文件,本質上是B-Tree(B樹),使用B-Tree做爲索引指向db-.log裏面存儲的消息
Ø db.redo 用來進行消息恢復
Ø db-.log 存儲消息內容。新的數據以APPEND的方式追加到日誌文件末尾。屬於順序寫入,所以消息存儲是比較
快的。默認是32M,達到閥值會自動遞增
Ø lock文件 鎖,表示當前得到kahadb讀寫權限的broker
JDBC存儲
使用JDBC持久化方式,數據庫會建立3個表:activemq_msgs,activemq_acks和activemq_lock。
ACTIVEMQ_MSGS 消息表,queue和topic都存在這個表中
ACTIVEMQ_ACKS 存儲持久訂閱的信息和最後一個持久訂閱接收的消息ID
ACTIVEMQ_LOCKS 鎖表,用來確保某一時刻,只能有一個ActiveMQ broker實例來訪問數據庫
JDBC存儲配置
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="# MySQL-DS " createTablesOnStartup="true" /> </persistenceAdapter>
dataSource指定持久化數據庫的bean,createT ablesOnStartup是否在啓動的時候建立數據表,默認值是true,這
樣每次啓動都會去建立數據表了,通常是第一次啓動的時候設置爲true,以後改爲false
Mysql持久化Bean配置
<bean id="Mysql-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.11.156:3306/activemq? relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> </bean>
LevelDB存儲
LevelDB持久化性能高於KahaDB,雖然目前默認的持久化方式仍然是KahaDB。而且,在ActiveMQ 5.9版本提供
了基於LevelDB和Zookeeper的數據複製方式,用於Master-slave方式的首選數據複製方案。
不過,據ActiveMQ官網對LevelDB的表述:LevelDB官方建議使用以及再也不支持,推薦使用的是KahaDB
<persistenceAdapter> <levelDBdirectory="activemq-data"/> </persistenceAdapter>
Memory 消息存儲
基於內存的消息存儲,內存消息存儲主要是存儲全部的持久化的消息在內存中。persistent=」false」,表示不設置持
久化存儲,直接存儲到內存中
<beans> <broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://localhost:61635"/> </transportConnectors> </broker> </beans>
JDBC Message store with ActiveMQ Journal
這種方式克服了JDBC Store的不足,JDBC每次消息過來,都須要去寫庫和讀庫。
ActiveMQ Journal,使用高速緩存寫入技術,大大提升了性能。
當消費者的消費速度可以及時跟上生產者消息的生產速度時,journal文件可以大大減小須要寫入到DB中的消息。
舉個例子,生產者生產了1000條消息,這1000條消息會保存到journal文件,若是消費者的消費速度很快的狀況
下,在journal文件尚未同步到DB以前,消費者已經消費了90%的以上的消息,那麼這個時候只須要同步剩餘的
10%的消息到DB。
若是消費者的消費速度很慢,這個時候journal文件可使消息以批量方式寫到DB。
Ø 將原來的標籤註釋掉
Ø 添加以下標籤
<persistenceFactory> <journalPersistenceAdapterFactory dataSource="#Mysql-DS" dataDirectory="activemqdata"/> </persistenceFactory>
Ø 在服務端循環發送消息。能夠看到數據是延遲同步到數據庫的
咱們知道有兩種方法能夠接收消息,一種是使用同步阻塞的MessageConsumer#receive方法。另外一種是使用消息監聽器MessageListener。這裏須要注意的是,在同一個session下,這二者不能同時工做,也就是說不能針對不一樣消息採用不一樣的接收方式。不然會拋出異常。
至於爲何這麼作,最大的緣由仍是在事務性會話中,兩種消費模式的事務很差管控
消費流程圖
ActiveMQMessageConsumer.receive消費端同步接收消息的源碼入口
public Message receive() throws JMSException { checkClosed(); checkMessageListener(); //檢查receive和MessageListener是否同時配置在當前的會話中,同步消費不須要設置MessageListener 不然會報錯 sendPullCommand(0); //若是PrefetchSizeSize爲0而且unconsumerMessage爲空,則發起pull命令 MessageDispatch md = dequeue(-1); //從unconsumerMessage出隊列獲取消息 if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); //發送ack給到broker return createActiveMQMessage(md);//獲取消息並返回 }
sendPullCommand
發送pull命令從broker上獲取消息,前提是prefetchSize=0而且unconsumedMessages爲空。
unconsumedMessage表示未消費的消息,這裏面預讀取的消息大小爲prefetchSize的值
protected void sendPullCommand(long timeout) throws JMSException { clearDeliveredList(); if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { MessagePull messagePull = new MessagePull(); messagePull.configure(info); messagePull.setTimeout(timeout); session.asyncSendPacket(messagePull); //向服務端異步發送messagePull指令 } }
clearDeliveredList
在上面的sendPullCommand方法中,會先調用clearDeliveredList方法,主要用來清理已經分發的消息鏈表
deliveredMessages
deliveredMessages,存儲分發給消費者但還未應答的消息鏈表
Ø 若是session是事務的,則會遍歷deliveredMessage中的消息放入到previouslyDeliveredMessage中來作重發
Ø 若是session是非事務的,根據ACK的模式來選擇不一樣的應答操做
// async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again private void clearDeliveredList() { if (clearDeliveredList) { synchronized (deliveredMessages) { if (clearDeliveredList) { if (!deliveredMessages.isEmpty()) { if (session.isTransacted()) { if (previouslyDeliveredMessages == null) { previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId()); } for (MessageDispatch delivered : deliveredMessages) { previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); } LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size()); } else { if (session.isClientAcknowledge()) { LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); // allow redelivery if (!this.info.isBrowser()) { for (MessageDispatch md: deliveredMessages) { this.session.connection.rollbackDuplicate(this, md.getMessage()); } } } LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); deliveredMessages.clear(); pendingAck = null; } } clearDeliveredList = false; } } } }
dequeue
從unconsumedMessage中取出一個消息,在建立一個消費者時,就會爲這個消費者建立一個未消費的消息道,
這個通道分爲兩種,一種是簡單優先級隊列分發通道SimplePriorityMessageDispatchChannel ;另外一種是先進先
出的分發通道FifoMessageDispatchChannel.
至於爲何要存在這樣一個消息分發通道,你們能夠想象一下,若是消費者每次去消費完一個消息之後再broker拿一個消息,效率是比較低的。因此經過這樣的設計能夠容許session可以一次性將多條消息分發給一個消費者。
默認狀況下對於queue來講,prefetchSize的值是1000
beforeMessageIsConsumed
這裏面主要是作消息消費以前的一些準備工做,若是ACK類型不是DUPS_OK_ACKNOWLEDGE或者隊列模式(簡單來講就是除了T opic和DupAck這兩種狀況),全部的消息先放到deliveredMessages鏈表的開頭。而且若是當前是事務類型的會話,則判斷transactedIndividualAck,若是爲true,表示單條消息直接返回ack。
不然,調用ackLater,批量應答, client端在消費消息後暫且不發送ACK,而是把它緩存下來(pendingACK),等到這些消息的條數達到必定閥值時,只須要經過一個ACK指令把它們所有確認;這比對每條消息都逐個確認,在性能上要提升不少
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { md.setDeliverySequenceId(session.getNextDeliveryId()); lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); if (!isAutoAcknowledgeBatch()) { synchronized(deliveredMessages) { deliveredMessages.addFirst(md); } if (session.getTransacted()) { if (transactedIndividualAck) { immediateIndividualTransactedAck(md); } else { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } } }
afterMessageIsConsumed
這個方法的主要做用是執行應答操做,這裏面作如下幾個操做
Ø 若是消息過時,則返回消息過時的ack
Ø 若是是事務類型的會話,則不作任何處理
Ø 若是是AUTOACK或者(DUPS_OK_ACK且是隊列),而且是優化ack操做,則走批量確認ack
Ø 若是是DUPS_OK_ACK,則走ackLater邏輯
Ø 若是是CLIENT_ACK,則執行ackLater
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { if (unconsumedMessages.isClosed()) { return; } if (messageExpired) { acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); stats.getExpiredMessageCount().increment(); } else { stats.onMessage(); if (session.getTransacted()) { // Do nothing. } else if (isAutoAcknowledgeEach()) { if (deliveryingAcknowledgements.compareAndSet(false, true)) { synchronized (deliveredMessages) { if (!deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++; // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack); optimizeAckTimestamp = System.currentTimeMillis(); } // AMQ-3956 - as further optimization send // ack for expired msgs when there are any. // This resets the deliveredCounter to 0 so that // we won't sent standard acks with every msg just // because the deliveredCounter just below // 0.5 * prefetch as used in ackLater() if (pendingAck != null && deliveredCounter > 0) { session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } } } else { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack!=null) { deliveredMessages.clear(); session.sendAck(ack); } } } } deliveryingAcknowledgements.set(false); } } else if (isAutoAcknowledgeBatch()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { boolean messageUnackedByConsumer = false; synchronized (deliveredMessages) { messageUnackedByConsumer = deliveredMessages.contains(md); } if (messageUnackedByConsumer) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } else { throw new IllegalStateException("Invalid session state."); } } }
ActiveMQ 採用消息推送方式,因此最適合的場景是默認消息均可在短期內被消費。數據量越大,查找和消費消息就越慢,消息積壓程度與消息速度成反比。
缺點
1.吞吐量低。因爲 ActiveMQ 須要創建索引,致使吞吐量降低。這是沒法克服的缺點,只要使用徹底符合 JMS 規範的消息中間件,就要接受這個級別的TPS。
2.無分片功能。這是一個功能缺失,JMS 並無規定消息中間件的集羣、分片機制。而因爲 ActiveMQ 是偉企業級開發設計的消息中間件,初衷並非爲了處理海量消息和高併發請求。若是一臺服務器不能承受更多消息,則須要橫向拆分。ActiveMQ 官方不提供分片機制,須要本身實現。
適用場景
對 TPS 要求比較低的系統,可使用 ActiveMQ 來實現,一方面比較簡單,可以快速上手開發,另外一方面可控性也比較好,還有比較好的監控機制和界面
不適用的場景
消息量巨大的場景。ActiveMQ 不支持消息自動分片機制,若是消息量巨大,致使一臺服務器不能處理所有消息,就須要本身開發消息分片功能。