rabbitmq高併發RPC調用,你Get到了嗎?

微信公衆號:跟着老萬學java
關注可瞭解更多的編程技巧。問題或建議,請公衆號留言;html

今天給你們介紹下rabbitmq中很重要的一個功能,RPC調用。java

RPC,即Remote Procedure Call的簡稱,也就是遠程過程調用,是一種經過網絡從遠程計算機上請求服務,而不須要了解底層網絡的技術。好比兩臺服務器上的A和B兩個應用,須要進行服務接口的相互調用,咱們就可使用RPC實現。好比常見的Java RMI、WebService、Dubbo均可以
實現RPC調用。git

rabbitmq實現的RPC調用主要是簡單,不用管各類複雜的網絡協議,客戶端發送消息,消費者消費消息,反饋消息到回覆隊列Reply中,而後客戶端獲取反饋的結果。github

1、原理

流程說明:
一、對於一個RPC請求,客戶端發送一條消息,該消息具備兩個重要屬性:replyTo(設置爲僅爲該請求建立的匿名互斥隊列,答覆隊列)和correlationId(設置爲每一個請求的惟一值)。web

二、該請求被髮送到rpc_queue隊列。spring

三、RPC工做程序(消息消費者)會監聽該隊列的消息。監聽到有新的消息後,會根據消息執行響應的邏輯,而後將結果返回到消息中攜帶的replyTo指定的答覆隊列中。編程

四、客戶端(消息生產者)等待答覆隊列中的數據,出現出現後,它會檢查correlationId屬性是否一致,若是匹配,則將響應結果返回給應用程序。服務器

2、rpc的三種調用方式

以後官網就針對使用Spring AMQP實現RPC調用給出了一個簡單的 Tut6Server.java示例,但真心太簡單,只能做爲入門的參考demo。
以後分析經過查看rabbitTemplate.sendAndReceive()方法的源碼,Spring AMQP支持3中RPC調用實現。
分別是:微信

一、doSendAndReceiveWithDirect   直接反饋
二、doSendAndReceiveWithFixed   使用固定隊列答覆
三、doSendAndReceiveWithTemporary   使用臨時隊列答覆網絡

根據源碼,對這三種方式的排序不難看出,對三者的推薦順序爲:
doSendAndReceiveWithDirect 》  doSendAndReceiveWithFixed》doSendAndReceiveWithTemporary  
直接反饋無疑是最快最資源消耗最少的,固定隊列會聲明指定的的隊列用來接收答覆,
而使用臨時隊列來接收答覆是最消耗資源,性能也是最差的,由於隊列的聲明,創建,銷燬會消耗大。

    @Nullable
    protected Message doSendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) {
        if (!this.evaluatedFastReplyTo) {
            synchronized(this) {
                if (!this.evaluatedFastReplyTo) {
                    this.evaluateFastReplyTo();
                }
            }
        }

        if (this.usingFastReplyTo && this.useDirectReplyToContainer) {
            return this.doSendAndReceiveWithDirect(exchange, routingKey, message, correlationData);
        } else {
            return this.replyAddress != null && !this.usingFastReplyTo ? this.doSendAndReceiveWithFixed(exchange, routingKey, message, correlationData) : this.doSendAndReceiveWithTemporary(exchange, routingKey, message, correlationData);
        }
    }

3、代碼實戰

添加依賴:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

生產者代碼:

/**
 * @program: rabbitmq
 * @description: 交換器常量
 * @author: laowan
 * @create: 2019-06-13 17:36
 **/

@Getter
public enum ExchangeEnum {

    DIRECT_EXCHANGE("direct"),

    FIXED_EXCHANGE("fixed"),

    TMP_EXCHANGE("tmp");


    private String value;

    ExchangeEnum(String value) {
        this.value = value;
    }
}
/**
 * @program: rabbitmq
 * @description: 隊列枚舉
 * @author: laowan
 * @create: 2019-06-13 17:37
 **/

@Getter
public enum QueueEnum {

    //direct模式
    DIRECT_REQUEST("direct.request""direct"),

    //固定隊列應答模式
    FIXED_REQUEST("fixed.request""fixed"),
    FIXED_RESPONSE("fixed.response"""),

    //臨時模式  消息發送到的隊列
    TMP_REQUEST("tmp.request""tmp")
   ;

    /**
     * 隊列名稱
     */

    private String name;
    /**
     * 隊列路由鍵
     */

    private String routingKey;

    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}
/**
 * @program: rpc-parent
 * @description: direct   rpc請求模式
 * @author: laowan
 * @create: 2020-04-09 18:05
 **/

@Configuration
@Slf4j
public class DirectReplyConfig {
    /**
     * 注意bean的名稱是由方法名決定的,因此不能重複
     * @return
     */

