對於MQ的瞭解估計也僅限於安裝了,有時候一遇到問題就摸不着頭腦,固然主要是由於功力不夠深啊。ide
固然,MQ堵的狀況基本上都出如今爬蟲跑了兩三天以後,可是因爲老的版本沒有問題,因而跟老的版本進行了個簡單的比對,結果發現原來某個屬性的值不一樣形成的。noAck爲true和false的時候,我簡單觀察了下MQ的狀態。好比:我發兩百萬的數據,noAck爲true時,mq隊列中的數據一會的就發送給了爬蟲;而noAck 爲false時,這些數據在MQ的隊列中至少要存兩小時,能夠想象當數據量過大的時候,就會形成MQ隊列中數據堆積的狀況了。。。。spa
@Overriderabbitmq
public void handleMessage(MessageHandler handler) throws Exception {隊列
if (handler == null) {get
return;it
}io
do {exception
Channel channel = connecter.getChannel();channel
if (channel == null) {next
break;
}
try {
//此處再改回true,爲false的狀況會形成mq消息堆積,可是不影響抓取
boolean noAck = true;
if (!noAck) {
channel.basicQos(100);
}
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(connecter.getQueueName(), noAck, consumer);
for (; runFlag;) {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
log.trace("get message from rabbitmq " );
if (delivery != null) {
handler.onMessage(delivery.getBody(), delivery.getEnvelope().getRoutingKey());
}
if (!noAck) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
delivery = null;
} catch (InterruptedException e) {
log.warn("RabbitMQMessageReceiver Thread " + Thread.currentThread().getId() + " occer exception : ", e);
}
}
} finally {
if (channel != null) {
channel.abort();
}
}
} while (runFlag);
}