分佈式消息通訊ActiveMQ原理-發送策略-筆記

持久化消息和非持久化消息的發送策略java

  • 消息同步發送和異步發送
    • ActiveMQ支持同步、異步兩種發送模式將消息發送到broker上。
    • 同步發送過程當中,發送者發送一條消息會阻塞直到broker反饋一個確認消息,表示消息已經被broker處理。
      • 這個機制提供了消息的安全性保障,可是因爲是阻塞的操做,會影響到客戶端消息發送的性能
    • 異步發送的過程當中,發送者不須要等待broker提供反饋,因此性能相對較高。可是可能會出現消息丟失的狀況。
      • 因此使用異步發送的前提是在某些狀況下容許出現數據丟失的狀況。
    • 默認狀況下,非持久化消息是異步發送的,持久化消息而且是在非事務模式下是同步發送的。
    • 可是在開啓事務的狀況下,消息都是異步發送。因爲異步發送的效率會比同步發送性能更高。
    • 因此在發送持久化消息的時候,儘可能去開啓事務會話。
    • 除了持久化消息和非持久化消息的同步和異步特性之外,咱們還能夠經過如下幾種方式來設置異步發送
    • ConnectionFactory connectionFactory=
          new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?jms.useAsyncSend=true");
      ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
      ((ActiveMQConnection)connection).setUseAsyncSend(true);

       

ProducerWindowSize的含義apache

  • producer每發送一個消息,統計一下發送的字節數,當字節數達到ProducerWindowSize值時,須要等待broker的確認,才能繼續發送。
  • 主要用來約束在異步發送時producer端容許積壓的(還沒有ACK)的消息的大小,且只對異步發送有意義。
  • 每次發送消息以後,都將會致使memoryUsage大小增長(+message.size),當broker返回producerAck時,memoryUsage尺寸減小(producerAck.size,此size表示先前發送消息的大小)。
  • 能夠經過以下2種方式設置:
    • 在brokerUrl中設置: "tcp://localhost:61616?jms.producerWindowSize=1048576",這種設置將會對全部的producer生效。
    •  在destinationUri中設置: "test-queue?producer.windowSize=1048576",此參數只會對使用此Destination實例的producer有效,將會覆蓋brokerUrl中的producerWindowSize值。
      • 注意:此值越大,意味着消耗Client端的內存就越大。

消息發送的源碼分析緩存

  • 以producer.send爲入口
public void send(Destination destination, Message message, int deliveryMode, int priority, long
            timeToLive, AsyncCallback onComplete) throws JMSException {
        checkClosed(); //檢查session的狀態,若是session以關閉則拋異常
        if (destination == null) {
            if (info.getDestination() == null) {
                throw new UnsupportedOperationException("A destination must be specified.");
            }
            throw new InvalidDestinationException("Don't understand null destinations");
        }
        ActiveMQDestination dest;
        if (destination.equals(info.getDestination())) {//檢查destination的類型,若是符合要求,就轉變爲
            ActiveMQDestination
                    dest = (ActiveMQDestination) destination;
        } else if (info.getDestination() == null) {
            dest = ActiveMQDestination.transform(destination);
        } else {
            throw new UnsupportedOperationException("This producer can only send messages to: " +
                    this.info.getDestination().getPhysicalName());
        }
        if (dest == null) {
            throw new JMSException("No destination specified");
        }
        if (transformer != null) {
            Message transformedMessage = transformer.producerTransform(session, this, message);
            if (transformedMessage != null) {
                message = transformedMessage;
            }
        }
        if (producerWindow != null) {//若是發送窗口大小不爲空,則判斷髮送窗口的大小決定是否阻塞
            try {
                producerWindow.waitForSpace();
            } catch (InterruptedException e) {
                throw new JMSException("Send aborted due to thread interrupt.");
            }
        }
        //發送消息到broker的topic
        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,
                sendTimeout, onComplete);
        stats.onMessage();
    }

ActiveMQSession的send方法安全

protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {

        checkClosed();
        if (destination.isTemporary() && connection.isDeleted(destination)) {
            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
        }
        synchronized (sendMutex) {
            //互斥鎖,若是一個session的多個producer發送消息到這裏,會保證消息發送的有序性
            // tell the Broker we are about to start a new transaction
            doStartTransaction();//告訴broker開始一個新事務,只有事務型會話中才會開啓
            TransactionId txid = transactionContext.getTransactionId();//從事務上下文中獲取事務id
            long sequenceNumber = producer.getMessageSequence();

            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
            message.setJMSDeliveryMode(deliveryMode);//在JMS協議頭中設置是否持久化標識
            long expiration = 0L;//計算消息過時時間
            if (!producer.getDisableMessageTimestamp()) {
                long timeStamp = System.currentTimeMillis();
                message.setJMSTimestamp(timeStamp);
                if (timeToLive > 0) {
                    expiration = timeToLive + timeStamp;
                }
            }
            message.setJMSExpiration(expiration);//設置消息過時時間
            message.setJMSPriority(priority);//設置消息的優先級
            message.setJMSRedelivered(false);//設置消息爲非重發
             
             //將不一樣的消息格式統一轉化爲ActiveMQMessage
            // transform to our own message format here
            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
            msg.setDestination(destination);//設置目的地
            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));//生成並設置消息id

            // Set the message id.
            if (msg != message) {//若是消息是通過轉化的,則更新原來的消息id和目的地
                message.setJMSMessageID(msg.getMessageId().toString());
                // Make sure the JMS destination is set on the foreign messages too.
                message.setJMSDestination(destination);
            }
            //clear the brokerPath in case we are re-sending this message
            msg.setBrokerPath(null);

            msg.setTransactionId(txid);
            if (connection.isCopyMessageOnSend()) {
                msg = (ActiveMQMessage)msg.copy();
            }
            msg.setConnection(connection);
            msg.onSend();//把消息屬性和消息體都設置爲只讀,防止被修改
            msg.setProducerId(msg.getMessageId().getProducerId());
            if (LOG.isTraceEnabled()) {
                LOG.trace(getSessionId() + " sending message: " + msg);
            }

//若是onComplete沒有設置,且發送超時時間小於0,且消息不須要反饋,且鏈接器不是同步發送模式,且消息非持久化或者鏈接器是異步發送模式
//或者存在事務id的狀況下,走異步發送,不然走同步發送
            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
                this.connection.asyncSendPacket(msg);
                if (producerWindow != null) {
                    // Since we defer lots of the marshaling till we hit the
                    // wire, this might not
                    // provide and accurate size. We may change over to doing
                    // more aggressive marshaling,
                    // to get more accurate sizes.. this is more important once
                    // users start using producer window
                    // flow control.
                    int size = msg.getSize();
                    producerWindow.increaseUsage(size);
                }
            } else {
                if (sendTimeout > 0 && onComplete==null) {
                    this.connection.syncSendPacket(msg,sendTimeout);
                }else {
                    this.connection.syncSendPacket(msg, onComplete);
                }
            }

        }
    }

ActiveMQConnection. doAsyncSendPacketsession

public void asyncSendPacket(Command command) throws JMSException {
        if (isClosed()) {
            throw new ConnectionClosedException();
        } else {
            doAsyncSendPacket(command);
        }
    }

    private void doAsyncSendPacket(Command command) throws JMSException {
        try {
            this.transport.oneway(command);
        } catch (IOException e) {
            throw JMSExceptionSupport.create(e);
        }
    }
  • 這個地方問題來了,this.transport是什麼東西?在哪裏實例化的?按照之前看源碼的慣例來看,它確定不是一個單純的對象。
  • 按照以往咱們看源碼的經驗來看,必定是在建立鏈接的過程當中初始化的。

transport的實例化過程異步

  • 從connection=connectionFactory.createConnection();這行代碼做爲入口,
  • 一直跟蹤到ActiveMQConnectionFactory. createActiveMQConnection這個方法中。
  • 代碼以下:
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
        if (brokerURL == null) {
            throw new ConfigurationException("brokerURL not set.");
        }
        ActiveMQConnection connection = null;
        try {
            Transport transport = createTransport();
            connection = createActiveMQConnection(transport, factoryStats);

            connection.setUserName(userName);
            connection.setPassword(password);

            configureConnection(connection);

            transport.start();

            if (clientID != null) {
                connection.setDefaultClientID(clientID);
            }

            return connection;
        } catch (JMSException e) {
            // Clean up!
            try {
                connection.close();
            } catch (Throwable ignore) {
            }
            throw e;
        } catch (Exception e) {
            // Clean up!
            try {
                connection.close();
            } catch (Throwable ignore) {
            }
            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
        }
    }

createTransportsocket

  • 調用ActiveMQConnectionFactory.createTransport方法,去建立一個transport對象。
    1. 構建一個URI
    2. 根據URL去建立一個鏈接TransportFactory.connect
    3. 默認使用的是tcp的協議
protected Transport createTransport() throws JMSException {
        try {
            URI connectBrokerUL = brokerURL;
            String scheme = brokerURL.getScheme();
            if (scheme == null) {
                throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
            }
            if (scheme.equals("auto")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
            } else if (scheme.equals("auto+ssl")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
            } else if (scheme.equals("auto+nio")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
            } else if (scheme.equals("auto+nio+ssl")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
            }

            return TransportFactory.connect(connectBrokerUL);
        } catch (Exception e) {
            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
        }
    }

TransportFactory.connectasync

public static Transport connect(URI location) throws Exception {
        TransportFactory tf = findTransportFactory(location);
        return tf.doConnect(location);
    }

TransportFactory. findTransportFactorytcp

  1. 從TRANSPORT_FACTORYS這個Map集合中,根據scheme去得到一個TransportFactory指定的實例對象
  2. 若是Map集合中不存在,則經過TRANSPORT_FACTORY_FINDER去找一個而且構建實例