    @Bean
    public Queue directRequest() {
        return new Queue(QueueEnum.DIRECT_REQUEST.getName(), true);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(ExchangeEnum.DIRECT_EXCHANGE.getValue());
    }

    @Bean
    public Binding directBinding() {
        return BindingBuilder.bind(directRequest()).to(directExchange()).with(QueueEnum.DIRECT_REQUEST.getRoutingKey());
    }


    /**
     * 當進行多個主題隊列消費時,最好對每一個單獨定義RabbitTemplate,以便將各自的參數分別控制
     * @param connectionFactory
     * @return
     */

    @Bean
    public RabbitTemplate directRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());

        //這一步很是關鍵
        template.setUseTemporaryReplyQueues(false);
        template.setReplyAddress("amq.rabbitmq.reply-to");
       // template.expectedQueueNames();
        template.setUserCorrelationId(true);

        //設置請求超時時間爲10s
        template.setReplyTimeout(10000);
        return template;
    }

}

DirectProducer 生產者代碼

@Component
@Slf4j
public class DirectProducer {
    @Autowired
    private RabbitTemplate directRabbitTemplate;

    public String sendAndReceive(String request) throws TimeoutException {
        log.info("請求報文:{}" , request);
        //請求結果
        String result = null;
        //設置消息惟一id
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //直接發送message對象
        MessageProperties messageProperties = new MessageProperties();
        //過時時間10秒,也是爲了減小消息擠壓的可能
        messageProperties.setExpiration("10000");
        messageProperties.setCorrelationId(correlationId.getId());
        Message message = new Message(request.getBytes(), messageProperties);

        StopWatch stopWatch = new StopWatch();
        stopWatch.start("direct模式下rpc請求耗時");
        Message response = directRabbitTemplate.sendAndReceive(ExchangeEnum.DIRECT_EXCHANGE.getValue(), QueueEnum.DIRECT_REQUEST.getRoutingKey(), message, correlationId);
        stopWatch.stop();
        log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());

        if (response != null) {
            result = new String(response.getBody());
            log.info("請求成功,返回的結果爲:{}" , result);
        }else{
            log.error("請求超時");
            //爲了方便jmeter測試,這裏拋出異常
            throw  new TimeoutException("請求超時");
        }
        return result;

    }
}

4、Fixed  reply-to模式

Fixed 配置類

/**
 * @program: rpc-parent
 * @description: Fixed   rpc請求模式
 * @author: wanli
 * @create: 2020-04-09 18:05
 **/

@Configuration
@Slf4j
public class FixedReplyConfig {
    @Bean
    public Queue fixedRequest() {
        return new Queue(QueueEnum.FIXED_REQUEST.getName(), true);
    }

    @Bean
    public DirectExchange fixedExchange() {
        return new DirectExchange(ExchangeEnum.FIXED_EXCHANGE.getValue());
    }

    @Bean
    public Binding fixedBinding() {
        return BindingBuilder.bind(fixedRequest()).to(fixedExchange()).with(QueueEnum.FIXED_REQUEST.getRoutingKey());
    }

    /**
     * 注意,固定模式指定的應答隊列  exclusive排他屬性設置爲true,且能自動刪除
     * @return
     */

    @Bean
    public Queue fixedResponseQueue() {
        return new Queue(QueueEnum.FIXED_RESPONSE.getName(),false,true,true,new HashMap<>());
    }


    @Bean
    public RabbitTemplate fixedRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());

        //設置固定的Reply 地址
        template.setUseTemporaryReplyQueues(false);
        template.setReplyAddress(QueueEnum.FIXED_RESPONSE.getName());
        template.expectedQueueNames();
        template.setUserCorrelationId(true);

        //設置請求超時時間爲10s
        template.setReplyTimeout(10000);
        return template;
    }


    @Bean
    public SimpleMessageListenerContainer fixedListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //這一步很是重要,固定隊列模式要,必定要主動設置  SimpleMessageListenerContainer監聽容器,監聽應答隊列
        container.setQueueNames(QueueEnum.FIXED_RESPONSE.getName());
        container.setMessageListener(fixedRabbitTemplate(connectionFactory));
        container.setConcurrentConsumers(100);
        container.setConcurrentConsumers(100);
        container.setPrefetchCount(250);
        return container;
    }

}

FixedProducer生產者

@Component
@Slf4j
public class FixedProducer {
    @Autowired
    private RabbitTemplate fixedRabbitTemplate;

