本文主要研究一下artemis的SlowConsumerReaperRunnablejava
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerPolicy.javanode
public enum SlowConsumerPolicy { KILL, NOTIFY; public static SlowConsumerPolicy getType(int type) { switch (type) { case 0: return KILL; case 1: return NOTIFY; default: return null; } } }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javagit
private final class SlowConsumerReaperRunnable implements Runnable { private final SlowConsumerPolicy policy; private final float threshold; private final long checkPeriod; private SlowConsumerReaperRunnable(long checkPeriod, float threshold, SlowConsumerPolicy policy) { this.checkPeriod = checkPeriod; this.policy = policy; this.threshold = threshold; } @Override public void run() { float queueRate = getRate(); long queueMessages = getMessageCount(); if (logger.isDebugEnabled()) { logger.debug(getAddress() + ":" + getName() + " has " + queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second."); } if (consumers.size() == 0) { logger.debug("There are no consumers, no need to check slow consumer's rate"); return; } else { float queueThreshold = threshold * consumers.size(); if (queueRate < queueThreshold && queueMessages < queueThreshold) { if (logger.isDebugEnabled()) { logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer."); } return; } } for (ConsumerHolder consumerHolder : consumers) { Consumer consumer = consumerHolder.consumer(); if (consumer instanceof ServerConsumerImpl) { ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; float consumerRate = serverConsumer.getRate(); if (consumerRate < threshold) { RemotingConnection connection = null; ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer(); RemotingService remotingService = server.getRemotingService(); for (RemotingConnection potentialConnection : remotingService.getConnections()) { if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) { connection = potentialConnection; } } serverConsumer.fireSlowConsumer(); if (connection != null) { ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate); if (policy.equals(SlowConsumerPolicy.KILL)) { connection.killMessage(server.getNodeID()); remotingService.removeConnection(connection.getID()); connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())); } else if (policy.equals(SlowConsumerPolicy.NOTIFY)) { TypedProperties props = new TypedProperties(); props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, getConsumerCount()); props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address); props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(connection.getRemoteAddress())); if (connection.getID() != null) { props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(connection.getID().toString())); } props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, serverConsumer.getID()); props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(serverConsumer.getSessionID())); Notification notification = new Notification(null, CoreNotificationType.CONSUMER_SLOW, props); ManagementService managementService = ((PostOfficeImpl) postOffice).getServer().getManagementService(); try { managementService.sendNotification(notification); } catch (Exception e) { ActiveMQServerLogger.LOGGER.failedToSendSlowConsumerNotification(notification, e); } } } } } } } }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.javagithub
public class ServerConsumerImpl implements ServerConsumer, ReadyListener { //...... public void fireSlowConsumer() { if (slowConsumerListener != null) { slowConsumerListener.onSlowConsumer(this); } } //...... }
activemq-artemis-2.11.0/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.javaapache
class SlowConsumerDetection implements SlowConsumerDetectionListener { @Override public void onSlowConsumer(ServerConsumer consumer) { if (consumer.getProtocolData() != null && consumer.getProtocolData() instanceof AMQConsumer) { AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData(); ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(amqConsumer.getOpenwireDestination()); ActiveMQMessage advisoryMessage = new ActiveMQMessage(); try { advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString()); protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId(), null); } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn("Error during method invocation", e); } } } }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.javaide
public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection { //...... public void killMessage(SimpleString nodeID) { if (channelVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) { return; } Channel clientChannel = getChannel(1, -1); DisconnectConsumerWithKillMessage response = new DisconnectConsumerWithKillMessage(nodeID); clientChannel.send(response, -1); } public void fail(final ActiveMQException me, String scaleDownTargetNodeID) { synchronized (failLock) { if (destroyed) { return; } destroyed = true; } if (!(me instanceof ActiveMQRemoteDisconnectException)) { ActiveMQClientLogger.LOGGER.connectionFailureDetected(transportConnection.getRemoteAddress(), me.getMessage(), me.getType()); } try { transportConnection.forceClose(); } catch (Throwable e) { ActiveMQClientLogger.LOGGER.failedForceClose(e); } // Then call the listeners callFailureListeners(me, scaleDownTargetNodeID); callClosingListeners(); internalClose(); for (Channel channel : channels.values()) { channel.returnBlocking(me); } } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.javapost
public class ManagementServiceImpl implements ManagementService { //...... public void sendNotification(final Notification notification) throws Exception { if (logger.isTraceEnabled()) { logger.trace("Sending Notification = " + notification + ", notificationEnabled=" + notificationsEnabled + " messagingServerControl=" + messagingServerControl); } // This needs to be synchronized since we need to ensure notifications are processed in strict sequence synchronized (this) { if (messagingServerControl != null && notificationsEnabled) { // We also need to synchronize on the post office notification lock // otherwise we can get notifications arriving in wrong order / missing // if a notification occurs at same time as sendQueueInfoToQueue is processed synchronized (postOffice.getNotificationLock()) { // First send to any local listeners for (NotificationListener listener : listeners) { try { listener.onNotification(notification); } catch (Exception e) { // Exception thrown from one listener should not stop execution of others ActiveMQServerLogger.LOGGER.errorCallingNotifListener(e); } } // start sending notification *messages* only when server has initialised // Note at backup initialisation we don't want to send notifications either // https://jira.jboss.org/jira/browse/HORNETQ-317 if (messagingServer == null || !messagingServer.isActive()) { if (logger.isDebugEnabled()) { logger.debug("ignoring message " + notification + " as the server is not initialized"); } return; } long messageID = storageManager.generateID(); Message notificationMessage = new CoreMessage(messageID, 512); // Notification messages are always durable so the user can choose whether to add a durable queue to // consume them in notificationMessage.setDurable(true); notificationMessage.setAddress(managementNotificationAddress); if (notification.getProperties() != null) { TypedProperties props = notification.getProperties(); props.forEach(notificationMessage::putObjectProperty); } notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString())); notificationMessage.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis()); if (notification.getUID() != null) { notificationMessage.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID())); } postOffice.route(notificationMessage, false); } } } } //...... }
SlowConsumerReaperRunnable實現了Runnable接口,其run方法會遍歷consumers,對於ServerConsumerImply在其consumerRate小於threshold時執行serverConsumer.fireSlowConsumer();以後對於connection不爲null的根據policy進行不一樣的處理,若爲SlowConsumerPolicy.KILL則執行connection.killMessage(server.getNodeID())、connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())),若爲SlowConsumerPolicy.NOTIFY則構建NotificationType爲CoreNotificationType.CONSUMER_SLOW的notification執行managementService.sendNotification(notification)this