消息隊列

消息隊列

前言:

說實話,最近仍是比較忙的,手上素材卻是一大把,可是大多隻是初步整理了。可是博客這種東西仍是要寫的,果真後面仍是要放低一下排版要求(擴展性的一些東西也少提一些)。java

簡介:

消息隊列這個東西,其實網上的資料仍是不少的。我就簡單說一些本身的認識與源代碼哈。spring

演變:

我是很喜歡瞭解技術演進的,由於演進的過程展示了前輩們的智慧。框架

最先的程序串行執行就不說了。異步

程序調用中的方法調用,每每調用方與被調用方都存在與同一內存空間(從Java角度說,都是在同一JVM中),因此方法調用的邏輯不會太複雜。簡單來講,就是調用方(Java中其實就是目標對象)將被調用方壓入Java虛擬機棧,從而執行(詳見JVM)。或者等我何時,把我有關JVM的筆記貼出來(嘿嘿)。分佈式

後來呢,就是出現了對非本地JVM方法調用的需求(舉個例子,我須要調用第三方的方法,若是每次都要雙方都寫一個專門的處理服務(在當時,也許接口更爲準確),比較麻煩),那麼就有了RPC與RMI的一個須要。那麼在Java中就出現了一個stub的技術,定義好後,相關方法就像調用本地同樣(詳見《Head First Java》相關章節)。固然了,這個時候已經有了中間件的概念了,因此也就有了CORBA等框架。談到中間件,感興趣的,能夠去查詢一下當時主流的中間件分類(如RPC,RMI,MOM,TPM,ORB)。ide

那麼到了如今呢,分佈式系統的通訊能夠按照同步與異步分爲兩大支柱。之因此這麼理解,是由於分佈式系統每每同步通訊與異步通訊都是須要的。簡單提一下,同步通訊業務邏輯相對簡單,實現快速,能夠實時得到迴應,但耦合度較高。異步通訊耦合度低,並能夠進行消息堆積,消峯,但沒法實時獲取迴應,業務邏輯複雜,從而提升系統複雜度(尤爲當一條業務線與多層異步邏輯)等。以後有機會,我會舉例細述。函數

固然了,在本篇中,只簡單談一下異步通訊的主流實現-消息隊列。學習

選擇:

選擇方面,我就很少說了,目前只用過RabbitMq,RocketMq,Kafka。網上有關消息隊列選擇的文章不少,很細緻,我就不贅述了。fetch

代碼實現:

這裏貼出來的都是實際生產代碼(若是內部版本也算的話,嘿嘿),因此若是有一些不是很熟悉的類,請查看import,是不是項目自身的類。或者也能夠直接詢問我。this

初步實現:

這裏的初步實現,是根據RabbitMq的原生方法進行編寫(詳細參考:《RabbitMQ實戰指南》第一章的兩個代碼清單及第二章的相關解釋)。

producer:​​

package com.renewable.gateway.rabbitmq.producer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    import com.renewable.gateway.pojo.Terminal;
    import com.renewable.gateway.util.JsonUtil;
    import com.renewable.gateway.util.PropertiesUtil;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import static com.renewable.gateway.common.constant.RabbitmqConstant.*;
    
    /**
     * @Description:
     * @Author: jarry
     */
    @Component("TerminalProducer")
    public class TerminalProducer {
    
        private static final String IP_ADDRESS = PropertiesUtil.getProperty(RABBITMQ_HOST);
        private static final int PORT = Integer.parseInt(PropertiesUtil.getProperty(RABBITMQ_PORT));
        private static final String USER_NAME = PropertiesUtil.getProperty(RABBITMQ_USER_NAME);
        private static final String USER_PASSWORD = PropertiesUtil.getProperty(RABBITMQ_USER_PASSWORD);
    
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY = "terminal.config.terminal2centcontrol";
    
        public static void sendTerminalConfig(Terminal terminal) throws IOException, TimeoutException, InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP_ADDRESS);
            factory.setPort(PORT);
            factory.setUsername(USER_NAME);
            factory.setPassword(USER_PASSWORD);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE, true, false, null);
            channel.queueDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, true, false, false, null);
            channel.queueBind(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY);
    
            String terminalStr = JsonUtil.obj2String(terminal);
            channel.basicPublish(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, terminalStr.getBytes());
    
            channel.close();
            connection.close();
        }
    }

consumer:

package com.renewable.gateway.rabbitmq.consumer;

    import com.rabbitmq.client.*;
    import com.renewable.gateway.common.GuavaCache;
    import com.renewable.gateway.common.ServerResponse;
    import com.renewable.gateway.pojo.Terminal;
    import com.renewable.gateway.service.ITerminalService;
    import com.renewable.gateway.util.JsonUtil;
    import com.renewable.gateway.util.PropertiesUtil;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    import static com.renewable.gateway.common.constant.CacheConstant.TERMINAL_MAC;
    import static com.renewable.gateway.common.constant.RabbitmqConstant.*;

    /**
     * @Description:
     * @Author: jarry
     */
    @Component
    public class TerminalConsumer {
    
        @Autowired
        private ITerminalService iTerminalService;
    
        private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_EXCHANGE = "exchange-terminal-config-centcontrol2terminal";
        private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_QUEUE = "queue-terminal-config-centcontrol2terminal";
        private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_ROUTINETYPE = "topic";
        private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_BINDINGKEY = "terminal.config.centcontrol2terminal";
    
        @PostConstruct
        public void messageOnTerminal() throws IOException, TimeoutException, InterruptedException {
            Address[] addresses = new Address[]{
                    new Address(PropertiesUtil.getProperty(RABBITMQ_HOST))
            };
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername(PropertiesUtil.getProperty(RABBITMQ_USER_NAME));
            factory.setPassword(PropertiesUtil.getProperty(RABBITMQ_USER_PASSWORD));
    
            Connection connection = factory.newConnection(addresses);
            final Channel channel = connection.createChannel();
            channel.basicQos(64);   // 設置客戶端最多接收未ack的消息個數,避免客戶端被沖垮(經常使用於限流)
            Consumer consumer = new DefaultConsumer(channel) {
    
    
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    // 1.接收數據,並反序列化出對象
                    Terminal receiveTerminalConfig = JsonUtil.string2Obj(new String(body), Terminal.class);
    
                    // 2.驗證是不是該終端的消息的消息     // 避免ACK其餘終端的消息
                    if (receiveTerminalConfig.getMac() == GuavaCache.getKey(TERMINAL_MAC)) {
                        // 業務代碼
                        ServerResponse response = iTerminalService.receiveTerminalFromRabbitmq(receiveTerminalConfig);
                        if (response.isSuccess()) {
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                }
            };
            channel.basicConsume(TERMINAL_CONFIG_CENTCONTROL2TERMINAL_QUEUE, consumer);
            // 等回調函數執行完畢後,關閉資源
            // 想了想仍是不關閉資源,保持一個監聽的狀態,從而確保配置的實時更新
            //        TimeUnit.SECONDS.sleep(5);
            //        channel.close();
            //        connection.close();
        }
    }

小結:

這是早期寫的一個demo代碼,是直接參照源碼的。若是是學習RabbitMq的話,仍是建議手寫一下這種比較原始的程序,瞭解其中每一個方法的做用,從而理解RabbitMq的思路。若是條件容許的話,還能夠查看一下RabbitMq的底層通訊協議-AMQP(若是不方便下載,也能夠私聊我)。

固然,此處能夠經過@Value直接導入相關配置(乃至到了SpringCloud後,能夠經過@Refreshscope等實現配置自動更新)。

與Spring集成:

producer:

package com.renewable.terminal.rabbitmq.producer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    import com.renewable.terminal.pojo.Terminal;
    import com.renewable.terminal.util.JsonUtil;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Description:
     * @Author: jarry
     */
    @Component("TerminalProducer")
    public class TerminalProducer {
    
        private static String rabbitmqHost = "47.92.249.250";
        private static String rabbitmqUser = "admin";
        private static String rabbitmqPassword = "123456";
        private static String rabbitmqPort = "5672";
    
        private static final String IP_ADDRESS = rabbitmqHost;
        private static final int PORT = Integer.parseInt(rabbitmqPort);
        private static final String USER_NAME = rabbitmqUser;
        private static final String USER_PASSWORD = rabbitmqPassword;
    
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY = "terminal.config.terminal2centcontrol";
    
        public static void sendTerminalConfig(Terminal terminal) throws IOException, TimeoutException, InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP_ADDRESS);
            factory.setPort(PORT);
            factory.setUsername(USER_NAME);
            factory.setPassword(USER_PASSWORD);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE, true, false, null);
            channel.queueDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, true, false, false, null);
            channel.queueBind(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY);
    
            String terminalStr = JsonUtil.obj2String(terminal);
            channel.basicPublish(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, terminalStr.getBytes());
    
            channel.close();
            connection.close();
        }
    }

consumer:

package com.renewable.terminal.rabbitmq.consumer;
    
    import com.rabbitmq.client.*;
    import com.renewable.terminal.Init.SerialSensorInit;
    import com.renewable.terminal.Init.TerminalInit;
    import com.renewable.terminal.common.GuavaCache;
    import com.renewable.terminal.common.ServerResponse;
    import com.renewable.terminal.pojo.Terminal;
    import com.renewable.terminal.service.ITerminalService;
    import com.renewable.terminal.util.JsonUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    import static com.renewable.terminal.common.constant.CacheConstant.TERMINAL_ID;
    import static com.renewable.terminal.common.constant.CacheConstant.TERMINAL_MAC;
    
    /**
     * @Description:
     * @Author: jarry
     */
    @Component
    @Slf4j
    public class TerminalConsumer {
    
        @Autowired
        private ITerminalService iTerminalService;
    
        @Autowired
        private SerialSensorInit serialSensorInit;
    
    
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-centcontrol2terminal";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-centcontrol2terminal";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.centcontrol2terminal";
    
        //TODO_FINISHED 2019.05.16 完成終端機TerminalConfig的接收與判斷(ID是否爲長隨機數,是否須要從新分配)
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, declare = "true"),
                exchange = @Exchange(value = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, declare = "true", type = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE),
                key = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY
        ))
        @RabbitHandler
        public void messageOnTerminal(@Payload String terminalStr, @Headers Map<String, Object> headers, Channel channel) throws IOException {
    
            Terminal terminal = JsonUtil.string2Obj(terminalStr, Terminal.class);
            if (terminal == null){
                log.info("consume the null terminal config !");
                Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
                channel.basicAck(deliveryTag, false);
            }
            if (!GuavaCache.getKey(TERMINAL_MAC).equals(terminal.getMac())){
                log.info("refuse target terminal with mac({}) configure to this terminal with mac({}).",terminal.getMac(), GuavaCache.getKey(TERMINAL_MAC));
                return;
            }
    
            // 2.業務邏輯
            ServerResponse response = iTerminalService.receiveTerminalFromRabbitmq(terminal);
            log.info("start serialSensorInit");
            serialSensorInit.init();
    
            // 3.確認
            if (response.isSuccess()) {
                Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
                channel.basicAck(deliveryTag, false);
            }
        }
    }

配置:

# rabbitmq 消費端配置
    spring:
      rabbitmq:
        listener:
          simple:
            concurrency: 5
            max-concurrency: 10
            acknowledge-mode: manual
            # 限流
            prefetch: 1
        host: "localhost"
        port: 5672
        username: "admin"
        password: "123456"
        virtual-host: "/"
        connection-timeout: 15000

小結:

這裏不得不讚一下Spring,它經過提供RabbitMq地封裝API-ampq,極大地簡化了消息隊列的代碼。其實上述方法就是經過ampq的註解與yml配置來迅速實現RabbitMq的使用。

固然,這裏還有不少的提高空間。好比說,經過@Bean註解(創建目標配置)與公用方法提取,能夠有效提升代碼複用性。

簡單擴展(與SpringStream集成):

這段代碼並非線上的代碼,而是慕課網學習時留下的代碼。主要實際生產中並無使用SpringStream,但這確實是認識事件驅動模型的要給很好途徑。

producer:

```java

package com.imooc.order.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * @Description:
 * @Author: jarry
 */
public interface StreamClient {

    String INPUT = "myMessage";
    String INPUT2 = "myMessageACK";


    @Input(StreamClient.INPUT)
    SubscribableChannel input();

    @Output(StreamClient.INPUT)
    MessageChannel output();

    @Input(StreamClient.INPUT2)
    SubscribableChannel input2();

    @Output(StreamClient.INPUT2)
    MessageChannel output2();
}

​```

package com.imooc.order;
    
    import org.junit.Assert;
    import org.junit.Test;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * @Description:
     * @Author: jarry
     */
    @Component
    public class MqSenderTest extends OrderApplicationTests{
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Test
        public void send(){
            amqpTemplate.convertAndSend("myQueue", "now: " + new Date());
            Assert.assertNotNull(new Date());
        }
    }

consumer:

package com.imooc.order.message;
    
    import com.imooc.order.dto.OrderDTO;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.stereotype.Component;
    
    /**
     * @Description:
     * @Author: jarry
     */
    @Component
    @EnableBinding(StreamClient.class)
    @Slf4j
    public class StreamReceiver {
    
    //  @StreamListener(StreamClient.INPUT)
    //  public void process(Object message){
    //      log.info("StreamReceiver: {}", message);
    //  }
    
        @StreamListener(StreamClient.INPUT)
        // 增長如下註解,能夠在INPUT消息消費後,返回一個消息。說白了就是RabbitMq對消息消費後的確認回調函數(貌似叫這個,意思就這樣,以後細查)
        @SendTo(StreamClient.INPUT2)
        public String process(OrderDTO message){
            log.info("StreamReceiver: {}", message);
            return "received.";
        }
    
        @StreamListener(StreamClient.INPUT2)
        public void process2(String message){
            log.info("StreamReceiver2: {}", message);
        }
    }

總結:

在學習技術的過程當中,一方面不斷地感覺到本身對技術瞭解的不足,另外一方面則是發現更重要的是系統設計中技術選型的權衡。

相關文章
相關標籤/搜索