Flink源碼分析之深度解讀流式數據寫入hive

  • 前言java

  • 數據流處理linux

  • hive基本信息獲取sql

  • 流、批判斷數據庫

  • 寫入格式判斷apache

  • 構造分區提交算子微信

  • 詳解StreamingFileWriter併發

  • 簡述StreamingFileSinkapp

  • 分區信息提交ide

  • 提交分區算子oop

  • 分區提交觸發器

  • 分區提交策略

  • 總結

前言

前段時間咱們講解了flink1.11中如何將流式數據寫入文件系統和hive [flink 1.11 使用sql將流式數據寫入hive],今天咱們來從源碼的角度深刻分析一下。以便朋友們對flink流式數據寫入hive有一個深刻的瞭解,以及在出現問題的時候知道該怎麼調試。

其實咱們能夠想一下這個工做大概是什麼流程,首先要寫入hive,咱們首先要從hive的元數據裏拿到相關的hive表的信息,好比存儲的路徑是哪裏,以便往那個目錄寫數據,還有存儲的格式是什麼,orc仍是parquet,這樣咱們須要調用對應的實現類來進行寫入,其次這個表是不是分區表,寫入數據是動態分區仍是靜態分區,這些都會根據場景的不一樣而選擇不一樣的寫入策略。

寫入數據的時候確定不會把全部數據寫入一個文件,那麼文件的滾動策略是什麼呢?寫完了數據咱們如何更新hive的元數據信息,以便咱們能夠及時讀取到相應的數據呢?

我畫了一個簡單的流程圖,你們能夠先看下,接下來咱們帶着這些疑問,一步步的從源碼裏探索這些功能是如何實現的。

數據流處理

咱們此次主要是分析flink如何將相似kafka的流式數據寫入到hive表,咱們先來一段簡單的代碼:

//構造hive catalog		String name = "myhive";		String defaultDatabase = "default";		String hiveConfDir = "/Users/user/work/hive/conf"; // a local path		String version = "3.1.2";		HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);		tEnv.registerCatalog("myhive", hive);		tEnv.useCatalog("myhive");		tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);		tEnv.useDatabase("db1");		tEnv.createTemporaryView("kafka_source_table", dataStream);		String insertSql = "insert into  hive.db1.fs_table SELECT userId, amount, " +		                   " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM kafka_source_table";		tEnv.executeSql(insertSql);

系統在啓動的時候會首先解析sql,獲取相應的屬性,而後會經過java的SPI機制加載TableFactory的全部子類,包含TableSourceFactory和TableSinkFactory,以後,會根據從sql中解析的屬性循環判斷使用哪一個工廠類,具體的操做是在TableFactoryUtil類的方法裏面實現的。

好比對於上面的sql,解析以後,發現是要寫入一個表名爲hive.db1.fs_table的hive sink。因此係統在調用TableFactoryUtil#findAndCreateTableSink(TableSinkFactory.Context context)方法之後,獲得了TableSinkFactory的子類HiveTableFactory,而後調用相應的createTableSink方法來建立相應的sink,也就是HiveTableSink。

咱們來簡單看下HiveTableSink的變量和結構。

/** * Table sink to write to Hive tables. */public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink {	private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);	private final boolean userMrWriter;	//是否有界,用來區分是批處理仍是流處理	private final boolean isBounded;	private final JobConf jobConf;	private final CatalogTable catalogTable;	private final ObjectIdentifier identifier;	private final TableSchema tableSchema;	private final String hiveVersion;	private final HiveShim hiveShim;	private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>();	private boolean overwrite = false;	private boolean dynamicGrouping = false;

咱們看到它實現了AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink三個接口,這三個接口決定了hive sink實現的功能,數據只能是append模式的,數據是可分區的、而且數據是能夠被覆蓋寫的。

類裏面的這些變量,看名字就大概知道是什麼意思了,就不作解釋了,講一下HiveShim,咱們在構造方法裏看到hiveShim是和hive 的版本有關的,因此其實這個類咱們能夠理解爲對不一樣hive版本操做的一層封裝。

hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);

tablesink處理數據流的方法是consumeDataStream,咱們來重點分析下。

hive基本信息獲取

首先會經過hive的配置鏈接到hive的元數據庫,獲得hive表的基本信息。

