ActiveMQ消息的發送原理

持久化消息和非持久化消息的發送策略:消息同步發送和異步發送

  ActiveMQ支持同步、異步兩種發送模式將消息發送到broker上。同步發送過程當中,發送者發送一條消息會阻塞直到broker反饋一個確認消息,表示消息已經被broker處理。這個機制提供了消息的安全性保障,可是因爲是阻塞的操做,會影響到客戶端消息發送的性能。異步發送的過程當中,發送者不須要等待broker提供反饋,因此性能相對較高。可是可能會出現消息丟失的狀況。因此使用異步發送的前提是在某些狀況下容許出現數據丟失的狀況。java

  默認狀況下,非持久化消息是異步發送的,持久化消息而且是在非事務模式下是同步發送的。可是在開啓事務的狀況下,消息都是異步發送。因爲異步發送的效率會比同步發送性能更高。因此在發送持久化消息的時候,儘可能去開啓事務會話。除了持久化消息和非持久化消息的同步和異步特性之外,咱們還能夠經過如下幾種方式來設置異步發送:apache

1.ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?jms.useAsyncSend=true");
2.((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
3.((ActiveMQConnection)connection).setUseAsyncSend(true);

消息發送的源碼:

  以producer.send爲入口 進入的是ActiveMQSession 實現:緩存

public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
        checkClosed(); //檢查session的狀態,若是session關閉則拋異常
        if (destination == null) {
            if (info.getDestination() == null) {
                throw new UnsupportedOperationException("A destination must be specified.");
            }
            throw new InvalidDestinationException("Don't understand null destinations");
        }
        //檢查destination的類型,若是符合要求,就轉變爲ActiveMQDestination
        ActiveMQDestination dest;
        if (destination.equals(info.getDestination())) {
            dest = (ActiveMQDestination)destination;
        } else if (info.getDestination() == null) {
            dest = ActiveMQDestination.transform(destination);
        } else {
            throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
        }
        if (dest == null) {
            throw new JMSException("No destination specified");
        }

        if (transformer != null) {
            Message transformedMessage = transformer.producerTransform(session, this, message);
            if (transformedMessage != null) {
                message = transformedMessage;
            }
        }
        //若是發送窗口大小不爲空,則判斷髮送窗口的大小決定是否阻塞
        if (producerWindow != null) {
            try {
                producerWindow.waitForSpace();
            } catch (InterruptedException e) {
                throw new JMSException("Send aborted due to thread interrupt.");
            }
        }
        //發送消息到broker的topic
        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);

        stats.onMessage();
    }

  ActiveMQSession的send方法,this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete):安全

protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {

        checkClosed();
        if (destination.isTemporary() && connection.isDeleted(destination)) {
            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
        }
        //互斥鎖,若是一個session的多個producer發送消息到這裏,會保證消息發送的有序性
        synchronized (sendMutex) {
            // tell the Broker we are about to start a new transaction
            doStartTransaction();//告訴broker開始一個新事務,只有事務型會話中才會開啓
            TransactionId txid = transactionContext.getTransactionId();//從事務上下文中獲取事務id
            long sequenceNumber = producer.getMessageSequence();

            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
            message.setJMSDeliveryMode(deliveryMode); //在JMS協議頭中設置是否持久化標識
            long expiration = 0L;//計算消息過時時間
            if (!producer.getDisableMessageTimestamp()) {
                long timeStamp = System.currentTimeMillis();
                message.setJMSTimestamp(timeStamp);
                if (timeToLive > 0) {
                    expiration = timeToLive + timeStamp;
                }
            }
            message.setJMSExpiration(expiration);//設置消息過時時間
            message.setJMSPriority(priority);//設置消息的優先級
            message.setJMSRedelivered(false);;//設置消息爲非重發

            // transform to our own message format here
            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
            msg.setDestination(destination);
            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));

            // Set the message id.
            if (msg != message) {//若是消息是通過轉化的,則更新原來的消息id和目的地
                message.setJMSMessageID(msg.getMessageId().toString());
                // Make sure the JMS destination is set on the foreign messages too.
                message.setJMSDestination(destination);
            }
            //clear the brokerPath in case we are re-sending this message
            msg.setBrokerPath(null);

            msg.setTransactionId(txid);
            if (connection.isCopyMessageOnSend()) {
                msg = (ActiveMQMessage)msg.copy();
            }
            msg.setConnection(connection);
            msg.onSend();//把消息屬性和消息體都設置爲只讀,防止被修改
            msg.setProducerId(msg.getMessageId().getProducerId());
            if (LOG.isTraceEnabled()) {
                LOG.trace(getSessionId() + " sending message: " + msg);
            }
            //若是onComplete沒有設置(這裏傳進來就是null),且發送超時時間小於0,且消息不須要反饋,且鏈接器不是同步發送模式,且消息非持久化或者鏈接器是異步發送模式
            //或者存在事務id的狀況下,走異步發送,不然走同步發送
            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
                this.connection.asyncSendPacket(msg);
                if (producerWindow != null) {
                    int size = msg.getSize();//異步發送的狀況下,須要設置producerWindow的大小
                    producerWindow.increaseUsage(size);
                }
            } else {
                if (sendTimeout > 0 && onComplete==null) {
                    this.connection.syncSendPacket(msg,sendTimeout);//帶超時時間的同步發送
                }else {
                    this.connection.syncSendPacket(msg, onComplete);//帶回調的同步發送
                }
            }

        }
    }

  咱們從上面的代碼能夠看到,在執行發送操做以前須要把消息作一個轉化,而且將咱們設置的一些屬性注入導指定的屬性中,咱們先來看看異步發送,會發現異步發送的時候涉及到producerWindowSize的大小:session

