SpringBoot中使用RabbitMQ(三) RPC

RPC是什麼?

Remote Procedure Call:遠程過程調用,一次遠程過程調用的流程即客戶端發送一個請求到服務端,服務端根據請求信息進行處理後返回響應信息,客戶端收到響應信息後結束。bash

RabbitMQ 使用RPC

發送消息後會收到回覆dom

  • 同步阻塞(在一條消息接受到回覆前不能發送其餘消息)
  • 異步(能夠不用等待回覆,繼續發送別的消息)

同步實例

下文使用的隊列和交換器在SpringBoot中使用RabbitMQ(二)已有聲明異步

  • 生產者
@Component
public class SyncSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String content = "I am sync msg!";
        System.out.println("########### send : " + content);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
       
        Object response = rabbitTemplate.convertSendAndReceive("directExchange", "info-msg", content, correlationData);
        System.out.println("########### response : " + response);
    }
   
}

複製代碼
  • 消費者
@Component
@RabbitListener(queues = "infoMsgQueue")
public class SyncReceiver {

    @RabbitHandler
    public String process(String message) throws InterruptedException {
        System.out.println("########### SyncReceiver Receive :" + message);
        Thread.sleep(1000*3);
        return "copy";
    }
}
複製代碼
  • 測試
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class SyncTest {
    @Autowired
    private SyncSender syncSender;

    @Test
    public void send() {
        for (int i=0 ; i< 10;i++){
            syncSender.send();
        }
    }

}

結果:接收到回覆後才發送下一條消息
########### send : I am sync msg!
########### SyncReceiver Receive :I am sync msg!
########### response : copy
########### send : I am sync msg!
########### SyncReceiver Receive :I am sync msg!
########### response : copy
########### send : I am sync msg!
########### SyncReceiver Receive :I am sync msg!
########### response : copy
複製代碼

rabbitTemplate 有個超時時間,默認5秒。5秒內生產者收不到回覆會拋出異常,能夠同步rabbitTemplate.setReplyTimeout()來設置。async

異步實例

異步中使用AsyncRabbitTemplateide

  • 配置AsyncRabbitTemplate SpringBoot 沒有默認的AsyncRabbitTemplate注入,因此這裏須要本身配置
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
    return new AsyncRabbitTemplate(rabbitTemplate);
}
複製代碼
  • 生產者
@Component
public class AsyncSender {
    @Autowired
    private AsyncRabbitTemplate rabbitTemplate;

    public void send(){
        String content = "I am async msg!";
        System.out.println("########### send : " + content);
    
        AsyncRabbitTemplate.RabbitConverterFuture<Object> future = rabbitTemplate.convertSendAndReceive("directExchange", "warn-msg", content);
        future.addCallback(new ListenableFutureCallback<Object>() {
            @Override
            public void onFailure(Throwable throwable) {
            }
            @Override
            public void onSuccess(Object o) {
                System.out.println("aaa : " + o);
            }
        });
    }
}
複製代碼
  • 消費者
@RabbitListener(queues = "warnMsgQueue")
@Component
public class AsyncReceiver {
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @RabbitHandler
    public String process(String message) throws InterruptedException {
        System.out.println("########### SyncReceiver Receive :" + message);
        Thread.sleep(1000*1);
        return "hao";
    }
}
複製代碼
  • 測試
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class AsyncTest {
    @Autowired
    private AsyncSender asyncSender;

    @Test
    public void send() {
        for (int i=0 ; i< 10;i++){
            asyncSender.send();
        }
    }
}

結果:能夠多條消息不用等待回覆才發送下一條
########### send : I am async msg!
########### send : I am async msg!
########### send : I am async msg!
########### send : I am async msg!
########### SyncReceiver Receive :I am async msg!
aaa : hao
########### SyncReceiver Receive :I am async msg!
aaa : hao
########### SyncReceiver Receive :I am async msg!
aaa : hao
########### SyncReceiver Receive :I am async msg!
aaa : hao
複製代碼
相關文章
相關標籤/搜索