String[] partitionColumns = getPartitionKeys().toArray(new String[0]);		String dbName = identifier.getDatabaseName();		String tableName = identifier.getObjectName();		try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(				new HiveConf(jobConf, HiveConf.class), hiveVersion)) {			Table table = client.getTable(dbName, tableName);			StorageDescriptor sd = table.getSd();
  • 獲取到hive的表的信息,也就是Table對象。

  • 獲取表的一些存儲信息,StorageDescriptor對象,這裏麪包含了hive表的存儲路徑、存儲格式等等。

流、批判斷

接下來判斷寫入hive是批處理仍是流處理

if (isBounded){   ......   //batch    } else {   ......   //streaming    }

因爲此次咱們主要分析flink的流處理,因此對於batch就暫且跳過,進入else,也就是流處理。

在這裏,定義了一些基本的配置:

  • 桶分配器TableBucketAssigner,簡單來講就是如何肯定數據的分區,好比按時間,仍是按照字段的值等等。

  • 滾動策略,如何生成下一個文件,按照時間,仍是文件的大小等等。

  • 構造bulkFactory,目前只有parquet和orc的列存儲格式使用bulkFactory

//桶分配器	TableBucketAssigner assigner = new TableBucketAssigner(partComputer);		//滾動策略	TableRollingPolicy rollingPolicy = new TableRollingPolicy(						true,						conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),						conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());    //構造bulkFactory	Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);

createBulkWriterFactory方法主要是用於構造寫入列存儲格式的工廠類,目前只支持parquet和orc格式,首先定義用於構造工廠類的一些參數,好比字段的類型,名稱等等,以後根據不一樣類型構造不一樣的工廠類。若是是parquet格式,最終構造的是ParquetWriterFactory工廠類,若是是orc格式,根據hive的版本不一樣,分別構造出OrcBulkWriterFactory或者是OrcNoHiveBulkWriterFactory。

寫入格式判斷

若是是使用MR的writer或者是行格式,進入if邏輯,使用HadoopPathBasedBulkFormatBuilder,若是是列存儲格式,進入else邏輯,使用StreamingFileSink來寫入數據.

if (userMrWriter || !bulkFactory.isPresent()) {					HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);					builder = new HadoopPathBasedBulkFormatBuilder<>(							new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)							.withRollingPolicy(rollingPolicy)							.withOutputFileConfig(outputFileConfig);					LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");				} else {					builder = StreamingFileSink.forBulkFormat(							new org.apache.flink.core.fs.Path(sd.getLocation()),							new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))							.withBucketAssigner(assigner)							.withRollingPolicy(rollingPolicy)							.withOutputFileConfig(outputFileConfig);					LOG.info("Hive streaming sink: Use native parquet&orc writer.");				}

在大數據處理中,列式存儲比行存儲有着更好的查詢效率,因此咱們此次以列式存儲爲主,聊聊StreamingFileSink是如何寫入列式數據的。經過代碼咱們看到在構造buckets builder的時候,使用了前面剛生成的bucket assigner、輸出的配置、以及文件滾動的策略。

構造分區提交算子

在HiveTableSink#consumeDataStream方法的最後,進入了FileSystemTableSink#createStreamingSink方法,這個方法主要作了兩件事情,一個是建立了用於流寫入的算子StreamingFileWriter,另外一個是當存在分區列而且在配置文件配置了分區文件提交策略的時候,構造了一個用於提交分區文件的算子StreamingFileCommitter,這個算子固定的只有一個併發度。

StreamingFileWriter fileWriter = new StreamingFileWriter(				rollingCheckInterval,				bucketsBuilder);		DataStream<CommitMessage> writerStream = inputStream.transform(				StreamingFileWriter.class.getSimpleName(),				TypeExtractor.createTypeInfo(CommitMessage.class),				fileWriter).setParallelism(inputStream.getParallelism());		DataStream<?> returnStream = writerStream;		// save committer when we don't need it.		if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {			StreamingFileCommitter committer = new StreamingFileCommitter(					path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);			returnStream = writerStream					.transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer)					.setParallelism(1)					.setMaxParallelism(1);		}

咱們看到在代碼中,inputStream通過transform方法,最終將要提交的數據轉換成CommitMessage格式,而後發送給它的下游StreamingFileCommitter算子,也就是說StreamingFileCommitter將會接收StreamingFileWriter中收集的數據。

詳解StreamingFileWriter

這個StreamingFileWriter咱們能夠理解爲一個算子級別的寫入文件的sink,它對StreamingFileSink進行了一些包裝,而後添加了一些其餘操做,好比提交分區信息等等。咱們簡單看下這個類的結構,並簡單聊聊各個方法的做用。

public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>		implements OneInputStreamOperator<RowData, CommitMessage>, BoundedOneInput{			@Override	public void initializeState(StateInitializationContext context) throws Exception {	    .........................	}				@Override	public void snapshotState(StateSnapshotContext context) throws Exception {	  .........................	}	@Override	public void processWatermark(Watermark mark) throws Exception {	  .........................	}	@Override	public void processElement(StreamRecord<RowData> element) throws Exception {	  .........................	}	/**	 * Commit up to this checkpoint id, also send inactive partitions to downstream for committing.	 */	@Override	public void notifyCheckpointComplete(long checkpointId) throws Exception {		  .........................	}	    		@Override	public void endInput() throws Exception {		  .........................	}	@Override	public void dispose() throws Exception {	  .........................	}	    		    		   }
  • initializeState :初始化狀態的方法,在這裏構造了要寫入文件的buckets,以及具體寫入文件的StreamingFileSinkHelper等等。

  • snapshotState:這個方法主要是進行每次checkpoint的時候調用。

  • processWatermark這個方法經過名字就能看出來,是處理水印的,好比往下游發送水印等等。

  • processElement:處理元素最核心的方法,每來一條數據,都會進入這個方法進行處理。

  • notifyCheckpointComplete,每次checkpoint完成的時候調用該方法。在這裏,收集了一些要提交的分區的信息,用於分區提交。

  • endInput:再也不有更多的數據進來,也就是輸入結束的時候調用。

  • dispose:算子的生命週期結束的時候調用。

簡述StreamingFileSink

StreamingFileSink咱們來簡單的描述下,經過名字咱們就能看出來,這是一個用於將流式數據寫入文件系統的sink,它集成了checkpoint提供exactly once語義。

在StreamingFileSink裏有一個bucket的概念,咱們能夠理解爲數據寫入的目錄,每一個bucket下能夠寫入多個文件。它提供了一個BucketAssigner的概念用於生成bucket,進來的每個數據在寫入的時候都會判斷下要寫入哪一個bucket,默認的實現是DateTimeBucketAssigner,每小時生成一個bucket。

它根據不一樣的寫入格式分別使用StreamingFileSink#forRowFormat或者StreamingFileSink#forBulkFormat來進行相應的處理。

此外,該sink還提供了一個RollingPolicy用於決定數據的滾動策略,好比文件到達多大或者通過多久就關閉當前文件,開啓下一個新文件。

具體的寫入ORC格式的數據,能夠參考下這個文章:flink 1.11 流式數據ORC格式寫入file,因爲咱們此次主要是講總體寫入hive的流程,這個sink就不作太具體的講解了。

分區信息提交

StreamingFileWriter#notifyCheckpointComplete 調用commitUpToCheckpoint在checkpoint完成的時候觸發了分區的提交操做。

private void commitUpToCheckpoint(long checkpointId) throws Exception {		helper.commitUpToCheckpoint(checkpointId);		CommitMessage message = new CommitMessage(				checkpointId,				getRuntimeContext().getIndexOfThisSubtask(),				getRuntimeContext().getNumberOfParallelSubtasks(),				new ArrayList<>(inactivePartitions));		output.collect(new StreamRecord<>(message));		inactivePartitions.clear();	}

在這裏,咱們看到,使用inactivePartitions構造了CommitMessage對象,而後使用output.collect將這個提交數據收集起來,也就是上文咱們提到的這裏收集到的這個數據將會發給StreamingFileCommitter算子來處理。

而inactivePartitions裏面的數據是何時添加進來的呢,也就是何時纔會生成要提交的分區呢?咱們跟蹤一下代碼,發現是給寫入文件的buckets添加了一個監聽器,在bucket成爲非活躍狀態以後,觸發監聽器,而後將對應的bucket id 添加到inactivePartitions集合。

@Override	public void initializeState(StateInitializationContext context) throws Exception {        ..........................		buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {			@Override			public void bucketCreated(Bucket<RowData, String> bucket) {			}			@Override			public void bucketInactive(Bucket<RowData, String> bucket) {				inactivePartitions.add(bucket.getBucketId());			}		});	}

而通知bucket變爲非活動狀態又是什麼狀況會觸發呢?從代碼註釋咱們看到,到目前爲止該bucket已接收的全部記錄都已提交後,則該bucket將變爲非活動狀態。

提交分區算子

這是一個單並行度的算子,用於提交寫入文件系統的分區信息。具體的處理步驟以下:

  • 從上游收集要提交的分區信息

  • 判斷某一個checkpoint下,全部的子任務是否都已經接收了分區的數據

  • 獲取分區提交觸發器。(目前支持partition-time和process-time)

  • 使用分區提交策略去依次提交分區信息(能夠配置多個分區策略)

這裏咱們主要講一下 StreamingFileCommitter#processElement方法是如何對進來的每一個提交數據進行處理的。

@Override	public void processElement(StreamRecord<CommitMessage> element) throws Exception {		CommitMessage message = element.getValue();		for (String partition : message.partitions) {			trigger.addPartition(partition);		}		if (taskTracker == null) {			taskTracker = new TaskTracker(message.numberOfTasks);		}		boolean needCommit = taskTracker.add(message.checkpointId, message.taskId);		if (needCommit) {			commitPartitions(message.checkpointId);		}	}

咱們看到,從上游接收到CommitMessage元素,而後從裏面獲得要提交的分區,添加到PartitionCommitTrigger裏(變量trigger),而後經過taskTracker來判斷一下,該checkpoint每一個子任務是否已經接收到了分區數據,最後經過commitPartitions方法來提交分區信息。

進入commitPartitions方法,看看是如何提交分區的。

private void commitPartitions(long checkpointId) throws Exception {		List<String> partitions = checkpointId == Long.MAX_VALUE ?				trigger.endInput() :				trigger.committablePartitions(checkpointId);		if (partitions.isEmpty()) {			return;		}		try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {			for (String partition : partitions) {				LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition));				LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier);				Path path = new Path(locationPath, generatePartitionPath(partSpec));				PartitionCommitPolicy.Context context = new PolicyContext(						new ArrayList<>(partSpec.values()), path);				for (PartitionCommitPolicy policy : policies) {					if (policy instanceof MetastoreCommitPolicy) {						((MetastoreCommitPolicy) policy).setMetastore(metaStore);					}					policy.commit(context);				}			}		}	}