這個地方又有點相似於咱們以前所學過的SPI的思想吧?ide

  • 他會從METAINF/services/org/apache/activemq/transport/ 這個路徑下,
  • 根據URI組裝的scheme去找到匹配的class對象而且實例化,
  • 因此根據tcp爲key去對應的路徑下能夠找到TcpTransportFactory
    • class=org.apache.activemq.transport.tcp.TcpTransportFactory

 

public static TransportFactory findTransportFactory(URI location) throws IOException {
        String scheme = location.getScheme();
        if (scheme == null) {
            throw new IOException("Transport not scheme specified: [" + location + "]");
        }
        TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
        if (tf == null) {
            // Try to load if from a META-INF property.
            try {
                tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
                TRANSPORT_FACTORYS.put(scheme, tf);
            } catch (Throwable e) {
                throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
            }
        }
        return tf;
    }

調用TransportFactory.doConnect去構建一個鏈接

public Transport doConnect(URI location) throws Exception {
        try {
            Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
            if( !options.containsKey("wireFormat.host") ) {
                options.put("wireFormat.host", location.getHost());
            }
            WireFormat wf = createWireFormat(options);
            Transport transport = createTransport(location, wf);//建立一個Transport,建立一個socket鏈接 -> 終於找到真相了
            Transport rc = configure(transport, wf, options);options);//配置configure,這個裏面是對Transport作鏈路包裝
            //remove auto
            IntrospectionSupport.extractProperties(options, "auto.");

            if (!options.isEmpty()) {
                throw new IllegalArgumentException("Invalid connect parameters: " + options);
            }
            return rc;
        } catch (URISyntaxException e) {
            throw IOExceptionSupport.create(e);
        }
    }

configure

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
 //組裝一個複合的transport,這裏會包裝兩層,一個是IactivityMonitor.另外一個是WireFormatNegotiator
 transport = compositeConfigure(transport, wf, options);
 transport = new MutexTransport(transport); //再作一層包裝,MutexTransport
 transport = new ResponseCorrelator(transport); //包裝ResponseCorrelator
 return transport;
}

到目前爲止,這個transport實際上就是一個調用鏈了,他的鏈結構爲

  • ResponseCorrelator(MutexTransport(WireFormatNegotiator(InactivityMonitor(TcpTransport()))

每一層包裝表示什麼意思呢?

  • ResponseCorrelator 用於實現異步請求。
  • MutexTransport 實現寫鎖,表示同一時間只容許發送一個請求
  • WireFormatNegotiator 實現了客戶端鏈接broker的時候先發送數據解析相關的協議信息,好比解析版本號,是否使用緩存等
  • InactivityMonitor 用於實現鏈接成功成功後的心跳檢查機制,客戶端每10s發送一次心跳信息。服務端每30s讀取一次心跳信息。

同步發送和異步發送的區別

  • 在ResponseCorrelator的request方法中,須要經過response.getResult去得到broker的反饋,不然會阻塞
public Object request(Object command, int timeout) throws IOException {
   FutureResponse response = asyncRequest(command, null);
   return response.getResult(timeout); // 從future方法阻塞等待返回
}

持久化消息和非持久化消息的存儲原理

  • 正常狀況下,非持久化消息是存儲在內存中的,持久化消息是存儲在文件中的。
  • 可以存儲的最大消息數據在${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage節點
  • SystemUsage配置設置了一些系統內存和硬盤容量:
<systemUsage>
    <systemUsage>
      <memoryUsage>
        <!-- 該子標記設置整個ActiveMQ節點的「可用內存限制」。這個值不能超過ActiveMQ自己設置的最大內存大小。其中的
        percentOfJvmHeap屬性表示百分比。佔用70%的堆內存-->
        <memoryUsage percentOfJvmHeap="70"/>
      </memoryUsage>
      <storeUsage>
        <!-- 該標記設置整個ActiveMQ節點,用於存儲「持久化消息」的「可用磁盤空間」。該子標記的limit屬性必需要進行設置-->
        <storeUsage limit="100 gb"/>
      </storeUsage>
      <tempUsage>
        <!-- 一旦ActiveMQ服務節點存儲的消息達到了memoryUsage的限制,非持久化消息就會被轉儲到 temp store區域,雖然
        咱們說過非持久化消息不進行持久化存儲,可是ActiveMQ爲了防止「數據洪峯」出現時非持久化消息大量堆積導致內存耗
        盡的狀況出現,仍是會將非持久化消息寫入到磁盤的臨時區域——temp store。這個子標記就是爲了設置這個temp
        store區域的「可用磁盤空間限制」-->
        <tempUsage limit="50 gb"/>
      </tempUsage>
    </systemUsage>
  </systemUsage>
  • 從上面的配置咱們須要get到一個結論,當非持久化消息堆積到必定程度的時候,也就是內存超過指定的設置閥值時,
    • ActiveMQ會將內存中的非持久化消息寫入到臨時文件,以便騰出內存。
    • 可是它和持久化消息的區別是,重啓以後,持久化消息會從文件中恢復,非持久化的臨時文件會直接刪除
相關文章
相關標籤/搜索