Apollo服務端設計原理剖析

本文摘自於 《Spring Cloud微服務 入門 實戰與進階》 一書。sql

1 配置發佈後的實時推送設計

配置中心最重要的一個特性就是實時推送了,正由於有這個特性,咱們能夠依賴配置中心作不少事情。在我本身開發的Smconf這個配置中心,Smconf是依賴於Zookeeper的Watch機制來實現實時推送。數據庫

來源於Apollo 文檔

上圖簡要描述了配置發佈的大體過程:json

  • 用戶在Portal中進行配置的編輯和發佈
  • Portal會調用Admin Service提供的接口進行發佈操做
  • Admin Service收到請求後,發送ReleaseMessage給各個Config Service,通知Config Service配置發生變化
  • Config Service收到ReleaseMessage後,通知對應的客戶端,基於Http長鏈接實現

2 發送ReleaseMessage的實現方式

ReleaseMessage消息是經過Mysql實現了一個簡單的消息隊列。之全部沒有采用消息中間件,是爲了讓Apollo在部署的時候儘可能簡單,儘量減小外部依賴。bash

release-message-design.png

上圖簡要描述了發送ReleaseMessage的大體過程:app

  • Admin Service在配置發佈後會往ReleaseMessage表插入一條消息記錄
  • Config Service會啓動一個線程定時掃描ReleaseMessage表,去查看是否有新的消息記錄
  • Config Service發現有新的消息記錄,那麼就會通知到全部的消息監聽器
  • 消息監聽器獲得配置發佈的信息後,則會通知對應的客戶端

3 Config Service通知客戶端的實現方式

通知是採用基於Http長鏈接實現,主要分爲下面幾個步驟:ide

  • 客戶端會發起一個Http請求到Config Service的notifications/v2接口
  • v2接口經過Spring DeferredResult把請求掛起,不會當即返回
  • 若是在60秒內沒有該客戶端關心的配置發佈,那麼會返回Http狀態碼304給客戶端
  • 若是發現配置有修改,則會調用DeferredResult的setResult方法,傳入有配置變化的namespace信息,同時該請求會當即返回
  • 客戶端從返回的結果中獲取到配置變化的namespace後,會當即請求Config Service獲取該namespace的最新配置

4 源碼解析實時推送設計

Apollo推送這塊代碼比較多,就不在本書中詳細分析了,我把推送這塊的代碼稍微簡化了下,給你們進行講解,這樣理解起來會更容易。固然我這邊會比較簡單,不少細節就不作考慮了,只是爲了可以讓你們明白Apollo推送的核心原理。微服務

發送ReleaseMessage的邏輯咱們就寫一個簡單的接口,用隊列存儲,測試的時候就調用這個接口模擬配置有更新,發送ReleaseMessage消息。測試

@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
	
    // 模擬配置更新,往裏插入數據表示有更新
    public static Queue<String> queue = new LinkedBlockingDeque<>();

    @GetMapping("/addMsg")
    public String addMsg() {
	    queue.add("xxx");
	    return "success";
    }

}
複製代碼

消息發送以後,前面咱們有講過Config Service會啓動一個線程定時掃描ReleaseMessage表,去查看是否有新的消息記錄,而後取通知客戶端,這邊咱們也啓動一個線程去掃描:ui

@Component
public class ReleaseMessageScanner implements InitializingBean {

	@Autowired
	private NotificationControllerV2 configController;
	
	@Override
	public void afterPropertiesSet() throws Exception {
		// 定時任務從數據庫掃描有沒有新的配置發佈
		new Thread(() -> {
			for (;;) {
				String result = NotificationControllerV2.queue.poll();
				if (result != null) {
					ReleaseMessage message = new ReleaseMessage();
					message.setMessage(result);
					configController.handleMessage(message);
				}
			}
		}).start();;
	}

}
複製代碼

循環去讀取NotificationControllerV2中的隊列,若是有消息的話就構造一個ReleaseMessage的對象,而後調用NotificationControllerV2中的handleMessage()方法進行消息的處理。this

ReleaseMessage就一個字段,模擬消息內容:

public class ReleaseMessage {
	private String message;
	
	public void setMessage(String message) {
		this.message = message;
	}
	public String getMessage() {
		return message;
	}
}
複製代碼

接下來,咱們看handleMessage作了什麼樣的工做

NotificationControllerV2實現了ReleaseMessageListener接口,ReleaseMessageListener中定義了handleMessage()方法。

public interface ReleaseMessageListener {
	void handleMessage(ReleaseMessage message);
}
複製代碼

handleMessage就是當配置發生變化的時候,通知的消息監聽器,消息監聽器獲得配置發佈的信息後,則會通知對應的客戶端:

@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
	
	private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps
			.synchronizedSetMultimap(HashMultimap.create());
	
	@Override
	public void handleMessage(ReleaseMessage message) {
		System.err.println("handleMessage:"+ message);
		List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get("xxxx"));
		for (DeferredResultWrapper deferredResultWrapper : results) {
			List<ApolloConfigNotification> list = new ArrayList<>();
			list.add(new ApolloConfigNotification("application", 1));
			deferredResultWrapper.setResult(list);
		}
	}

}
複製代碼

Apollo的實時推送是基於Spring DeferredResult實現的,在handleMessage()方法中能夠看到是經過deferredResults獲取DeferredResult,deferredResults就是第一行的Multimap,Key其實就是消息內容,Value就是DeferredResult的業務包裝類DeferredResultWrapper,咱們來看下DeferredResultWrapper的代碼:

public class DeferredResultWrapper {
	private static final long TIMEOUT = 60 * 1000;// 60 seconds
	
	private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = 
			new ResponseEntity<>(HttpStatus.NOT_MODIFIED);

	private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;

	public DeferredResultWrapper() {
		result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
	}

	public void onTimeout(Runnable timeoutCallback) {
		result.onTimeout(timeoutCallback);
	}

	public void onCompletion(Runnable completionCallback) {
		result.onCompletion(completionCallback);
	}

	public void setResult(ApolloConfigNotification notification) {
		setResult(Lists.newArrayList(notification));
	}

	public void setResult(List<ApolloConfigNotification> notifications) {
		result.setResult(new ResponseEntity<>(notifications, HttpStatus.OK));
	}

	public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getResult() {
		return result;
	}
}
複製代碼

經過setResult()方法設置返回結果給客戶端,以上就是當配置發生變化,而後經過消息監聽器通知客戶端的原理,那麼客戶端是在何時接入的呢?

@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
	
	// 模擬配置更新,往裏插入數據表示有更新
	public static Queue<String> queue = new LinkedBlockingDeque<>();

	private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps
			.synchronizedSetMultimap(HashMultimap.create());
	
	@GetMapping("/getConfig")
	public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getConfig() {
		DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
		List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications();
		if (!CollectionUtils.isEmpty(newNotifications)) {
			deferredResultWrapper.setResult(newNotifications);
		} else {
			deferredResultWrapper.onTimeout(() -> {
				System.err.println("onTimeout");
			});

			deferredResultWrapper.onCompletion(() -> {
				System.err.println("onCompletion");
			});
			deferredResults.put("xxxx", deferredResultWrapper);
		}
		return deferredResultWrapper.getResult();
	}

	private List<ApolloConfigNotification> getApolloConfigNotifications() {
		List<ApolloConfigNotification> list = new ArrayList<>();
		String result = queue.poll();
		if (result != null) {
			list.add(new ApolloConfigNotification("application", 1));
		}
		return list;
	}
}
複製代碼

NotificationControllerV2中提供了一個/getConfig的接口,客戶端在啓動的時候會調用這個接口,這個時候會執行getApolloConfigNotifications()方法去獲取有沒有配置的變動信息,若是有的話證實配置修改過,直接就經過deferredResultWrapper.setResult(newNotifications);返回結果給客戶端了,客戶端收到結果後從新拉取配置的信息進行覆蓋本地的配置。

若是getApolloConfigNotifications()方法沒有返回配置修改的信息,證實配置沒有發生修改,就將DeferredResultWrapper對象添加到deferredResults中,等待後續配置發生變化時消息監聽器進行通知。

同時這個請求就會掛起,不會當即返回,掛起是經過DeferredResultWrapper中的下面的代碼實現的:

private static final long TIMEOUT = 60 * 1000;// 60 seconds
	
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = 
			new ResponseEntity<>(HttpStatus.NOT_MODIFIED);

private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;

public DeferredResultWrapper() {
	result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}
複製代碼

在建立DeferredResult對象的時候指定了超時的時間和超時後返回的響應碼,若是60秒內沒有消息監聽器進行通知,那麼這個請求就會超時,超時後客戶端就收到的響應碼就是304。

整個Config Service的流程就走完了,接下來咱們看客戶端是怎麼實現的,咱們簡單的寫個測試類模擬客戶端註冊:

public class ClientTest {
	public static void main(String[] args) {
		reg();
	}

	private static void reg() {
		System.err.println("註冊");
		String result = request("http://localhost:8081/getConfig");
		if (result != null) {
			// 配置有更新,從新拉取配置
			// ......
		}
		// 從新註冊
		reg();
	}
	
	private static String request(String url) {
		HttpURLConnection connection = null;
		BufferedReader reader = null;
		try {
			URL getUrl = new URL(url);
			connection = (HttpURLConnection) getUrl.openConnection();
			connection.setReadTimeout(90000);
			connection.setConnectTimeout(3000);
			connection.setRequestMethod("GET");
			connection.setRequestProperty("Accept-Charset", "utf-8");
			connection.setRequestProperty("Content-Type", "application/json");
			connection.setRequestProperty("Charset", "UTF-8");
			System.out.println(connection.getResponseCode());
			if (200 == connection.getResponseCode()) {
				reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
				StringBuilder result = new StringBuilder();
				String line = null;
				while ((line = reader.readLine()) != null) {
					result.append(line);
				}
				System.out.println("結果 " + result);
				return result.toString();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				connection.disconnect();
			}
		}
		return null;
	}
}

複製代碼

首先啓動/getConfig接口所在的服務,而後啓動客戶端,客戶端就會發起註冊請求,若是有修改直接獲取到結果,進行配置的更新操做。若是無修改,請求會掛起,這邊客戶端設置的讀取超時時間是90秒,大於服務端的60秒超時時間。

每次收到結果後,不管是有修改仍是沒修改,都必須從新進行註冊,經過這樣的方式就能夠達到配置實時推送的效果。

咱們能夠調用以前寫的/addMsg接口來模擬配置發生變化,調用以後客戶端就能立刻獲得返回結果。

本文摘自於**《Spring Cloud微服務 入門 實戰與進階》**一書。

去年出版的**《Spring Cloud微服務:全棧技術與案例解析》**一書,獲得了你們的支持以及反饋,基於你們的反饋,從新進行了更正和改進。

基於比較穩定的 Spring Cloud Finchley.SR2 版本和 Spring Boot 2.0.6.RELEASE 版本編寫。

同時將示列代碼進行標準的歸檔,以前的都在一塊兒,不方便讀者參考和運行。

同時還增長了像Apollo,Spring Cloud Gateway,生產實踐經驗等新的內容。

相關文章
相關標籤/搜索