    public String sendAndReceive(String request) throws TimeoutException {
        log.info("請求報文:{}" , request);
        //請求結果
        String result = null;
        //設置消息惟一id
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //直接發送message對象
        MessageProperties messageProperties = new MessageProperties();
        //過時時間10秒
        messageProperties.setExpiration("10000");
        messageProperties.setCorrelationId(correlationId.getId());
        Message message = new Message(request.getBytes(), messageProperties);

        StopWatch stopWatch = new StopWatch();
        stopWatch.start("fixed模式下rpc請求耗時");
        Message response = fixedRabbitTemplate.sendAndReceive(ExchangeEnum.FIXED_EXCHANGE.getValue(), QueueEnum.FIXED_REQUEST.getRoutingKey(), message, correlationId);
        stopWatch.stop();
        log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());

        if (response != null) {
            result = new String(response.getBody());
            log.info("請求成功,返回的結果爲:{}" , result);
        }else{
            //爲了方便jmeter測試,這裏拋出異常
            throw  new TimeoutException("請求超時");
        }

        return result;
    }
}

5、Temporary reply-to模式

/**
 * @program: rpc-parent
 * @description: Temporary應答模式
 * @author: laowan
 * @create: 2020-04-09 18:05
 **/

@Configuration
@Slf4j
public class TmpReplyConfig {
    @Bean
    public Queue tmpRequest() {
        return new Queue(QueueEnum.TMP_REQUEST.getName(), true);
    }

    @Bean
    public DirectExchange tmpExchange() {
        return new DirectExchange(ExchangeEnum.TMP_EXCHANGE.getValue());
    }

    @Bean
    public Binding tmpBinding() {
        return BindingBuilder.bind(tmpRequest()).to(tmpExchange()).with(QueueEnum.TMP_REQUEST.getRoutingKey());
    }



    @Bean
    public RabbitTemplate tmpRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());

        template.setUseTemporaryReplyQueues(true);
        template.setUserCorrelationId(true);

        //設置請求超時時間爲10s
        template.setReplyTimeout(10000);

        return template;
    }
}

TmpProducer生產者代碼

@Component
@Slf4j
public class TmpProducer {
    @Autowired
    private RabbitTemplate tmpRabbitTemplate;

    public String sendAndReceive(String request) throws TimeoutException {
        log.info("請求報文:{}" , request);
        //請求結果
        String result = null;
        //設置消息惟一id
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //直接發送message對象
        MessageProperties messageProperties = new MessageProperties();
        //過時時間10秒
        messageProperties.setExpiration("10000");
        messageProperties.setCorrelationId(correlationId.getId());
        Message message = new Message(request.getBytes(), messageProperties);

        StopWatch stopWatch = new StopWatch();
        stopWatch.start("tmp模式下rpc請求耗時");
        Message response = tmpRabbitTemplate.sendAndReceive(ExchangeEnum.TMP_EXCHANGE.getValue(), QueueEnum.TMP_REQUEST.getRoutingKey(), message, correlationId);

        stopWatch.stop();
        log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());

        if (response != null) {
            result = new String(response.getBody());
            log.info("請求成功,返回的結果爲:{}" , result);
        }else{
            log.error("請求超時");
            //爲了方便jmeter測試,這裏拋出異常
            throw  new TimeoutException("請求超時");
        }
        return result;
    }
}

生產者啓動類:

@SpringBootApplication
@RestController
public class ProducerApplication {

    @Autowired
    DirectProducer directProducer;

    @Autowired
    FixedProducer fixedProducer;

    @Autowired
    TmpProducer tmpProducer;



    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }



    @GetMapping("/direct")
    public String direct(String message) throws Exception {
        return directProducer.sendAndReceive(message);
    }

    @GetMapping("/fixed")
    public String fixed(String message) throws Exception {
        return fixedProducer.sendAndReceive(message);
    }

    @GetMapping("/tmp")
    public String tmp(String message) throws Exception {
        return tmpProducer.sendAndReceive(message);
    }
}

消費者基本相似,就附上DirectConsumer類的代碼:

/**
 * @program: rabbitmq
 * @description: direct消費者
 * @author: wanli
 * @create: 2019-06-13 18:01
 **/

@Component
@RabbitListener(queues = "direct.request")
@Slf4j
public class DirectConsumer {

    @RabbitHandler
    public String onMessage(byte[] message,
                            @Headers Map<String, Object> headers,
                            Channel channel)
 
{
        StopWatch stopWatch = new StopWatch("調用計時");
        stopWatch.start("rpc調用消費者耗時");
        String request = new String(message);
        String response = null;
        log.info("接收到的消息爲:" + request);

        //模擬請求耗時3s
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        response= this.sayHello(request);
        log.info("返回的消息爲:" + response);
        stopWatch.stop();
        log.info(stopWatch.getLastTaskName()+stopWatch.getTotalTimeMillis()+"ms");
        return response;
    }

