Gateway 動態路由(金絲雀發佈 / 灰度發佈)

參考https://windmt.com/2019/01/20/spring-cloud-20-gateway-dynamic-routing/前端

 

爲何須要動態路由?

以前說過 Gateway 的路由配置,經常使用的有兩種方式:web

  • Fluent API
  • 配置文件

這二者之間由於配置文件的方式修改起來比較靈活,而後經過 Stream+Bus 的方式刷新路由配置,因此你們使用的比較多。redis

可是若是咱們在網關層須要相似於 Canary Release(金絲雀發佈,也稱灰度發佈)這樣的能力的話,那麼以上兩種配置路由的方式就都顯得太笨拙了。spring

礦井中的金絲雀
17 世紀,英國礦井工人發現,金絲雀對瓦斯這種氣體十分敏感。空氣中哪怕有極其微量的瓦斯,金絲雀也會中止歌唱;而當瓦斯含量超過必定限度時,雖然魯鈍的人類毫無察覺,金絲雀卻早已毒發身亡。當時在採礦設備相對簡陋的條件下,工人們每次下井都會帶上一隻金絲雀做爲 「瓦斯檢測指標」,以便在危險情況下緊急撤離。數據庫

Spring Cloud Gateway 中雖然已經提供了關於權重的斷言,咱們在配置文件中能夠直接這樣配置json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
spring:
application:
name: cloud-gateway
cloud:
gateway:
routes:
- id: service1_prod
uri: http://localhost:8081
predicates:
- Path=/test
- Weight=service1, 90
- id: service1_canary
uri: http://localhost:8082
predicates:
- Path=/test
- Weight=service1, 10

 


以實現 Canary Release 的能力,可是每次發佈都配置一遍未免太過麻煩了。api

出於 「懶」 的本性,咱們固然但願在發佈腳本里能在運行時直接動態修改service1_prodservice1_canary的權重,這樣咱們就不用手動修改還提心吊膽的擔憂改錯了。app

這其實就是 「動態路由」 了。分佈式

Spring Cloud Gateway 默認動態路由實現

Spring Cloud Gateway 在去年 6 月份發佈了 2.0 第一個 release 版本,其實已經自帶動態路由了, 可是官方文檔並無講如何動態配置。
不過咱們翻看 Spring Cloud Gateway 源碼,會發現類 org.springframework.cloud.gateway.actuate.GatewayControllerEndpoint 中提供了網關配置的 RESTful 接口,默認是沒有啓用的。
在配置類 org.springframework.cloud.gateway.config.GatewayAutoConfiguration 中配置了 GatewayControllerEndpointide

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
@ConditionalOnClass(Health.class)
protected static class GatewayActuatorConfiguration {

@Bean
@ConditionalOnEnabledEndpoint
public GatewayControllerEndpoint gatewayControllerEndpoint(RouteDefinitionLocator routeDefinitionLocator, List<GlobalFilter> globalFilters,
List<GatewayFilterFactory> GatewayFilters, RouteDefinitionWriter routeDefinitionWriter,
RouteLocator routeLocator) {
return new GatewayControllerEndpoint(routeDefinitionLocator, globalFilters, GatewayFilters, routeDefinitionWriter, routeLocator);
}
}

 

也就是說在存在org.springframework.boot.actuate.health.Health時啓用,咱們想用自帶的接口就須要添加 actuator 依賴

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

 

而且還要將 actuator 的端點暴露出來

1
2
3
4
5
management:
endpoints:
web:
exposure:
include: "*"

 

而後咱們就能經過自帶的GatewayControllerEndpoint的 RESTful API 修改運行時的路由了
GatewayControllerEndpoint

GatewayControllerEndpoint

 

此時咱們已經能實現以前的目標了
JVM 級別的動態路由

JVM 級別的動態路由


可是 Gateway 自帶的這套是僅僅支持了 JVM 級別的動態路由,不能序列化存儲的。
默認的實現:

 

1
2
3
4
5
6
// GatewayAutoConfiguration
@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
return new InMemoryRouteDefinitionRepository();
}

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// InMemoryRouteDefinitionRepository
public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {

private final Map<String, RouteDefinition> routes = synchronizedMap(new LinkedHashMap<String, RouteDefinition>());

@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap( r -> {
routes.put(r.getId(), r);
return Mono.empty();
});
}

@Override
public Mono<Void> delete(Mono<String> routeId) {
return routeId.flatMap(id -> {
if (routes.containsKey(id)) {
routes.remove(id);
return Mono.empty();
}
return Mono.defer(() -> Mono.error(new NotFoundException("RouteDefinition not found: "+routeId)));
});
}

@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(routes.values());
}
}

這樣就致使咱們的路由配置要分散存儲在兩個地方:Config Server 和 內存中,很是不利於管理。
另外在生產環境使用的話,Gateway 必定是一個集羣,一個個去調用每一個實例的 refresh 端口並不利於擴展。

Spring Cloud Gateway 路由加載過程

