In ActiveMQ 4.x flow control was implemented using TCP flow control. The underlying network connection of throttled consumers was suspended to enforce flow control limits. This strategy is very efficient but can lead to deadlocks if there are multiple producers and consumers sharing the same connection.java
在ActiveMQ 4.x版本中,流量控制是用TCP流量控制實現的。受節制的消費者底層的網絡鏈接被掛起,以強制進行流量控制限制。這個策略很是高效,可是若是有多個生產者和消費者共享同一個鏈接的時候,可能會致使死鎖。apache
As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection. By 'flow control' we mean that if the broker detects that the memory limit for the destination, or the temp- or file-store limits for the broker, have been exceeded, then the flow of messages can be slowed down. The producer will be either blocked until resources are available or will receive a JMSException: this behaviour is configurable and described in the section below on <systemUsage>.網絡
It's worth noting that the default <systemUsage> settings will cause the producer to block when the memoryLimit or <systemUsage> limits are reached: this blocking behaviour is sometimes misinterpreted as a 'hung producer', when in fact the producer is simply diligently waiting until space is available.less
ActiveMQConnectionFactory connctionFactory = ...connctionFactory.setProducerWindowSize(1024000);
The ProducerWindowSize is the maximum number of bytes of data that a producer will transmit to a broker before waiting for acknowledgment messages from the broker that it has accepted the previously sent messages.async
Alternatively, if you're sending non-persisted messages (which are by default sent async), and want to be informed if the queue or topic's memory limit has been breached, then you can simply configure the connection factory to 'alwaysSyncSend'. While this is going to be slower, it will ensure that your message producer is informed immediately of memory issues.學習
If you like, you can disable flow control for specific JMS queues and topics on the broker by setting the producerFlowControl flag to false on the appropriate destination policy in the Broker configuration - e.g.
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>" producerFlowControl="false"/></policyEntries> </policyMap></destinationPolicy>
see Broker Configuration.
詳情請看 代理設置 部分。
Note that, since the introduction of the new file cursor in ActiveMQ 5.x, non-persisted messages are shunted into the temporary file store to reduce the amount of memory used for non-persistent messaging. As a result, you may find that a queue's memoryLimit is never reached, as the cursor doesn't use very much memory. If you really do want to keep all your non-persistent messages in memory, and stop producers when the limit is reached, you should configure the <vmQueueCursor>.
注意,自從ActiveMQ 5.x中引入新的文件遊標以後,非持久化消息被分流到了臨時文件存儲中,以此來減小非持久化消息傳送使用的內存總量。結果就是,你可能會發現一個隊列的內存限制永遠達不到,由於遊標不須要使用太多的內存。若是你真的想把全部的非持久化消息存放在內存中,並在達到內存限制的時候停掉生產者,你須要配置<vmQueueCursor>。
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> <pendingQueuePolicy> <vmQueueCursor/></pendingQueuePolicy></policyEntry>
The fragment above will ensure that all non-persistent queue messages are kept in memory, with each queue having a limit of 1Mb.
If you are sending a persistent message (so that a response of the OpenWire Message is expected then the broker will send the producer aProducerAck message. This informs the producer that the previous sending window has been processed, so that it can now send another window. Its kinda like consumer acks but in reverse.
若是你發送一條持久化消息(這樣就會有一個對 OpenWire 響應消息的指望),代理將會給生產者發送一個應答消息。該消息會通知生產者其先前的發送窗口已經被處理了,因此它如今能夠發送另外的窗口了。這有點像消費者應答,不過是反向的。
So a nice producer might wait for a producer ack before sending more data, to avoid flooding the broker (and forcing the broker to block the entire connection if a slow consumer occurs). To see how this works in source code, check out the ActiveMQMessageProducer code.
因此,一個友好的生產者能夠再發送更多的數據以前,等待生產者應答,以此來避免對代理的衝擊(而且若是出現了一個比較慢的消費者,強制代理阻塞整個鏈接)。若是你想知道這部分的源代碼是怎麼實現的,能夠看一下 ActiveMQMessageProducer 的代碼。
Though a client can ignore the producer ACKs altogether and the broker should just stall the transport if it has to for slow consumer handling; though this does mean it'll stall the entire connection.
An alternative to the indefinite blocking of the send() operation when no space is free on the broker is to instead configure that an exception to be thrown on the client-side. By configuring the sendFailIfNoSpace property to true, the broker will cause the send() operation to fail with ajavax.jms.ResourceAllocationException, which will propagate to the client. Below is an example of this configuration:
應對代理空間不足,而致使不肯定的阻塞 send()操做的一種替代方案,就是將其配置成客戶端拋出的一個異常。經過將sendFailIfNoSpace屬性設置爲true,代理將會引發send()方法失敗,並拋出javax.jms.ResourceAllocationException異常,傳播到客戶端。下面是一個配置的示例:
<systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage></systemUsage>
The advantage of this property is that the client can catch the javax.jms.ResourceAllocationException, wait a bit and retry the send() operation instead of just hanging indefinitely.
Starting in version 5.3.1 the sendFailIfNoSpaceAfterTimeout property has been added. This property causes the send() operation to fail with an exception on the client-side, but only after waiting the given amount of time. If space on the broker is still not freed after the configured amount of time, only then does the send() operation fail with an exception to the client-side. Below is an example:
從5.3.1版本以後,sendFailIfNoSpaceAfterTimeout 屬性被加了進來。這個屬性一樣致使send()方法失敗,並在客戶端拋出異常,但僅當等待了指定時間以後才觸發。若是在配置的等待時間過去以後,代理上的空間仍然沒有被釋放,僅當這個時候send()方法纔會失敗,而且在客戶端拋出異常。下面是一個示例:
<systemUsage> <systemUsage sendFailIfNoSpaceAfterTimeout="3000"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage></systemUsage>
The timeout is defined in milliseconds so the example above waits for three seconds before failing the send() operation with an exception to the client-side. The advantage of this property is that it will block for the configured amount of time instead of failing immediately or blocking indefinitely. This property offers not only an improvement on the broker-side, but also an improvement for the client so it can catch the exception, wait a bit and retry the send() operation.
A common requirement is to disable flow control so that message dispatching continues until all available disk is used up by pending messages (whether persistent or non persistent messaging is configured). To do this enable Message Cursors.
一個廣泛的需求是使流量控制無效,使得消息分發可以持續,直到全部可用的磁盤被掛起(pending)的消息耗盡(不管是持久化的仍是配置了非持久化的)。要這樣作,你可使用消息遊標(Message Cursors)。
You can also slow down producers via some attributes on the <systemUsage> element. Take a look at the following example:
<systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="64 mb" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb" /> </storeUsage> <tempUsage> <tempUsage limit="10 gb" /> </tempUsage> </systemUsage></systemUsage>
You can set limits of memory for NON_PERSISTENT messages, disk storage for PERSITENT messages and total usage for temporary messages, the broker will use before it slowdown producers. Using the default settings shown above, the broker will block the send() call until some messages are consumed and space becomes available on the broker. The default values are shown above, you will probably need to increase these values for your environment.
你能夠爲非持久化的消息(NON_PERSISTENT messages)設置內存限制,爲持久化消息(PERSISTENT messages)設置磁盤空間,以及爲臨時消息設置總的空間,代理將在減慢生產者以前使用這些空間。使用上述的默認設置,代理將會一直阻塞sen()方法的調用,直至一些消息被消費,而且代理有了可用空間。默認值如上例所述,你可能須要根據你的環境增長這些值。