/** * Copyright © Cisco Systems, Inc. */ package com.cisco.cmse.csra.cluster.rabbitmq; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author sanlli * */ @Configuration public abstract class AbstractRabbitConfiguration { private static final String Reload_Policy_Exchange = "Reload_Policy_Exchange"; @Value("${rabbitmq_addresses}") private String addresses; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(addresses); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin(){ return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate amqpTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setMessageConverter(jsonMessageConverter()); template.setExchange(Reload_Policy_Exchange); return template; } @Bean public MessageConverter jsonMessageConverter() { return new JsonMessageConverter(); } @Bean public TopicExchange reloadPolicyExchange() { TopicExchange exchange = new TopicExchange(Reload_Policy_Exchange, true, false); return exchange; } }
/** * Copyright © Cisco Systems, Inc. */ package com.cisco.cmse.csra.cluster.rabbitmq; import java.net.InetAddress; import java.net.UnknownHostException; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import com.cisco.cmse.csra.cluster.message.ReloadNotificationMessageListener; /** * @author sanlli * */ @Configuration public class RabbitClientConfiguration extends AbstractRabbitConfiguration { @Bean public Queue reloadPolicyQueue(){ String locahost = "0.0.0.0"; try { locahost = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { try { locahost = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e1) { locahost = "0.0.0.0"; } } return new Queue(locahost, true, false, false); } @Bean public Binding reloadPolicyBinding() { return BindingBuilder.bind(reloadPolicyQueue()).to(reloadPolicyExchange()).with("reload.#"); } @Bean public TaskExecutor taskExecutor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setMaxPoolSize(10); taskExecutor.setCorePoolSize(10); taskExecutor.setQueueCapacity(1500); return taskExecutor; } @Bean public ReloadNotificationMessageListener reloadNotificationMessageListener(){ return new ReloadNotificationMessageListener(); } @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setTaskExecutor(taskExecutor()); container.setQueues(reloadPolicyQueue()); container.setMessageListener(reloadNotificationMessageListener()); return container; } }