spring boot rabbitMQ 使用

yml 配置 html

server:
  port: 8154
  tomcat:
    uri-encoding: UTF-8
    #    max-http-post-size:
    #    accept-count:   # 線程數達到最大時,接受排隊的請求個數,默認值爲100
    #    max-connections:
    #    max-threads:  #最大線程數,即同時處理的任務個數,默認值爲200
  servlet:
    context-path: /demo
#    session:
#      timeout: 800 #(秒)

logging:
  config: classpath:dev-logback.xml

spring:
  profiles:
    active: dev
  http:
    encoding:
      charset: UTF-8
      enabled: true
      force: true
  messages:
    encoding: UTF-8
  thymeleaf:
    cache: true
    encoding: UTF-8
    suffix: .html
    servlet:
      content-type: text/html
  rabbitmq:
    host: xxxx
    port: xxxx
    username: xxxx
    password: xxxx
    virtual-host: /
    publisher-confirms: true
#    publisher-returns: false
#    listener:
#      #type: simple
#      direct:
#        prefetch: 5 # 一次請求中預處理的消息數量
#        acknowledge-mode: auto  # spring 自動應答,若是拋出異常將回滾
#      simple:
#        acknowledge-mode: auto  # spring 自動應答,若是拋出異常將回滾
#        prefetch: 5 # 一次請求中預處理的消息數量,一次讀取的數量
#        concurrency: 5 # 消費端最小併發數,線程池初始數量
#        max-concurrency: 10 # 消費端最大併發數,線程池最大數量

demo:
  test-queue-name: canaan-test-queue

java conifg java

@Configuration
@EnableRabbit
public class RabbitMQConfigurer {
    private static final Logger            LOGGER = LoggerFactory.getLogger(RabbitMQConfigurer.class);
    @Autowired
    private              ConnectionFactory connectionFactory;


    /**
     * 默認的線程池
     *
     * @return
     */
    @Bean
    @Primary
    @Qualifier("rabbitExecutor")
    public Executor rabbitExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);/*核心線程數*/
        executor.setMaxPoolSize(20);/*最大線程數*/
        executor.setQueueCapacity(30000);/*隊列大小*/
        executor.setKeepAliveSeconds(60);/* 某線程空閒超過1分鐘,就回收該線程*/
        //executor.setAllowCoreThreadTimeOut(true);   // KeepAliveSeconds 設置也做用於【核心線程數】
        executor.setThreadNamePrefix("rabbitExecutor-");
        //executor.setThreadFactory((r) -> {
        //    LOGGER.info("-------:run");
        //    return new Thread(r);
        //});
        //executor.setThreadFactory(traceableThreadFactory);
        executor.initialize();
        return executor;
    }


    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        //必須是prototype類型
        RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
        template.setEncoding("utf-8");
        template.setTaskExecutor(this.rabbitExecutor());        //doSendAndReceive 時用到
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        //template.setReceiveTimeout();
        //template.setReplyTimeout();
        return template;
    }

    /**
     * 【交易事件】交換機
     *
     * @author Canaan
     * @date 2019/4/24 21:04
     */
    @Bean
    public TopicExchange testExchange() {
        return new TopicExchange("canaan-test-exchange", false, true);
    }


    @Bean
    public Queue testQueue() {
        return new Queue("canaan-test-queue", false, false, true);
    }

    @Bean
    public Binding noticeReconciliationBinding() {
        return BindingBuilder.bind(testQueue()).to(testExchange()).with("test.#");
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

}

消息生產spring

@Component
public class RabbitPub implements RabbitTemplate.ConfirmCallback {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    private TopicExchange testExchange;

    @Autowired
    public RabbitPub(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    public void pub() throws InterruptedException {

        for (int i = 0; i < 20; i++) {

            String uuid = UUID.randomUUID().toString().replace("-", "");

            MyMsg myMsg = new MyMsg();
            myMsg.setCode("currentTime:" + LocalDateTime.now().toString());
            myMsg.setText(String.valueOf(i));

            final CorrelationData correlationId = new CorrelationData(uuid);

            this.rabbitTemplate.convertAndSend(this.testExchange.getName(),
                    "test." + uuid.substring(0, 3), myMsg, correlationId);

            //TimeUnit.SECONDS.sleep(1);
        }

    }


    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息成功消費");
        } else {
            System.out.println("消息消費失敗:" + cause);
        }
        //System.out.println("RabbitPub== correlationData - " + correlationData + " ack - " + ack + " cause - " + cause);
    }


}

 

消息消費tomcat

/**
 * rabbit 訂閱交易事件
 * <br>
 * <p>
 * channel.basicAck(tag, multiple);
 * <p>
 * channel.basicNack(tag, multiple, requeue);
 * <p>
 * channel.basicReject(envelope.getDeliveryTag(), false);
 * <p>
 * channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
 */
//@RabbitListener(queues = "canaan-test-queue")          //queue name
//@RabbitListener(queues = "#{'canaan-test-queue'}")    //SpEL queue name
//@RabbitListener(queues = "#{testQueue}")              //SpEL queue bean
//@RabbitListener(queues = "${demo.test-queue-name}")     //property-placeholder keys  queue name
@Component
@RabbitListener(queues = "#{testQueue}")
public class RabbitSubI {


    // String payload , Channel channel ,Message message
    // import org.springframework.amqp.core.Message;
    // @Header
    //@Headers Map<String,Object> headers
    @RabbitHandler
    public void process(@Payload MyMsg myMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        //System.out.println("RabbitSubI object handle....");
        System.out.println(myMsg);
        TimeUnit.SECONDS.sleep(3);
    }


    /**
     * 根據不一樣的內容,來處理不一樣的消息
     *
     * @author Canaan
     * @date 2019/6/25 11:20
     */
    @RabbitHandler
    public void process(@Payload String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println("RabbitSubI string handle....");
        System.out.println(payload);
    }

    /**
     * 根據不一樣的內容,來處理不一樣的消息
     *
     * @author Canaan
     * @date 2019/6/25 11:20
     */
    @RabbitHandler
    public void processMessage2(@Payload byte[] message) {
        System.out.println("RabbitSubI byte handle....");
        System.out.println(new String(message));
    }


}
相關文章
相關標籤/搜索