[轉] Spark sql 內置配置(V2.2)

【From】 https://blog.csdn.net/u010990043/article/details/82842995java

 

 最近整理了一下spark SQL內置配。加粗配置項是對sparkSQL 調優性能影響比較大的項,小夥伴們按需酌情配置。後續會挑出一些通用調優配置,共你們參考。有不正確的地方,歡迎你們在留言區留言討論。 node

 

配置項 默認值 概述
spark.sql.optimizer.maxIterations 100 sql優化器最大迭代次數
spark.sql.optimizer.inSetConversionThreshold 10 插入轉換的集合大小閾值
spark.sql.inMemoryColumnarStorage.compressed TRUE 當設置爲true時,SCAPK SQL將根據數據的統計自動爲每一個列選擇壓縮編解碼器
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制用於列緩存的批處理的大小。較大的批處理大小能夠提升內存利用率和壓縮率,但緩存數據時會出現OOM風險
spark.sql.inMemoryColumnarStorage.partitionPruning  TRUE 啓用內存中的列表分區剪枝
spark.sql.join.preferSortMergeJoin TRUE When true, 使用sort merge join 代替 shuffle hash join
spark.sql.sort.enableRadixSort TRUE 使用基數排序,基數排序性能很是快,可是會額外使用over heap.當排序比較小的Row時,overheap 須要提升50%
spark.sql.autoBroadcastJoinThreshold 10L * 1024 * 1024 當執行join時,被廣播到worker節點上表最大字節。當被設置爲-1,則禁用廣播。當前僅僅支持 Hive Metastore tables,表大小的統計直接基於hive表的源文件大小
spark.sql.limit.scaleUpFactor 4 在執行查詢時,兩次嘗試之間讀取partation數目的增量。較高的值會致使讀取過多分區,較少的值會致使執行時間過長,由於浙江運行更多的做業
spark.sql.statistics.fallBackToHdfs FALSE 當不能從table metadata中獲取表的統計信息,返回到hdfs。這否有用取決與表是否足夠小到可以使用auto broadcast joins
spark.sql.defaultSizeInBytes Long.MaxValue 在查詢計劃中表默認大小,默認被設置成Long.MaxValue 大於spark.sql.autoBroadcastJoinThreshold的值,也就意味着默認狀況下不會廣播一個表,除非他足夠小
spark.sql.shuffle.partitions 200 當爲join/aggregation  shuffle數據時,默認partition的數量
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 64 * 1024 * 1024byte The target post-shuffle input size in bytes of a task.
spark.sql.adaptive.enabled FALSE 是否開啓adaptive query execution(自適應查詢執行)
spark.sql.adaptive.minNumPostShufflePartitions -1 測試用
spark.sql.subexpressionElimination.enabled TRUE When true, common subexpressions will be eliminated
當爲真時,將刪除公共子表達式
spark.sql.caseSensitive FALSE 查詢分析器是否區分大小寫,默認狀況下不區分。強烈建議不區分大小寫
spark.sql.constraintPropagation.enabled   是否開啓優化,在查詢優化器期間推斷和傳播查詢計劃中的數據約束。對於某種類型的查詢計劃(例若有大量謂語和別名的查詢),約束傳播是昂貴的,會對整個運行時間產生負面影響。
spark.sql.parser.escapedStringLiterals FALSE 2.0以前默認值爲true,知否默認是否。正常文字可否包含在正則表達式中。
spark.sql.parquet.mergeSchema FALSE 若爲true,在讀取parquet數據源時,schema從全部文件中合併出來。不然若是沒有可用的摘要文件,則從概要文件或隨機文件中選擇模式
spark.sql.parquet.respectSummaryFiles FALSE 若爲ture,假設parquet的全部部分文件和概要文件一致,在合併模式時會忽略他們。不然將會合並全部的部分文件
spark.sql.parquet.binaryAsString FALSE 是否向下兼容其餘parquet生產系統(eg impala or older version spark sql ),不區分字節數據和string數據寫到parquet schema,這個配置促使spark sql將二進制數據做爲string達到兼容
spark.sql.parquet.int96AsTimestamp TRUE 是否使用Int96做爲timestamp的存儲格式,能夠避免精度損失丟失納秒部分,爲其餘parquet系統提供兼容(impala)
spark.sql.parquet.int64AsTimestampMillis FALSE 當爲true,timestamp值將以Int64做爲mlibs的存儲擴展類型,這種模式微秒將被丟棄
spark.sql.parquet.cacheMetadata TRUE 是否緩存parquet的schema數據元,能夠提高靜態數據的查詢性能
spark.sql.parquet.compression.codec snappy 支持類型:uncompressed", "snappy", "gzip", "lzo"。 指定parquet寫文件的壓縮編碼方式
spark.sql.parquet.filterPushdown TRUE 是否開啓parquet過濾條件下推
spark.sql.parquet.writeLegacyFormat FALSE spark sql在拼接schema時是否遵循parquet的schema的規範
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.ParquetOutputCommitter parquet輸出提交器類,同城必須是org.apache.hadoop.mapreduce.OutputCommitter的子類,若是不是將不會建立數據源摘要,即便配置開啓了parquet.enable.summary-metadata
spark.sql.parquet.enableVectorizedReader TRUE 開啓parquet向量解碼
spark.sql.orc.filterPushdown FALSE 是否開啓條件下推到orc文件寫
spark.sql.hive.verifyPartitionPath FALSE 當爲true時,在讀取HDFS中存儲的數據時,檢查表根目錄下的全部分區路徑
spark.sql.hive.metastorePartitionPruning TRUE 當爲true,spark sql的謂語將被下推到hive metastore中,更早的消除不匹配的分區,會影響到違背轉換成文件源關係的hive表
spark.sql.hive.manageFilesourcePartitions TRUE 是否使用hive metastore管理spark sql的 dataSource表分區,若爲true,dataSource表會在執行計劃期間使用分區剪枝 
spark.sql.hive.filesourcePartitionFileCacheSize 250 * 1024 * 1024 當非0時,開啓將分區文件數據元緩存到內存中,全部表共享一個緩存,當開啓 hive filesource partition management(spark.sql.hive.manageFilesourcePartitions)時纔會生效
spark.sql.hive.caseSensitiveInferenceMode INFER_AND_SAVE 設置沒法從hive表屬性讀取分區大小寫模式時所採起的操做,雖然Spice SQL自己不區分大小寫,但hive兼容的文件格式如parquet。Spark sql必須使用一個保持狀況的模式,當查詢由包含區分大小寫字段名或查詢的文件支持的任何表可能沒法返回準確的結果時。有效選項包括INFER_AND_SAVE(默認模式——從基礎數據文件推斷出區分大小寫的模式,並將其寫入表屬性),INFER_ONLY(推斷schema但不嘗試將其寫入表屬性)和NEVER_INFER(回退到使用區分大小寫間接轉移模式代替推斷)
spark.sql.optimizer.metadataOnly TRUE 當爲true時,啓用僅使用表的元數據的元數據查詢優化來生成分區列,而不是表掃描。當掃描的全部列都是分區列,而且查詢具備知足不一樣語義的聚合運算符時,它適用。
spark.sql.columnNameOfCorruptRecord _corrupt_record 當json/csv數據內部列解析失敗時,失敗列的名稱
spark.sql.broadcastTimeout" 5*60 在broadCast join時 ,廣播等待的超時時間
spark.sql.thriftserver.scheduler.pool   爲JDBC客戶端會話設置公平調度程序池
spark.sql.thriftServer.incrementalCollect FALSE 當TRUE時,啓用增量集合以在thrift server中執行
spark.sql.thriftserver.ui.retainedStatements 200 JDBC/ODBC Web用戶界面歷史記錄中SQL語句的數量
spark.sql.thriftserver.ui.retainedSessions 200 JDBC/ODBC Web UI歷史中保存的SQL客戶端會話數
spark.sql.sources.default parquet 輸入輸出默認數據元
spark.sql.hive.convertCTAS FALSE 若是時true,將使用spark.sql.sources.default.設置數據源,不指定任何存儲屬性到hive ctas語句
spark.sql.hive.gatherFastStats TRUE 在修復表分區時,將快速收集STATS(文件數量和全部文件的總大小),以免HIVE轉移子中的順序列表。
spark.sql.sources.partitionColumnTypeInference.enabled TRUE 是否自動推斷分區列的數據類型
spark.sql.sources.bucketing.enabled TRUE 當false時,分桶表看成普通表處理
spark.sql.crossJoin.enabled FALSE 當false時,若是查詢中語法笛卡兒積 卻語法中沒有顯示join,將會拋出異常
spark.sql.orderByOrdinal TRUE 當爲true時,排序字段放置到seleect List,不然被忽略
spark.sql.groupByOrdinal TRUE 當爲true時,按組子句的序號被視爲選擇列表中的位置。當爲false時,序數被忽略。
spark.sql.groupByAliases TRUE group by後的別名是否可以被用到 select list中,若爲否將拋出分析異常
spark.sql.sources.parallelPartitionDiscovery.threshold 32 容許在driver端列出文件的最大路徑數。若是在分區發現期間檢測到的路徑的數量超過該值,則嘗試用另外一個SCAPLE分佈式做業來列出文件。這適用於parquet、ORC、CSV、JSON和LIbSVM數據源。
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 遞歸地列出路徑集合的並行數,設置阻止文件列表生成太多任務的序號
spark.sql.selfJoinAutoResolveAmbiguity TRUE 自動解決子連接中的鏈接條件歧義,修復bug SPARK-6231
spark.sql.retainGroupColumns TRUE 是否保留分組列
spark.sql.pivotMaxValues 10000  
spark.sql.runSQLOnFiles TRUE 當爲true,在sql查詢時,可以使用dataSource.path做爲表(eg:"select a,b from hdfs://xx/xx/*")
spark.sql.codegen.wholeStage TRUE 當爲true,多個算子的整個stage將被便宜到一個java方法中
spark.sql.codegen.maxFields 100 在激活整個stage codegen以前支持的最大字段(包括嵌套字段)
spark.sql.codegen.fallback TRUE 當爲true,在整個stage的codegen,對於編譯generated code 失敗的query 部分,將會暫時關閉
spark.sql.codegen.maxCaseBranches 20 支持最大的codegen
spark.sql.files.maxPartitionBytes 128 * 1024 * 1024 在讀取文件時,一個分區最大被讀取的數量,默認值=parquet.block.size
spark.sql.files.openCostInBytes 4 * 1024 * 1024 爲了測定打開一個文件的耗時,經過同時掃描配置的字節數來測定,最好是過分估計,那麼小文件的分區將比具備較大文件的分區更快(首先調度
spark.sql.files.ignoreCorruptFiles FALSE 是否自動跳過不正確的文件
spark.sql.files.maxRecordsPerFile 0 寫入單個文件的最大條數,若是時0或者負數,則無限制
spark.sql.exchange.reuse TRUE planer是否嘗試找出重複的 exchanges並複用
spark.sql.streaming.stateStore.minDeltasForSnapshot 10 在合併成快照以前須要生成的狀態存儲增量文件的最小數目
spark.sql.streaming.checkpointLocation   檢查點數據流的查詢的默認存儲位置
spark.sql.streaming.minBatchesToRetain 100 流式計算最小批次長度
spark.sql.streaming.unsupportedOperationCheck TRUE streaming query的logical plan 檢查不支持的操做
spark.sql.variable.substitute TRUE  
spark.sql.codegen.aggregate.map.twolevel.enable   啓用兩級聚合哈希映射。當啓用時,記錄將首先「插入/查找第一級、小、快的映射,而後在第一級滿或沒法找到鍵時回落到第二級、更大、較慢的映射。當禁用時,記錄直接進入第二級。默認爲真
spark.sql.view.maxNestedViewDepth 100 嵌套視圖中視圖引用的最大深度。嵌套視圖能夠引用其餘嵌套視圖,依賴關係被組織在有向無環圖(DAG)中。然而,DAG深度可能變得太大,致使意外的行爲。此配置限制了這一點:當分析期間視圖深度超過該值時,咱們終止分辨率以免潛在錯誤。
spark.sql.objectHashAggregate.sortBased.fallbackThreshold 128 在ObjectHashAggregateExec的狀況下,當內存中哈希映射的大小增加過大時,咱們將回落到基於排序的聚合。此選項爲哈希映射的大小設置行計數閾值。
spark.sql.execution.useObjectHashAggregateExec TRUE 是否使用 ObjectHashAggregateExec
spark.sql.streaming.fileSink.log.deletion TRUE 是否刪除文件流接收器中的過時日誌文件
spark.sql.streaming.fileSink.log.compactInterval 10 日誌文件合併閾值,而後將全部之前的文件壓縮到下一個日誌文件中
spark.sql.streaming.fileSink.log.cleanupDelay 10min 保證一個日誌文件被全部用戶可見的時長
spark.sql.streaming.fileSource.log.deletion TRUE 是否刪除文件流源中過時的日誌文件
spark.sql.streaming.fileSource.log.compactInterval 10 日誌文件合併閾值,而後將全部之前的文件壓縮到下一個日誌文件中
spark.sql.streaming.fileSource.log.cleanupDelay 10min 保證一個日誌文件被全部用戶可見的時長
spark.sql.streaming.schemaInference FALSE 基於文件的流,是否推斷它的模式
spark.sql.streaming.pollingDelay 10L(MILLISECONDS) 在沒有數據可用時延遲查詢新數據多長時間
spark.sql.streaming.noDataProgressEventInterval 10000L(MILLISECONDS) 在沒有數據的狀況下,在兩個進度事件之間等待時間
spark.sql.streaming.metricsEnabled FALSE 是否爲活動流查詢報告DoopWalth/CODAHALE度量
spark.sql.streaming.numRecentProgressUpdates 100 streaming query 保留的進度更新數量
spark.sql.statistics.ndv.maxError 0.05 生成列級統計量時超對數G+++算法容許的最大估計偏差
spark.sql.cbo.enabled FALSE 在設定true時啓用CBO來估計計劃統計信息
spark.sql.cbo.joinReorder.enabled FALSE Enables join reorder in CBO.
spark.sql.cbo.joinReorder.dp.threshold 12 The maximum number of joined nodes allowed in the dynamic programming algorithm
spark.sql.cbo.joinReorder.card.weight 0.07 The weight of cardinality (number of rows) for plan cost comparison in join reorder:  rows * weight + size * (1 - weight)
spark.sql.cbo.joinReorder.dp.star.filter FALSE Applies star-join filter heuristics to cost based join enumeration
spark.sql.cbo.starSchemaDetection FALSE When true, it enables join reordering based on star schema detection
spark.sql.cbo.starJoinFTRatio 0.9 Specifies the upper limit of the ratio between the largest fact tables  for a star join to be considered
spark.sql.session.timeZone TimeZone.getDefault.getID  時間時區
spark.sql.windowExec.buffer.in.memory.threshold 4096 窗口操做符保證存儲在內存中的行數的閾值
spark.sql.windowExec.buffer.spill.threshold spark.sql.windowExec.buffer.in.memory.threshold 窗口操做符溢出的行數的閾值
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold Int.MaxValue 由sortMergeJoin運算符保證存儲在內存中的行數的閾值
spark.sql.sortMergeJoinExec.buffer.spill.threshold spark.sql.sortMergeJoinExec.buffer.in.memory.threshold 由排序合併鏈接運算符溢出的行數的閾值
spark.sql.cartesianProductExec.buffer.in.memory.threshold 4096 笛卡爾乘積算子保證存儲在內存中的行數的閾值
spark.sql.cartesianProductExec.buffer.spill.threshold spark.sql.cartesianProductExec.buffer.in.memory.threshold 笛卡爾乘積算子溢出的行數閾值
spark.sql.redaction.options.regex "(?i)url".r  

 

 

即使join的hive表沒有10M,也沒有觸發 mapjoin[解決方案]
spark在join的時候,用來判斷一個表的大小是否達到了10M這個限制,是不會去計算這個表在hdfs上的具體的文件大小的,而是使用hive metadata中的信息,具體以下圖:正則表達式

 

explain出來spark的執行計劃以下:算法

== Physical Plan ==sql

*Project [device#57, pkg#58]express

+- *BroadcastHashJoin [pkg#58], [apppkg#62], Inner, BuildRightapache

   :- *Filter isnotnull(pkg#58)json

   :  +- HiveTableScan [device#57, pkg#58], MetastoreRelation dm_sdk_mapping, device_applist, [isnotnull(day#56), (cast(day#56 as double) = 2.0180501E7)]緩存

   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))session

      +- *Filter isnotnull(apppkg#62)

         +- HiveTableScan [apppkg#62], MetastoreRelation dm_sdk_mapping, app_category_mapping

當有些hive沒有totalSize這個信息的時候,spark就會用sortMergeJoin來作join了,可使用下面的命令從新生成metadata信息:

ANALYZE TABLE dm_sdk_mapping.app_category_mapping COMPUTE STATISTICS

相關文章
相關標籤/搜索