從trigger中獲取該checkpoint下的全部要提交的分區,放到一個List集合partitions中,在提交的分區不爲空的狀況下,循環遍歷要配置的分區提交策略PartitionCommitPolicy,而後提交分區。

分區提交觸發器

目前系統提供了兩種分區提交的觸發器,PartitionTimeCommitTigger和ProcTimeCommitTigger,分別用於處理何時提交分區。

  • ProcTimeCommitTigger 主要依賴於分區的建立時間和delay,當處理時間大於'partition creation time' + 'delay'的時候,將提交這個分區

  • PartitionTimeCommitTigger 依賴於水印,當水印的值大於 partition-time + delay的時候提交這個分區。

分區提交策略

目前系統提供了一個接口PartitionCommitPolicy,用於提交分區的信息,目前系統提供瞭如下幾種方案,

  • 一種是METASTORE,主要是用於提交hive的分區,好比建立hive分區等等

  • 還有一種是SUCCESS_FILE,也就是往對應的分區目錄下寫一個success文件。

  • 此外,系統還提供了一個對外的自定義實現,用於用戶自定義分區提交,好比提交分區以後合併小文件等等。自定義提交策略的時候,須要實現PartitionCommitPolicy接口,並將提交策略置爲custom。

我在網上也看到過一些實現該接口用於合併小文件的示例,可是我我的以爲其實有點不太完美,由於這個合併小文件可能會涉及不少的問題:

  • 合併的時候如何保證事務,保證合併的同時如何有讀操做不會發生髒讀

  • 事務的一致性,若是合併出錯了怎麼回滾

  • 合併小文件的性能是否跟得上,目前flink只提供了一個單並行度的提交算子。

  • 如何多併發合併寫入

因此暫時我也沒有想到一個完美的方案用於flink來合併小文件。

總結

經過上述的描述,咱們簡單聊了一下flink是如何將流式數據寫入hive的,可是可能每一個人在作的過程當中仍是會遇到各類各類的環境問題致使的寫入失敗,好比window和linux系統的差別,hdfs版本的差別,系統時區的配置等等,在遇到一些個性化的問題以後,就可能須要你們去針對本身的問題去個性化的debug了。

更多幹貨信息,歡迎關注個人公衆號【大數據技術與應用實戰】

本文分享自微信公衆號 - 大數據技術與應用實戰(bigdata_bigdata)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索