ProducerWindowSize的含義異步

  producer每發送一個消息,統計一下發送的字節數,當字節數達到ProducerWindowSize值時,須要等待broker的確認,才能繼續發送。async

  主要用來約束在異步發送時producer端容許積壓的(還沒有ACK)的消息的大小,且只對異步發送有意義。每次發送消息以後,都將會致使memoryUsage大小增長(+message.size),當broker返回producerAck時,memoryUsage尺寸減小(producerAck.size,此size表示先前發送消息的大小)。tcp

能夠經過以下2種方式設置:
Ø 在brokerUrl中設置: "tcp://localhost:61616?jms.producerWindowSize=1048576",這種設置將會對全部的producer生效。
Ø 在destinationUri中設置: "myQueue?producer.windowSize=1048576",此參數只會對使用此Destination實例的producer生效,將會覆蓋brokerUrl中的producerWindowSize值。
注意:此值越大,意味着消耗Client端的內存就越大。性能

  接下去咱們進入異步發送流程,看看消息是怎麼異步發送的this.connection.asyncSendPacket(msg):ui

private void doAsyncSendPacket(Command command) throws JMSException {
        try {
            this.transport.oneway(command);
        } catch (IOException e) {
            throw JMSExceptionSupport.create(e);
        }
    }

  這裏的 Command 其實就是以前一步所轉化的message ,而且通過一系列的屬性注入。由於ActiveMQMessage 繼承了 baseCommand ,該類實現了 Command 。因此能夠轉化,而後咱們發現 oneway 方法又不少的實現,都是基於 transport ,那麼咱們就須要來看看這個 transport 是什麼。這裏咱們把代碼往前翻並無發現他的初始化,按照咱們以往的思路,這裏就會在初始化鏈接的時候進行初始化該對象:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.254.135:61616");
Connection connection= connectionFactory.createConnection();

  這裏進入 ActiveMQConnectionFactory 的 createConnection方法會來到:

protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
        if (brokerURL == null) {
            throw new ConfigurationException("brokerURL not set.");
        }
        ActiveMQConnection connection = null;
        try {// 果真發現了這個東東的初始化
            Transport transport = createTransport();
            // 建立鏈接
            connection = createActiveMQConnection(transport, factoryStats);
            // 設置用戶密碼
            connection.setUserName(userName);
            connection.setPassword(password);
            // 對鏈接作包裝
            configureConnection(connection);
            // 啓動一個後臺傳輸線程
            transport.start();
            // 設置客戶端消費的id
            if (clientID != null) {
                connection.setDefaultClientID(clientID);
            }

            return connection;
        } ......
   }

  這裏咱們發現了 Transport transport = createTransport(); 這就是他的初始化:咱們能夠發現