    public String sayHello(String name){
        return "hello " + name;
    }


}

6、壓測

經過對/direct,/fixed,/tmp三個接口使用JMeter壓測,線程數1000,時間1s,
屢次執行,比較發現:
direct和fixed的rpc方式調用的性能基本一致,差異不大,每分鐘3500左右的併發
而tmp方式併發能力會弱會弱不少,大概3000併發左右。
併發請求時能夠經過rabbitmq的管理界面明顯看到tmp方式高併發時生成了很是多的臨時隊列。
性能:direct>=fixed>tmp,與以前根據源碼和各自執行原理預期的執行性能基本一致

7、參數優化

生產者這邊,在fix模式下,須要配置對應的SimpleMessageListenerContainer監聽答覆隊列,能夠適當增長消費者的併發數,而且提升每次抓取的消息數。
而且設置acknowledge-mode=auto自動ack。

    @Bean
    public SimpleMessageListenerContainer fixedListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //這一步很是重要,固定隊列模式要,必定要主動設置  SimpleMessageListenerContainer監聽容器,監聽應答隊列
        container.setQueueNames(QueueEnum.FIXED_RESPONSE.getName());
        container.setMessageListener(fixedRabbitTemplate(connectionFactory));
        container.setConcurrentConsumers(100);
        container.setConcurrentConsumers(100);
        container.setPrefetchCount(250);
        return container;
    }

消費者這邊,必定要注意設置消費者每次抓取的數量,若是每一個消息消費比較耗時,一次抓取太多,就容易致使抓取的這一批消息被這個消費者串行消費的時候出現超時狀況。抓取太少,又會致使吞吐量下降。

這裏我設置的是10,通過壓測發如今高併發下,rpc響應出現延長,說明消費能力基本能知足。

#消費者的併發參數
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.concurrency=200
spring.rabbitmq.listener.simple.max-concurrency=500
#抓取參數很是關鍵,一次抓取的消息多了,消費速度一慢,就會形成響應延遲,抓取少了又會致使併發量低
spring.rabbitmq.listener.simple.prefetch=10
#能夠不須要反饋
spring.rabbitmq.listener.simple.acknowledge-mode=none

7、問題

這裏要吐槽一下,關於rabbitmq的RPC調用,網上的資料真到太少了,踩了很多坑。
坑一:

CORRECTION: The RabbitTemplate does not currently support Direct reply-to for sendAndReceive() operations; you can, however, specify a fixed reply queue (with a reply-listener). Or you can use rabbitTemplate.execute() with a ChannelCallback to consume the reply from that "queue" (and publish).
I have created a JIRA issue if you wish to track it.
1.4.1 and above now supports direct reply-to.

百度上找的資料太少,以後在google上找到上面的說明,大意是RabbitTemplate在sendAndReceive操做時不支持Direct reply-to調用
解決:
做爲老鳥一枚,這裏我就和他槓上了,恰恰不信這個邪,RabbitTemplate源碼中明明能夠搜索到'amq.rabbitmq.reply-to'相關判斷以及doSendAndReceiveWithDirect的定義,怎麼可能不支持?

坑二:

Broker does not support fast replies via 'amq.rabbitmq.reply-to'

Broker指的是咱們的rabbitmq的服務節點,不支持經過'amq.rabbitmq.reply-to'進行快速返回。

解決:
當前版本rabbitmq的Broker不支持經過'amq.rabbitmq.reply-to'進行快速返回,那麼就升級broker的版本。
3.3.5版本不支持建立amq.rabbitmq.reply-to虛擬隊列,那就升級到3.7.8版本。

坑三:

Caused by: java.lang.IllegalStateException: A listener container must not be provided when using direct reply-to

解決:
指定名爲「amq.rabbitmq.reply-to」的反饋地址後,不能再調用expectedQueueNames方法

        template.setUseTemporaryReplyQueues(false);
        template.setReplyAddress("amq.rabbitmq.reply-to");
       // template.expectedQueueNames();
        template.setUserCorrelationId(true);

坑四:
壓測過程當中,併發一高,就容易出現rpc調用超時的問題。

解決:
增長消費者的併發數,減少消費者每次抓取的消息數。

總結

有些東西,百度不會告訴你,要看官網;
有些東西,官網不會告訴你,要看源碼;
有些東西,源碼不會告訴你,只能根據原理實踐推敲;
最後,推敲不出來,能夠找老萬

git源碼地址:
https://github.com/StarlightWANLI/rabbitmq-rpc.git

更多精彩,關注我吧。

圖注:大少公衆號


本文分享自微信公衆號 - 跟着老萬學java(douzhe_2019)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索