使用Beam,首先使用Beam SDK中編寫一個Beam程序。 在Beam程序中定義了Pipeline,包括全部輸入,變換和輸出; 同時也包含了設置Pipeline的參數(一般使用命令行選項傳遞)。 包括Pipeline的執行引擎選項,用來肯定Pipeline運行在那個執行引擎上(目前支持Beam的執行引擎包括Spark Flink Apex等)。html
Beam SDK提供了一些抽象,能夠簡化大規模分佈式數據處理的機制。 Beam用相同的抽象來統一表達批處理和流計算。 當建立Beam Pipeline時,能夠根據這些抽象設計數據處理任務。 包括如下:java
Pipeline從頭至尾封裝整個數據處理任務。包括讀取輸入數據,變換數據和寫入輸出數據。全部Beam程序必須建立一個Pipeline。建立Piepline時,還必須指定執行選項,告訴Pipeline在哪裏(如哪一種執行引擎,Spark Flink等)和如何運行(批處理或流式)。python
PCollection表示Beam Pipeline處理的的分佈式數據集。數據集能夠是有限的,例如來自於文件這樣的再也不變化的數據源,或是無限的,這意味着它來自於經過訂閱或其餘機制不斷更新的數據源。Pipeline一般經過從外部數據源讀取數據來建立初始PCollection,也能夠從Beam程序中的內存數據建立PCollection。PCollection是Pipeline中每一個步驟的輸入和輸出。算法
Transform表明Pipeline中的數據處理操做或步驟。每一個Transform將一個或多個PCollection對象做爲輸入,執行對該PCollection的元素提供的處理函數,並生成一個或多個輸出PCollection對象。數據庫
Beam提供Source和Sink API來分別表示讀取和寫入數據。 Source封裝了從一些外部來源(如雲端文件存儲或訂閱流式數據源)將數據讀入Beam Pipeline所需的代碼。 Sink一樣封裝將PCollection的元素寫入外部數據接收器所需的代碼。apache
典型的Beam程序的以下:編程
1. 建立Pipeline對象並設置Pipeline執行選項,包括Pipeline的執行引擎。 2. 爲Pipeline建立初始數據集PCollection,使用Source API從外部源讀取數據,或使用CreateTransform從內存數據構建PCollection。 3. 應用Transform到每一個PCollection。Transform能夠改變、過濾、分組、分析或以其餘方式處理PCollection中的元素。Transform建立一個新的輸出PCollection,而不改變輸入集合(函數式編程特性)。 典型的Pipeline依次將後續的Transform應用於每一個新的輸出PCollection,直處處理完成。 4.輸出最終的轉換PCollection,通常使用Sink API將數據寫入外部源。 5. 使用指定的執行引擎運行Pipeline代碼。
Pipeline封裝了數據處理任務中的全部數據和步驟。 Beam程序一般從構建一個Pipeline對象開始,而後使用該對象做爲建立管道數據集做爲PCollections的基礎,並將其做爲Transforms操做。緩存
要使用Beam,咱們編寫的程序必須首先建立Beam SDK類Pipeline的實例(一般在main()函數中)。 建立Pipeline時,還須要設置一些配置選項。 能夠以編程方式設置管道的配置選項,但提早設置選項(或從命令行讀取)一般更容易,並在建立對象時將其傳遞給Pipeline對象。安全
// 從建立Pipeline的options開始 PipelineOptions options = PipelineOptionsFactory.create(); // 而後建立Pipeline Pipeline p = Pipeline.create(options);
管道抽象封裝了數據處理任務中的全部數據和步驟。一般從構建一個PipelinePipeline對象開始,而後使用該對象做爲建立管道數據集做爲PCollections的基礎,並將其做爲Transforms操做。服務器
要使用Beam,必須首先建立Beam SDK類Pipeline的實例(一般在main()函數中)。 建立流水線時,您還須要設置一些配置選項。 您能夠以編程方式設置管道的配置選項,但提早設置選項(或從命令行讀取)一般更容易,並在建立對象時將其傳遞給管道對象。
雖然能夠經過建立PipelineOptions對象並直接設置字段來配置Pipeline,但Beam SDK包含一個命令行解析器,可使用它來使用命令行參數在PipelineOptions中設置字段。
要從命令行讀取選項,首先要建立一個PipelineOptions對象,如如下示例代碼所示:
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
命令行的格式以下:
--<option>=<value>
<font color=red>注意: 使用 .withValidation會校驗命令行的參數</font>
使用命令行的方式能夠爲Pipeline建立任何的參數。
除了標準的PipelineOptions以外,還能夠添加自定義選項。 要添加自定義選項,須要爲每一個選項定義一個帶有getter和setter方法的接口,如如下示例所示:
public interface MyOptions extends PipelineOptions { String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
還能夠爲每個參數設定默認值和參數描述,當用戶使用–help時顯示的描述和默認值。設定方法以下:
public interface MyOptions extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
而後使用PipelineOptionsFactory 註冊自定義參數的接口,建立PipelineOptions 的時候做爲參數傳遞進去。只有當在PipelineOptionsFactory 中註冊了接口以後,使用—help才能顯示接口中定義的參數的默認值和描述,PipelineOptionsFactory 纔會校驗命令行中輸入的參數,在全部已註冊的自定義參數中是否有匹配的。
下邊的代碼示例中,展現瞭如何在PipelineOptionsFactory中註冊自定義參數接口和如何使用自定義參數接口:
PipelineOptionsFactory.register(MyOptions.class); MyOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(MyOptions.class);
如今就能夠在Pipeline中使用 –myCustomOption=value 參數了。
PCollection抽象表示分佈式數據集。 您能夠將PCollection視爲Pipeline數據; Bean中的Transform使用PCollection對象做爲輸入和輸出。 所以,若是要處理Pipeline中的數據,則必須採用PCollection的形式。
建立Pipeline後,須要先建立一個至少一個PCollection。 建立的PCollection做爲Pipeline中第一個操做的輸入。
可使用Beam的Source API從外部源中讀取數據來建立PCollection,也能夠在程序中建立存儲在內存中集合類中的數據的PCollection。 前者一般是在生產環境中Pipeline讀取數據;Beam的源API提供了大量針對不一樣數據源的適配器從外部數據源讀取數據(如大型基於雲的文件,數據庫或訂閱服務)中讀取。 後者主要用於測試和調試目的。
要從外部源讀取,請使用Beam提供的I / O適配器之一。 適配器的用法有所不一樣,但它們的基本邏輯是讀取自某些外部數據源, 以PCollection返回從源中讀取的數據。
每一個數據源適配器都有一個Read Transform,要讀取,必須將該Transform應用於Pipeline。 例如,TextIO.Readio.TextFileSource從外部文本文件讀取並返回其元素爲String類型的PCollection,每一個String表示文本文件中的一行。
如下是將TextIO.Readio.TextFileSource應用於Pipeline以建立PCollection的方法:
public static void main(String[] args) { // 建立pipeline. PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); // 使用Read Transform建立PCollection 名爲'lines' PCollection<String> lines = p.apply( "ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt")); }
參考 I/O部分了解Beam支持的適配器。
從內存中的Java集合建立PCollection,可使用Beam提供的Create Transform。 很像數據適配器的Read,能夠在Pipeline中使用Create。
Create接受Java Collection和Coder對象做爲參數。 Coder指定如何對集合中的元素進行序列化反序列化。
要從內存中的List建立PCollection,可使用Beam提供的Create Transform。
下邊的代碼示例中,展現瞭如何從內存中的List中建立PCollection:
public static void main(String[] args) { // 建立一個Java Collection ,元素類型爲String. static final List<String> LINES = Arrays.asList( "To be, or not to be: that is the question: ", "Whether 'tis nobler in the mind to suffer ", "The slings and arrows of outrageous fortune, ", "Or to take arms against a sea of troubles, "); // 建立pipeline. PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); // 使用Create Transform,用給定的字符串編碼器將上邊建立的Java Collectio轉換爲PCollection p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of()) }
PCollection由建立它的特定Pipeline對象擁有; Pipeline之間不能共享PCollection。 PCollection看起來很像集合類。 可是,PCollection和集合在幾個關鍵方面有所不一樣:
元素類型
PCollection的元素能夠是任何類型的,但都必須是相同的類型。 然而,爲了支持分佈式處理,Beam須要可以將每一個單獨的元素編碼爲字節串(所以元素能夠傳遞給分佈式工做人員)。 Beam SDK提供了一種數據序列化反序列化,內置了不少經常使用類型的Coder,也支持根據須要自定義Coder。
不變性
PCollection是不可變的。 建立後,沒法添加,刪除或更改單個元素。 Beam Transform能夠處理PCollection的每一個元素並生成新的Pipeline數據(做爲新的PCollection),但不會改變輸入的PCollection。
隨機訪問
PCollection不支持隨機訪問單個元素。 相反,Beam Transform能夠單獨考慮PCollection中的每一個元素。
Size和邊界
PCollection是一個大的,不可變的「包」元素。 PCollection能夠包含多少元素沒有上限;任何給定的PCollection能夠是在單機內容可以容納的數據集,也可能表示來自於數據存儲中的很是大的分佈式數據集。
PCollection能夠是有限的的或無限的。PCollection表示已知固定大小的數據集,而無限PCollection表示無限大小的數據集。 PCollection是有限仍是無限取決於它所表明的數據集的來源。從批量數據源(如文件或數據庫)讀取可建立有界的PCollection。從流或連續更新的數據源(如Pub / Sub或Kafka)讀取會建立一個無限的PCollection(除非您明確告訴它不要)。
根據PCollection的有限(或無限),Beam會採用不一樣的方式處理數據。使用批處理做業來處理有限PCollection,批處理做業能夠讀取整個數據集一次,並在有限長度的做業中執行處理。使用持續運行的流式做業來處理無限PCollection,流式的數據永遠不會在哪一時刻整個數據集是完整的,總會有數據源源不斷的進來。
當對無限PCollection中的元素進行分組的操做時,Beam須要一個稱爲 窗口(Window)的概念,將連續更新的數據集劃分爲有限大小的邏輯窗口。Beam將每一個窗口處理爲一個批次(bundle),而且隨着數據集的生成,處理繼續進行。這些邏輯窗口由與諸如時間戳之類的數據元素相關聯的一些特性來肯定。
元素時間戳
PCollection中的每一個元素都具備相關聯的時間戳。每一個元素的時間戳記最初由建立PCollection的數據源分配。建立無限PCollection的數據源一般會爲每一個新元素分配一個對應於元素被讀取或添加的時間戳。
注意:Beam 數據源在建立有限PCollection的時候,會爲每一個元素自動分配時間戳。最常規的作法是,全部的元素都賦予相同的時間戳。
時間戳對於包含具備固有時間概念的元素的PCollection是有用的。 若是Pipeline正在讀取一系列事件,例如推文或其餘社交媒體消息,則每一個元素可能會將事件發佈的時間用做元素時間戳。
若是Beam源沒有分配時間戳,也能夠手動將時間戳分配給PCollection的元素。 若是元素具備固有的時間戳,可是時間戳在元素自己的結構中(例如服務器日誌條目中的「時間」)字段,則您須要執行此操做。 Beam提供了Transform將原始的PCollection做爲輸入並輸出具備附加時間戳的PCollection; 有關如何執行此操做的更多信息,請參閱分配時間戳。
在BeamSDK中,Transform是Pipeline中的操做。Transform將PCollection(或多個PCollection)做爲輸入,對集合中的每個元素執行咱們編寫的操做(代碼),並生成新的輸出PCollection。必須Transform應用於輸入PCollection才能起做用。
Beam SDK包含許多不一樣的Transform,能夠將其應用於Pipeline的PCollection。包括通用的核心轉換,如ParDo或Combine。還包括SDK中包含的內置的組合Transform,將一個或多個核心變換組合在有用的處理模式中,例如計數或組合集合中的元素。還能夠自定義定義的更復雜的複合轉換,以知足Pipeline的業務用例場景。
Beam SDK中的每一個Transform都有一個通用的apply 方法(在python中是管道符|)。調用多個Beam變換相似於方法鏈。通常形式以下:
[輸出PCollection] = [輸入PCollection].apply([Transform])
因爲Beam使用PCollection的通用應用方法,所以您能夠依次連接變換,也能夠應用包含嵌套在其中的其餘變換的轉換(在Beam SDK中稱爲複合Transform)。
Pipeline中的處理順序取決於Pipeline的結構,Pipeline能夠理解爲一張有向無環圖,圖中的節點是PCollection,邊是Transform。以下圖所示,能夠在Pipeline中進行鏈式調用:
[最終輸出PCollection] = [原始輸入PCollection] .apply([First Transform]) .apply([Second Transform]) .apply([Third Transform])
Beam SDK提供了一些通用的Transform框架,能夠以函數對象(俗稱「用戶代碼」)的形式編寫編寫處理邏輯,處理輸入的PCollection的元素。 用戶代碼在實際執行的時候,可能在集羣中的不少不一樣的worker上並行執行,具體取決於選擇執行Beam Pipeline的執行引擎。 在每一個worker上運行用戶代碼,每一個worker輸出PCollection的一部分,最終彙總成1個完整的輸出PCollection。
Beam提供瞭如下Transform,對應於不一樣的處理範式:
• ParDo
• GroupByKey
• Combine
• Flatten 和Partition
ParDo是用於並行處理的通用Beam Transform。 ParDo處理範例與Map / Shuffle / Reduce樣式算法的「Map」階段類似:ParDo轉換考慮了輸入PCollection中的每一個元素,對該元素執行一些處理函數(用戶代碼),並輸出0個,1個或多個元素到輸出PCollection。
ParDo可用於各類常見的數據處理操做,包括:
過濾
使用ParDo來判斷PCollection中的每一個元素,是否該元素輸出到新集合,或者將其丟棄。
格式化或類型轉換
若是輸入PCollection包含元素的類型或者格式不是所期待的,,則可使用ParDoto對每一個元素執行轉換,並將結果輸出到新的PCollection。.
提取數據集中數據
例如,若是有一個具備多個字段的記錄的PCollection,則可使用ParDo將您想要考慮的字段解析爲新的PCollection。
對數據集中的每一個元素進行處理
使用ParDo對PCollection的每一個元素或某些元素執行簡單或複雜的計算,並將結果輸出爲新的PCollection。
在這樣的場景裏中,ParDo是一個通用的中間步驟。 可使用它從一組原始輸入記錄中提取某些字段,或將原始輸入轉換爲不一樣的格式; 還可使用ParDo將處理後的數據轉換爲適合輸出的格式,例如如數據庫錶行或可打印字符串。
當進行ParDo轉換時,須要以DoFn對象的形式提供用戶代碼。 DoFn是一個定義分佈式處理功能的Beam SDK類。
在PCollection 上調用apply 方法,用ParDo 做爲參數,以下代碼所示:
// 元素類型爲字符串類型的輸入PCollection PCollection<String> words = ...; // DoFn子類,用來具體計算每1個元素的長度 static class ComputeWordLengthFn extends DoFn<String, Integer> { ... } // 使用ParDo計算PCollection "words" 中每個單詞的長度 PCollection<Integer> wordLengths = words.apply( ParDo .of(new ComputeWordLengthFn()));
在該示例中,咱們的輸入PCollection包含String類型的值。 咱們使用一個ParDo Transform,ParDo中使用函數(ComputeWordLengthFn)來計算每一個字符串的長度,並將結果字符串的長度做爲值,輸出到一個新的元素類型爲Integer的PCollection中。
傳遞給ParDo的DoFn對象中包含對輸入集合中的元素的進行處理的。 當使用Beam時,一般最重要的代碼是這些DoFn函數,函數裏實現了業務邏輯。
DoFn從輸入的PCollection一次處理一個元素。 當建立DoFn的子類時,須要提供與輸入和輸出元素的類型相匹配的類型參數。 若是DoFn處理傳入的String元素並生成輸出集合的整數元素(像以前的例子ComputeWordLengthFn),類聲明將以下所示:
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
在DoFn子類中,使用@ProcessElement註解方法,在被註解的方法中實現處理邏輯。 不須要從輸入集合手動提取元素, Beam SDK已經封裝好。 @ProcessElement方法應該接受類型爲ProcessContext的對象。 ProcessContext對象提供了獲取輸入元素和發出輸出元素的方法:
static class ComputeWordLengthFn extends DoFn<String, Integer> { @ProcessElement public void processElement(ProcessContext c) { // Get the input element from ProcessContext. String word = c.element(); // Use ProcessContext.output to emit the output element. c.output(word.length()); } }
注意: 若是 PCollection 的元素是key/value鍵值對,能夠經過ProcessContext.element().getKey()獲取鍵(key), ProcessContext.element().getValue()獲取值(value)
給定的DoFn實例一般被調用一次或屢次來處理一些任意的元素組。 然而,Beam並不保證確切的調用次數; 能夠在worker節點上屢次調用它,以解決故障和重試。 所以,能夠將多個調用中的信息緩存處處理方法中,可是若是這樣作,請確保實現不依賴於調用數量。
處理方法中須要知足一些不可變性要求,以確保Beam和執行引擎能夠安全地序列化並緩存Pipeline中的值。 方法應符合如下要求:
不該以任何方式修改ProcessContext.element()或ProcessContext.sideInput()返回的元素(輸入集合中的傳入元素)。
使用ProcessContext.output()或ProcessContext.sideOutput()輸出一個值後,不該該以任何方式修改該值。
若是功能相對簡單,能夠經過提供一個輕量級的DoFn做爲匿名內部類實例來簡化對ParDo的使用。這是之前的例子,ParDo與ComputeLengthWordsFn,DoFn指定爲匿名內部類實例:
// 輸入PCollection. PCollection<String> words = ...; // 建立一個匿名類處理PCollection 「words」. // 輸出單詞的長度到新的輸出PCollection PCollection<Integer> wordLengths = words.apply( "ComputeWordLengths",// Transform 的自定義名稱 ParDo.of(new DoFn<String, Integer>() {// DoFn做爲匿名內部類 @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().length()); } }));
若是ParDo將輸入元素與輸出元素進行一對一映射,即對於每一個輸入元素,對應一個輸出,可使用更高級的MapElements Transform。 MapElements可使用匿名的Java 8 lambda函數來進一步簡化代碼。
如下是使用MapElements的上一個示例:
// 輸入PCollection. PCollection<String> words = ...; // 在MapElements中使用匿名lambda函數處理 PCollection 「words」. //輸出單詞的長度到新的輸出PCollection. PCollection<Integer> wordLengths = words.apply( MapElements.into(TypeDescriptors.integers()) .via((String word) -> word.length()));
注意: java8 lambda函數寫法,只能在Filter,FlatMapElements和Partition使用。
GroupByKey 是一個用於處理鍵/值對集合的Bean Transform,是一個並行Reduce操做,相似於Map / Shuffle / Reduce-style算法的Shuffle階段。 GroupByKey 的輸入是表示多重映射的鍵/值對的集合,其中集合包含具備相同鍵但具備不一樣值的多個對。給定這樣的集合,可使用GroupByKey 來收集與每一個惟一鍵相關聯的全部值。
GroupByKey 是彙總具備共同點的數據的好方法。例如,有一個存儲客戶訂單記錄的集合,須要未來自同一郵政編碼的全部訂單組合在一塊兒(其中鍵/值對的鍵(key)是郵政編碼字段,而值(value)是記錄的剩餘部分)。
來看一下GroupByKey 的一個簡單的例子,其中咱們的數據集由文本文件中的單詞和出現的行號組成。咱們想將全部共享相同單詞(鍵)的行號(值)組合在一塊兒,讓咱們看到文本中出現特定單詞的全部位置。
輸入是一個鍵/值對的PCollection ,其中每一個單詞都是一個鍵,該值是該文本出現的文件中的行號。如下是輸入集合中的鍵/值對列表:
cat, 1 dog, 5 and, 1 jump, 3 tree, 2 cat, 5 dog, 2 and, 2 cat, 9 and, 6 ...
GroupByKey 使用相同的鍵收集全部值,並輸出一個新的鍵值對,最後輸出一個包含惟一鍵和與輸入集合中的該關鍵字所關聯的全部值的集合。 若是咱們將GroupByKey 應用於上面的輸入集合,則輸出集合將以下所示:
cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2] ..
所以,GroupByKey表示從多重映射(多個鍵到各個值)到單一映射(惟一鍵到值集合)的轉換。
CoGroupByKey關聯兩個或多個具備相同鍵類型的鍵/值PCollection,而後輸出KV<K, CoGbkResult>集合。 Design Your Pipeline展現瞭如何在Pipeline中使用Join。
以下兩個PCollection:
// collection 1 user1, address1 user2, address2 user3, address3 // collection 2 user1, order1 user1, order2 user2, order3 guest, order4 ...
CoGroupByKey從全部PCollection中收集具備相同鍵的值,並輸出一個由惟一鍵和包含與該鍵相關聯的全部值的對象CoGbkResult組成的對。 若是將CoGroupByKey應用於上述輸入集合,則輸出集合將以下所示:
user1, [[address1], [order1, order2]] user2, [[address2], [order3]] user3, [[address3], []] guest, [[], [order4]] ...
鍵/值對的注意事項:根據使用的語言和SDK不一樣,Beam表示鍵/值的方式對略有不一樣。 在Beam SDK for Java中,使用KV<K, V>類型的對象來表示一個鍵/值對。 在Python中,使用2-tuple表示鍵/值對。
Combine是一種用於組合數據中元素或值集合的Beam Transform。Combine有一種實現是對鍵值對PCollection進行處理,根據鍵值對中的鍵組合值。
應用Combine Transform時,必須提供一個函數用於組合元素或者鍵值對中的值。組合函數應該知足交換律和結合律,由於函數不必定在給定鍵的全部值上精確調用一次。因爲輸入數據(包括價值收集)能夠分佈在多個worker之間,因此在每一個worker上都會計算出部分結果,因此能夠屢次調用Combine函數,以在值集合的子集上執行部分組合。Beam SDK還提供了一些預先構建的組合功能,用來對數值型的PCollection進行組合,如sum,min和max。
簡單的組合操做(如sum)一般能夠實現爲一個簡單的功能。更復雜的組合操做可能須要建立一個具備與輸入/輸出類型不一樣的累加類型的CombineFn 的子類。
// Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction. public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> { @Override public Integer apply(Iterable<Integer> input) { int sum = 0; for (int item : input) { sum += item; } return sum; } }
經過繼承CombineFn 類能夠實現複雜的組合功能。如須要一個複雜的累加器,必須進行預處理或者後處理,輸出的類型和輸入的類型不同,組合的時候須要考慮鍵值對的鍵(key)等,則須要使用CombineFn來實現。
組合由4種操做組成。當實現一個CombineFn 的子類的時候必須重寫這4個操做:
代碼示例以下:
public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> { public static class Accum { int sum = 0; int count = 0; } @Override public Accum createAccumulator() { return new Accum(); } @Override public Accum addInput(Accum accum, Integer input) { accum.sum += input; accum.count++; return accum; } @Override public Accum mergeAccumulators(Iterable<Accum> accums) { Accum merged = createAccumulator(); for (Accum accum : accums) { merged.sum += accum.sum; merged.count += accum.count; } return merged; } @Override public Double extractOutput(Accum accum) { return ((double) accum.sum) / accum.count; } }
一般狀況下,對元素爲鍵值對的PCollection 進行組合運算,CombineFn就夠了。有一些特殊狀況下,須要根據不一樣的key作不一樣的處理,例如對某些用戶計算最小值,對另外的用戶計算最大值。使用KeyedCombineFn 能夠在代碼中獲取到key。
對於輸入的PCollection,使用全局的組合運算,最終輸出只有1個值的PCollection。以下例所示,使用Beam SDK中內置的Sum組合運算,處理輸入的PCollection,最終獲得一個元素類型爲Integer的PCollection:
// Sum.SumIntegerFn() combines the elements in the input PCollection. // The resulting PCollection, called sum, contains one value: the sum of all the elements in the input PCollection. PCollection<Integer> pc = ...; PCollection<Integer> sum = pc.apply( Combine.globally(new Sum.SumIntegerFn()));
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());
非全局窗口
若是輸入PCollection使用任何非全局窗口函數,則Beam不提供默認行爲。 進行組合運算時,必須指定如下選項之一:
一、指定.withoutDefaults,其中輸入PCollection中爲空的窗口在輸出集合中一樣爲空。
二、指定.asSingletonView,其中輸出當即轉換爲PCollectionView,當用做邊輸入時,它將爲每一個空窗口提供默認值。 通常來講,若是Pipeline組合運算的結果在後面的Pipeliine中被用做旁路輸入(side inputs),那麼一般只須要使用此選項。
在建立以key分組的集合(例如,經過使用GroupByKey Transform)以後,常規模式是將與每一個key相關聯的值的集合合併成單個值。 根據GroupByKey的前一個例子,一個名爲groupingWords的按鍵組合的PCollection以下所示:
cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2] ...
在上述PCollection中,每一個元素都有一個字符串類型的鍵(例如「cat」)和一個可迭代的整數集合(在第一個元素中,包含[1,5,9])。 若是Pipeline的下一個處理步驟組合這些值(而不是單獨考慮它們),則能夠組合整數集合,以建立要與每一個鍵配對的單個合併值。 GroupByKey而後接着對值進行合併,這種處理模式至關於Beam的Combine PerKey轉換。 Combine PerKey提供的Combine函數必須是知足結合律的Reduce函數或CombineFn的子類。
// 對PCollection按照key進行分組,對每一個分組中的Double類型的值進行求和 ,值類型與以前同樣 PCollection<KV<String, Double>> salesRecords = ...; PCollection<KV<String, Double>> totalSalesPerPerson = salesRecords.apply(Combine.<String, Double, Double>perKey( new Sum.SumDoubleFn())); // 聚合以後的值與PCollection原始值的類型不一樣 // PCollection的元素爲KV類型的,Key是String,Value是Integer,聚合以後的值是Double PCollection<KV<String, Integer>> playerAccuracy = ...; PCollection<KV<String, Double>> avgAccuracyPerPlayer = playerAccuracy.apply(Combine.<String, Integer, Double>perKey( new MeanInts())));
Flatten和Partition是存儲相同數據類型的PCollection對象的的Beam Transform。 Flatten將多個PCollection對象合併到1個PCollection中,而且Partition將單個PCollection拆分爲固定數量的較小集合。
以下代碼示例,展現瞭如何將Flatten應用在PCollection上。
// Flatten接受一個PCollectionList,PCollectionList是一組具備相同元素類型的PCollection //將PCollectionList中全部子PCollection的元素放到一個新的PCollection中,並返回這個新的PCollection PCollection<String> pc1 = ...; PCollection<String> pc2 = ...; PCollection<String> pc3 = ...; PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3); PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
合併以後的PCollection中數據編碼
默認狀況下,輸出PCollection的Coder與輸入PCollectionList中第一個PCollection的Coder相同。 可是,輸入的PCollection對象能夠分別使用不一樣的Coder,只要Java包含相同的數據類型便可。
合併窗口集合
當使用Flatten合併應用了窗口策略的PCollection對象時,要合併的全部PCollection對象必須使用兼容的窗口策略和窗口大小。 例如,合併的全部集合必須所有使用(假設)相同的5分鐘長度固定窗口或4分鐘長度的滑動窗口每30秒滑動一次。
若是Pipiline嘗試使用Flatten將PCollection對象與不兼容的窗口合併,則當構建Pipeline時,Beam會生成IllegalStateException錯誤。
Partition用來切分PCollection。 Partition功能包含肯定如何將輸入PCollection的元素分解爲每一個生成的分區PCollection的邏輯。 分區數必須在Pipeline構建時肯定。 例如,能夠在運行時將分區數做爲命令行選項傳遞(而後用於構建Pipeline圖),但不能在運行時流水線中的再肯定分區數(基於之後計算的數據) 例如,您的流水線圖是構建的)。
// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function. // In this example, we define the PartitionFn in-line. // Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects. PCollection<Student> students = ...; // Split students up into 10 partitions, by percentile: PCollectionList<Student> studentsByPercentile = students.apply(Partition.of(10, new PartitionFn<Student>() { public int partitionFor(Student student, int numPartitions) { return student.getPercentile() // 0..99 * numPartitions / 100; }})); // You can extract each partition from the PCollectionList using the get method, as follows: PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
當編寫一個Beam Transform代碼時,須要理解最終的代碼是要分佈式執行的。 例如,編寫的代碼,會生成不少副本,在不一樣的機器上並行執行,相互獨立,而無需與任何其餘機器上的副本通訊或共享狀態。 根據Pipeline的執行引擎,能夠選擇Pipeline,代碼的每一個副本可能會重試或運行屢次。 所以應該謹慎地在代碼中包括狀態依賴關係。
簡單來講,編寫的代碼至少要知足如下兩個要求:
• 函數必須是可序列化的。 • 函數必須是線程兼容的,Beam SDK並非線程安全的。
除了同樣的要求,強烈建議函數是知足冪等特性。
注意:以上的要求適用於DoFn(ParDo 內使用),ConbineFn( Combine 內使用)和WindowFn(Window 內使用)
提供給Transform的任何函數必須是徹底可序列化的。 這是由於函數的副本須要序列化並傳輸處處理集羣中的遠程worker。 用戶代碼的父類,如DoFn,CombineFn和WindowFn,已經實現了Serializable; 可是,在子類不能添加任何不可序列化的成員。
須要時刻記住的序列化要點以下:
• 函數對象中的瞬態字段不會傳輸到工做實例,由於它們不會自動序列化。 • 在序列化以前避免加載大量數據的字段。 • 函數對象實例之間不能共享數據。 • 函數對象在應用後會變得無效。 • 經過使用匿名內部類實例來內聯聲明函數對象時要當心。 在非靜態上下文中,內部類實例將隱含地包含一個指向封閉類和該類的狀態的指針。 該內部類也將被序列化,所以適用於函數對象自己的相同注意事項也適用於此外部類。
編寫的函數應該兼容線程的特性。在執行時,每個worker會啓動一個線程執行代碼,若是想實現多線程,須要在代碼中本身實現。可是Beam SDK不是線程安全的,因此實現多線程須要開發者本身控制同步。注意,靜態變量並不會序列化傳遞到不一樣的worker上,還可能會被多個線程使用。
強烈建議開發者編寫的函數符合冪等性—即不管重複執行多少次都不會帶來意外的反作用。Beam模型中,並不能保證函數的執行次數,鑑於此,符合冪等性,可讓Pipeline的是肯定的,Transform的行爲是可預測的,更容易測試。
PCollection主輸入PCollection,還能夠以旁路輸入(side inputs)的形式爲ParDo Transform提供額外的輸入。 旁路輸入是DoFn每次處理輸入PCollection中的元素時能夠訪問的附加輸入。 當指定邊輸入時,能夠建立一個能夠在ParDo Transform的DoFn中讀取的其餘數據的視圖,同時處理每一個元素。
若是ParDo在處理輸入PCollection中的每一個元素時須要注入附加數據,旁路輸入會很是有用,但須要在運行時肯定附加數據(而不是硬編碼)。 這些值可能由輸入數據肯定,或者取決於Pipeline的不一樣分支。
// 調用.withSideInputs將爲ParDo添加旁路輸入 side input //在DoFn內,經過DoFn.Processecontext.sideInput可使用旁路輸入 side input // ParDo的輸入PCollection. PCollection<String> words = ...; //包含了單詞長度的PCollection,將PCollection中的值聚合爲1個值 PCollection<Integer> wordLengths = ...; // Singleton PCollection // 使用Combine.globally and View.asSingleton來計算單詞的總長度,生成一個一個單例的PCollectionView final PCollectionView<Integer> maxWordLengthCutOffView = wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView()); // 在ParDo中使用maxWordLengthCutOffView做爲side input. PCollection<String> wordsBelowCutOff = words.apply(ParDo .of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String word = c.element(); // 在DoFn內使用side input. int lengthCutOff = c.sideInput(maxWordLengthCutOffView); if (word.length() <= lengthCutOff) { c.output(word); } } }).withSideInputs(maxWordLengthCutOffView) );
窗口化的PCollection多是無限的,所以不能被壓縮成單個值(或單個集合類)。當建立一個基於窗口化PCollection的PCollectionView時,PCollectionView表示每一個窗口的一個實例(能夠是每窗口一個值,也能夠是每一個窗口一個列表等)。
Beam使用主輸入元素的窗口來查找旁路輸入元素的適當窗口。Beam將主輸入元素的窗口投影到側面輸入的窗口集合中,而後使用來自窗口的旁路輸入。若是主輸入和側輸入具備相同的窗口,投影將提供準確的相應窗口。然而,若是輸入具備不一樣的窗口,則Beam使用投影來選擇最合適的旁路輸入窗口。
例如,若是使用1分鐘的固定時間窗口對主輸入進行了窗口化,而且使用1個小時的固定時間窗口對邊輸入進行了窗口化,則Beam將主輸入窗口映射到爲旁路輸入窗口,並從旁路輸入中的合適窗口選擇值。
若是主輸入元素存在於多個窗口中,那麼processElement被調用屢次,每一個窗口一次。對processElement的每一個調用都會爲主輸入元素投射「當前」窗口,所以可能會每次提供不一樣的旁路輸入視圖。
若是側面輸入有多個觸發器,則Beam將使用最近觸發器觸發的值。使用用帶有觸發器的單個全局窗口的旁路輸入時,此功能特別有用。
雖然ParDo老是生成主輸出PCollection(做爲從apply方法返回值),可是也可讓ParDo生成任意數量的附加輸出PCollection。 若是使用具備多個輸出,ParDo將返回捆綁在一塊兒的全部輸出PCollection(包括主輸出)。
使用Tags多路輸出的代碼示例:
// 爲了將數據元素髮送給多個下游PCollection,須要建立TupleTag來標示每一個PCollection //例如若是想在ParDo中建立三個輸出PCollection(1個主輸出,兩個旁路輸出),必需要建立3個TupleTag // 下邊的代碼示例中展現瞭如何建立爲3個輸出PCollection建立TupleTag // 輸入PCollection PCollection<String> words = ...; // 輸入PCollection中低於cutoff的單詞發送給主輸出PCollection<String> // 若是單詞的長度大於cutoff,單詞的長度發送給1個旁路輸出PCollection<Integer> // 若是單詞一"MARKER"開頭, 將單詞發送給旁路輸出PCollection<String> // ou final int wordLengthCutOff = 10; //爲每一個輸出PCollection建立1個TupleTag // 單子低於cutoff長度的輸出PCollection final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; // 包含單詞長度的輸出PCollection final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; // 以"MARKER"開頭的單詞的輸出PCollection final TupleTag<String> markedWordsTag = new TupleTag<String>(){}; // 將輸出TupleTag傳給ParDo //調用.withOutputTags爲每一個輸出指定TupleTag // 先爲主輸出指定TupleTag,而後旁路輸出 //在上邊例子的基礎上,爲輸出PCollection設定tag // 全部的輸出,包括主輸出PCollection都被打包到PCollectionTuple中。 PCollectionTuple results = words.apply(ParDo .of(new DoFn<String, String>() { //DoFn內的業務邏輯. ... }) // 爲主輸出指定tag. .withOutputTags(wordsBelowCutOffTag, // 使用TupleTagList爲旁路輸出設定ta TupleTagList.of(wordLengthsAboveCutOffTag) .and(markedWordsTag)));
DoFn中多路輸出代碼示例:
// 在ParDo的DoFn中,在調用ProcessContext.output的時候可使用TupleTag指定將結果發送給哪一個下游PCollection // 在ParDo以後從PCollectionTuple中解出輸出PCollection // 在前邊例子的基礎上,本例示意了將結果輸出到主輸出和兩個旁路輸出 .of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String word = c.element(); if (word.length() <= wordLengthCutOff) { // 將長度較短的單詞發送到主輸出 // 在本例中,是wordsBelowCutOffTag表明的輸出 c.output(word); } else { // 將長度較長的單詞發送到 wordLengthsAboveCutOffTag表明的輸出中. c.output(wordLengthsAboveCutOffTag, word.length()); } if (word.startsWith("MARKER")) { // 將以MARKER爲開頭的單詞發送到markedWordsTag的輸出中 c.output(markedWordsTag, word); } }}));
Transform能夠嵌套,複雜變換執行多個更簡單的變換(例如多個ParDo,Combine,GroupByKey或甚至其餘複合Transform)。 將多個Transform嵌入到單個複合變換中可使代碼更加模塊化,更易於理解。
Beam SDK包含許多有用的複合轉換。 有關轉換列表,請參閱API參考頁面:
WordCount 示例程序中的CountWordsTransform是複合Transform的示例。 CountWords是由多個嵌套Transform組成的PTransform子類。
在expand展方法中,CountWordsTransform邏輯以下:
它將 Beam SDK庫中的Count Transform應用於PCollection的單詞,產生一個鍵/值對的PCollection。 每一個鍵表示文本中的一個單詞,每一個值表示單詞在原始數據中出現的次數。
注意,這也是嵌套複合Transform的示例,由於Count自己就是複合Transform。
複合Transform的參數和返回值必須與整個變換的初始輸入類型和最終返回類型相匹配,即便Transform的處理過程當中的中間數據的數據類型變化屢次。
public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<String> lines) { //將每行文本分割成單詞 PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // 統計每一個單詞出現的次數 PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } }
要建立複合Transform,集成PTransform類,並重寫expand方法,在方法中實現具體的邏輯。 而後,就能夠像使用Beam SDK的內置Transform同樣使用此複合Transform。
對於PTransform類類型參數,您將傳遞您的Transform所用的PCollection類型做爲輸入,並生成輸出。 要將多個PCollections做爲輸入,或者產生多個PCollections做爲輸出,從多個PCollection中的選取一個PCollection的類型,做爲Transform的輸出類型參數。
以下例所示,Transform使用子元素爲String類型的PCollection做爲輸入,子元素爲Integer的PCollection做爲輸出:
static class ComputeWordLengths extends PTransform<PCollection<String>, PCollection<Integer>> { ... }
在繼承PTransform子類中,須要重寫expand方法。 expand方法是添加PTransform的處理邏輯的地方。 重寫的expand方法必須接受適當類型的輸入PCollection做爲參數,並將輸出PCollection指定爲返回值。
如下代碼示例顯示如何覆蓋上一個示例中聲明的ComputeWordLengths類的expand方法:
static class ComputeWordLengths extends PTransform<PCollection<String>, PCollection<Integer>> { @Override public PCollection<Integer> expand(PCollection<String>) { ... // 轉換邏輯 ... }
只要重寫的PTransform子類中的expand方法來接受適當的輸入PCollection並返回相應的輸出PCollection,就能夠包含任意數量的Transform。 這些變換能夠包括Beam核心Transform,複合Transform或Beam SDK庫中包含的Transform。
注意:PTransform的expand方法並不意味着轉換用戶直接調用。 相反,您應該在PCollection自己調用apply方法,以變換爲參數。 這容許將轉換嵌套在管道的結構中。
建立Pipeline時,常常須要從外部數據源或數據庫中讀取數據。 一樣,Pipeline會將其結果數據輸出到相似的外部數據接收器。 Beam爲許多常見的數據存儲類型提供讀寫Transform。 若是要讓Pipeline讀取或寫入內置Transform中還不支持的數據存儲格式,能夠 實現自定義的讀寫Transform。
讀取Transform從外部源讀取數據並返回數據的PCollection,供Pipeline使用。 通常在Pipeline建立時讀取數據是最多見的,同時也容許在Pipeline中任何須要的地方讀取數據。
PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt"));
寫入Transform將PCollection中的數據寫入外部數據源。通常在Pipeline結束時讀取數據是最多見的,同時也容許在Pipeline中任何須要的地方寫入數據到外部數據源。
output.apply(TextIO.write().to("gs://some/outputData"));
許多讀取Transform支持用glob運算符匹配的多個輸入文件中讀取數據。 請注意,glob運算符是特定於文件系統的,並遵循文件系統特定的一致性模型。 如下TextIO示例使用glob運算符(*)讀取在給定位置中具備前綴「input-」和後綴「.csv」的全部匹配輸入文件:
p.apply(「ReadFromText」,
TextIO.read().from("protocol://my_bucket/path/to/input-*.csv");
要未來自不一樣來源的數據讀取到單個PCollection中,能夠分別讀取每一個數據源,而後使用FlattenTransform建立合併成單個PCollection。
對於基於文件的輸出數據,默認狀況下,寫入Transform寫入多個輸出文件。 將輸出文件名傳遞給寫入Transform時,文件名將用做文件的前綴。 能夠經過指定一個後綴來爲每一個輸出文件附加一個後綴。
如下寫入變換示例將多個輸出文件寫入到某個位置。 每一個文件都有前綴「數字」,數字標籤和後綴「.csv」。
records.apply("WriteToText", TextIO.write().to("protocol://my_bucket/path/to/numbers") .withSuffix(".csv"));
當Beam的執行引擎運行Pipeline時,常常須要序列化反序列化PCollections中的中間數據,這就須要將元素轉換爲二進制字節碼和從二進制字節碼中轉換。 Beam SDK使用被稱爲「Coders」的對象來描述如何對給定的PCollection的元素進行編碼和解碼。
請注意,Coder與外部數據源或匯點交互時與解析或格式化數據無關。 這種解析或格式化一般應該在諸如ParDo或MapElements之類的Transform中明確指定。
在Beam Java SDK中,Coder提供編碼和解碼數據所需的方法。 Java SDK爲Java中的標準類型提供了Coder的實現,例如Integer,Long,Double,StringUtf8等。 能夠在Coder包中找到全部可用的Coder子類。
請注意,Coder與類型不定是1:1的關係。 例如,整數類型能夠有多個有效的編碼器,輸入和輸出數據可使用不一樣的整數編碼器。 Transform可能使用使用BigEndianIntegerCoder輸入數據,而使用VarIntCoder輸出數據。
Beam要求Pipeline中的每一個PCollection都有Coder。在大多數狀況下,Beam SDK可以根據PCollection元素類型或生成它的Transform來自動推斷PCollection的Coder,可是在某些狀況下,Pipeline的開發者須要明確指定Coder,或者開發一個自定義類型的Coder。
可使用PCollection.setCoder方法顯式設置現有PCollection的Coder。請注意,沒法在已完成的PCollection上調用setCoder(例如,調用.apply以後)。
可使用getCoder方法獲取現有PCollection的Coder。若是Coder還沒有設置且不能推斷PCollection的Coder,則此方法將調用失敗,並拋出IllegalStateException。
Beam SDK在嘗試自動推斷PCollection的Coder時使用了多種機制。
每一個Pipeline對象都有一個CoderRegistry。 CoderRegistry表示Java類型與Pipeline應用於每種類型的PCollection的默認Coder的對應關係。
默認狀況下,Beam Java SDK 會自動使用Transform函數對象的類型參數(如DoFn)做爲PTransform生成的PCollection的Coder。在ParDo的狀況下,例如,DoFn
每一個Pipeline對象都有一個CoderRegistry對象,它將語言類型映射到Pipeline要使用的類型的默認Coder。 您能夠本身使用CoderRegistry查找給定類型的默認編碼器,或者爲給定類型註冊新的默認編碼器。CoderRegistry包含了Beam Java SDK建立的Pipeline的Coder與標準Java類型的默認映射。
下表顯示了標準對應關係:
Java 類型 | 默認Coder |
---|---|
Double | DoubleCoder |
Instant | InstantCoder |
Integer | VarIntCoder |
Iterable | IterableCoder |
KV | KvCoder |
List | ListCoder |
Map | MapCoder |
Long | VarLongCoder |
String | StringUtf8Coder |
TableRow | TableRowJsonCoder |
Void | VoidCoder |
byte[ ] | ByteArrayCoder |
TimestampedValue | TimestampedValueCoder |
使用CoderRegistry.getDefaultCoder方法能夠獲取Java類型的默認Coder。 使用Pipeline.getCoderRegistry方法能夠訪問Pipeline的CoderRegistry。 這樣就能夠基於每一個流水線肯定(或設置)Java類型的默認Coder:即「對於此Pipeline,驗證是不是使用BigEndianIntegerCoder對Integer值進行編碼」。
要爲Pipeline爲Java類型設置默認編碼器,能夠獲取並修改管道的CoderRegistry。 可以使用Pipeline.getCoderRegistry方法獲取CoderRegistry對象,而後使用CoderRegistry.registerCoder方法爲目標類型註冊新的Coder。
如下示例代碼演示瞭如何爲流水線的整數值設置默認Coder(在本例中爲BigEndianIntegerCoder)。
PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); CoderRegistry cr = p.getCoderRegistry(); cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
若是Pipeline使用了自定義數據類型,則可使用@DefaultCoder註釋來指定要與該類型一塊兒使用的Coder。 例如,假設要使用SerializableCoder的自定義數據類型,可使用@DefaultCoder註釋,以下所示:
@DefaultCoder(AvroCoder.class) public class MyCustomDataType { ... }
若是建立了一個自定義Coder來匹配數據類型,而且要使用@DefaultCoder註釋,則自定義的Coder類必須實現靜態Coder.of(Class )工廠方法。
public class MyCustomCoder implements Coder { public static Coder<T> of(Class<T> clazz) {...} ... } @DefaultCoder(MyCustomCoder.class) public class MyCustomDataType { ... }
窗口根據PCollection中的每一個元素的時間戳細分PCollection。 聚合運算(如GroupByKey和Combine)在每一個窗口的基礎上隱式工做 - 它們將每一個PCollection做爲多個有限窗口的連續過程進行處理,儘管整個集合自己多是無限大小的。
觸發器用來決定什麼時候在無限數據到達時發出聚合結果,使用觸發器可有優化PCollection的窗口策略。 觸發器容許處理遲到的數據或在窗口結束前預先計算不完整的結果。 有關詳細信息,請參閱觸發器部分。
一些Beam Transform,如GroupByKey和Combine,經過公共key對多個元素進行分組。 一般,分組操做將在整個數據集中具備相同key的全部元素分組。 使用無限數據集,因爲新元素不斷被添加而且多是無限多的(例如流數據),因此不可能在某一個時刻是PCollection包含了全部的元素,此時窗口特別有用。
在Beam模型中,任何PCollection(包括無限PCollections)均可以使用邏輯上的窗口進行切分。 PCollection中的每一個元素根據PCollection的窗口功能分配給一個或多個窗口,每一個窗口包含有限數量的元素。 分組Transform而後在每一個窗口的基礎上處理PCollection的每一個元素。 GroupByKey,例如,經過鍵和窗口隱式地分組PCollection的元素。
注意:Beam的默認窗口行爲是將PCollection的全部元素分配到單個全局窗口,並丟棄遲到的數據,即便對於無限PCollections也是如此。 在無限PCollection使用GroupByKey之類的分組變換以前,必須至少執行如下操做之一:
• 設置一個非全局的窗口函數,參見爲PCollection設置窗口函數. • 設置一個非默認的 觸發器,這能夠防止觸發窗口的默認行爲(等待全部的數據到達).
若是沒有爲無限PCollection設置非全局窗口函數或非默認觸發器,隨後使用GroupByKey或Combine等分組Transform是,在構建Pipeline時將會發生錯誤,做業會失敗。
爲PCollection設置窗口函數後,下次將組合Transform應用於PCollection時,將使用窗口做爲基礎。 窗口分組根據須要進行。 若是您使用Window轉換設置了一個窗口函數,則將每一個元素分配給一個窗口,只有在GroupByKey或Combine這樣的操做中才會用到窗口。 這可能會對Pipeline產生不一樣的影響。 考慮下圖中的示例Pipeline:
在上述Pipeline中,使用KafkaIO讀取一組鍵/值對來建立一個無限PCollection,而後使用WindowTransform將該窗口函數應用於該集合, 而後將ParDo應用於該集合,而後使用GroupByKey將ParDo的結果分組。 窗口函數對ParDoTransform沒有影響,由於在GroupByKey須要以前,窗口實際上並無被使用。 而後,GroupByKey以後的處理就是基於鍵和窗口的分組。
在有限PCollections中可使用具備固定大小的窗口。 可是,請注意,窗口僅考慮附加到PCollection的每一個元素的隱式時間戳,建立固定數據集的數據源(如TextIO)會爲每一個元素分配相同的時間戳。 這意味着默認的全部元素都屬於單個全局窗口。
要在限數據集上使用窗口,能夠爲每一個元素分配本身的時間戳。 要爲元素分配時間戳,請使用具備DoFn的ParDo轉換,在DoFn中爲每一個元素附加一個新的時間戳(例如,在Beam Java SDK中的WithTimestamps Transform)。
爲了說明如何使用有限PCollection進行窗口化可能會影響Pipeline如何處理數據,以下圖所示:
在上面的Pipeline中,使用TextIO讀取一組鍵/值對來建立一個有限PCollection。 而後,使用GroupByKey對集合進行分組,並將ParDo轉換應用於分組的PCollection。 在此示例中,GroupByKey建立一個惟一的鍵值對(值是輸入元素的值的集合),而後ParDo對每一個key處理1次。
請注意,即便沒有設置窗口函數,仍然有1個窗口 - PCollection中的全部元素都分配給單個全局窗口。
如今對相同的Pipeline使用窗口函數,以下圖所示:
如上所示,Pipeline建立一個元素爲鍵值對的PCollection,而後爲PCollection設置一個窗口函數,GroupByKeyTransform基於窗口,經過鍵和窗口對PCollection的元素進行分組。 隨後的ParDo Transform對每一個key應用屢次,每一個窗口一次。
可使用不一樣類型的窗口來切分PCollection的元素。 Beam提供了幾個窗口功能,包括:
• 固定時間窗口Fixed Time Windows • 滑動時間窗口Sliding Time Windows • 會話窗口Per-Session Windows • 單一全局窗口Single Global Window • 基於日曆的時間窗口Calendar-based Windows
注意:每一個元素能夠邏輯上屬於多個窗口,具體取決於使用的窗口函數。 例如,滑動時間窗口建立重疊的窗口,其中能夠將單個元素分配給多個窗口。
最簡單的窗口形式是使用固定時間窗口:有1個持續更新的時間戳PCollection,每一個窗口能夠捕獲(例如)全部時間戳在5分鐘時間間隔內的元素。
固定時間窗口表示數據流中一致的連續、不重疊的時間間隔。 好比5分鐘固定長度窗口:無限PCollection中的全部元素,時間戳值從0:00:00到(但不包括)0:05:00屬於第一個窗口,時間戳值爲0的元素 :05:00(但不包括)0:10:00屬於第二個窗口,依此類推。
滑動時間窗口也表示數據流中的時間間隔; 然而,滑動時間窗口能夠重疊。 例如,每一個窗口可能捕獲五分鐘的數據,可是每十秒鐘會啓動一個新窗口。 滑動窗口開始的頻率稱爲週期。 所以,示例中的窗口的時間長度爲5分鐘,滑動週期爲10秒鐘。
因爲多個窗口重疊,數據集中的大多數元素將屬於多個窗口。 這種窗口對於計算不斷變化的數據的均值很是有用; 使用滑動時間窗口,能夠在示例中計算過去5分鐘的數據的運行平均值,每10秒更新一次。
會話窗口是一種在時間上非連續的窗口。 會話窗口適用於每一個key,對於在時間上呈現不規則分佈的數據頗有用。 例如,表示用戶鼠標活動的數據流可能具備長時間的空閒時間,而在另外一個時間範圍內點擊不少。 若是數據在最小時間隙以後到達,則啓動一個新的窗口。
注意: 每個key由於數據在時間分佈上的差別,而具備不一樣的窗口。
默認狀況下,PCollection中的全部數據都被分配給單一全局窗口,而且丟棄遲到的數據。 若是是有限數據集,則可使用PCollection的全局窗口默認值。
若是是無限數據集(例如來自流式數據源),也可使用單個全局窗口,但在應用聚合Transform時(如GroupByKey和Combine)時要當心。 帶有默認觸發器的單個全局窗口一般要求整個數據集在處理以前可用,這在連續更新數據時是不可能的。 要在使用全局窗口的無限PCollection上執行聚合,應爲該PCollection指定非默認觸發器。
能夠經過apply窗口Transform來設置PCollection的窗口函數。 進行WindowTransform時,必須提供一個WindowFn。 WindowFn用來肯定PCollection使用哪一種窗口函數來切分PCollection,如固定或滑動時間窗口。
Beam爲此處描述的基本窗口功能提供預約義的WindownFn。 若有更復雜的需求,您還能夠自定義WindowFn。
設置窗口函數時,可能還須要爲PCollection設置觸發器(trigger)。 觸發器用來決定每一個窗口什麼時候被聚合和發出,而且能讓窗口函數可以對遲到的數據的處理和在窗口超時前預先計算結果有更好的方式。 有關詳細信息,請參閱觸發器部分。
如下示例代碼顯示瞭如何1分鐘長度的固定時間窗口應用在PCollection上:
PCollection<String> items = ...;
PCollection<String> fixed_windowed_items = items.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
如下示例代碼顯示瞭如何使用滑動時間窗口將PCollection切分。 每一個窗口長度爲30分鐘,每5秒鐘開1個新窗口:
PCollection<String> items = ...;
PCollection<String> sliding_windowed_items = items.apply(
Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));
如下示例代碼顯示瞭如何使用會話窗口切分PCollection,最小的時間跨度爲10分鐘:
PCollection<String> items = ...;
PCollection<String> session_windowed_items = items.apply(
Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))));
注意:會話窗口首先是基於key的,每一個key有本身的會話窗口,有多少個會話窗口,取決於數據在時間上的分佈。
若是您的PCollection是有限的(大小是固定的),能夠將全部元素分配給單一全局窗口。 如下示例代碼顯示如何爲PCollection設置單一全局窗口:
PCollection<String> items = ...;
PCollection<String> batch_items = items.apply(
Window.<String>into(new GlobalWindows()));
在任何數據處理系統中,數據事件產生時間(由數據元素自己的產生的時刻,肯定的「事件時間」)與實際數據元素的處理時刻之間存在必定的滯後(「處理時間」,由系統上數據被處理的時刻決定)。此外,不能保證數據事件將按照生成的順序在Pipeline中進行處理。
例如,假設咱們有一個使用固定時間窗口的PCollection,窗口長度爲五分鐘。對於每一個窗口,Beam必須在給定的窗口範圍內(例如在第一個窗口的0:00和4:59之間)收集全部的數據,判斷依據是事件時間。時間戳超出該範圍(5:00或更晚的數據)的數據屬於不一樣的窗口。
然而,數據沒法保證按照事件時間的順序到達Pipeline,或者始終以可預測的延遲到達。Beam使用Watermark的概念,即認爲某個窗口中的全部數據都到達的時刻。在Watermark以後的數據叫作延遲數據。
從咱們的例子中,假設咱們有一個簡單的水印,假設數據時間戳(事件時間)和數據出如今Pipeline的時間(處理時間)之間大約30秒的滯後時間,那麼Beam將在5 :30關閉第一個窗口。若是數據記錄到達5:34,可是時間戳記會在0:00-4:59窗口(好比說3:38)中,那麼該記錄是延遲數據。
注意:爲簡單起見,咱們假設使用了一個很是簡單的Watermark來估計滯後時間。實際上,PCollection的數據源決定了Watermark,而且Watermark可能更精確或更復雜。
Beam的默認窗口配置,會基於數據源的類型,嘗試肯定全部數據什麼時候到達,而後將Watermark提早移動到窗口的末尾。此默認配置下延遲數據會被丟棄。使用觸發器(Trigger)能夠修改和優化PCollection的窗口策略,來決定每一個窗口什麼時候聚合並報告其結果,同時包含了窗口如何處理延遲數據的策略。
設置PCollection的窗口策略時,調用.withAllowedLateness操做來容許延遲數據。 如下代碼示例演示了窗口策略,容許在窗口結束後最多兩天的延遲數據。
PCollection<String> items = ...;
PCollection<String> fixed_windowed_items = items.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardDays(2)));
當在PCollection中設置.withAllowedLateness時,設置的容許延遲時間會向前傳播到該PCollection的任何後續PCollection。 若是要稍後在Pipeline中更改容許的延遲,則必須經過顯式調用Window.configure().withAllowedLateness()來修改。
無限數據源爲每一個元素附加了時間戳。可是因爲數據來源類型的不一樣,時間戳可能不符合須要,可能須要從原始數據流中從新提取時間戳。有限數據源(例如來自TextIO的文件)不提供時間戳, 若是須要時間戳,則必須將它們添加到PCollection的元素中。
能夠經過使用ParDo Transform爲PCollection的元素分配新的時間戳,在ParDo Transform中添加時間戳後,輸入一個新的PCollection。例如:若是Pipeline從輸入文件讀取日誌記錄,而且每一個日誌記錄都包含時間戳字段; 因爲Pipeline從文件讀取記錄,文件源不會自動分配時間戳。 此時須要從每一個記錄中解析時間戳字段,並使用帶DoFn的ParDo Transform將時間戳附加到PCollection中的每一個元素。
PCollection<LogEntry> unstampedLogs = ...;
PCollection<LogEntry> stampedLogs =
unstampedLogs.apply(ParDo.of(new DoFn<LogEntry, LogEntry>() { public void processElement(ProcessContext c) { // 從當前處理的日誌記錄中解析出時間戳E Instant logTimeStamp = extractTimeStampFromLogEntry(c.element()); // 使用ProcessContext.outputWithTimestamp (而不是 // ProcessContext.output)發出帶有時間錯的日誌記錄 c.outputWithTimestamp(c.element(), logTimeStamp); } }));
當收集數據並將數據按照窗口進行分組時,Beam使用觸發器來肯定什麼時候發出每一個窗口的聚合結果(稱爲窗格)。 若是使用Beam的窗口默認設置和默認觸發器,Beam會在估計全部數據到達時輸出聚合結果,並丟棄該窗口的全部延遲數據。
能夠爲PCollections設置觸發器來更改此默認行爲。 Beam提供了一些內置觸發器:
事件時間觸發器
這類觸發器根據事件時間進行觸發,Beam的默認觸發器是事件時間觸發器。
處理時間觸發器
這類觸發器根據事件的處理時間(在Pipeline中的每一個階段處理數據元素的時間)進行觸發。
數據驅動觸發器
這類觸發器經過在數據到達每一個窗口時檢查數據,並在數據知足某個屬性時觸發操做。 目前,數據驅動的觸發器只支持在必定數量的數據元素以後觸發。
複合觸發器
這類觸發器組合使用事件時間觸發器、處理時間觸發器、數據驅動觸發器等。
從更高的層次看,與簡單的在窗口結束時輸出數據相比,觸發器提供兩個附加功能:
觸發器容許Beam在窗口中的全部數據到達以前,先計算併發出結果。
例如,在通過一段時間以後或在必定數量的元素到達以後計算併發出,此時窗口還沒有關閉.
觸發器提供了在事件時間的Watermark超過窗口結束時間以後處理延遲數據的機會。
這些特性讓開發者可以控制數據流和在不一樣約束以前取捨:
例如,時間敏感的系統可能會使用嚴格的基於時間的觸發器,每N秒發出一個窗口,數據的時效性的重要程度大於完整性。 數據完整性超過結果的時效性的系統可能會選擇使用Beam的默認觸發器,該觸發器在窗口的末尾觸發。還能夠爲無限PCollection設置觸發器,該觸發器使用單個全局窗口進行PCollection切分。 當但願Pipeline在無限數據集上提供按期更新時,這可能會頗有用 - 例如,當前所擁有的數據的平均值,每N秒更新一次或每N個元素。
AfterWatermark觸發器以事件時間爲基礎觸發。 當Watermark超過窗口末尾時觸發,將窗口中的數據發送到下游。Watermark是全局的進程指標,在Beam的概念中,表示輸入是否完整。 AfterWatermark.pastEndOfWindow()僅在Watermark經過窗口的末尾時觸發。 此外,可使用.withEarlyFirings(trigger)和.withLateFirings(trigger)來配置觸發器,當Pipeline在窗口結束以前或以後收到數據,則觸發器將觸發。
// 在月末的時候生成帳單 AfterWatermark.pastEndOfWindow() //持續的實時產生預計帳單 .withEarlyFirings( AfterProcessingTime .pastFirstElementInPane() .plusDuration(Duration.standardMinutes(1)) // 當延遲數據到達的時候持續的修正帳單,最終帳單是準確的F .withLateFirings(AfterPane.elementCountAtLeast(1))
PCollection的默認觸發是基於事件時間,當Beam的Watermark超過窗口的末尾時,發出窗口的結果,而後每當延遲數據到達時觸發。
可是,若是同時使用窗口默認設置和默認觸發器,則默認觸發器將會發出一次,而且丟棄延遲的數據。 這是由於默認窗口配置的容許的延遲值爲0.有關修改此行爲的信息,請參閱處理延遲數據部分。
AfterProcessingTime觸發器根據處理時間進行觸發。 例如,AfterProcessingTime.pastFirstElementInPane()觸發器在收到數據後通過必定的處理時間後會發出一個窗口。 處理時間由系統時鐘決定,而不是數據元素的時間戳。AfterProcessingTime觸發器可用於觸發窗口的早期結果,特別是時間跨度很是大的窗口(如單個全局窗口)。
Beam提供了一個數據驅動的觸發器AfterPane.elementCountAtLeast()。 該觸發器對元素計數起做用; 它在當前窗格至少收集了N個元素後觸發。 這容許窗口發出早期的結果(在全部數據已經累積以前),若是使用單個全局窗口,這可能特別有用。須要注意的是,例如,若是使用.elementCountAtLeast(50)而且只有32個元素到達,則這32個元素永遠沒有機會觸發,若是32個元素很重要,考慮使用複合觸發器來組合多個觸發條件,例如「當我收到50個元素或每1秒觸發」時。
使用Window Transform爲PCollection設置窗口函數時,能夠指定觸發器。經過在Window.into()轉換結果上調用方法.triggering()來設置PCollection的觸發器,以下所示:
PCollection<String> pc = ...;
pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1))) .discardingFiredPanes());
此代碼示例爲PCollection設置基於時間的觸發器,該觸發器在該窗口中的第一個元素已被處理後1分鐘發出結果。 代碼示例中的最後一行.discardingFiredPanes()是窗口的累加模式。
當指定觸發器時,還必須設置窗口的累積模式。 當觸發器觸發時,它將窗口的當前內容做爲窗格發出。 因爲觸發器能夠屢次觸發,因此累積模式決定系統是否在觸發器觸發時累加窗口窗格,或者丟棄它們。
要設置窗口以累積觸發器觸發時生成的窗格,請在設置觸發器時調用.accumulatingFiredPanes()。 要設置一個窗口來放棄已觸發的窗格,請調用.discardingFiredPanes()。
咱們來看一個使用具備固定時間窗口和基於數據的觸發器的PCollection的例子。 例如,若是窗口長度爲10分鐘,而後計算數據的均值,可是但願在UI中更新頻繁顯示當前平均值,而不是每10分鐘更新一次。
下圖顯示了具備key = X事件,到達PCollection並將其分配給窗口。 爲了使圖表更簡單,假設事件都按順序到達:
若是觸發器設置爲.accumulatingFiredPanes,觸發器將在每次觸發時發出如下值。 記住,每次3個新元素到達時觸發器都會觸發:
第1次觸發: [5, 8, 3] 第2次觸發: [5, 8, 3, 15, 19, 23] 第3次觸發: [5, 8, 3, 15, 19, 23, 9, 13, 10]
若是觸發器設置爲 .discardingFiredPanes,觸發器每次觸發時,發出的數據以下:
第1次觸發: [5, 8, 3] 第2次觸發: [15, 19, 23] 第3次觸發: [9, 13, 10]
若是但願Pipeline處理Watermark超過窗口末尾後到達的數據,能夠在設置窗口時設置容許的延遲時間。 這使觸發器有機會處理延遲數據。 若是設置了容許的延遲時間,默認的觸發器會在延遲數據到達時當即發出新的結果。
使用.withAllowedLateness() 容許的延遲時間:
PCollection<String> pc = ...;
pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardMinutes(30));
容許的延遲時間會向下傳播,設置了PCollection的後續PCollection都會繼承。 若是要稍後在Pipeline中更改容許的延遲時間,能夠顯式調用Window.configure().AllowedLateness()。
能夠組合多個觸發器來造成複合觸發器,而且能夠指定觸發器的觸發方式:重複發出結果,最多一次或其餘自定義條件。
Beam包括如下複合觸發器:
能夠經過.withEarlyFirings和.withLateFirings向AfterWatermark.pastEndOfWindow添加額外的提早觸發或延遲啓動。
Repeatedly.forever指定一個永遠重複執行的觸發器。任何觸發條件知足時,都會致使窗口發出結果,而後重置並從新開始。將Repeatedly.forever與.orFinally組合能夠指定重複觸發器中止的條件。
AfterEach.inOrder組合多個觸發器以特定的順序啓動。每次序列中的觸發器發出一個窗口,而後指向下一個觸發器。
AfterFirst須要多個觸發器,而且首次發出任何一個參數觸發器都被知足。至關於多個觸發器的邏輯OR運算。
AfterAll須要多個觸發器,並在其全部參數觸發器都知足時發出。至關於多個觸發器的邏輯AND運算。
當Beam估計全部的數據已經到達時(即當水印經過窗口的末端)與如下二者或二者結合使用時,一些最有用的複合觸發器觸發一次:
Watermark超過窗口末尾以前的推測性觸發,以容許更快地處理但有可能只發出部分結果(不是完整的)。
**Watermark超過窗口的末尾以後發生的延遲觸發,以容許處理延遲數據
可使用AfterWatermark.pastEndOfWindow來表達此模式。 例如,如下示例代碼表示在以下條件下觸發:
•當Beam估計,全部的數據已經到達(Watermark超過窗口的末尾)時觸發。 •通過10分鐘延遲後,每一次延遲數據到達觸發。 •2天后,咱們認爲不再會有新數據到達,觸發器中止執行。
.apply(Window
.configure()
.triggering(AfterWatermark
.pastEndOfWindow()
.withLateFirings(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(10)))) .withAllowedLateness(Duration.standardDays(2)));
觸發器能夠組合使用,構建其餘類型的複合觸發器。 如下示例代碼顯示了一個簡單的複合觸發器,每當窗格至少有100個元素或每1分鐘觸發1次。
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(100), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))