protected Transport createTransport() throws JMSException {
        try {
            URI connectBrokerUL = brokerURL;
            String scheme = brokerURL.getScheme();
            if (scheme == null) {
                throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
            }
            if (scheme.equals("auto")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
            } else if (scheme.equals("auto+ssl")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
            } else if (scheme.equals("auto+nio")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
            } else if (scheme.equals("auto+nio+ssl")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
            }

            return TransportFactory.connect(connectBrokerUL);
        } catch (Exception e) {
            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
        }
    }

   這裏有點相似於基於URL驅動的意思,這裏進來先是構建一個 URI ,根據URL去建立一個鏈接TransportFactory.connect,會發現默認使用的是tcp的協議。這裏因爲咱們在建立鏈接的時候就已經指定了tcp因此這裏的判斷都沒用,直接進入建立鏈接TransportFactory.connect(connectBrokerUL):

public static Transport connect(URI location) throws Exception {
        TransportFactory tf = findTransportFactory(location);
        return tf.doConnect(location);
    }

  這裏作鏈接須要建立一個 tf 對象。這就要看看findTransportFactory(location) :

public static TransportFactory findTransportFactory(URI location) throws IOException {
        String scheme = location.getScheme();
        if (scheme == null) {
            throw new IOException("Transport not scheme specified: [" + location + "]");
        }
        TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
        if (tf == null) {
            // Try to load if from a META-INF property.
            try {
                tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
                TRANSPORT_FACTORYS.put(scheme, tf);
            } catch (Throwable e) {
                throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
            }
        }
        return tf;
    }

  不難理解以上的 代碼是根據 scheme經過TRANSPORT_FACTORYS 這個map 來建立的 TransportFactory ,若是獲取不到,就會經過TRANSPORT_FACTORY_FINDER 去獲取一個實例。TRANSPORT_FACTORY_FINDER 這個FINDER是什麼東西呢? 咱們看看他的初始化:

private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");

  咱們經過源碼中指定路徑如下的東西:

 

   這有點相似於 java 中SPI規範的意思。咱們能夠看看 tcp 其中的內容:

class=org.apache.activemq.transport.tcp.TcpTransportFactory

  這裏是鍵值對的方式,上述獲取實例的代碼中其實就是獲取一個 TcpTransportFactory 實例,那麼咱們就知道tf.doConnect(location)  是哪一個實現類作的,就是TcpTransportFactory,可是咱們點開一看並未發現 TcpTransportFactory實現,這就說明該類使用的是父類裏面的方法,這裏就是TransportFactory 類:

public Transport doConnect(URI location) throws Exception {
        try {
            Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
            if( !options.containsKey("wireFormat.host") ) {
                options.put("wireFormat.host", location.getHost());
            }
            WireFormat wf = createWireFormat(options);
            //建立一個Transport 這裏纔是咱們要找的真相
            Transport transport = createTransport(location, wf);
            //配置configure,這個裏面是對Transport作鏈路包裝,思想相似於dubbo的cluster
            Transport rc = configure(transport, wf, options);
            //remove auto
            IntrospectionSupport.extractProperties(options, "auto.");

            if (!options.isEmpty()) {
                throw new IllegalArgumentException("Invalid connect parameters: " + options);
            }
            return rc;
        } catch (URISyntaxException e) {
            throw IOExceptionSupport.create(e);
        }
    }

  咱們進入 createTransport(location, wf) 方法,這裏是使用Tcp子類的實現。會發現裏面建立了一個 Sokect 鏈接 ,這就是準備後來進行發送的Sokect。而後這裏返回的 Transport 就是 TcpTransport .接下去就是對這個 transport 進行包裝 configure(transport, wf, options):

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
//組裝一個複合的transport,這裏會包裝兩層,一個是IactivityMonitor.另外一個是WireFormatNegotiator
        transport = compositeConfigure(transport, wf, options);
        //再作一層包裝,MutexTransport
        transport = new MutexTransport(transport);
        //包裝ResponseCorrelator
        transport = new ResponseCorrelator(transport);
        return transport;
    }

  到目前爲止,這個transport實際上就是一個調用鏈了,他的鏈結構爲ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport()))每一層包裝表示什麼意思呢?

  ResponseCorrelator 用於實現異步請求。
  MutexTransport 實現寫鎖,表示同一時間只容許發送一個請求
  WireFormatNegotiator 實現了客戶端鏈接broker的時候先發送數據解析相關的協議信息,好比解析版本號,是否使用緩存等
  InactivityMonitor 用於實現鏈接成功成功後的心跳檢查機制,客戶端每10s發送一次心跳信息。服務端每30s讀取一次心跳信息。

  經過這層層的分析,咱們回到 ActiveMQConnection 發送消息的doAsyncSendPacket 方法:

