【From】 https://blog.csdn.net/u010990043/article/details/82842995java
最近整理了一下spark SQL內置配。加粗配置項是對sparkSQL 調優性能影響比較大的項,小夥伴們按需酌情配置。後續會挑出一些通用調優配置,共你們參考。有不正確的地方,歡迎你們在留言區留言討論。 node
配置項 | 默認值 | 概述 |
spark.sql.optimizer.maxIterations | 100 | sql優化器最大迭代次數 |
spark.sql.optimizer.inSetConversionThreshold | 10 | 插入轉換的集合大小閾值 |
spark.sql.inMemoryColumnarStorage.compressed | TRUE | 當設置爲true時,SCAPK SQL將根據數據的統計自動爲每一個列選擇壓縮編解碼器 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制用於列緩存的批處理的大小。較大的批處理大小能夠提升內存利用率和壓縮率,但緩存數據時會出現OOM風險 |
spark.sql.inMemoryColumnarStorage.partitionPruning | TRUE | 啓用內存中的列表分區剪枝 |
spark.sql.join.preferSortMergeJoin | TRUE | When true, 使用sort merge join 代替 shuffle hash join |
spark.sql.sort.enableRadixSort | TRUE | 使用基數排序,基數排序性能很是快,可是會額外使用over heap.當排序比較小的Row時,overheap 須要提升50% |
spark.sql.autoBroadcastJoinThreshold | 10L * 1024 * 1024 | 當執行join時,被廣播到worker節點上表最大字節。當被設置爲-1,則禁用廣播。當前僅僅支持 Hive Metastore tables,表大小的統計直接基於hive表的源文件大小 |
spark.sql.limit.scaleUpFactor | 4 | 在執行查詢時,兩次嘗試之間讀取partation數目的增量。較高的值會致使讀取過多分區,較少的值會致使執行時間過長,由於浙江運行更多的做業 |
spark.sql.statistics.fallBackToHdfs | FALSE | 當不能從table metadata中獲取表的統計信息,返回到hdfs。這否有用取決與表是否足夠小到可以使用auto broadcast joins |
spark.sql.defaultSizeInBytes | Long.MaxValue | 在查詢計劃中表默認大小,默認被設置成Long.MaxValue 大於spark.sql.autoBroadcastJoinThreshold的值,也就意味着默認狀況下不會廣播一個表,除非他足夠小 |
spark.sql.shuffle.partitions | 200 | 當爲join/aggregation shuffle數據時,默認partition的數量 |
spark.sql.adaptive.shuffle.targetPostShuffleInputSize | 64 * 1024 * 1024byte | The target post-shuffle input size in bytes of a task. |
spark.sql.adaptive.enabled | FALSE | 是否開啓adaptive query execution(自適應查詢執行) |
spark.sql.adaptive.minNumPostShufflePartitions | -1 | 測試用 |
spark.sql.subexpressionElimination.enabled | TRUE | When true, common subexpressions will be eliminated 當爲真時,將刪除公共子表達式 |
spark.sql.caseSensitive | FALSE | 查詢分析器是否區分大小寫,默認狀況下不區分。強烈建議不區分大小寫 |
spark.sql.constraintPropagation.enabled | 是否開啓優化,在查詢優化器期間推斷和傳播查詢計劃中的數據約束。對於某種類型的查詢計劃(例若有大量謂語和別名的查詢),約束傳播是昂貴的,會對整個運行時間產生負面影響。 | |
spark.sql.parser.escapedStringLiterals | FALSE | 2.0以前默認值爲true,知否默認是否。正常文字可否包含在正則表達式中。 |
spark.sql.parquet.mergeSchema | FALSE | 若爲true,在讀取parquet數據源時,schema從全部文件中合併出來。不然若是沒有可用的摘要文件,則從概要文件或隨機文件中選擇模式 |
spark.sql.parquet.respectSummaryFiles | FALSE | 若爲ture,假設parquet的全部部分文件和概要文件一致,在合併模式時會忽略他們。不然將會合並全部的部分文件 |
spark.sql.parquet.binaryAsString | FALSE | 是否向下兼容其餘parquet生產系統(eg impala or older version spark sql ),不區分字節數據和string數據寫到parquet schema,這個配置促使spark sql將二進制數據做爲string達到兼容 |
spark.sql.parquet.int96AsTimestamp | TRUE | 是否使用Int96做爲timestamp的存儲格式,能夠避免精度損失丟失納秒部分,爲其餘parquet系統提供兼容(impala) |
spark.sql.parquet.int64AsTimestampMillis | FALSE | 當爲true,timestamp值將以Int64做爲mlibs的存儲擴展類型,這種模式微秒將被丟棄 |
spark.sql.parquet.cacheMetadata | TRUE | 是否緩存parquet的schema數據元,能夠提高靜態數據的查詢性能 |
spark.sql.parquet.compression.codec | snappy | 支持類型:uncompressed", "snappy", "gzip", "lzo"。 指定parquet寫文件的壓縮編碼方式 |
spark.sql.parquet.filterPushdown | TRUE | 是否開啓parquet過濾條件下推 |
spark.sql.parquet.writeLegacyFormat | FALSE | spark sql在拼接schema時是否遵循parquet的schema的規範 |
spark.sql.parquet.output.committer.class | org.apache.parquet.hadoop.ParquetOutputCommitter | parquet輸出提交器類,同城必須是org.apache.hadoop.mapreduce.OutputCommitter的子類,若是不是將不會建立數據源摘要,即便配置開啓了parquet.enable.summary-metadata |
spark.sql.parquet.enableVectorizedReader | TRUE | 開啓parquet向量解碼 |
spark.sql.orc.filterPushdown | FALSE | 是否開啓條件下推到orc文件寫 |
spark.sql.hive.verifyPartitionPath | FALSE | 當爲true時,在讀取HDFS中存儲的數據時,檢查表根目錄下的全部分區路徑 |
spark.sql.hive.metastorePartitionPruning | TRUE | 當爲true,spark sql的謂語將被下推到hive metastore中,更早的消除不匹配的分區,會影響到違背轉換成文件源關係的hive表 |
spark.sql.hive.manageFilesourcePartitions | TRUE | 是否使用hive metastore管理spark sql的 dataSource表分區,若爲true,dataSource表會在執行計劃期間使用分區剪枝 |
spark.sql.hive.filesourcePartitionFileCacheSize | 250 * 1024 * 1024 | 當非0時,開啓將分區文件數據元緩存到內存中,全部表共享一個緩存,當開啓 hive filesource partition management(spark.sql.hive.manageFilesourcePartitions)時纔會生效 |
spark.sql.hive.caseSensitiveInferenceMode | INFER_AND_SAVE | 設置沒法從hive表屬性讀取分區大小寫模式時所採起的操做,雖然Spice SQL自己不區分大小寫,但hive兼容的文件格式如parquet。Spark sql必須使用一個保持狀況的模式,當查詢由包含區分大小寫字段名或查詢的文件支持的任何表可能沒法返回準確的結果時。有效選項包括INFER_AND_SAVE(默認模式——從基礎數據文件推斷出區分大小寫的模式,並將其寫入表屬性),INFER_ONLY(推斷schema但不嘗試將其寫入表屬性)和NEVER_INFER(回退到使用區分大小寫間接轉移模式代替推斷) |
spark.sql.optimizer.metadataOnly | TRUE | 當爲true時,啓用僅使用表的元數據的元數據查詢優化來生成分區列,而不是表掃描。當掃描的全部列都是分區列,而且查詢具備知足不一樣語義的聚合運算符時,它適用。 |
spark.sql.columnNameOfCorruptRecord | _corrupt_record | 當json/csv數據內部列解析失敗時,失敗列的名稱 |
spark.sql.broadcastTimeout" | 5*60 | 在broadCast join時 ,廣播等待的超時時間 |
spark.sql.thriftserver.scheduler.pool | 爲JDBC客戶端會話設置公平調度程序池 | |
spark.sql.thriftServer.incrementalCollect | FALSE | 當TRUE時,啓用增量集合以在thrift server中執行 |
spark.sql.thriftserver.ui.retainedStatements | 200 | JDBC/ODBC Web用戶界面歷史記錄中SQL語句的數量 |
spark.sql.thriftserver.ui.retainedSessions | 200 | JDBC/ODBC Web UI歷史中保存的SQL客戶端會話數 |
spark.sql.sources.default | parquet | 輸入輸出默認數據元 |
spark.sql.hive.convertCTAS | FALSE | 若是時true,將使用spark.sql.sources.default.設置數據源,不指定任何存儲屬性到hive ctas語句 |
spark.sql.hive.gatherFastStats | TRUE | 在修復表分區時,將快速收集STATS(文件數量和全部文件的總大小),以免HIVE轉移子中的順序列表。 |
spark.sql.sources.partitionColumnTypeInference.enabled | TRUE | 是否自動推斷分區列的數據類型 |
spark.sql.sources.bucketing.enabled | TRUE | 當false時,分桶表看成普通表處理 |
spark.sql.crossJoin.enabled | FALSE | 當false時,若是查詢中語法笛卡兒積 卻語法中沒有顯示join,將會拋出異常 |
spark.sql.orderByOrdinal | TRUE | 當爲true時,排序字段放置到seleect List,不然被忽略 |
spark.sql.groupByOrdinal | TRUE | 當爲true時,按組子句的序號被視爲選擇列表中的位置。當爲false時,序數被忽略。 |
spark.sql.groupByAliases | TRUE | group by後的別名是否可以被用到 select list中,若爲否將拋出分析異常 |
spark.sql.sources.parallelPartitionDiscovery.threshold | 32 | 容許在driver端列出文件的最大路徑數。若是在分區發現期間檢測到的路徑的數量超過該值,則嘗試用另外一個SCAPLE分佈式做業來列出文件。這適用於parquet、ORC、CSV、JSON和LIbSVM數據源。 |
spark.sql.sources.parallelPartitionDiscovery.parallelism | 10000 | 遞歸地列出路徑集合的並行數,設置阻止文件列表生成太多任務的序號 |
spark.sql.selfJoinAutoResolveAmbiguity | TRUE | 自動解決子連接中的鏈接條件歧義,修復bug SPARK-6231 |
spark.sql.retainGroupColumns | TRUE | 是否保留分組列 |
spark.sql.pivotMaxValues | 10000 | |
spark.sql.runSQLOnFiles | TRUE | 當爲true,在sql查詢時,可以使用dataSource.path做爲表(eg:"select a,b from hdfs://xx/xx/*") |
spark.sql.codegen.wholeStage | TRUE | 當爲true,多個算子的整個stage將被便宜到一個java方法中 |
spark.sql.codegen.maxFields | 100 | 在激活整個stage codegen以前支持的最大字段(包括嵌套字段) |
spark.sql.codegen.fallback | TRUE | 當爲true,在整個stage的codegen,對於編譯generated code 失敗的query 部分,將會暫時關閉 |
spark.sql.codegen.maxCaseBranches | 20 | 支持最大的codegen |
spark.sql.files.maxPartitionBytes | 128 * 1024 * 1024 | 在讀取文件時,一個分區最大被讀取的數量,默認值=parquet.block.size |
spark.sql.files.openCostInBytes | 4 * 1024 * 1024 | 爲了測定打開一個文件的耗時,經過同時掃描配置的字節數來測定,最好是過分估計,那麼小文件的分區將比具備較大文件的分區更快(首先調度 |
spark.sql.files.ignoreCorruptFiles | FALSE | 是否自動跳過不正確的文件 |
spark.sql.files.maxRecordsPerFile | 0 | 寫入單個文件的最大條數,若是時0或者負數,則無限制 |
spark.sql.exchange.reuse | TRUE | planer是否嘗試找出重複的 exchanges並複用 |
spark.sql.streaming.stateStore.minDeltasForSnapshot | 10 | 在合併成快照以前須要生成的狀態存儲增量文件的最小數目 |
spark.sql.streaming.checkpointLocation | 檢查點數據流的查詢的默認存儲位置 | |
spark.sql.streaming.minBatchesToRetain | 100 | 流式計算最小批次長度 |
spark.sql.streaming.unsupportedOperationCheck | TRUE | streaming query的logical plan 檢查不支持的操做 |
spark.sql.variable.substitute | TRUE | |
spark.sql.codegen.aggregate.map.twolevel.enable | 啓用兩級聚合哈希映射。當啓用時,記錄將首先「插入/查找第一級、小、快的映射,而後在第一級滿或沒法找到鍵時回落到第二級、更大、較慢的映射。當禁用時,記錄直接進入第二級。默認爲真 | |
spark.sql.view.maxNestedViewDepth | 100 | 嵌套視圖中視圖引用的最大深度。嵌套視圖能夠引用其餘嵌套視圖,依賴關係被組織在有向無環圖(DAG)中。然而,DAG深度可能變得太大,致使意外的行爲。此配置限制了這一點:當分析期間視圖深度超過該值時,咱們終止分辨率以免潛在錯誤。 |
spark.sql.objectHashAggregate.sortBased.fallbackThreshold | 128 | 在ObjectHashAggregateExec的狀況下,當內存中哈希映射的大小增加過大時,咱們將回落到基於排序的聚合。此選項爲哈希映射的大小設置行計數閾值。 |
spark.sql.execution.useObjectHashAggregateExec | TRUE | 是否使用 ObjectHashAggregateExec |
spark.sql.streaming.fileSink.log.deletion | TRUE | 是否刪除文件流接收器中的過時日誌文件 |
spark.sql.streaming.fileSink.log.compactInterval | 10 | 日誌文件合併閾值,而後將全部之前的文件壓縮到下一個日誌文件中 |
spark.sql.streaming.fileSink.log.cleanupDelay | 10min | 保證一個日誌文件被全部用戶可見的時長 |
spark.sql.streaming.fileSource.log.deletion | TRUE | 是否刪除文件流源中過時的日誌文件 |
spark.sql.streaming.fileSource.log.compactInterval | 10 | 日誌文件合併閾值,而後將全部之前的文件壓縮到下一個日誌文件中 |
spark.sql.streaming.fileSource.log.cleanupDelay | 10min | 保證一個日誌文件被全部用戶可見的時長 |
spark.sql.streaming.schemaInference | FALSE | 基於文件的流,是否推斷它的模式 |
spark.sql.streaming.pollingDelay | 10L(MILLISECONDS) | 在沒有數據可用時延遲查詢新數據多長時間 |
spark.sql.streaming.noDataProgressEventInterval | 10000L(MILLISECONDS) | 在沒有數據的狀況下,在兩個進度事件之間等待時間 |
spark.sql.streaming.metricsEnabled | FALSE | 是否爲活動流查詢報告DoopWalth/CODAHALE度量 |
spark.sql.streaming.numRecentProgressUpdates | 100 | streaming query 保留的進度更新數量 |
spark.sql.statistics.ndv.maxError | 0.05 | 生成列級統計量時超對數G+++算法容許的最大估計偏差 |
spark.sql.cbo.enabled | FALSE | 在設定true時啓用CBO來估計計劃統計信息 |
spark.sql.cbo.joinReorder.enabled | FALSE | Enables join reorder in CBO. |
spark.sql.cbo.joinReorder.dp.threshold | 12 | The maximum number of joined nodes allowed in the dynamic programming algorithm |
spark.sql.cbo.joinReorder.card.weight | 0.07 | The weight of cardinality (number of rows) for plan cost comparison in join reorder: rows * weight + size * (1 - weight) |
spark.sql.cbo.joinReorder.dp.star.filter | FALSE | Applies star-join filter heuristics to cost based join enumeration |
spark.sql.cbo.starSchemaDetection | FALSE | When true, it enables join reordering based on star schema detection |
spark.sql.cbo.starJoinFTRatio | 0.9 | Specifies the upper limit of the ratio between the largest fact tables for a star join to be considered |
spark.sql.session.timeZone | TimeZone.getDefault.getID | 時間時區 |
spark.sql.windowExec.buffer.in.memory.threshold | 4096 | 窗口操做符保證存儲在內存中的行數的閾值 |
spark.sql.windowExec.buffer.spill.threshold | spark.sql.windowExec.buffer.in.memory.threshold | 窗口操做符溢出的行數的閾值 |
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold | Int.MaxValue | 由sortMergeJoin運算符保證存儲在內存中的行數的閾值 |
spark.sql.sortMergeJoinExec.buffer.spill.threshold | spark.sql.sortMergeJoinExec.buffer.in.memory.threshold | 由排序合併鏈接運算符溢出的行數的閾值 |
spark.sql.cartesianProductExec.buffer.in.memory.threshold | 4096 | 笛卡爾乘積算子保證存儲在內存中的行數的閾值 |
spark.sql.cartesianProductExec.buffer.spill.threshold | spark.sql.cartesianProductExec.buffer.in.memory.threshold | 笛卡爾乘積算子溢出的行數閾值 |
spark.sql.redaction.options.regex | "(?i)url".r |
即使join的hive表沒有10M,也沒有觸發 mapjoin[解決方案]
spark在join的時候,用來判斷一個表的大小是否達到了10M這個限制,是不會去計算這個表在hdfs上的具體的文件大小的,而是使用hive metadata中的信息,具體以下圖:正則表達式
explain出來spark的執行計劃以下:算法
== Physical Plan ==sql
*Project [device#57, pkg#58]express
+- *BroadcastHashJoin [pkg#58], [apppkg#62], Inner, BuildRightapache
:- *Filter isnotnull(pkg#58)json
: +- HiveTableScan [device#57, pkg#58], MetastoreRelation dm_sdk_mapping, device_applist, [isnotnull(day#56), (cast(day#56 as double) = 2.0180501E7)]緩存
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))session
+- *Filter isnotnull(apppkg#62)
+- HiveTableScan [apppkg#62], MetastoreRelation dm_sdk_mapping, app_category_mapping
當有些hive沒有totalSize這個信息的時候,spark就會用sortMergeJoin來作join了,可使用下面的命令從新生成metadata信息:
ANALYZE TABLE dm_sdk_mapping.app_category_mapping COMPUTE STATISTICS