消息隊列 概念 配合SpringBoot使用Demo

http://www.jianshu.com/p/048e954dab40 

概念: java

分佈式消息隊列web

‘分佈式消息隊列’包含兩個概念ajax

一是‘消息隊列’,二是‘分佈式’spring

那麼就先看下消息隊列的概念,和爲何須要分佈式apache


消息隊列的定義vim

消息」指進程間傳送的數據緩存

隊列」是在消息的傳輸過程當中保存消息的容器安全

消息被髮送到隊列中,消息隊列充當中間人,將消息從源發送給目標服務器

當系統中出現「生產「和「消費「的速度或穩定性等因素不一致時,就須要消息隊列,做爲抽象層,彌合雙方的差別websocket

例如

(1)服務員點菜快,廚師作菜慢,服務員只須要下單給廚師,而後就能夠繼續去服務顧客,不須要等待廚師把菜作完

點菜單就至關於消息,放單子的位置就至關於隊列

(2)業務系統須要發短信,但短信發送模塊速度跟不上,業務系統就能夠把發送短信的相關信息封裝爲一個消息,放入隊列,短信發送模塊從隊列中獲取消息進行處理

消息隊列的好處

(1)提升系統響應速度

使用了消息隊列,生產者一方,把消息往隊列裏一扔,就能夠立馬返回響應用戶了,無需等待處理結果

(2)保證消息的傳遞

若是發送消息時接收者不可用,消息隊列會保留消息,直到成功地傳遞它

(3)解耦

只要消息格式不變,即便接收者的接口、位置、或者配置改變,也不會給發送者帶來任何改變

消息發送者無需知道消息接收者是誰,使得系統設計更清晰


爲何須要分佈式消息隊列

(1)多系統協做須要分佈式

例如消息隊列中的數據須要在多個系統間共享,因此須要提供分佈式通訊機制、協同機制

(2)可靠

消息會被持久化到分佈式存儲中,這樣避免了單臺機器存儲的消息因爲機器問題致使消息的丟失

(3)可擴展

分佈式消息隊列,會隨着訪問量的增長而方便的增長處理服務器


2. 消息隊列技術是分佈式應用間交換信息的一種技術

 如今電商網站某個搶購活動,併發怎麼辦?消息隊列

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。有玩RabbitMq的哥們,多多交流!

用戶請求會post到後臺一些信息如(用戶信息,商品信息)到消息隊列中。

消息隊列經過Windows服務去處理解析過來的信息,單線程處理(多線程可能會出現問題,你懂得)!

成功的話,插入到本次活動的成功記錄表裏面;失敗的話,插入到有意購買表(方便業務人員銷售)!

怎樣通知用戶?

1,ajax異步去請求(隔兩分鐘去請求一次成功記錄,若是不請求庫的話咱們會用Redis緩存)

2,長鏈接的方式(websocket,signalr之類的)

 


在整套的微服務架構中, 消息隊列是不可或缺的部分, 它可以起到線程內同步或者異步調用沒法達到的做用,優缺點分別是:
優勢:

  1. 解耦
    i. 只依賴消息的格式, 而不依賴發送者的ip和端口
    ii. 多消費者的狀況下, 發送者不須要關注消費者的任何信息
  2. 路由
    不能互相訪問的網絡之間能夠消息隊列實現訪問, 能夠減小對現有網絡的修改。
  3. 消息可靠性
    當消費者發生故障時, 消息能夠被有效保存下來, 等待恢復後繼續訪問.
  4. 異步調用
    發送者異步發送消息, 不等待消息ack,不會對發送者自己產生響應速度的影響, 固然異步調用也是能夠實現的。
  5. 方便擴展
    集羣部署消息隊列, 當流量增大和減少是能夠經過調整部署來實現和發送方, 消費方無關。

缺點:
多出一個環節,須要保證消息隊列的可用性。

二. 經常使用消息隊列

目前經常使用的消息隊列大概有三種類型,RabbitMQ等AMQP系列, Kafka, Redis等kev value系列,它們的使用場景分別是:
1.RabbitMQ: 相對重量級高併發的狀況,好比數據的異步處理 任務的串行執行等.
2.Kafka: 基於Pull的模式來處理,具體很高的吞吐量,通常用來進行 日誌的存儲和收集.
3.Redis: 輕量級高併發,實時性要求高的狀況,好比緩存,秒殺,及時的數據分析(ELK日誌分析框架,使用的就是Redis).