private void doAsyncSendPacket(Command command) throws JMSException {
        try {
            this.transport.oneway(command);
        } catch (IOException e) {
            throw JMSExceptionSupport.create(e);
        }
    }

  這裏的 oneway(command)方法會前後經歷上述調用鏈的處理最後調用到 TcpTransport  的oneway(command) ,咱們一步一步來看看都作了些什麼:

  ResponseCorrelator.oneway(command):裏面就設置了兩個屬性

public void oneway(Object o) throws IOException {
        Command command = (Command)o; //對前面的對象作一個強轉,組裝一些信息
        command.setCommandId(sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(false);
        next.oneway(command);
    }

  MutexTransport.oneway(command):

 public void oneway(Object command) throws IOException {
        writeLock.lock();// 經過 ReentrantLock作加鎖
        try {
            next.oneway(command);
        } finally {
            writeLock.unlock();
        }
    }

  WireFormatNegotiator.oneway(command):這個裏面調用了父類的 oneway ,父類是 TransportFilter 類

public void oneway(Object command) throws IOException {
        boolean wasInterrupted = Thread.interrupted();
        try {
            if (readyCountDownLatch.getCount() > 0 && !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
                throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
            }
        } catch (InterruptedException e) {
            InterruptedIOException interruptedIOException = new InterruptedIOException("Interrupted waiting for wire format negotiation");
            interruptedIOException.initCause(e);
            try {
                onException(interruptedIOException);
            } finally {
                Thread.currentThread().interrupt();
                wasInterrupted = false;
            }
            throw interruptedIOException;
        }  finally {
            if (wasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
        super.oneway(command); //裏面沒作什麼事情進入下一個調用鏈
    }

  從WireFormatNegotiator的父類TransportFilter進入下一個調用鏈應該調用的是InactivityMonitor.oneway(command),但是並未發現又該類實現,因此這裏進入InactivityMonitor 的父類AbstractInactivityMonitor:

public void oneway(Object o) throws IOException {
        // To prevent the inactivity monitor from sending a message while we
        // are performing a send we take a read lock. The inactivity monitor
        // sends its Heart-beat commands under a write lock. This means that
        // the MutexTransport is still responsible for synchronizing sends
        sendLock.readLock().lock();//獲取發送讀鎖 鎖定
        inSend.set(true);//設置屬性
        try {
            doOnewaySend(o);//經過這個邏輯進入下一個調用鏈
        } finally {
            commandSent.set(true);
            inSend.set(false);
            sendLock.readLock().unlock();
        }
    }

  在doOnewaySend 裏面的next.oneway(command) 方法最終調用 TcpTransport 的實現:

public void oneway(Object command) throws IOException {
        checkStarted();
        //進行格式化內容 經過Sokct 發送
        wireFormat.marshal(command, dataOut);
        // 流的刷新
        dataOut.flush();
}

  最後經過Sokect進行數據的傳輸。這樣子異步發送的流程就結束了。下面來走一下同步的流程:經過this.connection.syncSendPacket() 進入同步發送流程。

public Response syncSendPacket(Command command, int timeout) throws JMSException {
        if (isClosed()) {
            throw new ConnectionClosedException();
        } else {

            try {// 進行發送,阻塞獲取結果
                Response response = (Response)(timeout > 0
                        ? this.transport.request(command, timeout)
                        : this.transport.request(command));
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse)response;
                    if (er.getException() instanceof JMSException) {
                        throw (JMSException)er.getException();
                    }
                 。。。。。。。。。
                return response;
            } catch (IOException e) {
                throw JMSExceptionSupport.create(e);
            }
        }
    }

  這裏的 transport 跟異步發送過程當中的transport時同樣的,即 ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport())) 一個調用鏈,進入ResponseCorrelator 的實現:

public Object request(Object command, int timeout) throws IOException {
        FutureResponse response = asyncRequest(command, null);
        return response.getResult(timeout);
    }

  從這個方法咱們能夠獲得的信息時,在發送的時候採用的是  asyncRequest 方法,意思是異步請求,可是在下行採用  response.getResult(timeout) 去同步阻塞的方式去獲取結果:

public Response getResult(int timeout) throws IOException {
        final boolean wasInterrupted = Thread.interrupted();
        try {
            Response result = responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
              .........            
    }

  這裏會從 ArrayBlockingQueue 去 阻塞的處理請求。其實這裏的同步發送實質上採用的不阻塞發送,阻塞的去等待broker 的反饋結果。

  最後整理一下這個發送流程圖

相關文章
相關標籤/搜索