SpringCloud實戰8-Bus消息總線

好了如今咱們接着上一篇的隨筆,繼續來說。上一篇咱們講到,咱們若是要去更新全部微服務的配置,在不重啓的狀況下去更新配置,只能依靠spring cloud config了,可是,是咱們要一個服務一個服務的發送post請求,git

咱們能受的了嗎?這比以前的沒配置中心好多了,那麼咱們如何繼續避免挨個挨個的向服務發送Post請求來告知服務,你的配置信息改變了,須要及時修改內存中的配置信息。web

這時候咱們就不要忘記消息隊列的發佈訂閱模型。讓全部爲服務來訂閱這個事件,當這個事件發生改變了,就能夠通知全部微服務去更新它們的內存中的配置信息。這時Bus消息總線就能解決,你只須要在springcloud Config Server端發出refresh,就能夠觸發全部微服務更新了。spring

以下架構圖所示:apache

 

Spring Cloud Bus除了支持RabbitMQ的自動化配置以外,還支持如今被普遍應用的Kafka。在本文中,咱們將搭建一個Kafka的本地環境,並經過它來嘗試使用Spring Cloud Bus對Kafka的支持,實現消息總線的功能。bootstrap

Kafka使用Scala實現,被用做LinkedIn的活動流和運營數據處理的管道,如今也被諸多互聯網企業普遍地用做爲數據流管道和消息系統。windows

Kafak架構圖以下:服務器

 

Kafka是基於消息發佈/訂閱模式實現的消息系統,其主要設計目標以下:架構

  1.消息持久化:以時間複雜度爲O(1)的方式提供消息持久化能力,即便對TB級以上數據也能保證常數時間複雜度的訪問性能。併發

  2.高吞吐:在廉價的商用機器上也能支持單機每秒100K條以上的吞吐量app

  3.分佈式:支持消息分區以及分佈式消費,並保證分區內的消息順序

  4.跨平臺:支持不一樣技術平臺的客戶端(如:Java、PHP、Python等)

  5.實時性:支持實時數據處理和離線數據處理

  6.伸縮性:支持水平擴展

Kafka中涉及的一些基本概念:

  1.Broker:Kafka集羣包含一個或多個服務器,這些服務器被稱爲Broker。

  2.Topic:邏輯上同Rabbit的Queue隊列類似,每條發佈到Kafka集羣的消息都必須有一個Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個Broker上,但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)

  3.Partition:Partition是物理概念上的分區,爲了提供系統吞吐率,在物理上每一個Topic會分紅一個或多個Partition,每一個Partition對應一個文件夾(存儲對應分區的消息內容和索引文件)。

  4.Producer:消息生產者,負責生產消息併發送到Kafka Broker。

  5.Consumer:消息消費者,向Kafka Broker讀取消息並處理的客戶端。

  6.Consumer Group:每一個Consumer屬於一個特定的組(可爲每一個Consumer指定屬於一個組,若不指定則屬於默認組),組能夠用來實現一條消息被組內多個成員消費等功能。

 

能夠從kafka的架構圖看到Kafka是須要Zookeeper支持的,你須要在你的Kafka配置裏面指定Zookeeper在哪裏,它是經過Zookeeper作一些可靠性的保證,作broker的主從,咱們還要知道Kafka的消息是以topic形式做爲組織的,Producers發送topic形式的消息,
Consumer是按照組來分的,因此,一組Consumers都會都要一樣的topic形式的消息。在服務端,它還作了一些分片,那麼一個Topic可能分佈在不一樣的分片上面,方便咱們拓展部署多個機器,Kafka是天生分佈式的。
這裏爲了演示,咱們只須要用它的默認配置,在windows上作個小Demo便可。

咱們這裏主要針對Spring Cloud Bus對Kafka的支持,實現消息總線的功能,具體的Kafka,RabbitMQ消息隊列但願本身去找資料來學習一下。
 
有了一些概念的支持後,咱們進行一些Demo。以下:
首先新建一個springCloud-config-client1模塊,方便咱們進行測試
所引入的依賴以下:
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
            <version>1.4.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
            <version>1.3.2.RELEASE</version>
        </dependency>

 

接着要注意一下,client1的配置文件要改成bootstrap.yml,由於這種配置格式,是優先加載的,上一篇隨筆有講過,client1的配置以下:

server:
  port: 7006
spring:
  application:
    name: cloud-config
  cloud:
    config:
#啓動什麼環境下的配置,dev 表示開發環境,這跟你倉庫的文件的後綴有關,好比,倉庫配置文件命名格式是cloud-config-dev.properties,因此profile 就要寫dev
      profile: dev
      discovery:
        enabled: true
#這個名字是Config Server端的服務名字,不能瞎寫。
        service-id: config-server
#註冊中心
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8888/eureka/,http://localhost:8889/eureka/
#是否須要權限拉去,默認是true,若是不false就不容許你去拉取配置中心Server更新的內容
management:
  security:
    enabled: false

接着啓動類以下:

@SpringBootApplication
@EnableDiscoveryClient
public class Client1Application {

    public static void main(String[] args) {
        SpringApplication.run(Client1Application.class, args);
    }
}

 

接着將client中的TestController賦值一份到client1中,代碼以下:

@RestController
//這裏面的屬性有可能會更新的,git中的配置中心變化的話就要刷新,沒有這個註解內,配置就不能及時更新
@RefreshScope
public class TestController {

    @Value("${name}")
    private String name;
    @Value("${age}")
    private Integer age;

    @RequestMapping("/test")
    public String test(){
        return this.name+this.age;
    }
}

 

 

接着還要在先前的隨筆中的模塊中的Config Server加入以下配置:

#是否須要權限拉去,默認是true,若是不false就不容許你去拉取配置中心Server更新的內容
management:
  security:
    enabled: false

 

接着還要作一點就是,在config-client,config-client1,和config-Server都要引入kafka的依賴,以下:

      <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
            <version>1.3.2.RELEASE</version>
        </dependency>

咱們工程準備好了,暫時先放在這裏,下面進行Kafka的安裝下載,首先咱們去Kafka官網kafka.apache.org/downloads  下來官網推薦的版本,

 

 首先咱們進到下載好的Kafka目錄中kafka_2.11-1.1.0\bin\windows 下編輯kafka-run-class.bat以下:

找到這條配置 以下:

set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %*

 

能夠看到%CLASSPATH%沒有雙引號,

所以用雙引號括起來,否則啓動不起來的,報你JDK沒安裝好,修改後以下:

set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*

接着,打開config文件夾中的server.properties配置以下:

能夠看到是鏈接到本地的zookeeper就好了。

接着咱們進行先啓動zookeeper,再啓動Kafka,以下:

當看到上面的信息證實啓動Zookeeper啓動成功。、

接下來再開一個CMD啓動Kafka,以下:

看到這些信息說明Kafka啓動成功了

 

好了,接下來把前面的工程,兩個註冊中心,一個springcloud-config-server,兩個springcloud-config-client,springcloud-config-client1啓動起來,

能夠看到springcloudBus是在0分片上,若是兩個config-client啓動都出現上面信息,證實啓動成功了。

好了如今咱們進行訪問一下config-server端,以下:

 

再訪問兩個client,以下:

 

 

好了,好戲開始了,如今咱們去git倉庫上修改配置中心的文件,將年齡改成24,以下:

 

接下來,咱們咱們用refresh刷新配置服務端配置,通知兩個client去更新內存中的配置信息。用postman發送localhost:7000/bus/refresh,以下:

能夠看到沒有返回什麼信息,可是不要擔憂,這是成功的通知全部client去更新了內存中的信息了。

接着咱們分別從新請求config-server,兩個client,刷新頁面,結果以下:

兩個client以下:

能夠看到全部client自動更新內存中的配置信息了。

 

到目前爲止,上面都是刷新說有的配置的信息的,若是咱們想刷新某個特定服務的配置信息也是能夠的。咱們能夠指定刷新範圍,以下:

指定刷新範圍

  上面的例子中,咱們經過向服務實例請求Spring Cloud Bus的/bus/refresh接口,從而觸發總線上其餘服務實例的/refresh。可是有些特殊場景下(好比:灰度發佈),咱們但願能夠刷新微服務中某個具體實例的配置。

  Spring Cloud Bus對這種場景也有很好的支持:/bus/refresh接口還提供了destination參數,用來定位具體要刷新的應用程序。好比,咱們能夠請求/bus/refresh?destination=服務名字:9000,此時總線上的各應用實例會根據destination屬性的值來判斷是否爲本身的實例名,

若符合才進行配置刷新,若不符合就忽略該消息。

  destination參數除了能夠定位具體的實例以外,還能夠用來定位具體的服務。定位服務的原理是經過使用Spring的PathMatecher(路徑匹配)來實現,好比:/bus/refresh?destination=customers:**,該請求會觸發customers服務的全部實例進行刷新。

相關文章
相關標籤/搜索