springCloud學習5(Spring-Cloud-Stream事件驅動)

hei

springcloud 總集:https://www.tapme.top/blog/detail/2019-02-28-11-33java

代碼見文章結尾node

  想一想日常生活中作飯的場景,在用電飯鍋作飯的同時,咱們能夠洗菜、切菜,等待電飯鍋發出飯作好的提示咱們回去拔下電飯鍋電源(或者什麼也不知讓它處於保溫狀態),反正這個時候咱們知道飯作好了,接下來能夠炒菜了。從這裏能夠看出咱們在平常生活中與世界的互動並非同步的、線性的,不是簡單的請求--響應模型。它是事件驅動的,咱們不斷的發送消息、接受消息、處理消息。git

  一樣在軟件世界中也不全是請求--響應模型,也會須要進行異步的消息通訊。使用消息實現事件通訊的概念被稱爲消息驅動架構(Event Driven Architecture,EDA),也被稱爲消息驅動架構(Message Driven Architecture,MDA)。使用這類架構能夠構建高度解耦的系統,該系統可以對變化作出響應,且不須要與特定的庫或者服務緊密耦合。github

  在 Spring Cloud 項目中可使用Spirng Cloud Stream垂手可得的構建基於消息傳遞的解決方案。redis

爲何使用消息傳遞

  要解答這個問題,讓咱們從一個例子開始,以前一直使用的兩個服務:許可證服務和組織服務。每次對許可證服務進行請求,許可證服務都要經過 http 請求到組織服務上查詢組織信息。顯而易見此次額外的 http 請求會花費較長的時間。若是可以將緩存組織數據的讀操做,將會大幅提升許可證服務的響應時間。可是緩存數據有以下 2 個要求:spring

  • 緩存的數據須要在許可證服務的全部實例之間保存一致——這意味着不能將數據緩存到服務實例的內存中。
  • 在更新或者刪除一個組織數據時,許可證服務緩存的數據須要失效——避免讀取到過時數據,須要儘早讓過期數據失效並刪除。

  要實現上面的要求,如今有兩種辦法。docker

  1. 使用同步請求--響應模型來實現。組織服務在組織數據變化時調用許可證服務的接口通知組織服務已經變化,或者直接操做許可證服務的緩存。json

  2. 使用事件驅動。組織服務發出一個異步消息。許可證服務收到該消息後清除對應的緩存。api

同步請求-響應方式

  許可證服務在 redis 中緩存從組織服務中查詢到的服務信息,當組織數據更新時,組織服務同步 http 請求通知許可證服務數據過時。這種方式有如下幾個問題:緩存

  • 組織服務和許可證服務緊密耦合
  • 這種方式不夠靈活,若是要爲組織服務添加新的消費者,必須修改組織服務代碼,以讓其通知新的服務數據變更。

使用消息傳遞方式

  一樣的許可證服務在 redis 中緩存從組織服務中查詢到的服務信息,當組織數據更新時,組織服務將更新信息寫入到隊列中。許可證服務監聽消息隊列。使用消息傳遞有一下 4 個好處:

  • 鬆耦合性:將服務間的依賴,變成了服務對隊列的依賴,依賴關係變弱了。
  • 耐久性:即便服務消費者已經關閉了,也能夠繼續往裏發送消息,等消費者開啓後處理
  • 可伸縮性: 消息發送者不用等待消息消費者的響應,它們能夠繼續作各自的工做
  • 靈活性:消息發送者不用知道誰會消費這個消息,所以在有新的消息消費者時無需修改消息發送代碼

spring cloud 中使用消息傳遞

  spring cloud 項目中能夠經過 spring cloud stream 框架來輕鬆集成消息傳遞。該框架最大的特色是抽象了消息傳遞平臺的細節,所以能夠在支持的消息隊列中隨意切換(包括 Apache Kafka 和 RabbitMQ)。

