Declare&bind queue dynamically with RabbitMQ

/**
 * Copyright © Cisco Systems, Inc. 
 */
package com.cisco.cmse.csra.cluster.rabbitmq;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @author sanlli
 *
 */
@Configuration
public abstract class AbstractRabbitConfiguration {
	
	private static final String Reload_Policy_Exchange = "Reload_Policy_Exchange";

	@Value("${rabbitmq_addresses}")
	private String addresses;
	
	@Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory(addresses);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }
	
	@Bean
	public AmqpAdmin amqpAdmin(){
		return new RabbitAdmin(connectionFactory());
	}

    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        template.setExchange(Reload_Policy_Exchange);
        return template;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new JsonMessageConverter();
    }

    @Bean
    public TopicExchange reloadPolicyExchange() {
		TopicExchange exchange = new TopicExchange(Reload_Policy_Exchange, true, false);
        return exchange;
    }
   
}

/**
 * Copyright © Cisco Systems, Inc. 
 */
package com.cisco.cmse.csra.cluster.rabbitmq;

import java.net.InetAddress;
import java.net.UnknownHostException;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.cisco.cmse.csra.cluster.message.ReloadNotificationMessageListener;

/**
 * @author sanlli
 *
 */
@Configuration
public class RabbitClientConfiguration extends AbstractRabbitConfiguration {

	@Bean
    public Queue reloadPolicyQueue(){
    	String locahost = "0.0.0.0";
		try {
			locahost = InetAddress.getLocalHost().getHostAddress();
		} catch (UnknownHostException e) {
			try {
				locahost = InetAddress.getLocalHost().getHostName();
			} catch (UnknownHostException e1) {
				locahost = "0.0.0.0";
			}
		}
		return new Queue(locahost, true, false, false);
    }
    
    @Bean
    public Binding reloadPolicyBinding() {
        return BindingBuilder.bind(reloadPolicyQueue()).to(reloadPolicyExchange()).with("reload.#");
    }
    
    @Bean
    public TaskExecutor taskExecutor(){
    	ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    	taskExecutor.setMaxPoolSize(10);
    	taskExecutor.setCorePoolSize(10);
    	taskExecutor.setQueueCapacity(1500);
    	return taskExecutor;
    }
    
    @Bean
    public ReloadNotificationMessageListener reloadNotificationMessageListener(){
    	return new ReloadNotificationMessageListener();
    }
    
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setTaskExecutor(taskExecutor());
        container.setQueues(reloadPolicyQueue());
        container.setMessageListener(reloadNotificationMessageListener());
        return container;
    }
    
}
相關文章
相關標籤/搜索