本文摘自於 《Spring Cloud微服務 入門 實戰與進階》 一書。sql
配置中心最重要的一個特性就是實時推送了,正由於有這個特性,咱們能夠依賴配置中心作不少事情。在我本身開發的Smconf這個配置中心,Smconf是依賴於Zookeeper的Watch機制來實現實時推送。數據庫
上圖簡要描述了配置發佈的大體過程:json
ReleaseMessage消息是經過Mysql實現了一個簡單的消息隊列。之全部沒有采用消息中間件,是爲了讓Apollo在部署的時候儘可能簡單,儘量減小外部依賴。bash
上圖簡要描述了發送ReleaseMessage的大體過程:app
通知是採用基於Http長鏈接實現,主要分爲下面幾個步驟:ide
Apollo推送這塊代碼比較多,就不在本書中詳細分析了,我把推送這塊的代碼稍微簡化了下,給你們進行講解,這樣理解起來會更容易。固然我這邊會比較簡單,不少細節就不作考慮了,只是爲了可以讓你們明白Apollo推送的核心原理。微服務
發送ReleaseMessage的邏輯咱們就寫一個簡單的接口,用隊列存儲,測試的時候就調用這個接口模擬配置有更新,發送ReleaseMessage消息。測試
@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
// 模擬配置更新,往裏插入數據表示有更新
public static Queue<String> queue = new LinkedBlockingDeque<>();
@GetMapping("/addMsg")
public String addMsg() {
queue.add("xxx");
return "success";
}
}
複製代碼
消息發送以後,前面咱們有講過Config Service會啓動一個線程定時掃描ReleaseMessage表,去查看是否有新的消息記錄,而後取通知客戶端,這邊咱們也啓動一個線程去掃描:ui
@Component
public class ReleaseMessageScanner implements InitializingBean {
@Autowired
private NotificationControllerV2 configController;
@Override
public void afterPropertiesSet() throws Exception {
// 定時任務從數據庫掃描有沒有新的配置發佈
new Thread(() -> {
for (;;) {
String result = NotificationControllerV2.queue.poll();
if (result != null) {
ReleaseMessage message = new ReleaseMessage();
message.setMessage(result);
configController.handleMessage(message);
}
}
}).start();;
}
}
複製代碼
循環去讀取NotificationControllerV2中的隊列,若是有消息的話就構造一個ReleaseMessage的對象,而後調用NotificationControllerV2中的handleMessage()方法進行消息的處理。this
ReleaseMessage就一個字段,模擬消息內容:
public class ReleaseMessage {
private String message;
public void setMessage(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
複製代碼
接下來,咱們看handleMessage作了什麼樣的工做
NotificationControllerV2實現了ReleaseMessageListener接口,ReleaseMessageListener中定義了handleMessage()方法。
public interface ReleaseMessageListener {
void handleMessage(ReleaseMessage message);
}
複製代碼
handleMessage就是當配置發生變化的時候,通知的消息監聽器,消息監聽器獲得配置發佈的信息後,則會通知對應的客戶端:
@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps
.synchronizedSetMultimap(HashMultimap.create());
@Override
public void handleMessage(ReleaseMessage message) {
System.err.println("handleMessage:"+ message);
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get("xxxx"));
for (DeferredResultWrapper deferredResultWrapper : results) {
List<ApolloConfigNotification> list = new ArrayList<>();
list.add(new ApolloConfigNotification("application", 1));
deferredResultWrapper.setResult(list);
}
}
}
複製代碼
Apollo的實時推送是基於Spring DeferredResult實現的,在handleMessage()方法中能夠看到是經過deferredResults獲取DeferredResult,deferredResults就是第一行的Multimap,Key其實就是消息內容,Value就是DeferredResult的業務包裝類DeferredResultWrapper,咱們來看下DeferredResultWrapper的代碼:
public class DeferredResultWrapper {
private static final long TIMEOUT = 60 * 1000;// 60 seconds
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST =
new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;
public DeferredResultWrapper() {
result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}
public void onTimeout(Runnable timeoutCallback) {
result.onTimeout(timeoutCallback);
}
public void onCompletion(Runnable completionCallback) {
result.onCompletion(completionCallback);
}
public void setResult(ApolloConfigNotification notification) {
setResult(Lists.newArrayList(notification));
}
public void setResult(List<ApolloConfigNotification> notifications) {
result.setResult(new ResponseEntity<>(notifications, HttpStatus.OK));
}
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getResult() {
return result;
}
}
複製代碼
經過setResult()方法設置返回結果給客戶端,以上就是當配置發生變化,而後經過消息監聽器通知客戶端的原理,那麼客戶端是在何時接入的呢?
@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
// 模擬配置更新,往裏插入數據表示有更新
public static Queue<String> queue = new LinkedBlockingDeque<>();
private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps
.synchronizedSetMultimap(HashMultimap.create());
@GetMapping("/getConfig")
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getConfig() {
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications();
if (!CollectionUtils.isEmpty(newNotifications)) {
deferredResultWrapper.setResult(newNotifications);
} else {
deferredResultWrapper.onTimeout(() -> {
System.err.println("onTimeout");
});
deferredResultWrapper.onCompletion(() -> {
System.err.println("onCompletion");
});
deferredResults.put("xxxx", deferredResultWrapper);
}
return deferredResultWrapper.getResult();
}
private List<ApolloConfigNotification> getApolloConfigNotifications() {
List<ApolloConfigNotification> list = new ArrayList<>();
String result = queue.poll();
if (result != null) {
list.add(new ApolloConfigNotification("application", 1));
}
return list;
}
}
複製代碼
NotificationControllerV2中提供了一個/getConfig的接口,客戶端在啓動的時候會調用這個接口,這個時候會執行getApolloConfigNotifications()方法去獲取有沒有配置的變動信息,若是有的話證實配置修改過,直接就經過deferredResultWrapper.setResult(newNotifications);返回結果給客戶端了,客戶端收到結果後從新拉取配置的信息進行覆蓋本地的配置。
若是getApolloConfigNotifications()方法沒有返回配置修改的信息,證實配置沒有發生修改,就將DeferredResultWrapper對象添加到deferredResults中,等待後續配置發生變化時消息監聽器進行通知。
同時這個請求就會掛起,不會當即返回,掛起是經過DeferredResultWrapper中的下面的代碼實現的:
private static final long TIMEOUT = 60 * 1000;// 60 seconds
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST =
new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;
public DeferredResultWrapper() {
result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}
複製代碼
在建立DeferredResult對象的時候指定了超時的時間和超時後返回的響應碼,若是60秒內沒有消息監聽器進行通知,那麼這個請求就會超時,超時後客戶端就收到的響應碼就是304。
整個Config Service的流程就走完了,接下來咱們看客戶端是怎麼實現的,咱們簡單的寫個測試類模擬客戶端註冊:
public class ClientTest {
public static void main(String[] args) {
reg();
}
private static void reg() {
System.err.println("註冊");
String result = request("http://localhost:8081/getConfig");
if (result != null) {
// 配置有更新,從新拉取配置
// ......
}
// 從新註冊
reg();
}
private static String request(String url) {
HttpURLConnection connection = null;
BufferedReader reader = null;
try {
URL getUrl = new URL(url);
connection = (HttpURLConnection) getUrl.openConnection();
connection.setReadTimeout(90000);
connection.setConnectTimeout(3000);
connection.setRequestMethod("GET");
connection.setRequestProperty("Accept-Charset", "utf-8");
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Charset", "UTF-8");
System.out.println(connection.getResponseCode());
if (200 == connection.getResponseCode()) {
reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line = null;
while ((line = reader.readLine()) != null) {
result.append(line);
}
System.out.println("結果 " + result);
return result.toString();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (connection != null) {
connection.disconnect();
}
}
return null;
}
}
複製代碼
首先啓動/getConfig接口所在的服務,而後啓動客戶端,客戶端就會發起註冊請求,若是有修改直接獲取到結果,進行配置的更新操做。若是無修改,請求會掛起,這邊客戶端設置的讀取超時時間是90秒,大於服務端的60秒超時時間。
每次收到結果後,不管是有修改仍是沒修改,都必須從新進行註冊,經過這樣的方式就能夠達到配置實時推送的效果。
咱們能夠調用以前寫的/addMsg接口來模擬配置發生變化,調用以後客戶端就能立刻獲得返回結果。
本文摘自於**《Spring Cloud微服務 入門 實戰與進階》**一書。
去年出版的**《Spring Cloud微服務:全棧技術與案例解析》**一書,獲得了你們的支持以及反饋,基於你們的反饋,從新進行了更正和改進。
基於比較穩定的 Spring Cloud Finchley.SR2 版本和 Spring Boot 2.0.6.RELEASE 版本編寫。
同時將示列代碼進行標準的歸檔,以前的都在一塊兒,不方便讀者參考和運行。
同時還增長了像Apollo,Spring Cloud Gateway,生產實踐經驗等新的內容。