org.eclipse.paho.client.mqttv3
2.2 GHz Intel Core i7 mac系統java
publish性能,注意請使用單線程的 mqttclinetshell
1萬條 341毫秒
4萬條 1163毫秒
5萬 1450毫秒
10萬條 2700毫秒多線程
多線程的 mqttclinet MQTT(32202): 正在發佈過多的消息 問題eclipse
[15:07:21]: publish failed, message: aaaa 正在進行過多的發佈 (32202) at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:496) at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:132) at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:156) at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1027) at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:399) at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:171) at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:161) at io.communet.ichater.emq.sub.MqttSendMsgEventSubscribe.onEvent(MqttSendMsgEventSubscribe.java:28) at java.lang.reflect.Method.invoke(Native Method) at java.lang.reflect.Method.invoke(Method.java:372) at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:507) at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:501) at org.greenrobot.eventbus.AsyncPoster.run(AsyncPoster.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587) at java.lang.Thread.run(Thread.java:818)
options.setMaxInflight(1000)
增長 actualInFlight
的值;筆者出現這個錯誤是由於使用 EventBus
, 以前使用單獨線程的 Handler
是沒有問題的, 調查發現, 使用 EventBus
是新建線程運行的, 而 Handler
是單獨一個線程.
因此當發送大量消息的時候, EventBus
幾乎是同一個點發出去, 就會形成這個錯誤性能
根據堆棧信息找到報錯地方ui
if (actualInFlight >= this.maxInflight) { //@TRACE 613= sending {0} msgs at max inflight window log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)}); throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT); }
其中 actualInFlight
以下this
// processed until the inflight window has space. if (actualInFlight < this.maxInflight) { // The in flight window is not full so process the // first message in the queue result = (MqttWireMessage)pendingMessages.elementAt(0); pendingMessages.removeElementAt(0); actualInFlight++; //@TRACE 623=+1 actualInFlight={0} log.fine(CLASS_NAME,methodName,"623",new Object[]{new Integer(actualInFlight)}); }
從 pendingMessages
中取出消息時, actualInFlight
加 1, maxInflight
能夠本身設定, 默認值爲 10.spa
public class ClientState { ... volatile private Vector pendingMessages; ... }
在 ClientState
中:線程
public void send(MqttWireMessage message, MqttToken token) throws MqttException { ... if (message instanceof MqttPublish) { synchronized (queueLock) { if (actualInFlight >= this.maxInflight) { //@TRACE 613= sending {0} msgs at max inflight window log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)}); throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT); } MqttMessage innerMessage = ((MqttPublish) message).getMessage(); //@TRACE 628=pending publish key={0} qos={1} message={2} log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message}); switch(innerMessage.getQos()) { case 2: outboundQoS2.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; case 1: outboundQoS1.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; } tokenStore.saveToken(token, message); pendingMessages.addElement(message); queueLock.notifyAll(); } } else { ... } }
能夠看到 pendingMessages
中添加元素的時候並無作 qos
類型的判斷code
private void decrementInFlight() { final String methodName = "decrementInFlight"; synchronized (queueLock) { actualInFlight--; //@TRACE 646=-1 actualInFlight={0} log.fine(CLASS_NAME,methodName,"646",new Object[]{new Integer(actualInFlight)}); if (!checkQuiesceLock()) { queueLock.notifyAll(); } } }
當收到消息反饋時 actualInFlight
減 1.