spring cloud stream 架構

  spring cloud stream 中有 4 個組件涉及到消息發佈和消息消費,分別爲:

  1. 發射器

      當一個服務準備發送消息時,它將使用發射器發佈消息。發射器是一個 Spring 註解接口,它接收一個普通 Java 對象,表示要發佈的消息。發射器接收消息,而後序列化(默認序列化爲 JSON)後發佈到通道中。

  2. 通道

      通道是對隊列的一個抽象。通道名稱是與目標隊列名稱相關聯的。可是隊列名稱並不會直接公開在代碼中,代碼永遠只會使用通道名。

  3. 綁定器

      綁定器是 spring cloud stream 框架的一部分,它是與特定消息平臺對話的 Spring 代碼。經過綁定器,使得開發人員沒必要依賴於特定平臺的庫和 API 來發布和消費消息。

  4. 接收器

      服務經過接收器來從隊列中接收消息,並將消息反序列化。

處理邏輯以下:

Spring-Cloud-Stream架構

實戰

  繼續使用以前的項目,在許可證服務中緩存組織數據到 redis 中。

創建 redis 服務

  爲方便起見,使用 docker 建立 redis,創建腳本以下:

docker run -itd --name redis --net host redis:

創建 kafka 服務

在組織服務中編寫消息生產者

  首先在 organization 服務中引入 spring cloud stream 和 kafka 的依賴。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

  而後在 events 類中編寫SimpleSouce類,用於組織數據修改,產生一條消息到隊列中。代碼以下:

@EnableBinding(Source.class)
public class SimpleSource {
    private Logger logger = LoggerFactory.getLogger(SimpleSource.class);

    private Source source;

    @Autowired
    public SimpleSource(Source source) {
        this.source = source;
    }

    public void publishOrChange(String action, String orgId) {
        logger.info("在請求:{}中,發送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId);
        OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id);
        source.output().send(MessageBuilder.withPayload(change).build());
    }
}

這裏使用的是默認通道,Source 類定義的 output 通道發消息。後面經過 Sink 定義的 input 通道收消息。

  而後在OrganizationController類中定義一個 delete 方法,並注入 SimpleSouce 類,代碼以下:

@Autowired
private SimpleSource simpleSource;

@DeleteMapping(value = "/organization/{orgId}")
public void deleteOne(@PathVariable("orgId") String id) {
    logger.debug("刪除了組織:{}", id);
    simpleSource.publishOrChange("delete", id);
}

  最後在配置文件中加入消息隊列的配置:

# 省略了其餘配置
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: orgChangeTopic
          content-type: application/json
      kafka:
        binder:
          # 替換爲部署kafka的ip和端口
          zk-nodes: 192.168.226.5:2181
          brokers: 192.168.226.5:9092

  如今咱們能夠測試下訪問localhost:5555/apis/org/organization/12,能夠看到控制檯打印消息生成的日誌。

在許可證服務中編寫消息消費者

  集成 redis 的方法,參看。這裏不做說明。

  首先引入依賴,依賴項同上面組織服務。

  而後在 event 包下建立OrgChange的類,代碼以下:

@EnableBinding(Sink.class) //使用Sink接口中定義的通道來監聽傳入消息
public class OrgChange {

    private Logger logger = LoggerFactory.getLogger(OrgChange.class);

    @StreamListener(Sink.INPUT)
    public void loggerSink(OrganizationChange change){
        logger.info("收到一個消息,組織id爲:{},關聯id爲:{}",change.getOrgId(),change.getId());
        //刪除失效緩存
        RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId()));
    }
}

//下面兩個都在util包下
//RedisKeyUtils.java代碼以下
public class RedisKeyUtils {

    private static final String  ORG_CACHE_PREFIX = "orgCache_";

    public static String getOrgCacheKey(String orgId){
        return ORG_CACHE_PREFIX+orgId;
    }
}

//RedisUtils.java代碼以下
@Component
@SuppressWarnings("all")
public class RedisUtils {

    public static RedisTemplate redisTemplate;

