ActiveMQ支持同步、異步兩種發送模式將消息發送到broker上。同步發送過程當中,發送者發送一條消息會阻塞直到broker反饋一個確認消息,表示消息已經被broker處理。這個機制提供了消息的安全性保障,可是因爲是阻塞的操做,會影響到客戶端消息發送的性能。異步發送的過程當中,發送者不須要等待broker提供反饋,因此性能相對較高。可是可能會出現消息丟失的狀況。因此使用異步發送的前提是在某些狀況下容許出現數據丟失的狀況。java
默認狀況下,非持久化消息是異步發送的,持久化消息而且是在非事務模式下是同步發送的。可是在開啓事務的狀況下,消息都是異步發送。因爲異步發送的效率會比同步發送性能更高。因此在發送持久化消息的時候,儘可能去開啓事務會話。除了持久化消息和非持久化消息的同步和異步特性之外,咱們還能夠經過如下幾種方式來設置異步發送:apache
1.ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?jms.useAsyncSend=true"); 2.((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true); 3.((ActiveMQConnection)connection).setUseAsyncSend(true);
以producer.send爲入口 進入的是ActiveMQSession 實現:緩存
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { checkClosed(); //檢查session的狀態,若是session關閉則拋異常 if (destination == null) { if (info.getDestination() == null) { throw new UnsupportedOperationException("A destination must be specified."); } throw new InvalidDestinationException("Don't understand null destinations"); } //檢查destination的類型,若是符合要求,就轉變爲ActiveMQDestination ActiveMQDestination dest; if (destination.equals(info.getDestination())) { dest = (ActiveMQDestination)destination; } else if (info.getDestination() == null) { dest = ActiveMQDestination.transform(destination); } else { throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); } if (dest == null) { throw new JMSException("No destination specified"); } if (transformer != null) { Message transformedMessage = transformer.producerTransform(session, this, message); if (transformedMessage != null) { message = transformedMessage; } } //若是發送窗口大小不爲空,則判斷髮送窗口的大小決定是否阻塞 if (producerWindow != null) { try { producerWindow.waitForSpace(); } catch (InterruptedException e) { throw new JMSException("Send aborted due to thread interrupt."); } } //發送消息到broker的topic this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete); stats.onMessage(); }
ActiveMQSession的send方法,this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete):安全
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { checkClosed(); if (destination.isTemporary() && connection.isDeleted(destination)) { throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); } //互斥鎖,若是一個session的多個producer發送消息到這裏,會保證消息發送的有序性 synchronized (sendMutex) { // tell the Broker we are about to start a new transaction doStartTransaction();//告訴broker開始一個新事務,只有事務型會話中才會開啓 TransactionId txid = transactionContext.getTransactionId();//從事務上下文中獲取事務id long sequenceNumber = producer.getMessageSequence(); //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 message.setJMSDeliveryMode(deliveryMode); //在JMS協議頭中設置是否持久化標識 long expiration = 0L;//計算消息過時時間 if (!producer.getDisableMessageTimestamp()) { long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } message.setJMSExpiration(expiration);//設置消息過時時間 message.setJMSPriority(priority);//設置消息的優先級 message.setJMSRedelivered(false);;//設置消息爲非重發 // transform to our own message format here ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); msg.setDestination(destination); msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); // Set the message id. if (msg != message) {//若是消息是通過轉化的,則更新原來的消息id和目的地 message.setJMSMessageID(msg.getMessageId().toString()); // Make sure the JMS destination is set on the foreign messages too. message.setJMSDestination(destination); } //clear the brokerPath in case we are re-sending this message msg.setBrokerPath(null); msg.setTransactionId(txid); if (connection.isCopyMessageOnSend()) { msg = (ActiveMQMessage)msg.copy(); } msg.setConnection(connection); msg.onSend();//把消息屬性和消息體都設置爲只讀,防止被修改 msg.setProducerId(msg.getMessageId().getProducerId()); if (LOG.isTraceEnabled()) { LOG.trace(getSessionId() + " sending message: " + msg); } //若是onComplete沒有設置(這裏傳進來就是null),且發送超時時間小於0,且消息不須要反饋,且鏈接器不是同步發送模式,且消息非持久化或者鏈接器是異步發送模式 //或者存在事務id的狀況下,走異步發送,不然走同步發送 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { this.connection.asyncSendPacket(msg); if (producerWindow != null) { int size = msg.getSize();//異步發送的狀況下,須要設置producerWindow的大小 producerWindow.increaseUsage(size); } } else { if (sendTimeout > 0 && onComplete==null) { this.connection.syncSendPacket(msg,sendTimeout);//帶超時時間的同步發送 }else { this.connection.syncSendPacket(msg, onComplete);//帶回調的同步發送 } } } }
咱們從上面的代碼能夠看到,在執行發送操做以前須要把消息作一個轉化,而且將咱們設置的一些屬性注入導指定的屬性中,咱們先來看看異步發送,會發現異步發送的時候涉及到producerWindowSize的大小:session
ProducerWindowSize的含義異步
producer每發送一個消息,統計一下發送的字節數,當字節數達到ProducerWindowSize值時,須要等待broker的確認,才能繼續發送。async
主要用來約束在異步發送時producer端容許積壓的(還沒有ACK)的消息的大小,且只對異步發送有意義。每次發送消息以後,都將會致使memoryUsage大小增長(+message.size),當broker返回producerAck時,memoryUsage尺寸減小(producerAck.size,此size表示先前發送消息的大小)。tcp
能夠經過以下2種方式設置:
Ø 在brokerUrl中設置: "tcp://localhost:61616?jms.producerWindowSize=1048576",這種設置將會對全部的producer生效。
Ø 在destinationUri中設置: "myQueue?producer.windowSize=1048576",此參數只會對使用此Destination實例的producer生效,將會覆蓋brokerUrl中的producerWindowSize值。
注意:此值越大,意味着消耗Client端的內存就越大。性能
接下去咱們進入異步發送流程,看看消息是怎麼異步發送的this.connection.asyncSendPacket(msg):ui
private void doAsyncSendPacket(Command command) throws JMSException { try { this.transport.oneway(command); } catch (IOException e) { throw JMSExceptionSupport.create(e); } }
這裏的 Command 其實就是以前一步所轉化的message ,而且通過一系列的屬性注入。由於ActiveMQMessage 繼承了 baseCommand ,該類實現了 Command 。因此能夠轉化,而後咱們發現 oneway 方法又不少的實現,都是基於 transport ,那麼咱們就須要來看看這個 transport 是什麼。這裏咱們把代碼往前翻並無發現他的初始化,按照咱們以往的思路,這裏就會在初始化鏈接的時候進行初始化該對象:
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.254.135:61616"); Connection connection= connectionFactory.createConnection();
這裏進入 ActiveMQConnectionFactory 的 createConnection方法會來到:
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { if (brokerURL == null) { throw new ConfigurationException("brokerURL not set."); } ActiveMQConnection connection = null; try {// 果真發現了這個東東的初始化 Transport transport = createTransport(); // 建立鏈接 connection = createActiveMQConnection(transport, factoryStats); // 設置用戶密碼 connection.setUserName(userName); connection.setPassword(password); // 對鏈接作包裝 configureConnection(connection); // 啓動一個後臺傳輸線程 transport.start(); // 設置客戶端消費的id if (clientID != null) { connection.setDefaultClientID(clientID); } return connection; } ...... }
這裏咱們發現了 Transport transport = createTransport(); 這就是他的初始化:咱們能夠發現
protected Transport createTransport() throws JMSException { try { URI connectBrokerUL = brokerURL; String scheme = brokerURL.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + brokerURL + "]"); } if (scheme.equals("auto")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp")); } else if (scheme.equals("auto+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl")); } else if (scheme.equals("auto+nio")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio")); } else if (scheme.equals("auto+nio+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl")); } return TransportFactory.connect(connectBrokerUL); } catch (Exception e) { throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); } }
這裏有點相似於基於URL驅動的意思,這裏進來先是構建一個 URI ,根據URL去建立一個鏈接TransportFactory.connect,會發現默認使用的是tcp的協議。這裏因爲咱們在建立鏈接的時候就已經指定了tcp因此這裏的判斷都沒用,直接進入建立鏈接TransportFactory.connect(connectBrokerUL):
public static Transport connect(URI location) throws Exception { TransportFactory tf = findTransportFactory(location); return tf.doConnect(location); }
這裏作鏈接須要建立一個 tf 對象。這就要看看findTransportFactory(location) :
public static TransportFactory findTransportFactory(URI location) throws IOException { String scheme = location.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + location + "]"); } TransportFactory tf = TRANSPORT_FACTORYS.get(scheme); if (tf == null) { // Try to load if from a META-INF property. try { tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); TRANSPORT_FACTORYS.put(scheme, tf); } catch (Throwable e) { throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); } } return tf; }
不難理解以上的 代碼是根據 scheme經過TRANSPORT_FACTORYS 這個map 來建立的 TransportFactory ,若是獲取不到,就會經過TRANSPORT_FACTORY_FINDER 去獲取一個實例。TRANSPORT_FACTORY_FINDER 這個FINDER是什麼東西呢? 咱們看看他的初始化:
private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
咱們經過源碼中指定路徑如下的東西:
這有點相似於 java 中SPI規範的意思。咱們能夠看看 tcp 其中的內容:
class=org.apache.activemq.transport.tcp.TcpTransportFactory
這裏是鍵值對的方式,上述獲取實例的代碼中其實就是獲取一個 TcpTransportFactory 實例,那麼咱們就知道tf.doConnect(location) 是哪一個實現類作的,就是TcpTransportFactory,可是咱們點開一看並未發現 TcpTransportFactory實現,這就說明該類使用的是父類裏面的方法,這裏就是TransportFactory 類:
public Transport doConnect(URI location) throws Exception { try { Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); if( !options.containsKey("wireFormat.host") ) { options.put("wireFormat.host", location.getHost()); } WireFormat wf = createWireFormat(options); //建立一個Transport 這裏纔是咱們要找的真相 Transport transport = createTransport(location, wf); //配置configure,這個裏面是對Transport作鏈路包裝,思想相似於dubbo的cluster Transport rc = configure(transport, wf, options); //remove auto IntrospectionSupport.extractProperties(options, "auto."); if (!options.isEmpty()) { throw new IllegalArgumentException("Invalid connect parameters: " + options); } return rc; } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } }
咱們進入 createTransport(location, wf) 方法,這裏是使用Tcp子類的實現。會發現裏面建立了一個 Sokect 鏈接 ,這就是準備後來進行發送的Sokect。而後這裏返回的 Transport 就是 TcpTransport .接下去就是對這個 transport 進行包裝 configure(transport, wf, options):
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { //組裝一個複合的transport,這裏會包裝兩層,一個是IactivityMonitor.另外一個是WireFormatNegotiator transport = compositeConfigure(transport, wf, options); //再作一層包裝,MutexTransport transport = new MutexTransport(transport); //包裝ResponseCorrelator transport = new ResponseCorrelator(transport); return transport; }
到目前爲止,這個transport實際上就是一個調用鏈了,他的鏈結構爲ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport()))每一層包裝表示什麼意思呢?
ResponseCorrelator 用於實現異步請求。
MutexTransport 實現寫鎖,表示同一時間只容許發送一個請求
WireFormatNegotiator 實現了客戶端鏈接broker的時候先發送數據解析相關的協議信息,好比解析版本號,是否使用緩存等
InactivityMonitor 用於實現鏈接成功成功後的心跳檢查機制,客戶端每10s發送一次心跳信息。服務端每30s讀取一次心跳信息。
經過這層層的分析,咱們回到 ActiveMQConnection 發送消息的doAsyncSendPacket 方法:
private void doAsyncSendPacket(Command command) throws JMSException { try { this.transport.oneway(command); } catch (IOException e) { throw JMSExceptionSupport.create(e); } }
這裏的 oneway(command)方法會前後經歷上述調用鏈的處理最後調用到 TcpTransport 的oneway(command) ,咱們一步一步來看看都作了些什麼:
ResponseCorrelator.oneway(command):裏面就設置了兩個屬性
public void oneway(Object o) throws IOException { Command command = (Command)o; //對前面的對象作一個強轉,組裝一些信息 command.setCommandId(sequenceGenerator.getNextSequenceId()); command.setResponseRequired(false); next.oneway(command); }
MutexTransport.oneway(command):
public void oneway(Object command) throws IOException { writeLock.lock();// 經過 ReentrantLock作加鎖 try { next.oneway(command); } finally { writeLock.unlock(); } }
WireFormatNegotiator.oneway(command):這個裏面調用了父類的 oneway ,父類是 TransportFilter 類
public void oneway(Object command) throws IOException { boolean wasInterrupted = Thread.interrupted(); try { if (readyCountDownLatch.getCount() > 0 && !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) { throw new IOException("Wire format negotiation timeout: peer did not send his wire format."); } } catch (InterruptedException e) { InterruptedIOException interruptedIOException = new InterruptedIOException("Interrupted waiting for wire format negotiation"); interruptedIOException.initCause(e); try { onException(interruptedIOException); } finally { Thread.currentThread().interrupt(); wasInterrupted = false; } throw interruptedIOException; } finally { if (wasInterrupted) { Thread.currentThread().interrupt(); } } super.oneway(command); //裏面沒作什麼事情進入下一個調用鏈 }
從WireFormatNegotiator的父類TransportFilter進入下一個調用鏈應該調用的是InactivityMonitor.oneway(command),但是並未發現又該類實現,因此這裏進入InactivityMonitor 的父類AbstractInactivityMonitor:
public void oneway(Object o) throws IOException { // To prevent the inactivity monitor from sending a message while we // are performing a send we take a read lock. The inactivity monitor // sends its Heart-beat commands under a write lock. This means that // the MutexTransport is still responsible for synchronizing sends sendLock.readLock().lock();//獲取發送讀鎖 鎖定 inSend.set(true);//設置屬性 try { doOnewaySend(o);//經過這個邏輯進入下一個調用鏈 } finally { commandSent.set(true); inSend.set(false); sendLock.readLock().unlock(); } }
在doOnewaySend 裏面的next.oneway(command) 方法最終調用 TcpTransport 的實現:
public void oneway(Object command) throws IOException { checkStarted(); //進行格式化內容 經過Sokct 發送 wireFormat.marshal(command, dataOut); // 流的刷新 dataOut.flush(); }
最後經過Sokect進行數據的傳輸。這樣子異步發送的流程就結束了。下面來走一下同步的流程:經過this.connection.syncSendPacket() 進入同步發送流程。
public Response syncSendPacket(Command command, int timeout) throws JMSException { if (isClosed()) { throw new ConnectionClosedException(); } else { try {// 進行發送,阻塞獲取結果 Response response = (Response)(timeout > 0 ? this.transport.request(command, timeout) : this.transport.request(command)); if (response.isException()) { ExceptionResponse er = (ExceptionResponse)response; if (er.getException() instanceof JMSException) { throw (JMSException)er.getException(); } 。。。。。。。。。 return response; } catch (IOException e) { throw JMSExceptionSupport.create(e); } } }
這裏的 transport 跟異步發送過程當中的transport時同樣的,即 ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport())) 一個調用鏈,進入ResponseCorrelator 的實現:
public Object request(Object command, int timeout) throws IOException { FutureResponse response = asyncRequest(command, null); return response.getResult(timeout); }
從這個方法咱們能夠獲得的信息時,在發送的時候採用的是 asyncRequest 方法,意思是異步請求,可是在下行採用 response.getResult(timeout) 去同步阻塞的方式去獲取結果:
public Response getResult(int timeout) throws IOException { final boolean wasInterrupted = Thread.interrupted(); try { Response result = responseSlot.poll(timeout, TimeUnit.MILLISECONDS); ......... }
這裏會從 ArrayBlockingQueue 去 阻塞的處理請求。其實這裏的同步發送實質上採用的不阻塞發送,阻塞的去等待broker 的反饋結果。
最後整理一下這個發送流程圖