三. SpingBoot 集成消息隊列

在SpingBoot中對這個三種都有支持

  1. Redis
    請參照 其餘做者的 文章 http://www.jianshu.com/p/a2ab17707eff

  2. RabbitMQ
    i. 配置

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>

    ii.實現方式 RabbitMQDemoConfigration.java

    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    /** * RabbitMQDemo * * @author: sunjie * @date: 16/01/10 */
    @Configuration
    @SuppressWarnings("SpringJavaAutowiringInspection")
    public class RabbitMQDemoConfigration {
    @Bean
     public CachingConnectionFactory myConnectionFactory() {
         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
         connectionFactory.setUsername("guest");
         connectionFactory.setPassword("guest");
         connectionFactory.setHost("192.168.1.1");
         connectionFactory.setPort(5672);
         connectionFactory.setVirtualHost("/myHost");
         return connectionFactory;
     }
     // 生成CachingConnectionFactory 也可使用下面的方式,在application.properties
     // 中定義好屬性便可
     // @Autowired
     // ConnectionFactory connectionFactory;
    @Bean
     public DirectExchange myExchange() {
         return new DirectExchange("myExchangeDemo", true, false);
     }
     @Bean
     public Queue myQueue() {
         return new Queue("myQueueDemo", true);
     }
     @Bean
     public Binding myExchangeBinding(@Qualifier("myExchange") DirectExchange directExchange, @Qualifier("myQueue") Queue queue) {
         return BindingBuilder.bind(queue).to(directExchange).with("routeDemo");
     }
     @Bean
     public RabbitTemplate myExchangeTemlate() {
         RabbitTemplate r = new RabbitTemplate(myConnectionFactory());
         r.setExchange("myExchangeDemo");
         r.setRoutingKey("routeDemo");
         return r;
     }
    /** * 發送消息,工業使用須要本身作個性化實現 */
     @Bean
     public void sendMessage(RabbitTemplate myExchangeTemlate) {
         String string = "Hello RabbitmQ";
         myExchangeTemlate.convertAndSend(string);
     }
     /** * 接受消息,工業使用時須要在監聽類中實現process邏輯 */
     @RabbitListener(queues = "myQueueDemo")
     public void process(Message message) {
         System.out.println("__________" + message.getBody().toString() + "__________");
         try {
             this.wait(1000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
  3. Kafka 使用
    i. pom.xml 配置

    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
      </dependency>
      <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-kafka</artifactId>
      </dependency>

    ii. spring-integration-kafka.xml 也能夠研究經過bean方式來實現

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
        xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
     http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
     <int:channel id="inputToKafka">
         <int:queue/>
     </int:channel>
     <int-kafka:outbound-channel-adapter
             id="kafkaOutboundChannelAdapter"
             kafka-producer-context-ref="kafkaProducerContext"
             channel="inputToKafka">
         <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
     </int-kafka:outbound-channel-adapter>
     <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="50000"/>
     <int-kafka:producer-context id="kafkaProducerContext">
         <int-kafka:producer-configurations>
             <int-kafka:producer-configuration broker-list="192.168.2.2:9092"
                                               key-class-type="java.lang.String"
                                               value-class-type="java.lang.String"
                                               topic="1_service"
                                               value-encoder="kafkaEncoder"
                                               key-encoder="kafkaEncoder"/>
         </int-kafka:producer-configurations>
     </int-kafka:producer-context>
     <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder"/>
    </beans>

    iii. 客戶端實現代碼:

    @Autowired
     MessageChannel             inputToKafka;
     String value = "Hello Kafka";
     Message<String> mess = MessageBuilder.withPayload(value)
             .setHeader(KafkaHeaders.TOPIC, appSetting.getStrValue("topic")).build();
     inputToKafka.send(mess);

四. 後記

消息中間件有不少,實際使用中每每是根據架構師的技術棧相關,作到了解使用場景和基本原理,在項目中提高本身細節能力,作到我的和公司共同成長,纔是好的方式.

相關文章
相關標籤/搜索