分佈式事務,EventBus 解決方案:CAP【中文文檔】(轉)

出處:http://www.cnblogs.com/savorboard/p/cap-document.htmlcss

前言

不少同窗想對CAP的機制以及用法等想有一個詳細的瞭解,因此花了將近兩週時間寫了這份中文的CAP文檔,對 CAP 還不知道的同窗能夠先看一下這篇文章html

本文檔爲 CAP 文獻(Wiki),本文獻同時提供中文和英文版本,英文版本目前還在翻譯中,會放到Github Wiki 中。mysql

目錄

一、Getting Started

1.1 介紹

CAP 是一個遵循 .NET Standard 標準庫的C#庫,用來處理分佈式事務以及提供EventBus的功能,她具備輕量級,高性能,易使用等特色。git

目前 CAP 使用的是 .NET Standard 1.6 的標準進行開發,目前最新預覽版本已經支持 .NET Standard 2.0.github

1.2 應用場景

CAP 的應用場景主要有如下兩個:sql

    1. 分佈式事務中的最終一致性(異步確保)的方案。

分佈式事務是在分佈式系統中不可避免的一個硬性需求,而目前的分佈式事務的解決方案也無外乎就那麼幾種,在瞭解 CAP 的分佈式事務方案前,能夠閱讀如下 這篇文章數據庫

CAP 沒有采用兩階段提交(2PC)這種事務機制,而是採用的 本地消息表+MQ 這種經典的實現方式,這種方式又叫作 異步確保。json

    1. 具備高可用性的 EventBus。

CAP 實現了 EventBus 中的發佈/訂閱,它具備 EventBus 的全部功能。也就是說你能夠像使用 EventBus 同樣來使用 CAP,另外 CAP 的 EventBus 是具備高可用性的,這是什麼意思呢?api

CAP 藉助於本地消息表來對 EventBus 中的消息進行了持久化,這樣能夠保證 EventBus 發出的消息是可靠的,當消息隊列出現宕機或者鏈接失敗的狀況時,消息也不會丟失。markdown

1.3 Quick Start

  • 引用 NuGet 包

使用一下命令來引用CAP的NuGet包:

PM> Install-Package DotNetCore.CAP

根據使用的不一樣類型的消息隊列,來引入不一樣的擴展包:

PM> Install-Package DotNetCore.CAP.RabbitMQ PM> Install-Package DotNetCore.CAP.Kafka

根據使用的不一樣類型的數據庫,來引入不一樣的擴展包:

PM> Install-Package DotNetCore.CAP.SqlServer PM> Install-Package DotNetCore.CAP.MySql
  • 啓動配置

在 ASP.NET Core 程序中,你能夠在 Startup.cs 文件 ConfigureServices() 中配置 CAP 使用到的服務:

public void ConfigureServices(IServiceCollection services) { services.AddDbContext<AppDbContext>(); services.AddCap(x => { // If your SqlServer is using EF for data operations, you need to add the following configuration: // Notice: You don't need to config x.UseSqlServer(""") again! x.UseEntityFramework<AppDbContext>(); // If you are using Dapper,you need to add the config: x.UseSqlServer("Your ConnectionStrings"); // If your Message Queue is using RabbitMQ you need to add the config: x.UseRabbitMQ("localhost"); // If your Message Queue is using Kafka you need to add the config: x.UseKafka("localhost"); }); }

Configure() 中配置啓動 CAP :

public void Configure(IApplicationBuilder app) { app.UseCap(); }

二、API接口

CAP 的 API 接口只有一個,就是 ICapPublisher 接口,你能夠從 DI 容器中獲取到該接口的實例進行調用。

2.1 發佈/發送

你可使用 ICapPublisher 接口中的 Publish<T> 或者 PublishAsync<T> 方法來發送消息:

public class PublishController : Controller { private readonly ICapPublisher _publisher; //在構造函數中獲取接口實例 public PublishController(ICapPublisher publisher) { _publisher = publisher; } [Route("~/checkAccount")] public async Task<IActionResult> PublishMessage() { await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); return Ok(); } }

下面是PublishAsync這個接口的簽名:

PublishAsync<T>(string name,T object)

默認狀況下,在調用此方法的時候 CAP 將在內部建立事務,而後將消息寫入到 Cap.Published 這個消息表。

2.1.1 事務

事務在 CAP 具備重要做用,它是保證消息可靠性的一個基石。 在發送一條消息到消息隊列的過程當中,若是不使用事務,咱們是沒有辦法保證咱們的業務代碼在執行成功後消息已經成功的發送到了消息隊列,或者是消息成功的發送到了消息隊列,可是業務代碼確執行失敗。

這裏的失敗緣由多是多種多樣的,好比鏈接異常,網絡故障等等。

只有業務代碼和CAP的Publish代碼必須在同一個事務中,纔可以保證業務代碼和消息代碼同時成功或者失敗。

如下是兩種使用事務進行Publish的代碼:

  • EntityFramework
using (var transaction = dbContext.Database.BeginTransaction()) { await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); // 你的業務代碼。 transaction.Commit(); }

你的業務代碼能夠位於 Publish 以前或者以後,只須要保證在同一個事務。

當CAP檢測到 Publish 是在EF事務區域內的時候,將使用當前的事務上下文進行消息的存儲。

其中,發送的內容會序列化爲Json存儲到消息表中。

  • Dapper
var connString = "數據庫鏈接字符串"; using (var connection = new MySqlConnection(connString)) { connection.Open(); using (var transaction = connection.BeginTransaction()) { await _publisher.PublishAsync("xxx.services.bar", new Person { Name = "Foo", Age = 11 }, connection, transaction); // 你的業務代碼。 transaction.Commit(); } }

在 Dapper 中,因爲不能獲取到事務上下文,因此須要用戶手動的傳遞事務上下文到CAP中。

2.2 訂閱/消費

注意:消息端在方法實現的過程當中須要實現冪等性。

使用 CapSubscribeAttribute 來訂閱 CAP 發佈出去的消息。

[CapSubscribe("xxx.services.bar")] public void BarMessageProcessor() { } 

這裏,你也可使用多個 CapSubscribe[""] 來同時訂閱多個不一樣的消息 :

[CapSubscribe("xxx.services.bar")] [CapSubscribe("xxx.services.foo")] public void BarAndFooMessageProcessor() { } 

其中,xxx.services.bar 爲訂閱的消息名稱,內部實現上,這個名稱在不一樣的消息隊列具備不一樣的表明。 在 Kafka 中,這個名稱即爲 Topic Name。 在RabbitMQ 中,爲 RouteKey。

RabbitMQ 中的 RouteKey 支持綁定鍵表達式寫法,有兩種主要的綁定鍵:

*(星號)能夠代替一個單詞.

# (井號) 能夠代替0個或多個單詞.

好比在下面這個圖中(P爲發送者,X爲RabbitMQ中的Exchange,C爲消費者,Q爲隊列)

在這個示例中,咱們將發送一條關於動物描述的消息,也就是說 Name(routeKey) 字段中的內容包含 3 個單詞。第一個單詞是描述速度的(celerity),第二個單詞是描述顏色的(colour),第三個是描述哪一種動物的(species),它們組合起來相似:「..」。

而後在使用 CapSubscribe 綁定的時候,Q1綁定爲 CapSubscribe["*.orange.*"], Q2 綁定爲 CapSubscribe["*.*.rabbit"][CapSubscribe["lazy.#]

那麼,當發送一個名爲 "quick.orange.rabbit" 消息的時候,這兩個隊列將會同時收到該消息。一樣名爲 lazy.orange.elephant的消息也會被同時收到。另外,名爲 "quick.orange.fox" 的消息將僅會被髮送到Q1隊列,名爲 "lazy.brown.fox" 的消息僅會被髮送到Q2。"lazy.pink.rabbit" 僅會被髮送到Q2一次,即便它被綁定了2次。"quick.brown.fox" 沒有匹配到任何綁定的隊列,因此它將會被丟棄。

另一種狀況,若是你違反約定,好比使用 4個單詞進行組合,例如 "quick.orange.male.rabbit",那麼它將匹配不到任何的隊列,消息將會被丟棄。

可是,假如你的消息名爲 "lazy.orange.male.rabbit",那麼他們將會被髮送到Q2,由於 #(井號)能夠匹配 0 或者多個單詞。

在 CAP 中,咱們把每個擁有 CapSubscribe[]標記的方法叫作訂閱者,你能夠把訂閱者進行分組。

組(Group),是訂閱者的一個集合,每一組能夠有一個或者多個消費者,可是一個訂閱者只能屬於某一個組。同一個組內的訂閱者訂閱的消息只能被消費一次。

若是你在訂閱的時候沒有指定組,CAP會將訂閱者設置到一個默認的組 cap.default.group

如下是使用組進行訂閱的示例:

[CapSubscribe("xxx.services.foo", Group = "moduleA")] public void FooMessageProcessor() { }

2.2.1 例外狀況

這裏有幾種狀況可能須要知道:

① 消息發佈的時候訂閱方還未啓動

Kafka:

當 Kafka 中,發佈的消息存儲於持久化的日誌文件中,因此消息不會丟失,當訂閱者所在的程序啓動的時候會消費掉這些消息。

RabbitMQ:

在 RabbitMQ 中,應用程序首次啓動會建立具備持久化的 Exchange 和 Queue,CAP 會針對每個訂閱者Group會新建一個消費者隊列,因爲首次啓動時候訂閱者未啓動的因此是沒有隊列的,消息沒法進行持久化,這個時候生產者發的消息會丟失

針對RabbitMQ的消息丟失的問題,有兩種解決方式:

i. 部署應用程序以前,在RabbitMQ中手動建立具備durable特性的Exchange和Queue,默認狀況他們的名字分別是(cap.default.topic, cap.default.group)。

ii. 提早運行一遍全部實例,讓Exchange和Queue初始化。

咱們建議採用第 ii 種方案,由於很容易作到。

② 消息沒有任何訂閱者

若是你發送了一條個沒有被任何訂閱者訂閱的消息,那麼此消息將會被丟棄。

三、配置

Cap 使用 Microsoft.Extensions.DependencyInjection 進行配置的注入,你也能夠依賴於 DI 從json文件中讀取配置。

3.1 Cap Options

你可使用以下方式來配置 CAP 中的一些配置項,例如

services.AddCap(capOptions => { capOptions.FailedCallback = //... });

CapOptions 提供了一下配置項:

NAME DESCRIPTION TYPE DEFAULT
PollingDelay 處理消息的線程默認輪詢等待時間(秒) int 15 秒
QueueProcessorCount 啓動隊列中消息的處理器個數 int 2
FailedMessageWaitingInterval 輪詢失敗消息的間隔(秒) int 180 秒
FailedCallback 執行失敗消息時的回調函數,詳情見下文 Action NULL

CapOptions 提供了 FailedCallback 爲處理失敗的消息時的回調函數。當消息屢次發送失敗後,CAP會將消息狀態標記爲Failed,CAP有一個專門的處理者用來處理這種失敗的消息,針對失敗的消息會從新放入到隊列中發送到MQ,在這以前若是FailedCallback具備值,那麼將首先調用此回調函數來告訴客戶端。

FailedCallback 的類型爲 Action<MessageType,string,string>,第一個參數爲消息類型(發送的仍是接收到),第二個參數爲消息的名稱(name),第三個參數爲消息的內容(content)。

3.2 RabbitMQ Options

CAP 採用的是針對 CapOptions 進行擴展來實現RabbitMQ的配置功能,因此針對 RabbitMQ 的配置用法以下:

services.AddCap(capOptions => { capOptions.UseRabbitMQ(rabbitMQOption=>{ // rabbitmq options. }); });

RabbitMQOptions 提供了有關RabbitMQ相關的配置:

NAME DESCRIPTION TYPE DEFAULT
HostName 宿主地址 string localhost
UserName 用戶名 string guest
Password 密碼 string guest
VirtualHost 虛擬主機 string /
Port 端口號 int -1
TopicExchangeName CAP默認Exchange名稱 string cap.default.topic
RequestedConnectionTimeout RabbitMQ鏈接超時時間 int 30,000 毫秒
SocketReadTimeout RabbitMQ消息讀取超時時間 int 30,000 毫秒
SocketWriteTimeout RabbitMQ消息寫入超時時間 int 30,000 毫秒
QueueMessageExpires 隊列中消息自動刪除時間 int (10天) 毫秒

3.3 Kafka Options

CAP 採用的是針對 CapOptions 進行擴展來實現 Kafka 的配置功能,因此針對 Kafka 的配置用法以下:

services.AddCap(capOptions => { capOptions.UseKafka(kafkaOption=>{ // kafka options. // kafkaOptions.MainConfig.Add("", ""); }); });

KafkaOptions 提供了有關 Kafka 相關的配置,因爲Kafka的配置比較多,因此此處使用的是提供的 MainConfig 字典來支持進行自定義配置,你能夠查看這裏來獲取對配置項的支持信息。

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

3.4 SqlServer Options

若是你使用的是 EntityFramewrok,你用不到該配置項下的內容。

CAP 採用的是針對 CapOptions 進行擴展來實現 SqlServer 的配置功能,因此針對 SqlServer 的配置用法以下:

services.AddCap(capOptions => { capOptions.UseSqlServer(sqlserverOptions => { // sqlserverOptions.ConnectionString }); });
NAME DESCRIPTION TYPE DEFAULT
Schema Cap表架構 string Cap
ConnectionString 數據庫鏈接字符串 string null

3.5 MySql Options

若是你使用的是 EntityFramewrok,你用不到該配置項下的內容。

CAP 採用的是針對 CapOptions 進行擴展來實現 MySql 的配置功能,因此針對 MySql 的配置用法以下:

services.AddCap(capOptions => { capOptions.UseMySql(mysqlOptions => { // mysqlOptions.ConnectionString }); });
NAME DESCRIPTION TYPE DEFAULT
TableNamePrefix Cap表名前綴 string cap
ConnectionString 數據庫鏈接字符串 string null

四、設計原理

4.1 動機

隨着微服務架構的流行,愈來愈多的人在嘗試使用微服務來架構他們的系統,而在這其中咱們會遇到例如分佈式事務的問題,爲了解決這些問題,我沒有發現簡單而且易於使用的解決方案,因此我決定來打造這樣一個庫來解決這個問題。

最初 CAP 是爲了解決分佈式系統中的事務問題,她採用的是 異步確保 這種機制實現了分佈式事務的最終一致性,更多這方面的信息能夠查看第6節。

如今 CAP 除了解決分佈式事務的問題外,她另一個重要的功能就是做爲 EventBus 來使用,她具備 EventBus 的全部功能,而且提供了更加簡化的方式來處理EventBus中的發佈/訂閱。

4.2 持久化

CAP 依靠本地數據庫實現消息的持久化,CAP 使用這種方式來應對一切環境或者網絡異常致使消息丟失的狀況,消息的可靠性是分佈式事務的基石,因此在任何狀況下消息都不能丟失。

對於消息的持久化分爲兩種:

① 消息進入消息隊列以前的持久化

在消息進入到消息隊列以前,CAP使用本地數據庫表對消息進行持久化,這樣能夠保證當消息隊列出現異常或者網絡錯誤時候消息是沒有丟失的。

爲了保證這種機制的可靠性,CAP使用和業務代碼相同的數據庫事務來保證業務操做和CAP的消息在持久化的過程當中是強一致的。也就是說在進行消息持久化的過程當中,任何一方發生異常狀況數據庫都會進行回滾操做。

② 消息進入到消息隊列以後的持久化

消息進入到消息隊列以後,CAP會啓動消息隊列的持久化功能,咱們須要說明一下在 RabbitMQ 和 Kafka 中CAP的消息是如何持久化的。

針對於 RabbitMQ 中的消息持久化,CAP 使用的是具備消息持久化功能的消費者隊列,可是這裏面可能有例外狀況,參加 2.2.1 章節。

因爲 Kafka 天生設計的就是使用文件進行的消息持久化,在因此在消息進入到Kafka以後,Kafka會保證消息可以正確被持久化而不丟失。

4.3 通信數據流

CAP 中消息的流轉過程大體以下:

「 P 」 表明消息發送者(生產者)。 「 C 」 表明消息消費者(訂閱者)。

4.4 一致性

CAP 採用最終一致性做爲的一致性方案,此方案是遵循 CAP 理論,如下是CAP理論的描述。

C(一致性)一致性是指數據的原子性,在經典的數據庫中經過事務來保障,事務完成時,不管成功或回滾,數據都會處於一致的狀態,在分佈式環境下,一致性是指多個節點數據是否一致;

A(可用性)服務一直保持可用的狀態,當用戶發出一個請求,服務能在必定的時間內返回結果;

P(分區容忍性)在分佈式應用中,可能由於一些分佈式的緣由致使系統沒法運轉,好的分區容忍性,使應用雖然是一個分佈式系統,可是好像一個能夠正常運轉的總體

根據 「CAP」分佈式理論, 在一個分佈式系統中,咱們每每爲了可用性和分區容錯性,忍痛放棄強一致支持,轉而追求最終一致性。大部分業務場景下,咱們是能夠接受短暫的不一致的。

第 6 節將對此作進一步介紹。

五、實現

CAP 封裝了在 ASP.NET Core 中的使用依賴注入來獲取 Publisher (ICapPublisher)的接口。而啓動方式相似於 「中間件」 的形式,經過在 Startup.cs 配置 ConfigureServicesConfigure 進行啓動。

5.1 消息表

當系統引入CAP以後並首次啓動後,CAP會在客戶端生成 3 個表,分別是 Cap.Published, Cap.Received, Cap.Queue。注意表名可能在不一樣的數據庫具備不一樣的大小寫區分,若是你在運行項目的時候沒有顯式的指定數據庫生成架構(SQL Server)或者表名前綴(MySql)的話,默認狀況下就是以上3個名字。

Cap.Published:這個表主要是用來存儲 CAP 發送到MQ(Message Queue)的客戶端消息,也就是說你使用 ICapPublisher 接口 Publish 的消息內容。

Cap.Received:這個表主要是用來存儲 CAP 接收到 MQ(Message Queue) 的客戶端訂閱的消息,也就是使用 CapSubscribe[] 訂閱的那些消息。

Cap.Queue: 這個表主要是CAP內部用來處理髮送和接收消息的一個臨時表,一般狀況下,若是系統不出現問題,這個表將是空的。

PublishedReceived 表具備 StatusName 字段,這個字段用來標識當前消息的狀態。目前共有 Scheduled,Enqueued,Processing,Successed,Failed 等幾個狀態。CAP 在處理消息的過程當中會依次從 Scheduled 到 Successed 來改變這些消息狀態的值。若是是狀態值爲 Successed,表明該消息已經成功的發送到了 MQ 中。若是爲 Failed 則表明消息發送失敗,消息發送失敗後 CAP 會對消息進行重試,直到成功。

關於數據清理: CAP 默認狀況下會每隔一個小時將消息表的數據進行清理刪除,避免數據量過多致使性能的下降。清理規則爲 ExpiresAt 不爲空而且小於當前時間的數據。

5.2 消息格式

CAP 採用 JSON 格式進行消息傳輸,如下是消息的對象模型:

NAME DESCRIPTION TYPE
Id 消息編號 int
Name 消息名稱 string
Content 內容 string
Group 所屬消費組 string
Added  建立時間 DateTime
ExpiresAt 過時時間 DateTime
Retries 重試次數 int
StatusName 狀態 string

對於 Cap.Received 中的消息,會多一個 Group 字段來標記所屬的消費者組。

5.3 EventBus

EventBus 採用 發佈-訂閱 風格進行組件之間的通信,它不須要顯式在組件中進行註冊。

上圖是EventBus的一個Event的流程,關於 EventBus 的更多信息就不在這裏介紹了...

在 CAP 中,爲何說 CAP 實現了 EventBus 中的所有特性,由於 EventBus 具備的兩個大功能就是發佈和訂閱, 在 CAP 中 使用了另一種優雅的方式來實現的,另一個 CAP 提供的強大功能就是消息的持久化,以及在任何異常狀況下消息的可靠性,這是EventBus不具備的功能。

CAP 裏面發送一個消息能夠看作是一個 「Event」,一個使用了CAP的ASP.NET Core 應用程序既能夠進行發送也能夠進行訂閱接收。

5.4 重試

重試在實現分佈式事務中具備重要做用,CAP 中會針對發送失敗或者執行失敗的消息進行重試。在整個 CAP 的設計過程當中有如下幾處採用的重試策略。

① 消息發送重試

在消息發送過程當中,當出現 Broker 宕機或者鏈接失敗的狀況亦或者出現異常的狀況下,這個時候 CAP 會對發送的重試,重試策略爲默認 15 次失敗重試,當15次事後仍然失敗時,CAP會將此消息狀態標記爲失敗。

② 消息消費重試

當 Consumer 接收到消息時,會執行消費者方法,在執行消費者方法出現異常時,會進行重試。這個重試策略和 ① 是相同的。

③ 失敗消息重試

CAP 會按期針對 ① 和 ② 中狀態爲「失敗的」消息進行重試,CAP會對他們進行從新「入隊(Enqueue)」,入隊時會將消息中的重試次數標記爲0,狀態置爲 Enqueued。

六、分佈式事務

針對於分佈式事務的處理,CAP 採用的是「異步確保」這種方案。

6.1 異步確保

異步確保這種方案又叫作本地消息表,這是一種經典的方案,方案最初來源於 eBay,參考資料見段末連接。這種方案目前也是企業中使用最多的方案之一。

相對於 TCC 或者 2PC/3PC 來講,這個方案對於分佈式事務來講是最簡單的,並且它是去中心化的。在TCC 或者 2PC 的方案中,必須具備事務協調器來處理每一個不一樣服務之間的狀態,而此種方案不須要事務協調器。
另外 2PC/TCC 這種方案若是服務依賴過多,會帶來管理複雜性增長和穩定性風險增大的問題。試想若是咱們強依賴 10 個服務,9 個都執行成功了,最後一個執行失敗了,那麼是否是前面 9 個都要回滾掉?這個成本仍是很是高的。

可是,並非說 2PC 或者 TCC 這種方案很差,由於每一種方案都有其相對優點的使用場景和優缺點,這裏就不作過多介紹了。

中文:http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html
英文:http://queue.acm.org/detail.cfm?id=1394128

七、FAQ

暫無


本文地址:http://www.cnblogs.com/savorboard/p/cap-document.html
做者博客:Savorboard 歡迎轉載,請在明顯位置給出出處及連接

相關文章
相關標籤/搜索