SOFARegistry 是螞蟻金服開源的生產級、高時效、高可用的服務註冊中心。java
本系列文章重點在於分析設計和架構,即利用多篇文章,從多角度反推總結 DataServer 或 SOFARegistry 的實現機制和架構思路,藉以學習阿里如何設計。git
本文爲第二篇,介紹SOFARegistry網絡封裝和操做。github
有的兄弟可能沒有讀過前面MetaServer的文章,這裏回憶下SOFARegistry 整體架構。編程
應用服務器集羣。Client 層是應用層,每一個應用系統經過依賴註冊中心相關的客戶端 jar 包,經過編程方式來使用服務註冊中心的服務發佈和服務訂閱能力。bootstrap
Session 服務器集羣。顧名思義,Session 層是會話層,經過長鏈接和 Client 層的應用服務器保持通信,負責接收 Client 的服務發佈和服務訂閱請求。該層只在內存中保存各個服務的發佈訂閱關係,對於具體的服務信息,只在 Client 層和 Data 層之間透傳轉發。Session 層是無狀態的,能夠隨着 Client 層應用規模的增加而擴容。api
數據服務器集羣。Data 層經過分片存儲的方式保存着所用應用的服務註冊數據。數據按照 dataInfoId(每一份服務數據的惟一標識)進行一致性 Hash 分片,多副本備份,保證數據的高可用。下文的重點也在於隨着數據規模的增加,Data 層如何在不影響業務的前提下實現平滑的擴縮容。緩存
元數據服務器集羣。這個集羣管轄的範圍是 Session 服務器集羣和 Data 服務器集羣的服務器信息,其角色就至關於 SOFARegistry 架構內部的服務註冊中心,只不過 SOFARegistry 做爲服務註冊中心是服務於廣大應用服務層,而 Meta 集羣是服務於 SOFARegistry 內部的 Session 集羣和 Data 集羣,Meta 層可以感知到 Session 節點和 Data 節點的變化,並通知集羣的其它節點。服務器
DataServer,SessionServer,MetaServer 本質都是網絡應用程序。這決定了網絡封裝和操做是本系統的基礎模塊及功能,下面咱們講講應用場景。微信
SOFARegistry 的應用場景是單元化狀態。網絡
在單元化狀態,一個單元是一個五臟俱全的縮小版整站,它是全能的,由於部署了全部應用;但它不是全量的,由於只能操做一部分數據。可以單元化的系統,很容易在多機房中部署,由於能夠輕易的把幾個單元部署在一個機房,而把另外幾個部署在其餘機房。藉由在業務入口處設置一個流量調配器,能夠調整業務流量在單元之間的比例。
因此 SOFARegistry 考慮的就是在 IDC 私網環境中如何進行節點間通訊。高吞吐、高併發的通訊,數量衆多的鏈接管理(C10K 問題),便捷的升級機制,兼容性保障,靈活的線程池模型運用,細緻的異常處理與日誌埋點等,這些功能都須要在通訊協議和實現框架上作文章。
服務器也有若干配置需求,用簡單http協議便可。
在內網單元化場景下,可以想到的問題點以下:
對於高性能,高併發的場景,在Java體系下必然選擇非阻塞IO複用,天然選擇基於Netty開發。
阿里是藉助 SOFABolt 通訊框架,實現基於TCP長鏈接的節點判活與推模式的變動推送,服務上下線通知時效性在秒級內。
sofa-bolt是螞蟻開源的基於Netty的網絡通訊框架。在Netty基礎上對網絡編程常見問題進行簡單封裝,讓中間件開發者更關注於中間件產品自己。
大致功能爲:
SOFABolt能夠理解爲Netty的最佳實踐,並額外進行一些優化工做。
SOFABolt框架後續可能會專門有系列進行分析,目前認爲基於SOFABolt能夠知足咱們需求,因此咱們會簡單介紹SOFABolt,重點在於如何使用以及業務實現。
在肯定了採用SOFABolt以後,以前提到的問題點就基本被SOFABolt解決,咱們暫時能想到的其餘問題大體以下:
咱們提早劇透,即從邏輯上看,阿里提供了兩個層級的封裝
從鏈接角度看,阿里實現了基於 netty.channel.Channel 的封裝,從下往上看是:
從應用角度看,阿里實現了Server,Client層次的封裝,從下往上看是:
具體邏輯大體以下:
+---------------------+ +---------------------+ | | | | | DataNodeExchanger | | MetaNodeExchanger | | | | | +----------------+----+ +--------+------------+ | | +-----------+ +-------------+ | | v v +---------+---------------------+--------+ | BoltExchange | | +------------------------------------+ | | | | | | | Map<String, Client> | | | | | | | | ConcurrentHashMap<Integer, Server> | | | | | | | +------------------------------------+ | +----------------------------------------+ | | | | | | v v +-----------------------+----------+ +---------+---------------------------------+ | BoltClient | | BoltServer | | | | | | +------------------------------+ | | +---------------------------------------+ | | | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | | | | +-------------------------+ | | | | | | | | | ConnectionEventHandler | | | | | Map<String, Channel> channels | | | | | | | | | | | | | | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | | | | +-------------------------+ | | | | | | | +------------------------------+ | | +---------------------------------------+ | +----------------------------------+ +-------------------------------------------+ | | | | | | | +---------------------------+ | | v | | v | +---+------------+ <--------------+ +-----v--------------+--------------+ | | ChannelHandler | | BoltChannel | | +----------------+ | | | | +------------------------------+ | | | |com.alipay.remoting.Connection| | | | +------------------------------+ | | +-----------------------------------+ | | v v +---+-------------+ +---------+------------+ | CallbackHandler | | netty.channel.Channel| +-----------------+ +----------------------+複製代碼
SOFARegistry 對網絡基礎功能作了封裝,也對外提供了API。如下是封裝模塊以及對外接口 registry-remoting-api。
├── CallbackHandler.java ├── Channel.java ├── ChannelHandler.java ├── Client.java ├── Endpoint.java ├── RemotingException.java ├── Server.java └── exchange ├── Exchange.java ├── NodeExchanger.java ├── RequestException.java └── message ├── Request.java └── Response.java複製代碼
其中比較關鍵的是四個接口:Server,Client,Exchange,Channel,所以這些就是網絡封裝的最基本概念。
Channel 這個概念比較廣泛,表明了IO源與目標打開的鏈接以及鏈接的操做。咱們先以Java的Channel爲例來進行說明。
Java 的Channel 由java.nio.channels包定義,Channel表示IO源與目標打開的鏈接,Channel相似於傳統的「流」,不過Channel自己不能直接訪問數據,Channel只能與Buffer進行交互。
Channel用於在字節緩衝區和位於通道另外一側的實體(一般是一個文件或套接字)之間有效地傳輸數據。通道是訪問IO服務的導管,經過通道,咱們以最小的開銷來訪問操做系統的I/O服務;順便說下,緩衝區是通道內部發送數據和接收數據的端點。
由java.nio.channels包定義,Channel表示IO源與目標打開的鏈接,Channel相似於傳統的「流」,只不過Channel自己不能直接訪問數據,Channel只能與Buffer進行交互。通道主要用於傳輸數據,從緩衝區的一側傳到另外一側的實體(如文件、套接字...),反之亦然;通道是訪問IO服務的導管,經過通道,咱們能夠以最小的開銷來訪問操做系統的I/O服務;
從SOFARegistry的Channel定義能夠看出其基本功能主要是屬性相關功能。
public interface Channel {InetSocketAddress getRemoteAddress();InetSocketAddress getLocalAddress();boolean isConnected();Object getAttribute(String key);void setAttribute(String key, Object value);WebTarget getWebTarget();void close(); }複製代碼
Server是服務器對應的封裝,其基本功能由定義可知,主要是基於Channel發送功能。
public interface Server extends Endpoint {boolean isOpen();Collection<Channel> getChannels();Channel getChannel(InetSocketAddress remoteAddress);Channel getChannel(URL url);void close(Channel channel);int getChannelCount();Object sendSync(final Channel channel, final Object message, final int timeoutMillis);void sendCallback(final Channel channel, final Object message, CallbackHandler callbackHandler,final int timeoutMillis); }複製代碼
Client是客戶端對應的封裝,其基本功能也是基於Channel進行交互。
public interface Client extends Endpoint {Channel getChannel(URL url);Channel connect(URL url);Object sendSync(final URL url, final Object message, final int timeoutMillis);Object sendSync(final Channel channel, final Object message, final int timeoutMillis);void sendCallback(final URL url, final Object message, CallbackHandler callbackHandler, final int timeoutMillis); }複製代碼
Exchange 是路由,做爲 Client / Server 鏈接的進一步抽象,負責同類型server之間的鏈接。
public interface Exchange<T> { String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer";Client connect(String serverType, URL serverUrl, T... channelHandlers);Client connect(String serverType, int connNum, URL serverUrl, T... channelHandlers);Server open(URL url, T... channelHandlers);Client getClient(String serverType);Server getServer(Integer port); }複製代碼
在創建鏈接中,能夠設置一系列應對不一樣任務的 handler (稱之爲 ChannelHandler)。
這些 ChannelHandler 有的做爲 Listener 用來處理鏈接事件,有的做爲 Processor 用來處理各類指定的事件,好比服務信息數據變化、Subscriber 註冊等事件。
public interface ChannelHandler<T> {enum HandlerType { LISENTER, PROCESSER }enum InvokeType { SYNC, ASYNC }void connected(Channel channel) throws RemotingException;void disconnected(Channel channel) throws RemotingException;void received(Channel channel, T message) throws RemotingException;Object reply(Channel channel, T message) throws RemotingException;void caught(Channel channel, T message, Throwable exception) throws RemotingException;HandlerType getType();Class interest();default InvokeType getInvokeType() {return InvokeType.SYNC; }default Executor getExecutor() {return null; } }複製代碼
所以,網絡基本對外接口以下:
+-------------------------------------------------------------------------+ |[registry+remoting+api] | | | | +----------+ +-------------+ | | | Exchange | |NodeExchanger| | | ++-----+--++ +----+--------+ | | | | | | | | | | +----------------------+ | | | | | | | | | +------+ v v v | | | +--+-----+ +-+-----++ | | | | Server | | Client | | | | +-----+--+ +-+----+-+ | | | | | | | | | +--------+ +-------+ | | | | | | | | | v v v v | | +----+-----------+ +-----+-+ ++--------------+ | | | ChannelHandler | |Channel| |CallbackHandler| | | +----------------+ +-------+ +---------------+ | +-------------------------------------------------------------------------+複製代碼
由於SOFARegistry主要是基於SOFABolt,無法繞開,因此咱們須要首先簡單介紹SOFABolt。
Bolt是基於Netty,因此要先說明Netty Channel。
在Netty框架中,Channel是其中核心概念之一,是Netty網絡通訊的主體,由它負責同對端進行網絡通訊、註冊和數據操做等功能。
Netty對Jdk原生的ServerSocketChannel進行了封裝和加強封裝成了NioXXXChannel, 相對於原生的JdkChannel, Netty的Channel增長了以下的組件。
根據服務端和客戶端,Channel能夠分紅兩類:
其實inbound和outbound分別用於標識 Context 所對應的handler的類型, 在Netty中事件能夠分爲Inbound和Outbound事件,在ChannelPipeline的類註釋中,有以下圖示:
* * I/O Request * via {@link Channel} or * {@link ChannelHandlerContext} * | * +---------------------------------------------------+---------------+ * | ChannelPipeline | | * | \|/ | * | +---------------------+ +-----------+----------+ | * | | Inbound Handler N | | Outbound Handler 1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler N-1 | | Outbound Handler 2 | | * | +----------+----------+ +-----------+----------+ | * | /|\ . | * | . . | * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| * | [ method call] [method call] | * | . . | * | . \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 2 | | Outbound Handler M-1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 1 | | Outbound Handler M | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * +---------------+-----------------------------------+---------------+ * | \|/ * +---------------+-----------------------------------+---------------+ * | | | | * | [ Socket.read() ] [ Socket.write() ] | * | | * | Netty Internal I/O Threads (Transport Implementation) | * +-------------------------------------------------------------------+複製代碼
Connection其刪減版定義以下,能夠看到其主要成員就是 Netty channel 實例:
public class Connection {private Channel channel;private final ConcurrentHashMap<Integer, InvokeFuture> invokeFutureMap = new ConcurrentHashMap<Integer, InvokeFuture>(4);/** Attribute key for connection */public static final AttributeKey<Connection> CONNECTION = AttributeKey.valueOf("connection"); /** Attribute key for heartbeat count */public static final AttributeKey<Integer> HEARTBEAT_COUNT = AttributeKey.valueOf("heartbeatCount");/** Attribute key for heartbeat switch for each connection */public static final AttributeKey<Boolean> HEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch");/** Attribute key for protocol */public static final AttributeKey<ProtocolCode> PROTOCOL = AttributeKey.valueOf("protocol");/** Attribute key for version */public static final AttributeKey<Byte> VERSION = AttributeKey.valueOf("version");private Url url;private final ConcurrentHashMap<Integer/* id */, String/* poolKey */> id2PoolKey = new ConcurrentHashMap<Integer, String>(256);private Set<String> poolKeys = new ConcurrentHashSet<String>();private final ConcurrentHashMap<String/* attr key*/, Object /*attr value*/> attributes = new ConcurrentHashMap<String, Object>(); }複製代碼
Connection的輔助類不少,摘錄以下:
另外,須要注意的點以下:
不管是服務端仍是客戶端,其實本質都在作一件事情:建立 ConnectionEventHandler 實例並添加到 Netty 的 pipeline 中,基本原理是:
ConnectionEventHandler處理兩類事件
以後當有 ConnectionEvent 觸發時(不管是 Netty 定義的事件被觸發,仍是 SOFABolt 定義的事件被觸發),ConnectionEventHandler 會經過異步線程執行器通知 ConnectionEventListener,ConnectionEventListener 將消息派發給具體的 ConnectionEventProcessor 實現類。
RpcServer實現了一個Server所必須的基本機制,能夠直接使用,好比:
其中,須要說明的是:
具體定義以下:
public class RpcServer extends AbstractRemotingServer {/** server bootstrap */private ServerBootstrap bootstrap;/** channelFuture */private ChannelFuture channelFuture;/** connection event handler */private ConnectionEventHandler connectionEventHandler;/** connection event listener */private ConnectionEventListener connectionEventListener = new ConnectionEventListener();/** user processors of rpc server */private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<String, UserProcessor<?>>(4);/** boss event loop group, boss group should not be daemon, need shutdown manually*/private final EventLoopGroup bossGroup = NettyEventLoopUtil.newEventLoopGroup(NamedThreadFactory("Rpc-netty-server-boss",false)); /** worker event loop group. Reuse I/O worker threads between rpc servers. */private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,new NamedThreadFactory("Rpc-netty-server-worker",true));/** address parser to get custom args */private RemotingAddressParser addressParser;/** connection manager */private DefaultConnectionManager connectionManager;/** rpc remoting */protected RpcRemoting rpcRemoting;/** rpc codec */private Codec codec = new RpcCodec(); }複製代碼
RpcClient主要機制以下:
具體代碼以下:
public class RpcClient extends AbstractConfigurableInstance {private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<String, UserProcessor<?>>(); /** connection factory */private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors, this);/** connection event handler */private ConnectionEventHandler connectionEventHandler = new RpcConnectionEventHandler(switches());/** reconnect manager */private ReconnectManager reconnectManager;/** connection event listener */private ConnectionEventListener connectionEventListener = new ConnectionEventListener();/** address parser to get custom args */private RemotingAddressParser addressParser;/** connection select strategy */private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy( switches());/** connection manager */private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy,connectionFactory,connectionEventHandler,connectionEventListener,switches());/** rpc remoting */protected RpcRemoting rpcRemoting;/** task scanner */private RpcTaskScanner taskScanner = new RpcTaskScanner();/** connection monitor */private DefaultConnectionMonitor connectionMonitor;/** connection monitor strategy */private ConnectionMonitorStrategy monitorStrategy; }複製代碼
針對上述提到的基礎封裝,系統針對Bolt和Http都進行了實現。如下是SOFABolt的對應封裝 registry-remoting-bolt。
├── AsyncUserProcessorAdapter.java ├── BoltChannel.java ├── BoltChannelUtil.java ├── BoltClient.java ├── BoltServer.java ├── ConnectionEventAdapter.java ├── SyncUserProcessorAdapter.java └── exchange └── BoltExchange.java複製代碼
BoltChannel 主要是封裝了com.alipay.remoting.Connection,而com.alipay.remoting.Connection又封裝了io.netty.channel.Channel。
感受Channel封裝的不夠完全,仍是把Connection暴露出來了,以此獲得本地IP,port,遠端IP,port等。
public class BoltChannel implements Channel {private Connection connection;private AsyncContext asyncContext;private BizContext bizContext;private final Map<String, Object> attributes = new ConcurrentHashMap<>(); }複製代碼
BoltServer 封裝了 com.alipay.remoting.rpc.RpcServer。
在初始化的時候,調用 addConnectionEventProcessor,registerUserProcessor等把 handler 註冊到 RpcServer。
用 ConcurrentHashMap 記錄了全部鏈接到本Server 的Channel,key是IP:port。
public class BoltServer implements Server {/** * accoding server port */private final URL url;private final List<ChannelHandler> channelHandlers;/** * bolt server */private RpcServer boltServer;/** * started status */private AtomicBoolean isStarted = new AtomicBoolean(false);private Map<String, Channel> channels = new ConcurrentHashMap<>();private AtomicBoolean initHandler = new AtomicBoolean(false); }複製代碼
其主要功能以下,基本就是調用Bolt的功能:
@Overridepublic Object sendSync(Channel channel, Object message, int timeoutMillis) {if (channel != null && channel.isConnected()) { Url boltUrl = null;try { boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel .getRemoteAddress().getPort());return boltServer.invokeSync(boltUrl, message, timeoutMillis); } }@Overridepublic void sendCallback(Channel channel, Object message, CallbackHandler callbackHandler, int timeoutMillis) {if (channel != null && channel.isConnected()) { Url boltUrl = null;try { boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel .getRemoteAddress().getPort()); boltServer.invokeWithCallback(boltUrl, message, new InvokeCallback() {@Overridepublic void onResponse(Object result) { callbackHandler.onCallback(channel, result); }@Overridepublic void onException(Throwable e) { callbackHandler.onException(channel, e); }@Overridepublic Executor getExecutor() {return callbackHandler.getExecutor(); } }, timeoutMillis);return; } }複製代碼
主要就是封裝了 com.alipay.remoting.rpc.RpcClient。
在初始化的時候,調用 addConnectionEventProcessor,registerUserProcessor 等把 handler 註冊到 RpcClient。
public class BoltClient implements Client {private RpcClient rpcClient;private AtomicBoolean closed = new AtomicBoolean(false);private int connectTimeout = 2000;private final int connNum; }複製代碼
主要函數以下:
@Overridepublic Channel connect(URL url) {try { Connection connection = getBoltConnection(rpcClient, url); BoltChannel channel = new BoltChannel(); channel.setConnection(connection);return channel; } }protected Connection getBoltConnection(RpcClient rpcClient, URL url) throws RemotingException { Url boltUrl = createBoltUrl(url);try { Connection connection = rpcClient.getConnection(boltUrl, connectTimeout);if (connection == null || !connection.isFine()) {if (connection != null) { connection.close(); } }return connection; } }@Overridepublic Object sendSync(URL url, Object message, int timeoutMillis) {return rpcClient.invokeSync(createBoltUrl(url), message, timeoutMillis); }@Overridepublic Object sendSync(Channel channel, Object message, int timeoutMillis) {if (channel != null && channel.isConnected()) { BoltChannel boltChannel = (BoltChannel) channel;return rpcClient.invokeSync(boltChannel.getConnection(), message, timeoutMillis); } }@Overridepublic void sendCallback(URL url, Object message, CallbackHandler callbackHandler, int timeoutMillis) {try { Connection connection = getBoltConnection(rpcClient, url); BoltChannel channel = new BoltChannel(); channel.setConnection(connection); rpcClient.invokeWithCallback(connection, message, new InvokeCallback() {@Overridepublic void onResponse(Object result) { callbackHandler.onCallback(channel, result); }@Overridepublic void onException(Throwable e) { callbackHandler.onException(channel, e); }@Overridepublic Executor getExecutor() {return callbackHandler.getExecutor(); } }, timeoutMillis);return; } }複製代碼
BoltExchange 的主要做用是維護了Client和Server兩個ConcurrentHashMap。就是全部的Clients和Servers。
這裏進行了第一層鏈接維護。
Map<String, Client> clients 是依據String對Client作了區分,String包括以下:
String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer";複製代碼
就是說,假如 Data Server 使用了BoltExchange,則其內部只有兩個BoltClient,這兩個Client分別被 同 dataServer 和 metaServer 的交互 所複用。
ConcurrentHashMap<Integer, Server> serverMap 是依據自己端口對 自己啓動的Server 作了區分。
public class BoltExchange implements Exchange<ChannelHandler> {private Map<String, Client> clients = new ConcurrentHashMap<>();private ConcurrentHashMap<Integer, Server> serverMap = new ConcurrentHashMap<>();@Overridepublic Client connect(String serverType, URL serverUrl, ChannelHandler... channelHandlers) {return this.connect(serverType, 1, serverUrl, channelHandlers); }@Overridepublic Client connect(String serverType, int connNum, URL serverUrl, ChannelHandler... channelHandlers) { Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers)); client.connect(serverUrl);return client; }@Overridepublic Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer();return server; }@Overridepublic Client getClient(String serverType) {return clients.get(serverType); }@Overridepublic Server getServer(Integer port) {return serverMap.get(port); }/** * add server into serverMap * @param server * @param url */public void setServer(Server server, URL url) { serverMap.putIfAbsent(url.getPort(), server); }private BoltClient newBoltClient(int connNum, ChannelHandler[] channelHandlers) { BoltClient boltClient = createBoltClient(connNum); boltClient.initHandlers(Arrays.asList(channelHandlers));return boltClient; }protected BoltClient createBoltClient(int connNum) {return new BoltClient(connNum); }protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {return new BoltServer(url, Arrays.asList(channelHandlers)); } }複製代碼
內部會根據不一樣的server type從 boltExchange 取出對應Bolt Client。
String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer";複製代碼
用以下方法put Client。
Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));複製代碼
Bolt用以下辦法獲取Client
Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE); Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);複製代碼
獲得對應的Client以後,而後分別根據參數的url創建Channel,或者發送請求。
Channel channel = client.getChannel(url); client.sendCallback(request.getRequestUrl()....; 複製代碼
此時大體邏輯以下:
+----------------------------------------+ | BoltExchange | | +------------------------------------+ | | | | | | | Map<String, Client> | | | | | | | | ConcurrentHashMap<Integer, Server> | | | | | | | +------------------------------------+ | +----------------------------------------+ | | | | | | v v +-----------------------+----------+ +---------+---------------------------------+ | BoltClient | | BoltServer | | | | | | +------------------------------+ | | +---------------------------------------+ | | | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | | | | +-------------------------+ | | | | | | | | | ConnectionEventHandler | | | | | Map<String, Channel> channels | | | | | | | | | | | | | | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | | | | +-------------------------+ | | | | | | | +------------------------------+ | | +---------------------------------------+ | +----------------------------------+ +-------------------------------------------+ | | | | | | | +---------------------------+ | | v | | v | +---+------------+ <--------------+ +-----v--------------+--------------+ | | ChannelHandler | | BoltChannel | | +----------------+ | | | | +------------------------------+ | | | |com.alipay.remoting.Connection| | | | +------------------------------+ | | +-----------------------------------+ | | v v +---+-------------+ +---------+------------+ | CallbackHandler | | netty.channel.Channel| +-----------------+ +----------------------+複製代碼
如下是基於Jetty封裝的Http模塊 registry-remoting-http。
├── JerseyChannel.java ├── JerseyClient.java ├── JerseyJettyServer.java ├── exchange │ └── JerseyExchange.java └── jetty └── server ├── HttpChannelOverHttpCustom.java ├── HttpConnectionCustom.java └── HttpConnectionCustomFactory.java複製代碼
由於 Http 服務不是SOFTRegistry主要功能,此處略去。
從目錄結構可大體看出功能模塊劃分。
├── remoting │ ├── dataserver │ │ ├── handler │ │ └── task │ ├── handler │ ├── metaserver │ └── sessionserver複製代碼
由於每一個大功能模塊大同小異,因此咱們下面主要以 dataserver 目錄下爲主,兼顧 metaserver 和 sessionserver目錄下的特殊部分。
Data Server 比較複雜,便是服務器也是客戶端,因此分別作了不一樣的組件來抽象這兩個概念。
DataServerBootstrap#start 方法,用於啓動一系列的初始化服務。在此函數中,啓動了若干網絡服務,用來提供 對外接口。
public void start() {try { openDataServer(); openDataSyncServer(); openHttpServer(); startRaftClient(); } }複製代碼
各 Handler 具體做用如圖所示:
DataServer 和 DataSyncServer 是 Bolt Server,是節點間的 bolt 通訊組件,其中:
具體代碼以下:
private void openDataServer() {try {if (serverForSessionStarted.compareAndSet(false, true)) { server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig.getPort()), serverHandlers .toArray(new ChannelHandler[serverHandlers.size()])); } } }private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }複製代碼
這兩個server的handlers有部分重複,懷疑開發者在作功能遷移。
@Bean(name = "serverSyncHandlers")public Collection<AbstractServerHandler> serverSyncHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler());return list; }@Bean(name = "serverHandlers")public Collection<AbstractServerHandler> serverHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(clientOffHandler()); list.add(getDataVersionsHandler()); list.add(publishDataProcessor()); list.add(sessionServerRegisterHandler()); list.add(unPublishDataHandler()); list.add(dataServerConnectionHandler()); list.add(renewDatumHandler()); list.add(datumSnapshotHandler());return list; }複製代碼
用DataSyncServer作具體說明。
啓動 DataSyncServer 時,註冊以下幾個 handler 處理 bolt 請求 :
DayaSyncServer 註冊的 Handler 以下:
該 Handler 主要用於數據的獲取,當一個請求過來時,會經過請求中的 DataCenter 和 DataInfoId 獲取當前 DataServer 節點存儲的相應數據。
數據發佈者 publisher 上下線會分別觸發 publishDataProcessor 或 unPublishDataHandler ,Handler 會往 dataChangeEventCenter 中添加一個數據變動事件,用於異步地通知事件變動中心數據變動。事件變動中心收到該事件後,會往隊列中加入事件。此時 dataChangeEventCenter 會根據不一樣的事件類型異步地對上下線數據進行相應的處理。
與此同時,DataChangeHandler 把這個事件變動信息經過 ChangeNotifier 對外發布,通知其餘節點進行數據同步。
數據拉取請求,該 Handler 被觸發時,通知當前 DataServer 節點進行版本號對比,若請求中數據的版本號高於當前節點緩存中的版本號,則會進行數據同步操做,保證數據是最新的。
這是一個 DataServer 上線通知請求 Handler,當其餘節點上線時,會觸發該 Handler,從而當前節點在緩存中存儲新增的節點信息。用於管理節點狀態,到底是 INITIAL 仍是 WORKING 。
節點間數據同步 Handler,該 Handler 被觸發時,會經過版本號進行比對,若當前 DataServer 所存儲數據版本號含有當前請求版本號,則會返回全部大於當前請求數據版本號的全部數據,便於節點間進行數據同步。
鏈接管理 Handler,當其餘 DataServer 節點與當前 DataServer 節點鏈接時,會觸發 connect 方法,從而在本地緩存中註冊鏈接信息,而當其餘 DataServer 節點與當前節點斷連時,會觸發 disconnect 方法,從而刪除緩存信息,進而保證當前 DataServer 節點存儲有全部與之鏈接的 DataServer 節點。
dataSyncServer 調用鏈以下:
在 DataServerBootstrap 中有
private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }複製代碼
而後有
public class BoltExchange implements Exchange<ChannelHandler> {@Overridepublic Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer();return server; } protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {return new BoltServer(url, Arrays.asList(channelHandlers)); } }複製代碼
BoltServer啓動以下,而且用戶自定義了UserProcessor。
public class BoltServer implements Server {public BoltServer(URL url, List<ChannelHandler> channelHandlers) {this.channelHandlers = channelHandlers;this.url = url; } public void startServer() {if (isStarted.compareAndSet(false, true)) { boltServer = new RpcServer(url.getPort(), true); initHandler(); boltServer.start(); } } private void initHandler() {if (initHandler.compareAndSet(false, true)) { boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,new ConnectionEventAdapter(ConnectionEventType.CONNECT, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),this)); boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,new ConnectionEventAdapter(ConnectionEventType.EXCEPTION, getConnectionEventHandler(), this)); registerUserProcessorHandler(); } } //這裏分了同步和異步private void registerUserProcessorHandler() {if (channelHandlers != null) {for (ChannelHandler channelHandler : channelHandlers) {if (HandlerType.PROCESSER.equals(channelHandler.getType())) {if (InvokeType.SYNC.equals(channelHandler.getInvokeType())) { boltServer.registerUserProcessor(new SyncUserProcessorAdapter( channelHandler)); } else { boltServer.registerUserProcessor(new AsyncUserProcessorAdapter( channelHandler)); } } } } } }複製代碼
HttpServer 用於控制的Http 通訊組件以及其配置,提供一系列 REST 接口,用於 dashboard 管理、數據查詢等;
private void openHttpServer() {try {if (httpServerStarted.compareAndSet(false, true)) { bindResourceConfig(); httpServer = jerseyExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig }); } } }複製代碼
RaftClient 是基於Raft協議的客戶端,用來基於raft協議獲取meta server leader信息。
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }複製代碼
具體DefaultMetaServiceImpl中實現代碼以下:
@Overridepublic void startRaftClient() {try {if (clientStart.compareAndSet(false, true)) { String serverConf = getServerConfig(); raftClient = new RaftClient(getGroup(), serverConf); raftClient.start(); } } }複製代碼
功能模塊最後邏輯大體以下:
+-> getDataHandler | +-> publishDataProcessor | +--> unPublishDataHandler | +---------------+ +--> notifyFetchDatumHandler +----> | DataSyncServer+->+ | +---------------+ +--> notifyOnlineHandler | | | +--> syncDataHandler | +-------------+ | +-------------------+ +----> | HttpServer | +-> dataSyncServerConnectionHandler |DataServerBootstrap+-->+ +-------------+ +-------------------+ | | | +------------+ +-> getDataHandler +-----> | RaftClient | | | +------------+ +--> clientOffHandler | | | +-------------+ +--> getDataVersionsHandler +-----> | DataServer +-->+ +-------------+ +--> publishDataProcessor | +--> sessionServerRegisterHandler | +--> unPublishDataHandler | +--> dataServerConnectionHandler | +--> renewDatumHandler | +-> datumSnapshotHandler複製代碼
Exchange 做爲 Client / Server 鏈接的抽象,負責節點之間的鏈接。
Data Server 主要是 DataNodeExchanger 和 MetaNodeExchanger,用來:
封裝 BoltExchange
把 Bolt Client 和 Bolt Channel 進行抽象。
提供可直接使用的網絡API,如ForwardServiceImpl,GetSyncDataHandler這些散落的Bean能夠直接使用DataNodeExchanger來作網絡交互。
從對外接口中能夠看出,
這裏有兩個問題:
可能由於Session Server會不少,不必保存Bolt client和Server,但Session對應的有ConnectionFactory。ConectionFactory 是低層次封裝,下文講解。
以 DataNodeExchanger爲例。
把全部 Data Server 相關 非 「Server,Client概念 直接強相關」 的網絡操做統一集中在這裏。
能夠看到主要對 boltExchange 進行更高層次的封裝。
具體代碼以下:
public class DataNodeExchanger implements NodeExchanger {@Autowiredprivate Exchange boltExchange;@Autowiredprivate DataServerConfig dataServerConfig;@Resource(name = "dataClientHandlers")private Collection<AbstractClientHandler> dataClientHandlers;@Overridepublic Response request(Request request) { Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);if (null != request.getCallBackHandler()) { client.sendCallback(request.getRequestUrl(), request.getRequestBody(), request.getCallBackHandler(), request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());return () -> Response.ResultStatus.SUCCESSFUL; } else {final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(), dataServerConfig.getRpcTimeout());return () -> result; } }public Channel connect(URL url) { Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);if (client == null) {synchronized (this) { client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);if (client == null) { client = boltExchange.connect(Exchange.DATA_SERVER_TYPE, url, dataClientHandlers.toArray(new ChannelHandler[dataClientHandlers.size()])); } } } Channel channel = client.getChannel(url);if (channel == null) {synchronized (this) { channel = client.getChannel(url);if (channel == null) { channel = client.connect(url); } } }return channel; } }複製代碼
上面代碼中使用了 dataClientHandlers,其是BoltClient所使用的,Server會對此進行推送,這兩個Handler會處理。
@Bean(name = "dataClientHandlers")public Collection<AbstractClientHandler> dataClientHandlers() { Collection<AbstractClientHandler> list = new ArrayList<>(); list.add(notifyDataSyncHandler()); list.add(fetchDataHandler());return list; }複製代碼
此時具體網絡概念以下:
+-------------------+ | ForwardServiceImpl| +-----------------+ +-------------------+ | | +--------------------+ | +-------------------+ | GetSyncDataHandler | +-----------------------> | DataNodeExchanger | +--------------------+ | +-------+-----------+ | | +----------------------------------+ | | | LocalDataServerChangeEventHandler| +--+ | +----------------------------------+ | | | | +------------------------------+ | v | DataServerChangeEventHandler +--------+ +-----+--------+ +------------------------------+ | BoltExchange | +-----+--------+ | +-----------------+ | | JerseyExchange | | +-------+---------+ +-------+-------+ | | | | | | v v v +--------+----------+ +------+-----+ +-----+------+ | JerseyJettyServer | | BoltServer | | BoltServer | +-------------------+ +------------+ +------------+ httpServer server dataSyncServer複製代碼
AbstractServerHandler 和 AbstractClientHandler 對 com.alipay.sofa.registry.remoting.ChannelHandler 進行了實現。
須要結合SOFABolt講解。
以 RpcServer 爲例,SOFABolt在這裏的使用是兩種處理器:
Handler主要代碼分別以下:
public abstract class AbstractServerHandler<T> implements ChannelHandler<T> {protected NodeType getConnectNodeType() {return NodeType.DATA; }@Overridepublic Object reply(Channel channel, T request) {try { logRequest(channel, request); checkParam(request);return doHandle(channel, request); } catch (Exception e) {return buildFailedResponse(e.getMessage()); } } }public abstract class AbstractClientHandler<T> implements ChannelHandler<T> {@Overridepublic Object reply(Channel channel, T request) {try { logRequest(channel, request); checkParam(request);return doHandle(channel, request); } catch (Exception e) {return buildFailedResponse(e.getMessage()); } } }複製代碼
系統能夠據此實現各類派生類。
須要注意是:ChannelHandler之中分紅兩類,分別對應了RpcServer中的listener和userProcessor。
enum HandlerType { LISENTER, PROCESSER } 複製代碼
以serverSyncHandlers爲例,只有dataSyncServerConnectionHandler是Listener,其他是Processor。
這也符合常理,由於消息響應函數應該只有一個。
@Bean(name = "serverSyncHandlers")public Collection<AbstractServerHandler> serverSyncHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler()); 只有這個是Listener,其他都是Processor。return list; }複製代碼
總結以下:
+-> getDataHandler | +-> publishDataProcessor | +--> unPublishDataHandler +-------------------+ | | serverSyncHandlers+-------> notifyFetchDatumHandler +-------------------+ | +--> notifyOnlineHandler | +--> syncDataHandler | +-> dataSyncServerConnectionHandler(Listener)複製代碼
在啓動時,會使用serverSyncHandlers完成BoltServer的啓動。
private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }複製代碼
BoltExchange中有:
@Overridepublic Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer();return server; }protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) { return new BoltServer(url, Arrays.asList(channelHandlers)); }複製代碼
在BoltServer中有以下代碼,主要是設置Handler。
public void startServer() {if (isStarted.compareAndSet(false, true)) {try { boltServer = new RpcServer(url.getPort(), true); initHandler(); boltServer.start(); } } }private void initHandler() {if (initHandler.compareAndSet(false, true)) { boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,new ConnectionEventAdapter(ConnectionEventType.CONNECT, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),this)); boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,new ConnectionEventAdapter(ConnectionEventType.EXCEPTION, getConnectionEventHandler(), this)); registerUserProcessorHandler(); } }複製代碼
最終則是調用到RpcServer之中,註冊了鏈接響應函數和用戶定義函數。
public void addConnectionEventProcessor(ConnectionEventType type, ConnectionEventProcessor processor) {this.connectionEventListener.addConnectionEventProcessor(type, processor); }@Overridepublic void registerUserProcessor(UserProcessor<?> processor) { UserProcessorRegisterHelper.registerUserProcessor(processor, this.userProcessors); }複製代碼
此時與SOFABolt邏輯以下:
+----------------------------+ | [RpcServer] | | | | | +--> EXCEPTION +--> DataSyncServerConnectionHandler | | | | connectionEventListener--->--->-CONNECT +----> DataSyncServerConnectionHandler | | | | | +--> CLOSE +-----> DataSyncServerConnectionHandler | | | | | | +--> EXCEPTION +--> DataSyncServerConnectionHandler | | | | connectionEventHandler +-->---> CONNECT +----> DataSyncServerConnectionHandler | | | | | +--> CLOSE +-----> DataSyncServerConnectionHandler | | | | | | +----> GetDataRequest +--------> getDataHandler | | | | | +----> PublishDataRequest +-----> publishDataProcessor | | | | userProcessors +----------------> UnPublishDataRequest +----> unPublishDataHandler | | | | | +----> NotifyOnlineRequest-----> notifyFetchDatumHandler | | | | | +----> NotifyOnlineRequest +-----> notifyOnlineHandler +----------------------------+ | +----> SyncDataRequest +-------> syncDataHandler複製代碼
至此,咱們把SOFARegistry網絡封裝和操做大體梳理下。
從邏輯上看,阿里提供了兩個層級的封裝:
從鏈接角度看,阿里實現了基於 netty.channel.Channel 的封裝,從下往上看是:
從應用角度看,阿里實現了Server,Client層次的封裝,從下往上看是:
具體邏輯大體以下:
阿里封裝的很是細緻。由於SOFARegistry是繁雜系統,因此把網絡概念,功能作封裝至關有必要。你們在平常開發中可能不用這麼細緻封裝,參考阿里的思路本身作選擇和裁剪便可。
★★★★★★關於生活和技術的思考★★★★★★
微信公衆帳號:羅西的思考
若是您想及時獲得我的撰寫文章的消息推送,或者想看看我的推薦的技術資料,敬請關注。