持久化消息和非持久化消息的發送策略java
ConnectionFactory connectionFactory= new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?jms.useAsyncSend=true"); ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true); ((ActiveMQConnection)connection).setUseAsyncSend(true);
ProducerWindowSize的含義apache
消息發送的源碼分析緩存
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { checkClosed(); //檢查session的狀態,若是session以關閉則拋異常 if (destination == null) { if (info.getDestination() == null) { throw new UnsupportedOperationException("A destination must be specified."); } throw new InvalidDestinationException("Don't understand null destinations"); } ActiveMQDestination dest; if (destination.equals(info.getDestination())) {//檢查destination的類型,若是符合要求,就轉變爲 ActiveMQDestination dest = (ActiveMQDestination) destination; } else if (info.getDestination() == null) { dest = ActiveMQDestination.transform(destination); } else { throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); } if (dest == null) { throw new JMSException("No destination specified"); } if (transformer != null) { Message transformedMessage = transformer.producerTransform(session, this, message); if (transformedMessage != null) { message = transformedMessage; } } if (producerWindow != null) {//若是發送窗口大小不爲空,則判斷髮送窗口的大小決定是否阻塞 try { producerWindow.waitForSpace(); } catch (InterruptedException e) { throw new JMSException("Send aborted due to thread interrupt."); } } //發送消息到broker的topic this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete); stats.onMessage(); }
ActiveMQSession的send方法安全
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { checkClosed(); if (destination.isTemporary() && connection.isDeleted(destination)) { throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); } synchronized (sendMutex) { //互斥鎖,若是一個session的多個producer發送消息到這裏,會保證消息發送的有序性 // tell the Broker we are about to start a new transaction doStartTransaction();//告訴broker開始一個新事務,只有事務型會話中才會開啓 TransactionId txid = transactionContext.getTransactionId();//從事務上下文中獲取事務id long sequenceNumber = producer.getMessageSequence(); //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 message.setJMSDeliveryMode(deliveryMode);//在JMS協議頭中設置是否持久化標識 long expiration = 0L;//計算消息過時時間 if (!producer.getDisableMessageTimestamp()) { long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } message.setJMSExpiration(expiration);//設置消息過時時間 message.setJMSPriority(priority);//設置消息的優先級 message.setJMSRedelivered(false);//設置消息爲非重發 //將不一樣的消息格式統一轉化爲ActiveMQMessage // transform to our own message format here ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); msg.setDestination(destination);//設置目的地 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));//生成並設置消息id // Set the message id. if (msg != message) {//若是消息是通過轉化的,則更新原來的消息id和目的地 message.setJMSMessageID(msg.getMessageId().toString()); // Make sure the JMS destination is set on the foreign messages too. message.setJMSDestination(destination); } //clear the brokerPath in case we are re-sending this message msg.setBrokerPath(null); msg.setTransactionId(txid); if (connection.isCopyMessageOnSend()) { msg = (ActiveMQMessage)msg.copy(); } msg.setConnection(connection); msg.onSend();//把消息屬性和消息體都設置爲只讀,防止被修改 msg.setProducerId(msg.getMessageId().getProducerId()); if (LOG.isTraceEnabled()) { LOG.trace(getSessionId() + " sending message: " + msg); } //若是onComplete沒有設置,且發送超時時間小於0,且消息不須要反饋,且鏈接器不是同步發送模式,且消息非持久化或者鏈接器是異步發送模式 //或者存在事務id的狀況下,走異步發送,不然走同步發送 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { this.connection.asyncSendPacket(msg); if (producerWindow != null) { // Since we defer lots of the marshaling till we hit the // wire, this might not // provide and accurate size. We may change over to doing // more aggressive marshaling, // to get more accurate sizes.. this is more important once // users start using producer window // flow control. int size = msg.getSize(); producerWindow.increaseUsage(size); } } else { if (sendTimeout > 0 && onComplete==null) { this.connection.syncSendPacket(msg,sendTimeout); }else { this.connection.syncSendPacket(msg, onComplete); } } } }
ActiveMQConnection. doAsyncSendPacketsession
public void asyncSendPacket(Command command) throws JMSException { if (isClosed()) { throw new ConnectionClosedException(); } else { doAsyncSendPacket(command); } } private void doAsyncSendPacket(Command command) throws JMSException { try { this.transport.oneway(command); } catch (IOException e) { throw JMSExceptionSupport.create(e); } }
transport的實例化過程異步
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { if (brokerURL == null) { throw new ConfigurationException("brokerURL not set."); } ActiveMQConnection connection = null; try { Transport transport = createTransport(); connection = createActiveMQConnection(transport, factoryStats); connection.setUserName(userName); connection.setPassword(password); configureConnection(connection); transport.start(); if (clientID != null) { connection.setDefaultClientID(clientID); } return connection; } catch (JMSException e) { // Clean up! try { connection.close(); } catch (Throwable ignore) { } throw e; } catch (Exception e) { // Clean up! try { connection.close(); } catch (Throwable ignore) { } throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); } }
createTransportsocket
protected Transport createTransport() throws JMSException { try { URI connectBrokerUL = brokerURL; String scheme = brokerURL.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + brokerURL + "]"); } if (scheme.equals("auto")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp")); } else if (scheme.equals("auto+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl")); } else if (scheme.equals("auto+nio")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio")); } else if (scheme.equals("auto+nio+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl")); } return TransportFactory.connect(connectBrokerUL); } catch (Exception e) { throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); } }
TransportFactory.connectasync
public static Transport connect(URI location) throws Exception { TransportFactory tf = findTransportFactory(location); return tf.doConnect(location); }
TransportFactory. findTransportFactorytcp
這個地方又有點相似於咱們以前所學過的SPI的思想吧?ide
class=org.apache.activemq.transport.tcp.TcpTransportFactory
public static TransportFactory findTransportFactory(URI location) throws IOException { String scheme = location.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + location + "]"); } TransportFactory tf = TRANSPORT_FACTORYS.get(scheme); if (tf == null) { // Try to load if from a META-INF property. try { tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); TRANSPORT_FACTORYS.put(scheme, tf); } catch (Throwable e) { throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); } } return tf; }
調用TransportFactory.doConnect去構建一個鏈接
public Transport doConnect(URI location) throws Exception { try { Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); if( !options.containsKey("wireFormat.host") ) { options.put("wireFormat.host", location.getHost()); } WireFormat wf = createWireFormat(options); Transport transport = createTransport(location, wf);//建立一個Transport,建立一個socket鏈接 -> 終於找到真相了 Transport rc = configure(transport, wf, options);options);//配置configure,這個裏面是對Transport作鏈路包裝 //remove auto IntrospectionSupport.extractProperties(options, "auto."); if (!options.isEmpty()) { throw new IllegalArgumentException("Invalid connect parameters: " + options); } return rc; } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } }
configure
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { //組裝一個複合的transport,這裏會包裝兩層,一個是IactivityMonitor.另外一個是WireFormatNegotiator transport = compositeConfigure(transport, wf, options); transport = new MutexTransport(transport); //再作一層包裝,MutexTransport transport = new ResponseCorrelator(transport); //包裝ResponseCorrelator return transport; }
到目前爲止,這個transport實際上就是一個調用鏈了,他的鏈結構爲
每一層包裝表示什麼意思呢?
同步發送和異步發送的區別
public Object request(Object command, int timeout) throws IOException { FutureResponse response = asyncRequest(command, null); return response.getResult(timeout); // 從future方法阻塞等待返回 }
持久化消息和非持久化消息的存儲原理
<systemUsage> <systemUsage> <memoryUsage> <!-- 該子標記設置整個ActiveMQ節點的「可用內存限制」。這個值不能超過ActiveMQ自己設置的最大內存大小。其中的 percentOfJvmHeap屬性表示百分比。佔用70%的堆內存--> <memoryUsage percentOfJvmHeap="70"/> </memoryUsage> <storeUsage> <!-- 該標記設置整個ActiveMQ節點,用於存儲「持久化消息」的「可用磁盤空間」。該子標記的limit屬性必需要進行設置--> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <!-- 一旦ActiveMQ服務節點存儲的消息達到了memoryUsage的限制,非持久化消息就會被轉儲到 temp store區域,雖然 咱們說過非持久化消息不進行持久化存儲,可是ActiveMQ爲了防止「數據洪峯」出現時非持久化消息大量堆積導致內存耗 盡的狀況出現,仍是會將非持久化消息寫入到磁盤的臨時區域——temp store。這個子標記就是爲了設置這個temp store區域的「可用磁盤空間限制」--> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage>