MaxCompute(原ODPS) 事件(Event)機制

摘要: 免費開通大數據服務:https://www.aliyun.com/product/odps 轉自habai 什麼是 MaxCompute事件機制 MaxCompute event 用於監控表和實例等MaxCompute資源(目前只用於監控表)。編程

 

免費開通大數據服務:https://www.aliyun.com/product/odpsapi

 

什麼是MaxCompute安全

 

大數據計算服務(MaxCompute,原名ODPS)是一種快速、徹底託管的TB/PB級數據倉庫解決方案。MaxCompute向用戶提供了完善的數據導入方案以及多種經典的分佈式計算模型,可以更快速的解決用戶海量數據計算問題,有效下降企業成本,並保障數據安全。服務器

 

什麼是 MaxCompute事件機制restful

 

MaxCompute event 用於監控表和實例等MaxCompute資源(目前只用於監控表)。當表狀態發生變化時,MaxCompute 會向預先註冊(訂閱)的 uri 發送信息。Event通知只有在訂閱Event以後才能收到。每一個project中能夠訂閱一個或多個Event。Event是用戶的數據,同表數據同樣,建立或修改時都須要有這個Project的操做權限。關於Event的Restful Api,在文章裏有介紹。網絡

 

爲何須要 MaxCompute 事件機制多線程

 

考慮如下場景:當一個用戶 A 關心某一個表 T 的操做(建立/刪除/插入/修改/...)時,若是表 T 不是用戶 A 建立的,那麼用戶 A 能夠採用什麼方法感知這個操做?一個方法是主動輪詢這個表是否作了某個操做,可是缺點是不言而喻的。另外一個方法是,註冊一個回調,當表被操做時,被動接受通知。用這種方法可使用戶邏輯沒必要輪詢和等待對錶的操做。異步

MaxCompute Event機制就是第二種方法的實現。socket

 

在實際的生產中,對以上應用場景有大量的需求,並已經造成了對MaxCompute Event豐富的應用,例如:分佈式

  • 數據地圖: 訂閱了一些 project 的 Event,並根據 Event 通知展現這些 project 中表的元數據。
  • 跨集羣複製: 監聽 Event 通知以複製相應的表。
  • 螞蟻金服: 依賴事件通知機制進行工做流管理,統計,受權等工做。 事實上,每一個 project 都有大量用戶訂閱了所屬project的表以及其它project表的事件通知。

 

MaxCompute 事件機制是怎樣實現的

 

本節首先將 MaxCompute 事件機制 做爲一個黑盒,從用戶的角度介紹其功能和使用方法。然後以此爲切入點,深刻剖析 MaxCompute 事件機制的內部機理。最後,提出一些對當前事件機制的思考。

 

訂閱(註冊)一個事件 & 事件通知

 

在網絡編程中,爲了減輕多線程的壓力,每每使用事件通知驅動的異步編程。如,libevent[2]。使用這個庫編寫一個服務器程序,能夠這樣作:

 

void on_accept(int sock, short event, void* arg);

 

int main(int argc, char* argv[])

{

// create socket s

struct sockadddr_in addrin;

int s = socket(AF_INET, SOCK_STREAM, 0);

BOOL bReuseaddr=TRUE;

setsockopt(s, SOL_SOCKET ,SO_REUSEADDR, (const char*)&bReuseaddr, sizeof(BOOL));

memset(&addrin, 0, sizeof(addrin));

addrin.sin_family = AF_INET;

addrin.sin_port = htons(PORT);

addrin.sin_addr.s_addr = INADDR_ANY;

bind(s, (struct sockaddr*)&addrin, sizeof(struct sockaddr));

listen(s, BACKLOG);

 

// 建立事件池 event base

struct event_base* eb = event_base_new();

 

// 建立事件 & 綁定回調

struct event e;

event_set(&e, s, EV_READ|EV_PERSIST, on_accept, NULL);

 

// 註冊事件

event_base_set(eb, &e);

event_add(&e, NULL);

 

// 啓動事件派發

event_base_dispatch(eb);

 

return 0;

}

抽取出上面事件通知邏輯的主線:建立事件池,建立一個 event 並綁定回調函數, 把 event 註冊到事件池並啓動事件派發器。

在這個過程當中,事件生產者是socket(嚴格說是綁定在這個socket上的事件多路複用接口,如epoll),事件中轉者是libevent中的事件池(event base)和事件派發器,事件消費者是事件處理回調函數。

一樣的過程適用於MaxCompute event。事件池和派發器不須要用戶建立。用戶首先建立一個事件,而後綁定回調處理邏輯,最後把事件註冊到事件池。代碼以下:

 

public class TestOdpsEvent {

/**

* 建立事件方法

*/

static Event buildEvent(String eventName, String tableName, String callbackUri, String comment) throws URISyntaxException {

Event event = new Event();

event.setName(eventName); // 指定事件名稱

event.setComment(comment); // 事件註釋

event.setType(Event.SourceType.TABLE); // 指定事件類型,目前支持 TABLE

Event.Config config = new Event.Config();

config.setName("source");

config.setValue(tableName); // 指定事件源(即 表名). "*" 表示全部表.

event.setConfig(config);

event.setUri(new URI(callbackUri)); // 指定了一個回調地址

return event;

}

 

public static void main(String[] args) throws OdpsException, InterruptedException, URISyntaxException {

 

Account account = new AliyunAccount("xxxxxx", "xxxxxx");

Odps odps = new Odps(account);

String odps_endpoint = "http://xxxx/xxx";

odps.setEndpoint(odps_endpoint);

odps.setDefaultProject("project1");

InternalOdps iodps = new InternalOdps(odps);

 

// 建立事件 & 綁定回調

String callbackUri = "http://xxx.xxx.xxx.xxx:xxxx/xxxxx"; // this is different from odps_endpoint

Event e = buildEvent("table_create_event_1", "table1", callbackUri, "this is a test event");

 

// 註冊事件

iodps.events().create(e);

 

// 查看已建立事件

Iterator<Event> events = iodps.events().iterator();

while(events.hasNext()) {

Event event1 = events.next();

System.out.println("Event found: " + event1.getName());

System.out.println("Event uri: " + event1.getUri());

// iodps.events().delete(event1.getName()); // 刪除事件

}

}

}

在上面的代碼中,指定了一個回調地址。當表發生變化時,就會通知這個回調地址。用戶根據在這個回調地址接收到事件通知,使用相應的處理邏輯處理。事件回調地址做爲事件處理邏輯入口,支持多種協議,包括但不限於kuafu, http, https等。與libevent不一樣的是,MaxCompute event的生產者,中轉者和消費者能夠位於不一樣網絡區域。在用戶註冊事件以後,MaxCompute event機制會在該事件發生後當即通知用戶註冊的回調地址。

 

剖析 MaxCompute 事件機制

 

圖3-1的三個部分分別表示了註冊事件,轉發通知,刪除事件的過程。MessageService是 MaxCompute 內部消息服務,做用是轉發事件通知到用戶註冊的回調地址。爲方便理解,把 Create topic, Create subscription, Add endpoint 看做註冊事件在消息服務層的三個操做。事件機制在消息服務層具體的實現將在後邊介紹。

 

圖3-1: 事件建立,轉發,刪除

 

在圖3-1註冊事件的過程當中,用戶的請求由 OdpsWorker 的 createEventHandler 處理。createEventHandler 依次檢查相應的 MessageService topic,subscription,endpoint 是否存在,若是不存在,建立。

 

在圖3-1刪除事件的過程相對簡單,用戶的請求由 OdpsWorker 的 deleteEventHandler 處理。deleteEventHandler 直接刪除相應的 MessageService subscription。

 

在圖3-1轉發事件通知的過程當中,事件的生產者主要是ddl task(事實上,因爲歷史緣由,還有HiveServer,CREATETABLE 事件從這裏發出),當執行對錶的meta相關的操做時,就會觸發ddl task。如drop table, add partition, insert into partition等。ddl task 會發送相應操做的事件通知。事件通知發送給事件中轉者——消息服務。消息服務將這個事件通知發送給相應事件綁定的回調地址。

 

消息服務做爲事件中轉者,主要完成以下功能:

1)做爲事件池維護不一樣事件和回調地址的對應關係(一個事件對應一個或多個回調地址)

2)做爲事件派發器根據事件通知匹配相應事件並將該事件通知轉發到對應事件的各個回調地址。

目前不一樣的事件依據兩個屬性區分:project名和事件源。事件源目前是表名。在ddl task發出的事件通知中,包含了這兩個關鍵信息。消息服務根據這兩個信息匹配相應事件。在介紹消息服務匹配的實現方式時,首先須要瞭解 MaxCompute消息服務 的基本概念(爲便於理解,本文簡化了消息服務的一些概念,如隱藏了partition概念。在文章[3]中,具體介紹了消息服務的設計和實現)。如圖3-2:

 

圖3-2: 消息服務基本概念

MaxCompute消息服務包含四個基本概念:topic, subscription, filter, endpoint。消息服務使用了典型的發佈訂閱模型。用戶能夠建立topic。建立一個或多個subscription(包含一個或多個endpoint)訂閱這個topic。消息發佈者向topic發送消息。該消息被轉發到該topic的全部filter匹配的subscription的全部的endpoint。其中,topic的建立者,subscription的建立者,消息的發送者,以及消息的接收者能夠是不一樣的用戶。在建立subscription時,須要指定filter matcher。在消息發送時須要指定filter。當某條消息發送到某個topic時,消息中的filter須要和這個topic的各個subscription的filter matcher匹配,若是匹配成功,將這個消息的一個副本發送給這個subscription的全部endpoint,不然不發送給它們,而後繼續匹配其餘的subscription。filter和filter_matcher的示例和匹配規則以下:

filter_matcher filter is matched
"" "k=v" yes. If filter_matcher is "", it will match forever.
"k=v" "k=v" yes
"k=v" "k=v1" no
"k1=v" "k=v" no
"k=v1|v2" "k=v1" yes
"k=v1|v2" "k=v2" yes
"k=v1|v2" "k=v1|v2" no. filter's value is 'v1|v2', not 'v1' or 'v2'.
"k1=v1,k2=v2" "k1=v1,k2=v2" yes
"k1=v1" "k1=v1,k2=v2" yes
"k1=v1,k2=v2" "k1=v1" no
"k=v" "" no
"" "" yes. If filter is "", filter_matcher will never hit except its value is ""

消息服務的這個機制,能夠實現上述事件中轉者的功能。將一個事件表達爲一個subscription,將一個事件通知表達爲一條消息,每一個endpoint記錄一個回調地址。每一個project對應一個topic,用filter區分事件源。當一個事件通知產生以後,會被髮送到產生通知的project所在的topic上。而後,通過匹配,轉發全部的endpoint對應回調地址上。事件通知消息體示例以下:

 

<?xml version="1.0" encoding="UTF-8"?>

<Notification>

<Account>ALIYUN$odpstest1@aliyun.com</Account>

<Project>a_2_test_event</Project>

<SourceType>Table</SourceType>

<SourceName>backup_partition</SourceName>

<Reason>CREATETABLE</Reason>

<TimeStamp>Sun, 18 Sep 2016 14:21:32 GMT</TimeStamp>

<Properties/>

<OdpsMessagerId>1</OdpsMessagerId>

<OdpsMessagerTime>1474208492</OdpsMessagerTime>

</Notification>

 

<?xml version="1.0" encoding="UTF-8"?>

<Notification>

<Account>ALIYUN$odpstest1@aliyun.com</Account>

<Project>a_2_test_event</Project>

<SourceType>Table</SourceType>

<SourceName>backup_partition</SourceName>

<Reason>ADDPARTITION</Reason>

<TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>

<Properties>

<Property>

<Name>Name</Name>

<Value>ds=ds1/pt=pt1</Value>

</Property>

</Properties>

<OdpsMessagerId>4</OdpsMessagerId>

<OdpsMessagerTime>1474289142</OdpsMessagerTime>

</Notification>

當用戶訂閱了 Project a_2_test_event 的 Table "backup_partition" 的事件後,當發生對這個表的 CREATETABLE 和 ADDPARTITION 操做後,會接收到上面的兩個事件通知。每一個事件通知是一個 xml 格式的消息。SourceType 表示訂閱的是表的事件通知仍是其它類型資源的事件通知(目前只支持表)。SourceName 表示訂閱的表的名字。Reason 表示在該表上發生的操做,上例中分別是建立表的操做和增長分區的操做(在 附錄 中列舉了更多的操做類型)。Properties 中會有一些附加的通知屬性,經常使用來指出操做發生在表的哪一個 parition 上。OdpsMessagerId 在一個 Project 的全部表中是惟一的。OdpsMessagerTime 是這條通知產生的時刻。

 

在MaxCompute線上服務環境中,每個project p1,就會對應一個名爲 SQL_p1的topic(由於歷史緣由hardcode了前綴SQL_,不過前綴是什麼無所謂,只要能夠區分事件機制的topic和其它應用中的topic就好),這個 topic 在第一次註冊事件的時候自動建立(也能夠在建立project時手動建立)。p1的全部事件通知都會發到這個topic上。這個topic在其對應的project刪除時被刪除。

 

MaxCompute消息服務爲事件機制提供了對事件通知的持久化,保序,failover的功能,盡最大努力保證消息不丟,可是依然不能保證絕對不丟。下面分析事件機制可能出現的丟消息狀況:事件生產者失敗,消息服務失敗,事件接收者失敗,消息服務熱升級。

 

1) 事件生產者失敗:在事件通知到達消息服務以前,存在事件通知生產者失敗的可能。具體的消息丟失機率取決於事件生產者的持久化,failover能力以及重試機制。

2) 消息服務失敗:消息服務失敗包括兩種狀況:消息到達消息服務前失敗和消息到達以後失敗。若是消息到達以前失敗,那麼消息服務提供的message client會重試3次,每次間隔5毫秒。若是事件成功地發送到消息服務,消息會首先被持久化。在消息服務中的一條消息只有知足下列兩種狀況纔會被刪除:a. 消息發送成功;b. 消息發送失敗且超太重試次數(目前重試3600次,每次間隔60秒)。能夠看到,事件(消息)丟失最大的風險在於發送到達消息服務以前的一段時間。

3) 事件接收者失敗:若是接收者失敗,且在得到事件通知以後,處理事件通知以前,消息服務不提供接口使接收者重獲這條消息。固然對於這個問題,還有另外一種解決方法,就是使用相似kafka的消息服務模型做爲中轉者,能夠保證事件通知更強大的可靠性。kafka模型[4]不會主動推送消息,僅僅對消息作持久化以實現高吞吐和高可靠。消息訂閱者須要給定消息id的範圍從某個topic的partition拉消息。當訂閱者失敗,但願從新得到歷史的消息時,只要給定消息id的範圍,若是這個範圍內的消息沒有過時,就能夠被從新得到。可是kafka模型使用拉消息的模式不具有完整的事件派發器功能,也就不能支持如今odps須要的異步事件通知編程方式。而事實上,MaxCompute消息服務設計的出發點,就是MaxCompute事件通知機制(在沒有消息服務以前,MaxCompute worker履行着消息服務的職責)。

4)消息服務熱升級:雖說是熱升級,可是新老服務之間切換也是須要時間的。在線上這個時間最誇張的一次達到了4個多小時(全部topic所有切換完成的時間間隔)。而在切換的過程當中,新老消息服務中處於切換中間狀態的 topic 是拒絕服務的(切換完成的 topic 能夠服務)。

 

總之,MaxCompute事件通知機制提供了必定程度的高可用保證,可是尚未把丟消息的機率下降爲0,其最大風險在於消息服務不服務。而此時,消息服務上某個 topic 丟失消息的數量和該 topic 不服務的時間成正比。

 

總結

 

MaxCompute事件機制給用戶監聽資源的變化帶來了很大的便利。它借鑑了通用的事件異步編程模型,提供了友好的用戶接口,支持了線上數據地圖,跨集羣複製等衆多服務。可是,依然有不足之處,例如:

1) 事件監聽(訂閱/註冊)的粒度粗且不可定製:咱們曾經接到用戶的需求,想監聽一個表的 CREATETABLE 事件,可是現有的機制只支持監聽到表的級別,這樣用戶就不得不本身過濾這個表的各類事件。

2) 事件機制的可靠性須要進一步提升:曾經出現過熱升級切換消息服務4個小時的狀況,緣由是其中一個 topic 向某個 endpoint 發送消息卡在了發送那裏一直沒法退出,形成該 topic 上丟失大量消息。

3) 消息服務的 生產者qps(生產環境接收消息1000-2000),消費者 qps(消費極限qps未測過,因其取決於) 與 開源消息服務如 kafka 生產者 qps (50,000),消費者 qps (22,000) 依然有必定差距[4]。

 

對於當前但願使用MaxCompute消息服務的用戶,最好確保知足如下條件:

1) 容許丟失少許的消息通知,由於的確存在小几率丟消息的可能;

2) 事件處理系統具備必定的事件處理能力,接受事件qps最好能夠達到500以上。

3) 不用的事件(回調uri發不通且再也不使用)請刪除,不然會在消息服務中留下永久性垃圾,形成消息在pangu中大量堆積,由於消息服務沒法判斷用戶的事件是否須要刪除!!!

解決上述的問題目前依然有一些挑戰,可是咱們會不斷改進和完善事件機制的各項功能,減少事件丟失率,細化事件訂閱粒度,優化用戶體驗。

 

參考資料

[1] Event restful api

[2] Libevent: http://libevent.org

[3] Odps Message Service

[4] Kreps J, Narkhede N, Rao J. Kafka: A distributed messaging system for log processing[C]//Proceedings of the NetDB. 2011: 1-7.

 

附錄

 

MaxCompute事件機制事件類型列表

 

在觸發 DDL 時,Event 會向預先註冊的 url 發送 POST 請求,消息體格式以下

 

<?xml version="1.0" encoding="UTF-8"?>

<Notification>

<Account>ALIYUN$odpstest1@aliyun.com</Account>

<Project>a_2_test_event</Project>

<SourceType>Table</SourceType>

<SourceName>backup_partition</SourceName>

<Reason>ADDPARTITION</Reason>

<TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>

<Properties>

<Property>

<Name>Name</Name>

<Value>ds=ds1/pt=pt1</Value>

</Property>

</Properties>

<OdpsMessagerId>4</OdpsMessagerId>

<OdpsMessagerTime>1474289142</OdpsMessagerTime>

</Notification>

其中:

Reason 可能取值 事件生產者
CREATETABLE hiveserver
DROPTABLE ddltask
ALTERTABLE ddltask
ADDPARTITION ddltask
DROPPARTITION ddltask
ALTERPARTITION ddltask
INSERTOVERWRITETABLE ddltask
INSERTINTOTABLE ddltask
INSERTOVERWRITEPARTITION ddltask
INSERTINTOPARTITION ddltask
MERGETABLE ddltask
MERGEPARTITION ddltask
ALTERVOLUMEPARTITION ddltask
ADDVOLUMEPARTITION ddltask
  • SourceType 可能取值:Table

 

使用限制

 

1) 目前只有 Project Owner 能夠建立 event,沒法受權給其餘人建立 event

2) 接收 post 信息的 url 應返回 http code 200,server 端 post 時並不支持如 302 這樣的跳轉。

 

原文連接

閱讀更多幹貨好文,請關注掃描如下二維碼:

相關文章
相關標籤/搜索