看了上面的源碼後,是否是感受其實咱們徹底能夠替換掉InMemoryRouteDefinitionRepository來用 DB 或 Redis 作持久化存儲,來實現持久化的動態路由。
不過在動手以前,咱們仍是要先看一下 Gateway 的路由加載過程,這樣才更好的實現咱們的需求。

Gateway 路由加載過程

Gateway 路由加載過程

 

  1. DispatcherHandler 接管用戶請求
  2. RoutePredicateHandlerMapping 路由匹配
    1. 根據 RouteLocator 獲取 RouteDefinitionLocator
    2. 返回多個 RouteDefinitionLocator.getRouteDefinitions() 的路由定義信息
  3. FilteringWebHandler 執行路由定義中的 filter 最後路由到具體的業務服務中

從加載流程上能夠看出,咱們要擴展動態路由的話,最核心的是要從RouteDefinitionLocator上入手。

持久化的分佈式動態路由組件

咱們如今能夠對 Gateway 作一些擴展來改善上述的問題。

擴展思路

  1. 增長一個路由管理模塊
    • 參考GatewayControllerEndpoint實現
    • 路由配置所有存儲在 MySQL 中(Config Server 還須要,但再也不存儲路由配置了)
    • 啓動時將路由配置加載到 Redis 中,運行時雙寫
    • 提供 RESTful API 以便腳本調用
    • 前端頁面能夠配合 JSON Viewer 或相似插件,便於修改展現
  2. 網關模塊擴展
    • 提供一個RouteDefinitionRepository,使它直接從 Redis 獲取路由配置
    • 網關集羣刷新路由配置,這裏用 Redis Pub/Sub 來充當 MQ 來實現

持久化的動態路由

持久化的動態路由


注:用 Redis 一方面是爲了支持 WebFlux(Reactor) 的背壓(Backpressure),另外一方面是爲了刷新 Gateway 集羣。

 

具體實現

路由管理模塊

數據庫的表結構
表結構

表結構

 

定義相關實體,這裏參考 Gateway 源碼的相關定義,涉及到三個類:

  • org.springframework.cloud.gateway.route.RouteDefinition
  • org.springframework.cloud.gateway.handler.predicate.PredicateDefinition
  • org.springframework.cloud.gateway.filter.FilterDefinition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class GatewayRoute {
private String routeId;
private String uri;
private Integer order;
private List<GatewayPredicateDefinition> predicates;
private List<GatewayFilterDefinition> filters;

private Long id;
private LocalDateTime createTime;
private LocalDateTime updateTime;
private EntityStatus status;
}

public class GatewayPredicateDefinition {
private String name;
private Map<String, String> args = new LinkedHashMap<>();
}

public class GatewayFilterDefinition {
private String name;
private Map<String, String> args = new LinkedHashMap<>();

}

Controller 參考GatewayControllerEndpoint實現便可。由於個人實現是軟刪除,因此對建立 / 更新作了明確區分。
注意裏邊有個refresh()方法,並非像GatewayControllerEndpoint同樣發RefreshRoutesEvent,而是往 Redis publish 了一條消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@Slf4j
@RestController
@RequestMapping("")
public class GatewayDynamicRouteController {


@Autowired
private GatewayRouteService gatewayRouteService;

/**
* 建立路由
*
* @param model
* @return
*/
@PostMapping("/routes")
public Mono<ResponseEntity<Map>> create(@RequestBody Mono<GatewayRoute> model) {
return model.flatMap(r -> {
String routeId = r.getRouteId();
return gatewayRouteService.findOneByRouteId(routeId)
.defaultIfEmpty(new GatewayRoute())
.flatMap(old -> {
if (old.getId() != null) {
return Mono.defer(() -> Mono.just(ResponseEntity.status(HttpStatus.FORBIDDEN).body(buildRetBody(403, "routeId " + routeId + " 已存在", null))));
}
log.info("[ROUTE] <biz> creating. {}", defer(() -> JsonUtils.toJSON(r)));
return gatewayRouteService.insert(Mono.just(r))
.flatMap(id -> {
return Mono.just((ResponseEntity.created(URI.create("/routes/" + id))
.body(buildRetBody(0, "success", ImmutableMap.of("id", id)))));
});
});
});
}

/**
* 修改路由
*
* @param id
* @param model
* @return
*/
@PutMapping("/routes/{id}")
public Mono<ResponseEntity<Map>> update(@PathVariable Long id, @RequestBody Mono<GatewayRoute> model) {
return model.flatMap(r -> {
String routeId = r.getRouteId();
return gatewayRouteService.findOneById(id)
.flatMap(old -> {
if (old == null) {
return Mono.defer(() -> Mono.just(ResponseEntity.status(HttpStatus.FORBIDDEN).body(buildRetBody(403, "routeId " + routeId + " 還未建立", null))));
}
log.info("[ROUTE] <biz> updating. id:{}\n before:{}\n after:{}",
id, defer(() -> JsonUtils.toJSON(old)), defer(() -> JsonUtils.toJSON(r)));
return gatewayRouteService.update(Mono.just(r))
.then(Mono.defer(() -> Mono.just((ResponseEntity.ok(buildRetBody(0, "success", null))))));
});
});
}

/**
* @param id
* @param status 0 正常,1 刪除
* @return
*/
@PutMapping("/routes/{id}/{status}")
public Mono<ResponseEntity<Object>> updateStatus(@PathVariable Long id, @PathVariable Integer status) {
EntityStatus entityStatus = EntityStatus.fromValue(status);
if (entityStatus == null) {
return Mono.defer(() -> Mono.just(ResponseEntity.status(HttpStatus.BAD_REQUEST).build()));
}
return gatewayRouteService.updateStatus(id, entityStatus)
.then(Mono.defer(() -> Mono.just(ResponseEntity.ok().build())))
.onErrorResume(t -> t instanceof NotFoundException, t -> Mono.just(ResponseEntity.notFound().build()));
}

/**
* 獲取單個路由信息
*
* @param id
* @return
*/
@GetMapping("/routes/{id}")
public Mono<ResponseEntity<GatewayRoute>> route(@PathVariable Long id) {
return gatewayRouteService.findOneById(id)
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}

/**
* 刷新路由
*
* @return
*/
@PostMapping("/routes/refresh")
public Mono<ResponseEntity<Object>> refresh() {
return gatewayRouteService.refresh()
.map(aLong -> {
if (aLong > 0) {
return ResponseEntity.ok().build();
} else {
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
}
});
}

private Map<String, Object> buildRetBody(int code, String msg, Object data) {
Map<String, Object> map = new HashMap<>();
map.put("code", code);
map.put("message", msg);
map.put("data", data);
return map;
}


}

 