    @Autowired
    public void setRedisTemplate(RedisTemplate redisTemplate) {
        RedisUtils.redisTemplate = redisTemplate;
    }

    public static boolean setObj(String key,Object value){
        return setObj(key,value,0);
    }

    /**
     * Description:
     *
     * @author fanxb
     * @date 2019/2/21 15:21
     * @param key 鍵
     * @param value 值
     * @param time 過時時間,單位ms
     * @return boolean 是否成功
     */
    public static boolean setObj(String key,Object value,long time){
        try{
            if(time<=0){
                redisTemplate.opsForValue().set(key,value);
            }else{
                redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS);
            }
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    public static Object get(String key){
        if(key==null){
            return null;
        }
        try{
            Object obj = redisTemplate.opsForValue().get(key);
            return obj;
        }catch (Exception e){
            e.printStackTrace();
            return null;
        }
    }

    public static void del(String... key){
        if(key!=null && key.length>0){
            redisTemplate.delete(CollectionUtils.arrayToList(key));
        }
    }
}

  上面用到的是 Sink.INPUT 通道,這個和以前的 Source.OUTPUT 通道恰好一隊,一個負責收,一個負責發。

  而後修改OrganizationByRibbonService.java文件中的getOrganizationWithRibbon方法:

public Organization getOrganizationWithRibbon(String id) {
        String key = RedisKeyUtils.getOrgCacheKey(id);
        //先從redis緩存取數據
        Object res = RedisUtils.get(key);
        if (res == null) {
            logger.info("當前數據無緩存:{}", id);
            try{

            ResponseEntity<Organization> responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}",
                    HttpMethod.GET, null, Organization.class, id);
            res = responseEntity.getBody();
            RedisUtils.setObj(key, res);
            }catch (Exception e){
                e.printStackTrace();
            }
        } else {
            logger.info("當前數據爲緩存數據:{}", id);
        }
        return (Organization) res;
    }

  最後修改配置文件,爲 input 通道指定 topic,配置以下:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: orgChangeTopic
          content-type: application/json
          # 定義將要消費消息的消費者組的名稱
          # 可能多個服務監聽同一個消息隊列。若是定義了消費者組,那麼同組中只要有一個消費了消息,剩餘的不會再次消費該消息,保證只有消息的
          # 一個副本會被該組的某個實例所消費
          group: licensingGroup
      kafka:
        binder:
          zk-nodes: 192.168.226.5:2181
          brokers: 192.168.226.5:9092

基本和發送的配置相同,只是這裏是爲input通道映射隊列,而後還定義了一個組名,避免一個消息被重複消費。

  如今來屢次訪問localhost:5555/apis/licensingservice/licensingByRibbon/12,能夠看到 licensingservice 控制檯打印數據從緩存中讀取,以下所示:

緩存

而後再以 delete 訪問localhost:5555/apis/org/organization/12清除緩存,再次訪問 licensingservice 服務,結果以下:

清除緩存

自定義通道

  上面用的是Spring Cloud Stream自帶的 input/output 通道,那麼要如何自定義通道呢?下面以自定義customInput/customOutput通道爲例。

自定義發數據通道

public interface CustomOutput {
    @Output("customOutput")
    MessageChannel out();
}

  對於每一個自定義的發數據通道,需使用@OutPut 註解標記的返回 MessageChannel 類的方法。

自定義收數據通道

public interface CustomInput {

    @Input("customInput")
    SubscribableChannel in();
}

  同上,對應自定義的收數據通道,須要使用@Input 註解標記的返回 SubscribableChannel 類的方法。

結束

  看完本篇你應該已經可以在 Spring Cloud 中集成 Spring Cloud Stream 消息隊列了,貌似這個也能用到普通的 spring boot 項目中,比直接集成 mq 更加的優雅。

2019,Fighting!

本篇原創發佈於:FleyX 的我的博客

本篇所用所有代碼:FleyX 的 github

相關文章
相關標籤/搜索