在滴滴的兩年一直在加班,人也變懶了,就不多再寫博客了,最近在進行Carbondata和hive集成方面的工做,因而乎須要對Carbondata進行深刻的研究。node
因而新開一個系列,記錄本身學習Carbondata的點點滴滴。git
當前版本是1.2.0-SNAPSHOTgithub
git clone https://github.com/apache/carbondata.gitsql
先用IDEA打開carbondata的代碼,點擊上方的View -> Tool Windows -> Maven Projects, 先勾選一下須要的profile和編譯format工程,以下圖所示: apache
咱們先打開入口類CarbonDataFrameWriter,找到writeToCarbonFile這個方法數組
private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = { val options = new CarbonOption(parameters) val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext) if (options.tempCSV) { loadTempCSV(options, cc) } else { loadDataFrame(options, cc) } }
它有兩個方式,loadTempCSV和loadDataFrame。多線程
loadTempCSV是先生成CSV文件,再調用LOAD DATA INPATH...的命令導入數據。異步
這裏咱們之研究loadDataFrame這種直接生成數據的方式。ide
一路點進去,目標落在carbonTableSchema的LoadTable的run方法裏,接着就是洋洋灑灑的二百行的set代碼。它是核心實際上是構造一個CarbonLoadModel類。學習
val carbonLoadModel = new CarbonLoadModel() carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) carbonLoadModel.setStorePath(relation.tableMeta.storePath) val table = relation.tableMeta.carbonTable carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray) carbonLoadModel.setTableName(table.getFactTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) // Need to fill dimension relation carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
這些代碼爲了Load一個文本文件準備的,若是是用dataframe的方式則不須要看了。直接略過,直接調到if (carbonLoadModel.getUseOnePass)這一句。
這個跟字典的生成方式有關,這個值默認是false,先忽略true的過程吧,看主流程就行,下面這哥倆纔是咱們要找的。
// 生成字典文件 GlobalDictionaryUtil .generateGlobalDictionary( sparkSession.sqlContext, carbonLoadModel, relation.tableMeta.storePath, dictionaryDataFrame) // 生成數據文件
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, carbonLoadModel, relation.tableMeta.storePath, columnar, partitionStatus, None, loadDataFrame, updateModel)
先看GlobalDictionaryUtil.generateGlobalDictionary方法
if (StringUtils.isEmpty(allDictionaryPath)) { LOGGER.info("Generate global dictionary from source data files!") // load data by using dataSource com.databricks.spark.csv var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel)) var headers = carbonLoadModel.getCsvHeaderColumns headers = headers.map(headerName => headerName.trim) val colDictFilePath = carbonLoadModel.getColDictFilePath if (colDictFilePath != null) { // generate predefined dictionary generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier, dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath) } if (headers.length > df.columns.length) { val msg = "The number of columns in the file header do not match the " + "number of columns in the data file; Either delimiter " + "or fileheader provided is not correct" LOGGER.error(msg) throw new DataLoadingException(msg) } // use fact file to generate global dict val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, df.columns) if (requireDimension.nonEmpty) { // select column to push down pruning df = df.select(requireColumnNames.head, requireColumnNames.tail: _*) val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier, requireDimension, storePath, dictfolderPath, false) // combine distinct value in a block and partition by column val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model) .partitionBy(new ColumnPartitioner(model.primDimensions.length)) // generate global dictionary files val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect() // check result status checkStatus(carbonLoadModel, sqlContext, model, statusList) } else { LOGGER.info("No column found for generating global dictionary in source data files") } } else { generateDictionaryFromDictionaryFiles(sqlContext, carbonLoadModel, storePath, carbonTableIdentifier, dictfolderPath, dimensions, allDictionaryPath) }
包含了兩種狀況:不存在字典文件和已存在字段文件。
先看不存在的狀況
// use fact file to generate global dict val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, df.columns) if (requireDimension.nonEmpty) { // 只選取標記爲字典的維度列 df = df.select(requireColumnNames.head, requireColumnNames.tail: _*) val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier, requireDimension, storePath, dictfolderPath, false) // 去重以後按列分區 val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model) .partitionBy(new ColumnPartitioner(model.primDimensions.length)) // 生成全局字段文件 val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect() // check result status checkStatus(carbonLoadModel, sqlContext, model, statusList) } else { LOGGER.info("No column found for generating global dictionary in source data files") }
先從源文件當中讀取全部維度列,去重以後按列分區,而後輸出,具體輸出的過程請看CarbonGlobalDictionaryGenerateRDD的internalCompute方法。
val dictWriteTask = new DictionaryWriterTask(valuesBuffer, dictionaryForDistinctValueLookUp, model.table, model.columnIdentifier(split.index), model.hdfsLocation, model.primDimensions(split.index).getColumnSchema, model.dictFileExists(split.index) ) // execute dictionary writer task to get distinct values val distinctValues = dictWriteTask.execute() val dictWriteTime = System.currentTimeMillis() - t3 val t4 = System.currentTimeMillis() // if new data came than rewrite sort index file if (distinctValues.size() > 0) { val sortIndexWriteTask = new SortIndexWriterTask(model.table, model.columnIdentifier(split.index), model.primDimensions(split.index).getDataType, model.hdfsLocation, dictionaryForDistinctValueLookUp, distinctValues) sortIndexWriteTask.execute() } val sortIndexWriteTime = System.currentTimeMillis() - t4 CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime() // After sortIndex writing, update dictionaryMeta dictWriteTask.updateMetaData()
字典文件在表目錄的下的Metadata目錄下,它須要生成三種文件
一、字段文件,命令方式爲 列ID.dict
二、sort index文件,命令方式爲 列ID.sortindex
三、字典列的meta信息,命令方式爲 列ID.dictmeta
請打開CarbonDataRDDFactory,找到loadCarbonData這個方法,方法裏面包括了從load命令和從dataframe加載的兩種方式,代碼看起來是有點兒又長又臭的感受。咱們只關注loadDataFrame的方式就好。
def loadDataFrame(): Unit = { try { val rdd = dataFrame.get.rdd // 獲取數據的位置 val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p => DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host) }.distinct.size
// 確保executor數量要和數據的節點數同樣多 val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData, sqlContext.sparkContext) val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct) // 生成數據文件 status = new NewDataFrameLoaderRDD(sqlContext.sparkContext, new DataLoadResultImpl(), carbonLoadModel, currentLoadCount, tableCreationTime, schemaLastUpdatedTime, newRdd).collect() } catch { case ex: Exception => LOGGER.error(ex, "load data frame failed") throw ex } }
打開NewDataFrameLoaderRDD類,查看internalCompute方法,這個方法的核心是這句話
new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)
打開DataLoadExecutor,execute方法裏面的核心是DataLoadProcessBuilder的build方法,根據表不一樣的參數設置,DataLoadProcessBuilder的build過程會有一些不一樣
public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation, CarbonIterator[] inputIterators) throws Exception { CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation); SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration); if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) { // 沒有排序列或者carbon.load.sort.scope設置爲NO_SORT的 return buildInternalForNoSort(inputIterators, configuration); } else if (configuration.getBucketingInfo() != null) { // 設置了Bucket的表 return buildInternalForBucketing(inputIterators, configuration); } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) { // carbon.load.sort.scope設置爲BATCH_SORT return buildInternalForBatchSort(inputIterators, configuration); } else { return buildInternal(inputIterators, configuration); } }
下面僅介紹標準的導入過程buildInternal:
private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration) { // 1. Reads the data input iterators and parses the data. AbstractDataLoadProcessorStep inputProcessorStep = new InputProcessorStepImpl(configuration, inputIterators); // 2. Converts the data like dictionary or non dictionary or complex objects depends on // data types and configurations. AbstractDataLoadProcessorStep converterProcessorStep = new DataConverterProcessorStepImpl(configuration, inputProcessorStep); // 3. Sorts the data by SortColumn AbstractDataLoadProcessorStep sortProcessorStep = new SortProcessorStepImpl(configuration, converterProcessorStep); // 4. Writes the sorted data in carbondata format. return new DataWriterProcessorStepImpl(configuration, sortProcessorStep); }
主要是分4個步驟:
一、讀取數據,並進行格式轉換,這一步驟是讀取csv文件服務的,dataframe的數據格式都已經處理過了
二、根據字段的數據類型和配置,替換掉字典列的值;非字典列會被替換成byte數組
三、按照Sort列進行排序
四、把數據用Carbondata的格式輸出
下面咱們從第二步DataConverterProcessorStepImpl開始提及,在getIterator方法當中,會發現每個CarbonRowBatch都要通過localConverter的convert方法轉換,localConverter中只有RowConverterImpl一個轉換器。
RowConverterImpl由不少的FieldConverter組成,在initialize方法中能夠看到它是由FieldEncoderFactory的createFieldEncoder方法生成的。
public FieldConverter createFieldEncoder(DataField dataField, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat, DictionaryClient client, Boolean useOnePass, String storePath, boolean tableInitialize, Map<Object, Integer> localCache, boolean isEmptyBadRecord) throws IOException { // Converters are only needed for dimensions and measures it return null. if (dataField.getColumn().isDimension()) { if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) && !dataField.getColumn().isComplex()) { return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) && !dataField.getColumn().isComplex()) { return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat, index, client, useOnePass, storePath, tableInitialize, localCache, isEmptyBadRecord); } else if (dataField.getColumn().isComplex()) { return new ComplexFieldConverterImpl( createComplexType(dataField, cache, carbonTableIdentifier, client, useOnePass, storePath, tableInitialize, localCache), index); } else { return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); } } else { return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); } }
從這段代碼當中能夠看出來,它是分紅了幾種類型的
一、維度類型,編碼方式爲Encoding.DIRECT_DICTIONARY的非複雜列,採用DirectDictionaryFieldConverterImpl (主要是TIMESTAMP和DATE類型),換算成值和基準時間的差值
二、維度類型,編碼方式爲Encoding.DICTIONARY的非複雜列,採用DictionaryFieldConverterImpl (非高基數的字段類型),把字段換成字典中的key(int類型)
三、維度類型,複雜列,採用ComplexFieldConverterImpl (複雜字段類型,Sturct和Array類型),把字段轉成二進制
四、維度類型,高基數列,採用NonDictionaryFieldConverterImpl,原封不動,原來是啥樣,如今仍是啥樣
五、指標類型,採用MeasureFieldConverterImpl (值類型,float、double、int、bigint、decimal等),原封不動,原來是啥樣,如今仍是啥樣
第三步SortProcessorStepImpl,關鍵點在SorterFactory.createSorter是怎麼實現的
public static Sorter createSorter(CarbonDataLoadConfiguration configuration, AtomicLong counter) { boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)); SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration); Sorter sorter; if (offheapsort) { if (configuration.getBucketingInfo() != null) { sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(), configuration.getBucketingInfo()); } else { sorter = new UnsafeParallelReadMergeSorterImpl(counter); } } else { if (configuration.getBucketingInfo() != null) { sorter = new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo()); } else { sorter = new ParallelReadMergeSorterImpl(counter); } } if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) { if (configuration.getBucketingInfo() == null) { sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter); } else { LOGGER.warn( "Batch sort is not enabled in case of bucketing. Falling back to " + sorter.getClass() .getName()); } } return sorter; }
竟然還可使用堆外內存sort,設置enable.unsafe.sort爲true就能夠開啓了。咱們看默認的ParallelReadMergeSorterImpl吧。
超過100000條記錄就要把數據排序,而後生成一個文件,文件數超過20個文件以後,就要作一次文件合併。
規則在NewRowComparator和NewRowComparatorForNormalDims當中
相關參數:
carbon.sort.size 100000
carbon.sort.intermediate.files.limit 20
到最後一步了,打開DataWriterProcessorStepImpl類,它是經過CarbonFactHandlerFactory.createCarbonFactHandler生成一個CarbonFactHandler,經過CarbonFactHandler的addDataToStore方法處理CarbonRow
addDataToStore的實現很簡單,當row的數量達到一個blocklet的大小以後,就往線程池裏提交一個異步的任務Producer進行處理
public void addDataToStore(CarbonRow row) throws CarbonDataWriterException { dataRows.add(row); this.entryCount++; // if entry count reaches to leaf node size then we are ready to write // this to leaf node file and update the intermediate files if (this.entryCount == this.blockletSize) { try { semaphore.acquire(); producerExecutorServiceTaskList.add( producerExecutorService.submit( new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false) ) ); blockletProcessingCount.incrementAndGet(); // set the entry count to zero processedDataCount += entryCount; LOGGER.info("Total Number Of records added to store: " + processedDataCount); dataRows = new ArrayList<>(this.blockletSize); this.entryCount = 0; } catch (InterruptedException e) { LOGGER.error(e, e.getMessage()); throw new CarbonDataWriterException(e.getMessage(), e); } } }
這裏用到了生產者消費者的模式,Producer的處理是多線程的,Consumer是單線程的;Producer主要是負責數據的壓縮,Consumer負責進行輸出,數據的交換經過blockletDataHolder。
相關參數:
carbon.number.of.cores.while.loading 2 (Producer的線程數)
number.of.rows.per.blocklet.column.page 32000
文件生成主要包含以上過程,限於文章篇幅,下一章再繼續接着寫Carbondata的數據文件格式細節。
岑玉海
轉載請註明出處,謝謝!