網關模塊

重寫一個新的RouteDefinitionRepository,主要是要實現getRouteDefinitions()方法。
對於savedelete這兩個方法,我是故意不處理的,由於路由的管理均在上邊的路由管理模塊實現了,網關模塊只關注路由的獲取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@Component
public class DynamicRouteDefinitionRepository implements RouteDefinitionRepository {

@Autowired
private ReactiveRedisTemplate<String, String> redisTemplate;

@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return redisTemplate.opsForHash()
.values(GATEWAY_ROUTES)
.map(json -> JsonUtils.fromJson(json.toString(), RouteDefinition.class));
}

@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return Mono.empty();
}

@Override
public Mono<Void> delete(Mono<String> routeId) {
return Mono.empty();
}
}

 

除此以外,爲了配合路由管理模塊實現網關集羣的刷新路由配置,網關模塊裏還須要加一個 Redis 的配置以訂閱刷新消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
@Configuration
public class RedisConfig {

@Bean
ReactiveRedisMessageListenerContainer container(GatewayRouteService routeService, ReactiveRedisConnectionFactory connectionFactory) {
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
container.destroyLater().subscribe();
}));
container.receive(topic())
.map(p -> p.getMessage())
.subscribe(message -> {
log.info("Received <{}>", message);
routeService.publishRefreshEvent();
});

return container;
}


@Bean
public ChannelTopic topic() {
return new ChannelTopic("gateway-route-refresh-topic");
}

}

 

自此也就大功告成了~

配置格式

這樣的動態路由,是用 JSON 格式來配置的,若是格式不對,但是要報 500 錯誤的!
這裏簡單舉個栗子:

若是咱們在配置文件裏要配的路由是這樣

1
2
3
4
5
6
7
8
9
10
11
12
spring:
cloud:
gateway:
routes:
- id: user-api
uri: http://user-api:8080
order: 0
predicates:
- Path=/user/**
- Weight=user-service, 90
filters:
- StripPrefix=1

 

那麼翻譯成 JSON 格式就是要這樣(其中 status 是我本身加的,能夠忽略)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"routeId": "user-api",
"uri": "http://user-api:8080",
"order": 0,
"predicates": [
{
"name": "Path",
"args": {
"pattern": "/user/**"
}
},
{
"name": "Weight",
"args": {
"weight.group": "user-service",
"weight.weight": "90"
}
}
],
"filters": [
{
"name": "StripPrefix",
"args": {
"parts": "1"
}
}
],
"status": 0
}

 

至於其中predicatefiltername字段都還好理解,即便是 yaml 格式的咱們也是要寫的。這個有相關的文檔,目前的規則就是RoutePredicateFactoryGatewayFilterFactory這兩個接口下全部的實現類去掉這兩個後綴後的名字(見org.springframework.cloud.gateway.support.NameUtils)。

那麼args裏邊的 key 的名字又是哪來的呢?
這個沒有文檔,翻看源碼發現此處的 key 有兩種配置方式:

    1. _genkey_0_genkey_1_genkey_n這種形式,比較方便可是可讀性比較差,還得注意順序。(這個的源碼也在 NameUtils 裏)
    2. 另外一種就是像我上邊例子中寫的,這須要去各個RoutePredicateFactoryGatewayFilterFactory的源碼找對應的命名規則。(還須要參考org.springframework.cloud.gateway.support.ShortcutConfigurable
相關文章
相關標籤/搜索