注:本文來源於 Hortonworks 的 Adam Muise 在 July 23 2013 日的 Toronto Hadoop User Group 大會上的一次演講, html
本文只是稍做增刪、整理,以備忘。 java
原文請見:http://www.slideshare.net/adammuise/2013-jul-23thughivetuningdeepdive node
• Scalable SQL processing over data in Hadoop sql
• Scales to 100PB+• Structured and Unstructured data shell
Hive |
RDBMS |
SQL Interface. |
SQL Interface. |
Focus on analytics. |
May focus on online or analytics. |
No transac1ons. |
Transac1ons usually supported. |
Partition adds, no random INSERTs. In-Place updates not na1vely supported (but are possible). |
Random INSERT and UPDATE supported. |
Distributed processing via map/reduce. |
Distributed processing varies by vendor (if available). |
Scales to hundreds of nodes. |
Seldom scale beyond 20 nodes. |
Built for commodity hardware. |
OQen built on proprietary hardware (especially when scaling out). |
Low cost per petabyte. |
What’s a petabyte? ( ←_← 做者又調皮了 ‾◡◝) |
注:文中某些地方因爲 foxit 和 adobe 的bug,ti 會顯示成 1, | 如表格的第 5 行,na1vely 應是 natively,其實第 4 行的 |
• 「Joins are evil」 – Cal Henderson
– Joins should be avoided in online systems.
• Joins are unavoidable in analytics.
– Making joins fast is the key design point. 架構
• Star schemas use dimension tables small enough to fit in RAM.
• Small tables held in memory by all nodes.
• Single pass through the large table.
• Used for star-schema type joins common in DW.
app
Observa1on 1:
Sor1ng by the join key makes joins easy.
All possible matches reside in the same area on disk.
Observa1on 2:
Hash bucke1ng a join key ensures all matching values reside on the same node.
Equi-joins can then run with no shuffle. less
注:在 mapreduce 中,幾種常見的 join 方式以及示例代碼: dom
http://my.oschina.net/leejun2005/blog/82523
http://my.oschina.net/leejun2005/blog/111963
http://my.oschina.net/leejun2005/blog/95186
• Bucketing:
– Hash partition values into a configurable number of buckets.
– Usually coupled with sorting.
• Skews:
– Split values out into separate files.
– Used when certain values are frequently seen.
• Replication Factor:
– Increase replication factor to accelerate reads.
– Controlled at the HDFS layer.
• Sorting:
– Sort the values within given columns.
– Greatly accelerates query when used with ORCFilefilter
pushdown.
注:hive 本地化 mr,請參考:
http://superlxw1234.iteye.com/blog/1703546
• Built-in Formats:
– ORCFile
– RCFile
– Avro
– Delimited Text
– Regular Expression
– S3 Logfile
– Typed Bytes
• 3rd
-Party Addons:
– JSON
– XML
PS:Hive allows mixed format.
• Use Case:
– Ingest data in a write-optimized format like JSON or delimited.
– Every night, run a batch job to convert to read-optimized ORCFile.
• High Compression
– Many tricks used out-of-the-box to ensure high compression rates.
– RLE, dictionary encoding, etc.
• High Performance
– Inline indexes record value ranges within blocks of ORCFiledata.
– Filter pushdown allows efficient scanning during precise queries.
• Flexible Data Model
– All Hive types including maps, structsand unions.
CREATE TABLE sale ( id int, timestamp timestamp, productsk int, storesk int, amount decimal, state string ) STORED AS orc;
CREATE TABLE sale ( id int, timestamp timestamp, productsk int, storesk int, amount decimal, state string ) STORED AS orc tblproperties ("orc.compress"="NONE");
CREATE TABLE sale (
id int, timestamp timestamp,
productsk int, storesk int,
amount decimal, state string
) STORED AS orc;
INSERT INTO sale AS SELECT * FROM staging SORT BY productsk;
ORCFile skipping speeds queries like
WHERE productsk = X, productsk IN (Y, Z);
• Traditional solution to all RDBMS problems:
– Put an index on it!
• Doing this in Hadoop == #fail
索引能夠加快GROUP BY查詢語句的執行速度。
Hive從0.80開始,提供了一個Bitmap位圖索引,它主要適用於在一個給定的列中只有幾個值的場景。詳情見:
http://flyingdutchman.iteye.com/blog/1869876
• Hadoop:
– Really good at coordinated sequential scans.
– No random I/O. Traditional index pretty much useless.
• Keys to speed in Hadoop:
– Sorting and skipping take the place of indexing.
– Minimizing data shuffle the other key consideration.
• Skipping data:
– Divide data among different files which can be pruned out.
– Partitions, buckets and skews.
– Skip records during scans using small embedded indexes.
– Automatic when you use ORCFileformat.
– Sort data ahead of time.
– Simplifies joins and skipping becomes more effective.
• Partitioning makes queries go fast.
• You will almost always use some sort of partitioning.
• When partitioning you will use 1 or more virtual
columns.
# Notice how xdate and state are not 「real」 column names.
CREATE TABLE sale ( id int, amount decimal, ... ) partitioned by (xdate string, state string);• Virtual columns cause directories to be created in
列裁剪、分區裁剪請參考:
http://my.oschina.net/leejun2005/blog/82529
http://my.oschina.net/leejun2005/blog/82065
• By default at least one virtual column must be hardcoded.
INSERT INTO sale (xdate=‘2013-03-01’, state=‘CA’) SELECT * FROM staging_table WHERE xdate = ‘2013-03-01’ AND state = ‘CA’;
• You can load all partitions in one shot:
– set hive.exec.dynamic.partition.mode=nonstrict;
– Warning: You can easily overwhelm your cluster this way.
set hive.exec.dynamic.partition.mode=nonstrict; INSERT INTO sale (xdate, state) SELECT * FROM staging_table;
• Virtual columns must be last within the inserted data set.
• You can use the SELECT statement to re-order.
INSERT INTO sale (xdate, state=‘CA’) SELECT id, amount, other_stuff, xdate, state FROM staging_table WHERE state = ‘CA’;
• mapred.max.split.size and mapred.min.split.size
• Hive processes data in chunks subject to these bounds.
• min too large -> Too few mappers.
• max too small -> Too many mappers.
• Tune variables un6l mappers occupy:
– All map slots if you own the cluster.
– Reasonable number of map slots if you don’t.
• Example:
– set mapred.max.split.size=100000000;
– set mapred.min.split.size=1000000;
• Manual today, automa6c in future version of Hive.
• You will need to set these for most queries.
注:控制hive任務中的map數和reduce數
http://superlxw1234.iteye.com/blog/1582880
• Hive and Map/Reduce maintain some separate buffers.
• If Hive maps need lots of local memory you may need to
shrink map/reduce buffers.
• If your maps spill, try it out.
• Example:
– set io.sort.mb=100;
• All the 6me:
– set hive.op1mize.mapjoin.mapreduce=true;
– set hive.op1mize.bucketmapjoin=true;
– set hive.op1mize.bucketmapjoin.sortedmerge=true;
– set hive.auto.convert.join=true;
– set hive.auto.convert.sortmerge.join=true;
– set hive.auto.convert.sortmerge.join.nocondi1onaltask=true;
• When bucke6ng data:
– set hive.enforce.bucke1ng=true;
– set hive.enforce.sor1ng=true;
• These and more are set by default in HDP 1.3.
– Check for them in hive-site.xml
– If not present, set them in your query script
• 防止 group by 數據傾斜
– hive.groupby.skewindata=true
• 增長reduce 的jvm內存,或者進行一些參數調優,如:
mapred.child.java.opts -Xmx 1024m
• In Hive shell:
CREATE TABLE fact_pos ( txnid STRING, txntime STRING, givenname STRING, lastname STRING, postalcode STRING, storeid STRING, ind1 STRING, productid STRING, purchaseamount FLOAT, creditcard STRING ) PARTITIONED BY (part_dt STRING) CLUSTERED BY (txnid) SORTED BY (txnid) INTO 24 BUCKETS STORED AS ORC tblproperties("orc.compress"="SNAPPY");The part_dtfield is defined in the partition by clause and cannot be the same name as any other
set hive.enforce.sorting=true; set hive.enforce.bucketing=true; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set mapreduce.reduce.input.limit=-1; FROM pos_staging INSERT OVERWRITE TABLE fact_pos PARTITION (part_dt) SELECT txnid, txntime, givenname, lastname, postalcode, storeid, ind1, productid, purchaseamount, creditcard, concat(year(txntime),month(txntime)) as part_dt SORT BY productid;We use this commend to load data from our staging table into our
hadoop fs-setrep-R –w 5 /apps/hive/warehouse/fact_posIncrease the replication factor for the high performance table.
dfs.block.local-path-access.user=hdfs dfs.client.read.shortcircuit=true dfs.client.read.shortcircuit.skip.checksum=false
Short Circuit reads allow the mappers to bypass the overhead of opening a
port to the datanodeif the data is local. The permissions for the local
block files need to permit hdfsto read them (should be by default already)
See HDFS-2246 for more details.
set hive.mapred.reduce.tasks.speculative.execution=false; set io.sort.mb=300; set mapreduce.reduce.input.limit=-1; select productid, ROUND(SUM(purchaseamount),2) as total from fact_pos where part_dtbetween ‘201210’ and ‘201212’ group by productid order by total desc limit 100;
一些調優方式請參考:深刻學習《Programing Hive》:Tuning
http://flyingdutchman.iteye.com/blog/1871983
(1)Join Optimizations
• Performance Improvements in Hive 0.11:
• New Join Types added or improved in Hive 0.11:
– In-memory Hash Join: Fast for fact-to-dimension joins.
– Sort-Merge-Bucket Join: Scalable for large-table to large-table
joins.
• More Efficient Query Plan Generation
– Joins done in-memory when possible, saving map-reduce steps.
– Combine map/reduce jobs when GROUP BY and ORDER BY use
the same key.
• More Than 30x Performance Improvement for Star
Schema Join
(2)Star Schema Join Improvements in 0.11
http://my.oschina.net/leejun2005/blog/140462#OSC_h3_2
淺談SQL on Hadoop系統
http://kan.weibo.com/con/3564924249512540
摘要:
強烈推薦此文,從大數據查詢處理的本質分析了當前的SQL on Hadoop系統。
想起了以前關於數據庫研究者"MapReduce是歷史倒退"的爭論!
數據庫技術四十多年的發展,其對數據處理和優化的精髓如高級索引,
物化視圖,基於代價的優化,各類嵌套查詢 已經有很深刻的研究和經驗了~
SQL on Hadoop系統的最新進展(1)、(2)