yml 配置 html
server: port: 8154 tomcat: uri-encoding: UTF-8 # max-http-post-size: # accept-count: # 線程數達到最大時,接受排隊的請求個數,默認值爲100 # max-connections: # max-threads: #最大線程數,即同時處理的任務個數,默認值爲200 servlet: context-path: /demo # session: # timeout: 800 #(秒) logging: config: classpath:dev-logback.xml spring: profiles: active: dev http: encoding: charset: UTF-8 enabled: true force: true messages: encoding: UTF-8 thymeleaf: cache: true encoding: UTF-8 suffix: .html servlet: content-type: text/html rabbitmq: host: xxxx port: xxxx username: xxxx password: xxxx virtual-host: / publisher-confirms: true # publisher-returns: false # listener: # #type: simple # direct: # prefetch: 5 # 一次請求中預處理的消息數量 # acknowledge-mode: auto # spring 自動應答,若是拋出異常將回滾 # simple: # acknowledge-mode: auto # spring 自動應答,若是拋出異常將回滾 # prefetch: 5 # 一次請求中預處理的消息數量,一次讀取的數量 # concurrency: 5 # 消費端最小併發數,線程池初始數量 # max-concurrency: 10 # 消費端最大併發數,線程池最大數量 demo: test-queue-name: canaan-test-queue
java conifg java
@Configuration @EnableRabbit public class RabbitMQConfigurer { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfigurer.class); @Autowired private ConnectionFactory connectionFactory; /** * 默認的線程池 * * @return */ @Bean @Primary @Qualifier("rabbitExecutor") public Executor rabbitExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(3);/*核心線程數*/ executor.setMaxPoolSize(20);/*最大線程數*/ executor.setQueueCapacity(30000);/*隊列大小*/ executor.setKeepAliveSeconds(60);/* 某線程空閒超過1分鐘,就回收該線程*/ //executor.setAllowCoreThreadTimeOut(true); // KeepAliveSeconds 設置也做用於【核心線程數】 executor.setThreadNamePrefix("rabbitExecutor-"); //executor.setThreadFactory((r) -> { // LOGGER.info("-------:run"); // return new Thread(r); //}); //executor.setThreadFactory(traceableThreadFactory); executor.initialize(); return executor; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { //必須是prototype類型 RabbitTemplate template = new RabbitTemplate(this.connectionFactory); template.setEncoding("utf-8"); template.setTaskExecutor(this.rabbitExecutor()); //doSendAndReceive 時用到 template.setMessageConverter(new Jackson2JsonMessageConverter()); //template.setReceiveTimeout(); //template.setReplyTimeout(); return template; } /** * 【交易事件】交換機 * * @author Canaan * @date 2019/4/24 21:04 */ @Bean public TopicExchange testExchange() { return new TopicExchange("canaan-test-exchange", false, true); } @Bean public Queue testQueue() { return new Queue("canaan-test-queue", false, false, true); } @Bean public Binding noticeReconciliationBinding() { return BindingBuilder.bind(testQueue()).to(testExchange()).with("test.#"); } @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
消息生產spring
@Component public class RabbitPub implements RabbitTemplate.ConfirmCallback { private final RabbitTemplate rabbitTemplate; @Autowired private TopicExchange testExchange; @Autowired public RabbitPub(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } public void pub() throws InterruptedException { for (int i = 0; i < 20; i++) { String uuid = UUID.randomUUID().toString().replace("-", ""); MyMsg myMsg = new MyMsg(); myMsg.setCode("currentTime:" + LocalDateTime.now().toString()); myMsg.setText(String.valueOf(i)); final CorrelationData correlationId = new CorrelationData(uuid); this.rabbitTemplate.convertAndSend(this.testExchange.getName(), "test." + uuid.substring(0, 3), myMsg, correlationId); //TimeUnit.SECONDS.sleep(1); } } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息成功消費"); } else { System.out.println("消息消費失敗:" + cause); } //System.out.println("RabbitPub== correlationData - " + correlationData + " ack - " + ack + " cause - " + cause); } }
消息消費tomcat
/** * rabbit 訂閱交易事件 * <br> * <p> * channel.basicAck(tag, multiple); * <p> * channel.basicNack(tag, multiple, requeue); * <p> * channel.basicReject(envelope.getDeliveryTag(), false); * <p> * channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); */ //@RabbitListener(queues = "canaan-test-queue") //queue name //@RabbitListener(queues = "#{'canaan-test-queue'}") //SpEL queue name //@RabbitListener(queues = "#{testQueue}") //SpEL queue bean //@RabbitListener(queues = "${demo.test-queue-name}") //property-placeholder keys queue name @Component @RabbitListener(queues = "#{testQueue}") public class RabbitSubI { // String payload , Channel channel ,Message message // import org.springframework.amqp.core.Message; // @Header //@Headers Map<String,Object> headers @RabbitHandler public void process(@Payload MyMsg myMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { //System.out.println("RabbitSubI object handle...."); System.out.println(myMsg); TimeUnit.SECONDS.sleep(3); } /** * 根據不一樣的內容,來處理不一樣的消息 * * @author Canaan * @date 2019/6/25 11:20 */ @RabbitHandler public void process(@Payload String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { System.out.println("RabbitSubI string handle...."); System.out.println(payload); } /** * 根據不一樣的內容,來處理不一樣的消息 * * @author Canaan * @date 2019/6/25 11:20 */ @RabbitHandler public void processMessage2(@Payload byte[] message) { System.out.println("RabbitSubI byte handle...."); System.out.println(new String(message)); } }