做者簡介html
Black,一個喜歡電子和機械的軟件工程師,陰差陽錯的走上了程序猿這條道路。上道以後發現寫代碼原來那麼有意思,就是頭冷!( ̄▽ ̄)~java
說到分佈式系統,不得不說集中式系統。傳統集中式系統中整個項目全部的東西都在一個應用裏面。一個網站就是一個應用,當系統壓力較大時,只能橫向擴展,增長多個服務器或者多個容器去作負載均衡,避免單點故障而影響到整個系統。集中式最明顯的優勢就是開發,測試,運維會比較方便,不用考慮複雜的分佈式環境。弊端也很明顯,系統大而複雜、不易擴展、難於維護,每次更新都必須更新全部的應用。node
介於集中式系統的種種弊端,促成了分佈式系統的造成,分佈式系統背後是由一系列的計算機組成,但用戶感知不到背後的邏輯,就像訪問單個計算機同樣。自然的避免了單機故障的問題。應用能夠按業務類型拆分紅多個應用或服務,再按結構分紅接口層、服務層。咱們也能夠按訪問入口分,如移動端、PC端等定義不一樣的接口應用。數據庫能夠按業務類型拆分紅多個實例,還能夠對單表進行分庫分表。同時增長分佈式緩存、消息隊列、非關係型數據庫、搜索等中間件。分佈式系統雖好,可是增長了系統的複雜性,如分佈式事務、分佈式鎖、分佈式session、數據一致性等都是如今分佈式系統中須要解決的難題。分佈式系統也增長了開發測試運維的成本,工做量增長,其管理很差反而會變成一種負擔。git
分佈式系統最爲核心的要屬分佈式服務框架,有了分佈式服務框架,咱們只需關注各自的業務,而無需去關注那些複雜的服務之間調用的過程。github
目前業界比較流行的分佈式服務框架有:阿里的Dubbo、Spring Cloud。這裏不對這些分佈式服務框架作對比,簡單的說說他們都作了些什麼,能使咱們掉用遠程服務就像掉用本地服務那麼簡單高效。web
服務是對使用用戶有功能輸出的模塊,以技術框架做爲基礎,能實現用戶的需求。好比日誌記錄服務、權限管理服務、後臺服務、配置服務、緩存服務、存儲服務、消息服務等,這些服務能夠靈活的組合在一塊兒,也能夠獨立運行。服務須要有接口,與系統進行對接。面向服務的開發,應該是把服務拆分開發,把服務組合運行。更加直接的例子如:歷史詳情、留言板、評論、評級服務等。他們之間能獨立運行,也要能組合在一塊兒做爲一個總體。redis
註冊中心對整個分佈式系統起着最爲核心的整合做用,支持對等集羣,須要提供CRUD接口,支持訂閱發佈機制且可靠性要求很是之高,通常拿zookeeper集羣來作爲註冊中心。
分佈式環境中服務提供方的服務會在多臺服務器上部署,每臺服務器會向註冊中心提供服務方標識、服務列表、地址、對應端口、序列化協議等信息。註冊中心記錄下服務和服務地址的映射關係,通常一個服務會對應多個地址,這個過程咱們稱之爲服務發佈或服務註冊。服務調用方會根據服務方標識、服務列表從註冊中心獲取所需服務的信息(地址端口信息、序列化協議等),這些信息會緩存至本地。當服務須要調用其它服務時,直接在這裏找到服務的地址,進行調用,這個過程咱們稱之爲服務發現。算法
下面是以zookeeper做爲註冊中心的簡單實現:數據庫
/** * 建立node節點 * @param node * @param data */
public boolean createNode(String node, String data) {
try {
byte[] bytes = data.getBytes();
//同步建立臨時順序節點
String path = zk.create(ZkConstant.ZK_RPC_DATA_PATH+"/"+node+"-", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
log.info("create zookeeper node ({} => {})", path, data);
}catch (KeeperException e) {
log.error("", e);
return false;
}catch (InterruptedException ex){
log.error("", ex);
return false;
}
return true;
}
複製代碼
以下面zookeeper中寫入的臨時順序節點信息:編程
com.black.blackrpc.test.HelloWord (發佈服務時對外的名稱)
00000000010,00000000011 (zk 順序節點id)
127.0.0.1:8888,127.0.0.1:8889 (服務地址端口)
Protostuff (序列化方式) 1.0 (權值,負載均衡策略使用)
這裏使用的是zookeeper的臨時順序節點,爲何使用臨時順序節點。主要是考慮如下兩點:
1、 當服務提供者異常下線時,與zookeeper的鏈接會中斷,zookeeper服務器會主動刪除臨時節點,同步給服務消費者。這樣就能避免服務消費者去請求異常的服務器。
校稿注: 通常消費方也會在實際發起請求前,對當前獲取到的服務提供方節點進行心跳,避免請求鏈接有問題的節點
2、 zk下面是不容許建立2個名稱相同的zk子節點的,經過順序節點就能避免建立相同的名稱。固然也能夠不用順序節點的方式,直接以com.black.blackrpc.test.HelloWord建立節點,在該節點下建立數據節點。
下面是zk的數據同步過程:
/** * 同步節點 (通知模式) * syncNodes會經過級聯方式,在每次watcher被觸發後,就會再掛上新的watcher。完成了相似鏈式觸發的功能 */
public boolean syncNodes() {
try {
List<String> nodeList = zk.getChildren(ZkConstant.ZK_RPC_DATA_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
syncNodes();
}
}
});
Map<String,List<String>> map =new HashMap<String,List<String>>();
for (String node : nodeList) {
byte[] bytes = zk.getData(ZkConstant.ZK_RPC_DATA_PATH + "/" + node, false, null);
String key =node.substring(0, node.lastIndexOf(ZkConstant.DELIMITED_MARKER));
String value=new String(bytes);
Object object =map.get(key);
if(object!=null){
((List<String>)object).add(value);
}else {
List<String> dataList = new ArrayList<String>();
dataList.add(value);
map.put(key,dataList);
}
log.info("node: [{}] data: [{}]",node,new String(bytes));
}
/**修改鏈接的地址緩存*/
if(MapUtil.isNotEmpty(map)){
log.debug("invoking service cache updateing....");
InvokingServiceCache.updataInvokingServiceMap(map);
}
return true;
} catch (KeeperException | InterruptedException e) {
log.error(e.toString());
return false;
}
}
複製代碼
當數據同步到本地時,通常會寫入到本地文件中,防止因zookeeper集羣異常下線而沒法獲取服務提者信息。
服務消費者不管是與註冊中心仍是與服務提供者,都須要存在網絡鏈接傳輸數據,而這就涉及到通信。筆者以前也作過這方面的工做,當時使用的是java BIO簡單的寫了一個通信包,使用場景沒有多大的併發,阻塞式的BIO也未暴露太多問題。java BIO因其創建鏈接以後會阻塞線程等待數據,這種方式必須以一鏈接一線程的方式,即客戶端有鏈接請求時服務器端就須要啓動一個線程進行處理。當鏈接數過大時,會創建至關多的線程,性能直線降低。
Java NIO : 同步非阻塞,服務器實現模式爲一個請求一個線程,即客戶端發送的鏈接請求都會註冊到多路複用器上,多路複用器輪詢到鏈接有I/O請求時才啓動一個線程進行處理。
Java AIO : 異步非阻塞,服務器實現模式爲一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務器應用去啓動線程進行處理, BIO、NIO、AIO適用場景分析:
BIO 用於鏈接數目比較小且固定的架構,這種方式對服務器資源要求比較高,併發侷限於應用中,但程序直觀簡單易理解。
NIO 適用於鏈接數目多且鏈接比較短(輕操做)的架構,好比聊天服務器,併發侷限於應用中,編程比較複雜,目前主流的通信框架 Netty、Apache Mina、Grizzl、NIO Framework都是基於其實現的。
AIO 用於鏈接數目多且鏈接比較長(重操做)的架構,好比圖片服務器,文件傳輸等,充分調用OS參與併發操做,編程比較複雜。
(有興趣能夠看看這篇文章:BIO與NIO、AIO的區別 )
做爲基石的通信,其實要考慮不少東西。如:丟包粘包的狀況,心跳機制,斷連重連,消息緩存重發,資源的優雅釋放,長鏈接仍是短鏈接等。
下面是Netty創建服務端,客戶端的簡單實現:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*** * netty tcp 服務端 * @author v_wangshiyu * */
public class NettyTcpService {
private static final Logger log = LoggerFactory.getLogger(NettyTcpService.class);
private String host;
private int port;
public NettyTcpService(String address) throws Exception{
String str[] = address.split(":");
this.host=str[0];
this.port=Integer.valueOf(str[1]);
}
public NettyTcpService(String host,int port) throws Exception{
this.host=host;
this.port=port;
}
/**用於分配處理業務線程的線程組個數 */
private static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默認
/** 業務出現線程大小*/
private static final int BIZTHREADSIZE = 4;
/* * NioEventLoopGroup實際上就是個線程, * NioEventLoopGroup在後臺啓動了n個NioEventLoop來處理Channel事件, * 每個NioEventLoop負責處理m個Channel, * NioEventLoopGroup從NioEventLoop數組裏挨個取出NioEventLoop來處理Channel */
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);
public void start() throws Exception {
log.info("Netty Tcp Service Run...");
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new ByteArrayDecoder());
pipeline.addLast("encoder", new ByteArrayEncoder());
// pipeline.addLast(new Encoder());
// pipeline.addLast(new Decoder());
pipeline.addLast(new TcpServerHandler());
}
});
b.bind(host, port).sync();
log.info("Netty Tcp Service Success!");
}
/** * 中止服務並釋放資源 */
public void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/** * 服務端處理器 */
public class TcpServerHandler extends SimpleChannelInboundHandler<Object>{
private static final Logger log = LoggerFactory.getLogger(TcpServerHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
byte[] data=(byte[])msg;
}
}
複製代碼
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.util.concurrent.Future;
/** * netty tcp 客戶端 * @author v_wangshiyu * */
public class NettyTcpClient {
private static final Logger log = LoggerFactory.getLogger(NettyTcpClient.class);
private String host;
private int port;
private Bootstrap bootstrap;
private Channel channel;
private EventLoopGroup group;
public NettyTcpClient(String host,int port){
bootstrap=getBootstrap();
channel= getChannel(host,port);
this.host=host;
this.port=port;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
/** * 初始化Bootstrap * @return */
public final Bootstrap getBootstrap(){
group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast(new Encoder());
// pipeline.addLast(new Decoder());
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new ByteArrayDecoder());
pipeline.addLast("encoder", new ByteArrayEncoder());
pipeline.addLast("handler", new TcpClientHandler());
}
});
b.option(ChannelOption.SO_KEEPALIVE, true);
return b;
}
/** * 鏈接,獲取Channel * @param host * @param port * @return */
public final Channel getChannel(String host,int port){
Channel channel = null;
try {
channel = bootstrap.connect(host, port).sync().channel();
return channel;
} catch (Exception e) {
log.info(String.format("connect Server(IP[%s],PORT[%s]) fail!", host,port));
return null;
}
}
/** * 發送消息 * @param msg * @throws Exception */
public boolean sendMsg(Object msg) throws Exception {
if(channel!=null){
channel.writeAndFlush(msg).sync();
log.debug("msg flush success");
return true;
}else{
log.debug("msg flush fail,connect is null");
return false;
}
}
/** * 鏈接斷開 * 而且釋放資源 * @return */
public boolean disconnectConnect(){
//channel.close().awaitUninterruptibly();
Future<?> future =group.shutdownGracefully();//shutdownGracefully釋放全部資源,而且關閉全部當前正在使用的channel
future.syncUninterruptibly();
return true;
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/** * 客戶端處理器 */
public class TcpClientHandler extends SimpleChannelInboundHandler<Object>{
private static final Logger log = LoggerFactory.getLogger(TcpClientHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
byte[] data=(byte[])msg;
}
}
複製代碼
說到通信就不能不說協議,通訊時所遵照的規則,訪問什麼,傳輸的格式等都屬於協議。做爲一個開發人員,應該都瞭解TCP/IP協議,它是一個網絡通訊模型,以及一整套網絡傳輸協議家族,是互聯網的基礎通訊架構。也都應該用過http(超文本傳輸協議),Web服務器傳輸超文本到本地瀏覽器的傳送協議,該協議創建在TCP/IP協議之上。分佈式服務框架服務間的調用也會規定協議。爲了支持不一樣場景,分佈式服務框架會存在多種協議,如Dubbo就支持7種協議:dubbo協議(默認),rmi協議,hessian協議,http協議,webservice協議,thrift協議,memcached協議,redis協議每種協議應對的場景不盡相同,具體場景具體對待。
(這裏詳細介紹了Dubbo 的協議:Dubbo 的7種協議 )
分佈式服務上線時都是集羣組網部署,集羣中會存在某個服務的多實例,消費者如何從服務列表中選擇合適的服務提供者進行調用,這就涉及到服務路由。分佈式服務框架須要可以知足用戶靈活的路由需求。
不少開源的RPC框架調用者須要配置服務提供者的地址信息,儘管能夠經過讀取數據庫的服務地址列表等方式避免硬編碼地址信息,可是消費者依然要感知服務提供者的地址信息,這違反了透明化路由原則。而基於服務註冊中心的服務訂閱發佈,消費者經過主動查詢和被動通知的方式獲取服務提供者的地址信息,而再也不須要經過硬編碼方式獲得提供者的地址信息,只須要知道當前系統發佈了那些服務,而不須要知道服務具體存在於什麼位置,這就是透明化路由。
負載均衡策略是服務的重要屬性,分佈式服務框架一般會提供多種負載均衡策略,同時支持用戶擴展負載均衡策略。
一般在對等集羣組網中,採用隨機算法進行負債均衡,隨機路由算法消息分發仍是比較均勻的,採用JDK提供的java.util.Random或者java.security.SecureRandom在指定服務提供者列表中生成隨機地址。消費者基於隨機生成的服務提供者地址進行遠程調用。
/** * 隨機 */
public class RandomStrategy implements ClusterStrategy {
@Override
public RemoteServiceBase select(List<RemoteServiceBase> list) {
int MAX_LEN = list.size();
int index = RandomUtil.nextInt(MAX_LEN);
return list.get(index);
}
}
複製代碼
隨機仍是存在缺點的,可能出現部分節點的碰撞的機率較高,另外硬件配置差別較大時,會致使各節點負載不均勻。爲避免這些問題,須要對服務列表加權,性能好的機器接收的請求的機率應該高於通常機器。
/** * 加權隨機 */
public class WeightingRandomStrategy implements ClusterStrategy {
@Override
public RemoteServiceBase select(List<RemoteServiceBase> list) {
//存放加權後的服務提供者列表
List<RemoteServiceBase> weightingList = new ArrayList<RemoteServiceBase>();
for (RemoteServiceBase remoteServiceBase : list) {
//擴大10倍
int weight = (int) (remoteServiceBase.getWeight()*10);
for (int i = 0; i < weight; i++) {
weightingList.add(remoteServiceBase);
}
}
int MAX_LEN = weightingList.size();
int index = RandomUtil.nextInt(MAX_LEN);
return weightingList.get(index);
}
}
複製代碼
逐個請求服務地址,到達邊界以後,繼續繞接。主要缺點:慢的提供者會累積請求。例如第二臺機器很慢,但沒掛。當請求第二臺機器時被卡在那。長此以往,全部請求都卡在第二臺機器上。 輪詢策略實現很是簡單,順序循環遍歷服務提供者列表,達到邊界以後從新歸零開始,繼續順序循環。
/** * 輪詢 */
public class PollingStrategy implements ClusterStrategy {
//計數器
private int index = 0;
private Lock lock = new ReentrantLock();
@Override
public RemoteServiceBase select(List<RemoteServiceBase> list) {
RemoteServiceBase service = null;
try {
lock.tryLock(10, TimeUnit.MILLISECONDS);
//若計數大於服務提供者個數,將計數器歸0
if (index >= list.size()) {
index = 0;
}
service = list.get(index);
index++;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
//兜底,保證程序健壯性,若未取到服務,則直接取第一個
if (service == null) {
service = list.get(0);
}
return service;
}
}
複製代碼
加權輪詢的話,須要給服務地址添加權重。
/** * 加權輪詢 */
public class WeightingPollingStrategy implements ClusterStrategy {
//計數器
private int index = 0;
//計數器鎖
private Lock lock = new ReentrantLock();
@Override
public RemoteServiceBase select(List<RemoteServiceBase> list) {
RemoteServiceBase service = null;
try {
lock.tryLock(10, TimeUnit.MILLISECONDS);
//存放加權後的服務提供者列表
List<RemoteServiceBase> weightingList = new ArrayList<RemoteServiceBase>();
for (RemoteServiceBase remoteServiceBase : list) {
//擴大10倍
int weight = (int) (remoteServiceBase.getWeight()*10);
for (int i = 0; i < weight; i++) {
weightingList.add(remoteServiceBase);
}
}
//若計數大於服務提供者個數,將計數器歸0
if (index >= weightingList.size()) {
index = 0;
}
service = weightingList.get(index);
index++;
return service;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
//兜底,保證程序健壯性,若未取到服務,則直接取第一個
return list.get(0);
}
}
複製代碼
消費者緩存全部服務提供者的調用時延,週期性的計算服務調用平均時延。而後計算每一個服務提供者服務調用時延與平均時延的差值,根據差值大小動態調整權重,保證服務時延大的服務提供者接收更少的消息,防止消息堆積。 該策略的特色:保證處理能力強的服務接受更多的消息,經過動態的權重分配消除服務調用時延的震盪範圍,使全部服務的調用時延接近平均值,實現負載均衡。
相同參數的請求老是發送到統一服務提供者,當某一臺服務提供者宕機時,本來發往跟提供者的請求,基於虛擬節點,平攤到其餘提供者,不會引發劇烈變更,平臺提供默認的虛擬節點數,能夠經過配置文件修改虛擬節點個數。一致性Hash環工做原理以下圖所示:
負載均衡只能保證服務提供者壓力的平衡,可是在一些業務場景中須要設置一些過濾規則,比較經常使用的是基本表達式的條件路由。
經過IP條件表達式配置黑白名單訪問控制:consumerIP != 192.168.1.1。
只暴露部分服務提供者,防止這個集羣服務都被沖垮,致使其餘服務也不可用。例如providerIP = 192.168.3*。 讀寫分離:method=find*,list*,get*,query*=>providerIP=192.168.1.。 先後臺分離:app=web=>providerIP=192.168.1.,app=java=>providerIP=192.168.2.。 灰度升級:將WEB前臺應用理由到新的服務版本上:app=web=>provicerIP=192.168.1.*。
因爲篇幅緣由這裏不細說,仍是丟個說的比較詳細的文章地址: 服務路由
把對象轉換爲字節序列的過程稱爲序列化,把字節序列恢復爲對象的過程稱爲反序列化。運程調用的時候,咱們須要先將Java對象進行序列化,而後經過網絡,IO進行傳輸,當到達目的地以後,再進行反序列化獲取到咱們想要的結果對象。分佈式系統中,傳輸的對象會不少,這就要求序列化速度快,產生字節序列小的序列化技術。
序列化技術:Serializable, xml, Jackson, MessagePack, fastjson, Protocol Buffer, Thrift,Gson, Avro,Hessian等
Serializable 是java自帶的序列化技術,沒法跨平臺,序列化和反序列化的速度相對較慢。
XML技術多平臺支持好,經常使用於與銀行交互的報文,可是其字節序列產生較大,不太適合用做分佈式通信框架。
Fastjson是Java語言編寫的高性能的JSON處理器,由阿里巴巴公司開發,字節序列爲json串,可讀性好,序列化也速度很是的快。
Protocol Buffer 序列化速度很是快,字節序列較小,可是可讀性較差。
( 這裏就不一一介紹,有興趣能夠看看這篇文章:序列化技術比較 )
通常分佈式服務框架會內置多種序列化協議可供選擇,如Dubbo 支持的7種協議用到的序列化技術就不徹底相同。
本地環境下,使用某個接口很簡單,直接調用就行。分佈式環境下就不是那麼簡單了,消費者方只會存在接口的定義,沒有具體的實現。想要像本地環境下直接調用遠程接口那就得耗費一些功夫了,須要用到遠程代理。
下面是我盜的圖:
通訊時序以下:
消費者端沒有具體的實現,須要調用接口時會動態的去建立一個代理類。與spirng集成的狀況,那直接在bean構建的時候注入代理類。
下面是構建代理類:
import java.lang.reflect.Proxy;
public class JdkProxy {
public static Object getInstance(Class<?> cls){
JdkMethodProxy invocationHandler = new JdkMethodProxy();
Object newProxyInstance = Proxy.newProxyInstance(
cls.getClassLoader(),
new Class[] { cls },
invocationHandler);
return (Object)newProxyInstance;
}
}
複製代碼
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class JdkMethodProxy implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] parameters) throws Throwable {
//若是傳進來是一個已實現的具體類
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, parameters);
} catch (Throwable t) {
t.printStackTrace();
}
//若是傳進來的是一個接口
} else {
//實現接口的核心方法
//return RemoteInvoking.invoking(serviceName, serializationType, //timeOut,loadBalanceStrategy,method, parameters);
}
return null;
}
}
複製代碼
代理會作不少事情,對請求服務的名稱及參數信息的的序列化、經過路由選擇最爲合適服務提供者、創建通信鏈接發送請求信息(或者直接發起http請求)、最後返回獲取到的結果。固然這裏面須要考慮不少問題,如調用超時,請求異常,通信鏈接的緩存,同步服務調用仍是異步服務調用等等。
同步服務調用:客戶端發起遠程服務調用請求,用戶線程完成消息序列化以後,將消息投遞到通訊框架,而後同步阻塞,等待通訊線程發送請求並接收到應答以後,喚醒同步等待的用戶線程,用戶線程獲取到應答以後返回。
異步服務調用:基於JAVA的Future機制,客戶端發起遠程服務調用請求,該請求會被標上requestId,同時創建一個與requestId對應 Future,客戶端經過Future 的 get方法獲取結果時會被阻塞。服務端收到請求應達會回傳requestId,經過requestId去解除對應Future的阻塞,同時set對應結果,最後客戶端獲取到結果。
構建Future,以requestId爲key,put到線程安全的map中。get結果時須要寫入timeOut超時時間,防止因爲結果的未返回而致使的長時間的阻塞。
SyncFuture<RpcResponse> syncFuture =new SyncFuture<RpcResponse>();
SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(), syncFuture);
try {
RpcResponse rpcResponse= syncFuture.get(timeOut,TimeUnit.MILLISECONDS); return rpcResponse.getResult();
}catch (Exception e){
throw e;
}finally {
SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());
}
複製代碼
結果返回時經過回傳的requestId獲取對應Future寫入Response,Future線程解除阻塞。
log.debug("Tcp Client receive head:"+headAnalysis+"Tcp Client receive data:" +rpcResponse);
SyncFuture<RpcResponse> syncFuture= SyncFutureCatch.syncFutureMap.get(rpcResponse.getRequestId());
if(syncFuture!=null){
syncFuture.setResponse(rpcResponse);
}
複製代碼
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class SyncFuture<T> implements Future<T> {
// 由於請求和響應是一一對應的,所以初始化CountDownLatch值爲1。
private CountDownLatch latch = new CountDownLatch(1);
// 須要響應線程設置的響應結果
private T response;
// Futrue的請求時間,用於計算Future是否超時
private long beginTime = System.currentTimeMillis();
public SyncFuture() {
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
if (response != null) {
return true;
}
return false;
}
// 獲取響應結果,直到有結果才返回。
@Override
public T get() throws InterruptedException {
latch.await();
return this.response;
}
// 獲取響應結果,直到有結果或者超過指定時間就返回。
@Override
public T get(long timeOut, TimeUnit unit) throws InterruptedException {
if (latch.await(timeOut, unit)) {
return this.response;
}
return null;
}
// 用於設置響應結果,而且作countDown操做,通知請求線程
public void setResponse(T response) {
this.response = response;
latch.countDown();
}
public long getBeginTime() {
return beginTime;
}
}
複製代碼
SyncFuture<RpcResponse> syncFuture =new SyncFuture<RpcResponse>();
SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(), syncFuture);
RpcResponse rpcResponse= syncFuture.get(timeOut,TimeUnit.MILLISECONDS);
SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());
複製代碼
除了同步服務調用,異步服務調用,還有並行服務調用,泛化調用等調用形式
( 這裏就不作介紹,有興趣能夠看看這篇文章:服務框架多形式的服務調用:同步、異步、並用、泛化 )
簡單的介紹了下分佈式服務框架,下面來講下分佈式系統的高可用。一個系統設計開發出來,三天兩晚就出個大問題,致使沒法使用,那這個系統也不是什麼好系統。業界流傳一句話:"咱們系統支持X個9的可靠性"。這個X是表明一個數字,X個9表示在系統1年時間的使用過程當中,系統能夠正常使用時間與總時間(1年)之比。
3個9:(1-99.9%)*365*24=8.76小時,表示該系統在連續運行1年時間裏最多可能的業務中斷時間是8.76小時,4個9即52.6分鐘,5個9即5.26分鐘。要作到如此高的可靠性,是很是大的挑戰。一個大型分佈式項目多是由幾十上百個項目構成,涉及到的服務成千上萬,主鏈上的一個流程就須要流轉多個團隊維護的項目。拿4個9的可靠性來講,平攤到每一個團隊的時間可能不到10分鐘。這10分鐘內須要頂住壓力,以最快的時間找到並解決問題,恢復系統的可用。
下面說說爲了提升系統的可靠性都有哪些方案:
服務檢測:某臺服務器與註冊中心的鏈接中斷,其提供的服務也無響應時,系統應該能主動去重啓該服務,使其能正常對外提供。
故障隔離:集羣環境下,某臺服務器能對外提供服務,可是由於其餘緣由,請求結果始終異常。這時就須要主動將該節點從集羣環境中剔除,避免繼續對後面的請求形成影響,非高峯時期再嘗試修復該問題。至於機房故障的狀況,只能去屏蔽整個機房了。目前餓了麼作的是異地多活,即使單邊機房掛了,流量也能夠全量切換至另一邊機房,保證系統的可用。
監控:包含業務監控、服務異常監控、db中間件性能的監控等,系統出現異常的時候能及時的通知到開發人員。等到線下報上來的時候,可能影響已經很大了。
壓測:產線主鏈路的壓測是必不可少的,單靠集成測試,有些高併發的場景是沒法覆蓋到的,壓測能暴露日常狀況沒法出現的問題,也能直觀的提現系統的吞吐能力。當業務激增時,能夠考慮直接作系統擴容。
sop方案與演練:產線上隨時均可能會發生問題,抱着出現問題時再想辦法解決的態度是確定不行的,時間根原本不及。提早作好對應問題的sop方案,能節省大量時間,儘快的恢復系統的正常。固然日常的演練也是不可少的,一旦產線故障能夠作到從容不迫的去應對和處理。
除了上述方案外,還能夠考慮服務策略的使用:
業務高峯期,爲了保證核心服務,須要停掉一些不過重要的業務,如雙十一期間不容許發起退款(* ̄▽ ̄)、只容許查看3個月以內的歷史訂單等業務的降級,調用服務接口時,直接返回的空結果或異常等服務的降級,都屬於分佈式系統的降級策略。服務降級是可逆操做,當系統壓力恢復到必定值不須要降級服務時,須要去除降級,將服務狀態恢復正常。 服務降級主要包括屏蔽降級和容錯降級:
屏蔽降級:分佈式服務框架直接屏蔽對遠程接口的請求,不發起對遠程服務的調用,直接返回空結果、拋出指定異常、執行本地模擬接口實現等方式。
容錯降級:非核心服務不可調用時,能夠對故障服務作業務放通,保證主流程不受影響。如請求超時、消息解碼異常、系統擁塞保護異常, 服務提供方系統異常等狀況。 筆者以前就碰到過因雙方沒有作容錯降級致使的系統故障的狀況。午高峯時期,對方調用咱們的一個非核心查詢接口,咱們系統由於bug問題一直異常,致使對方調用這個接口的頁面異常而沒法跳轉到主流程頁面,影響了產線的生產。當時對方緊急發版才使系統恢復正常。
說到限流,最早想到的就是秒殺活動了,一場秒殺活動的流量多是正常流量的幾百至幾千倍,如此高的流量系統根本沒法處理,只能經過限流來避免系統的崩潰。服務的限流本質和秒殺活動的限流是同樣的,都是限制請求的流入,防止服務提供方因大量的請求而崩潰。
限流算法:令牌桶、漏桶、計數器算法
上述算法適合單機的限流,但涉及到整個集羣的限流時,得考慮使用緩存中間件了。例如:某個服務1分鐘內只容許請求2次,或者一天只容許使用1000次。因爲負載均衡存在,可能集羣內每臺機器都會收到請求,這種時候就須要緩存來記錄調用方某段時間內的請求次數,再作限流處理。redis就很適合作此事。 限流算法的實現
熔斷本質上是一種過載保護機制,這一律念來源於電子工程中的斷路器,當電流過大時,保險絲會熔斷,從而保護整個電路。一樣在分佈式系統中,當被調用的遠程服務沒法使用時,若是沒有過載保護,就會致使請求的資源阻塞在遠程服務器上耗盡資源。不少時候,剛開始可能只是出現了局部小規模的故障,然而因爲種種緣由,故障影響範圍愈來愈大,最終致使全局性的後果。當下遊服務因訪問壓力過大而響應變慢或失敗,上游服務爲了保護本身以及系統總體的可用性,能夠暫時切斷對下游服務的調用。
熔斷器的設計思路
Closed:初始狀態,熔斷器關閉,正常提供服務
Open: 失敗次數,失敗百分比達到必定的閾值以後,熔斷器打開,中止訪問服務
Half-Open:熔斷必定時間以後,小流量嘗試調用服務,若是成功則恢復,熔斷器變爲Closed狀態
一個系統設計開發出來,必須保證其運行的數據準確和一致性。拿支付系統來講:用戶銀行卡已經扣款成功,系統裏卻顯示失敗,沒有給用戶的虛擬賬戶充值上,這會引發客訴。說的再嚴重點,用戶發起提現,資金已經轉到其銀行帳戶,系統卻沒扣除對應虛擬賬號的餘額,直接致使資金損失了。若是這時候用戶一直髮起提現,那就酸爽了。
說到數據一致性,就不得不說到CAP原則。CAP原則中指出任何一個分佈式系統中,Consistency(一致性 C)、 Availability(可用性 A)、Partition tolerance(分區容錯性P),三者不可兼得。傳統單機數據庫基於ACID特性(原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)、持久性(Durability)) ,放棄了分區容錯性,能作到可用性和一致性。對於一個分佈式系統而言,分區容錯性是一個最基本的要求。既然是一個分佈式系統,那麼分佈式系統中的組件必然須要被部署到不一樣的節點,會出現節點與節點之間的網絡通信,而網絡問題又是必定會出現的異常狀況,分區容錯性也就成爲了一個分佈式系統必然須要面對和解決的問題。系統架構師每每須要把精力花在如何根據業務特色在一致性和可用性之間尋求平衡。
集中式系統,經過數據庫事務的控制,能作到數據的強一致性。可是分佈式系統中,涉及多服務間的調用,經過分佈式事務的方案:兩階段提交(2PC)、三階段提交(3PC)、**補償事務(TCC)**等雖然能實現數據的強一致,可是都是經過犧牲可用性來實現。
BASE理論是對CAP原則中一致性和可用性權衡的結果:Basically Available(基本可用)、Soft state(軟狀態)和Eventually consistent(最終一致性)。BASE理論,其來源於對大規模互聯網系統分佈式實踐的總結,是基於CAP原則逐步演化而來的。其最核心思想是:即便沒法作到強一致性,但每一個應用均可以根據自身業務特色,採用適當的方式來使系統達到最終一致性。
基本可用
基本可用是指分佈式系統在出現不可預知故障的時候,容許損失部分可用性,這不等價於系統不可用。
軟狀態
軟狀態指容許系統中的數據存在中間狀態,並認爲該中間狀態的存在不會影響系統的總體可用性,即容許系統在不一樣節點的數據副本之間進行數據同步的過程存在延時
最終一致性
最終一致性強調的是全部的數據副本,在通過一段時間的同步以後,最終都可以達到一致的狀態。所以,最終一致性的本質是須要系統保證最終數據可以達到一致,而不須要實時保證系統數據的強一致性。
總的來講,BASE理論面向的是大型高可用可擴展的分佈式系統,和傳統的事物ACID特性是相反的,它徹底不一樣於ACID的強一致性模型,而是經過犧牲強一致性來得到可用性,並容許數據在一段時間內是不一致的,但最終達到一致狀態。同時,在實際的分佈式場景中,不一樣業務單元和組件對數據一致性的要求是不一樣的,所以在具體的分佈式系統架構設計過程當中,ACID特性和BASE理論每每又會結合在一塊兒。
下面2篇文章對分佈式事務和數據一致性這塊有較深的講解。
聊聊分佈式事務,再說說解決方案
微服務下的數據一致性的幾種實現方式之概述
分佈式系統涉及到的東西還有不少,如:分佈式鎖、定時調度、數據分片、性能問題、各類中間件的使用等。筆者分享只是瞭解到的那一小部分的知識而已。以前本着學習的目的也寫過一個很是簡單的分佈式服務框架blackRpc ,經過它瞭解了分佈式服務框架內部的一些活動。本文中全部代碼都能在該項目中找到,有興趣讀者能夠看看。
閱讀博客還不過癮?
歡迎你們掃二維碼經過添加羣助手,加入交流羣,討論和博客有關的技術問題,還能夠和博主有更多互動
博客轉載、線下活動及合做等問題請郵件至 shadowfly_zyl@hotmail.com 進行溝通