springcloud 總集:www.tapme.top/blog/detail…java
代碼見文章結尾node
想一想日常生活中作飯的場景,在用電飯鍋作飯的同時,咱們能夠洗菜、切菜,等待電飯鍋發出飯作好的提示咱們回去拔下電飯鍋電源(或者什麼也不知讓它處於保溫狀態),反正這個時候咱們知道飯作好了,接下來能夠炒菜了。從這裏能夠看出咱們在平常生活中與世界的互動並非同步的、線性的,不是簡單的請求--響應模型。它是事件驅動的,咱們不斷的發送消息、接受消息、處理消息。git
一樣在軟件世界中也不全是請求--響應模型,也會須要進行異步的消息通訊。使用消息實現事件通訊的概念被稱爲消息驅動架構(Event Driven Architecture,EDA),也被稱爲消息驅動架構(Message Driven Architecture,MDA)。使用這類架構能夠構建高度解耦的系統,該系統可以對變化作出響應,且不須要與特定的庫或者服務緊密耦合。github
在 Spring Cloud 項目中可使用Spirng Cloud Stream垂手可得的構建基於消息傳遞的解決方案。redis
要解答這個問題,讓咱們從一個例子開始,以前一直使用的兩個服務:許可證服務和組織服務。每次對許可證服務進行請求,許可證服務都要經過 http 請求到組織服務上查詢組織信息。顯而易見此次額外的 http 請求會花費較長的時間。若是可以將緩存組織數據的讀操做,將會大幅提升許可證服務的響應時間。可是緩存數據有以下 2 個要求:spring
要實現上面的要求,如今有兩種辦法。docker
使用同步請求--響應模型來實現。組織服務在組織數據變化時調用許可證服務的接口通知組織服務已經變化,或者直接操做許可證服務的緩存。json
使用事件驅動。組織服務發出一個異步消息。許可證服務收到該消息後清除對應的緩存。api
許可證服務在 redis 中緩存從組織服務中查詢到的服務信息,當組織數據更新時,組織服務同步 http 請求通知許可證服務數據過時。這種方式有如下幾個問題:緩存
一樣的許可證服務在 redis 中緩存從組織服務中查詢到的服務信息,當組織數據更新時,組織服務將更新信息寫入到隊列中。許可證服務監聽消息隊列。使用消息傳遞有一下 4 個好處:
spring cloud 項目中能夠經過 spring cloud stream 框架來輕鬆集成消息傳遞。該框架最大的特色是抽象了消息傳遞平臺的細節,所以能夠在支持的消息隊列中隨意切換(包括 Apache Kafka 和 RabbitMQ)。
spring cloud stream 中有 4 個組件涉及到消息發佈和消息消費,分別爲:
發射器
當一個服務準備發送消息時,它將使用發射器發佈消息。發射器是一個 Spring 註解接口,它接收一個普通 Java 對象,表示要發佈的消息。發射器接收消息,而後序列化(默認序列化爲 JSON)後發佈到通道中。
通道
通道是對隊列的一個抽象。通道名稱是與目標隊列名稱相關聯的。可是隊列名稱並不會直接公開在代碼中,代碼永遠只會使用通道名。
綁定器
綁定器是 spring cloud stream 框架的一部分,它是與特定消息平臺對話的 Spring 代碼。經過綁定器,使得開發人員沒必要依賴於特定平臺的庫和 API 來發布和消費消息。
接收器
服務經過接收器來從隊列中接收消息,並將消息反序列化。
處理邏輯以下:
繼續使用以前的項目,在許可證服務中緩存組織數據到 redis 中。
爲方便起見,使用 docker 建立 redis,創建腳本以下:
docker run -itd --name redis --net host redis:
複製代碼
首先在 organization 服務中引入 spring cloud stream 和 kafka 的依賴。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
複製代碼
而後在 events 類中編寫SimpleSouce
類,用於組織數據修改,產生一條消息到隊列中。代碼以下:
@EnableBinding(Source.class)
public class SimpleSource {
private Logger logger = LoggerFactory.getLogger(SimpleSource.class);
private Source source;
@Autowired
public SimpleSource(Source source) {
this.source = source;
}
public void publishOrChange(String action, String orgId) {
logger.info("在請求:{}中,發送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId);
OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id);
source.output().send(MessageBuilder.withPayload(change).build());
}
}
複製代碼
這裏使用的是默認通道,Source 類定義的 output 通道發消息。後面經過 Sink 定義的 input 通道收消息。
而後在OrganizationController
類中定義一個 delete 方法,並注入 SimpleSouce 類,代碼以下:
@Autowired
private SimpleSource simpleSource;
@DeleteMapping(value = "/organization/{orgId}")
public void deleteOne(@PathVariable("orgId") String id) {
logger.debug("刪除了組織:{}", id);
simpleSource.publishOrChange("delete", id);
}
複製代碼
最後在配置文件中加入消息隊列的配置:
# 省略了其餘配置
spring:
cloud:
stream:
bindings:
output:
destination: orgChangeTopic
content-type: application/json
kafka:
binder:
# 替換爲部署kafka的ip和端口
zk-nodes: 192.168.226.5:2181
brokers: 192.168.226.5:9092
複製代碼
如今咱們能夠測試下訪問localhost:5555/apis/org/organization/12,能夠看到控制檯打印消息生成的日誌。
首先引入依賴,依賴項同上面組織服務。
而後在 event 包下建立OrgChange
的類,代碼以下:
@EnableBinding(Sink.class) //使用Sink接口中定義的通道來監聽傳入消息
public class OrgChange {
private Logger logger = LoggerFactory.getLogger(OrgChange.class);
@StreamListener(Sink.INPUT)
public void loggerSink(OrganizationChange change){
logger.info("收到一個消息,組織id爲:{},關聯id爲:{}",change.getOrgId(),change.getId());
//刪除失效緩存
RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId()));
}
}
//下面兩個都在util包下
//RedisKeyUtils.java代碼以下
public class RedisKeyUtils {
private static final String ORG_CACHE_PREFIX = "orgCache_";
public static String getOrgCacheKey(String orgId){
return ORG_CACHE_PREFIX+orgId;
}
}
//RedisUtils.java代碼以下
@Component
@SuppressWarnings("all")
public class RedisUtils {
public static RedisTemplate redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
RedisUtils.redisTemplate = redisTemplate;
}
public static boolean setObj(String key,Object value){
return setObj(key,value,0);
}
/** * Description: * * @author fanxb * @date 2019/2/21 15:21 * @param key 鍵 * @param value 值 * @param time 過時時間,單位ms * @return boolean 是否成功 */
public static boolean setObj(String key,Object value,long time){
try{
if(time<=0){
redisTemplate.opsForValue().set(key,value);
}else{
redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS);
}
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
public static Object get(String key){
if(key==null){
return null;
}
try{
Object obj = redisTemplate.opsForValue().get(key);
return obj;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public static void del(String... key){
if(key!=null && key.length>0){
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
複製代碼
上面用到的是 Sink.INPUT 通道,這個和以前的 Source.OUTPUT 通道恰好一隊,一個負責收,一個負責發。
而後修改OrganizationByRibbonService.java
文件中的getOrganizationWithRibbon
方法:
public Organization getOrganizationWithRibbon(String id) {
String key = RedisKeyUtils.getOrgCacheKey(id);
//先從redis緩存取數據
Object res = RedisUtils.get(key);
if (res == null) {
logger.info("當前數據無緩存:{}", id);
try{
ResponseEntity<Organization> responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}",
HttpMethod.GET, null, Organization.class, id);
res = responseEntity.getBody();
RedisUtils.setObj(key, res);
}catch (Exception e){
e.printStackTrace();
}
} else {
logger.info("當前數據爲緩存數據:{}", id);
}
return (Organization) res;
}
複製代碼
最後修改配置文件,爲 input 通道指定 topic,配置以下:
spring:
cloud:
stream:
bindings:
input:
destination: orgChangeTopic
content-type: application/json
# 定義將要消費消息的消費者組的名稱
# 可能多個服務監聽同一個消息隊列。若是定義了消費者組,那麼同組中只要有一個消費了消息,剩餘的不會再次消費該消息,保證只有消息的
# 一個副本會被該組的某個實例所消費
group: licensingGroup
kafka:
binder:
zk-nodes: 192.168.226.5:2181
brokers: 192.168.226.5:9092
複製代碼
基本和發送的配置相同,只是這裏是爲input
通道映射隊列,而後還定義了一個組名,避免一個消息被重複消費。
如今來屢次訪問localhost:5555/apis/licensingservice/licensingByRibbon/12,能夠看到 licensingservice 控制檯打印數據從緩存中讀取,以下所示:
而後再以 delete 訪問localhost:5555/apis/org/organization/12清除緩存,再次訪問 licensingservice 服務,結果以下:
上面用的是Spring Cloud Stream
自帶的 input/output 通道,那麼要如何自定義通道呢?下面以自定義customInput/customOutput
通道爲例。
public interface CustomOutput {
@Output("customOutput")
MessageChannel out();
}
複製代碼
對於每一個自定義的發數據通道,需使用@OutPut 註解標記的返回 MessageChannel 類的方法。
public interface CustomInput {
@Input("customInput")
SubscribableChannel in();
}
複製代碼
同上,對應自定義的收數據通道,須要使用@Input 註解標記的返回 SubscribableChannel 類的方法。
看完本篇你應該已經可以在 Spring Cloud 中集成 Spring Cloud Stream 消息隊列了,貌似這個也能用到普通的 spring boot 項目中,比直接集成 mq 更加的優雅。
2019,Fighting!
本篇原創發佈於:FleyX 的我的博客
本篇所用所有代碼:FleyX 的 github
掃碼關注微信公衆號:FleyX 學習筆記,獲取更多幹貨