在執行shuffle操做的時候,按照key,來進行values的數據的輸出,拉取和聚合.同一個key的values,必定是分配到一個Reduce task進行處理.html
假如多個key對應的values,總共是90萬,可是可能某個key對應了88萬條數據,key-88萬條values,分配到一個task上面去執行.sql
另外兩個task,可能各分配到了1萬條數據,多是數百個key,對應一萬條數據.session
1.你的大部分的task,都執行的特別特別快,在很短的時間就執行完了(用client模式,standalone client,yarn client,本地機器主要一執行spark-submit腳本,就會開始打印log),好比task88 finished,剩下幾個task,執行的特別慢,前面的task,通常1s能夠執行完5個,最後發現1000個task,998,999task,要執行1個小時,甚至兩個小時才能執行完一個task.app
2.運行的時候,一樣是其餘的task都很快執行完了,也沒什麼特別的問題,可是有的task,就是會忽然間,啪,報了一個OOM,JVM Out Of Memory,內存溢出了,task failed, task lost, resubmitting task.反覆提交執行了幾回都到了某個task就是跑不通,最後就會掛掉.某個task就直接OOM,那麼基本上也是由於數據傾斜了,task分配的數量實在是太大了!!!因此內存放不下,而後你的task每處理一條數據,還要建立大量的對象。內存爆掉了。dom
根據log去定位:分佈式
出現數據傾斜的緣由,基本只多是由於發生了shuffle操做,在shuffle的過程當中,出現了數據傾斜的問題,由於某個,或者某些key對應的數據,遠遠高於其餘的key.ide
1.在所寫的程序找找,哪些地方用到了回產生shuffle的算子,groupByKey、countByKey、reduceByKey、join,groupBy,repartition,cogroup,distinct,leftouterJoin函數
2.看log,log通常會報是在你的哪一行代碼,致使了OOM異常,或者呢,看log,看看是執行到了第幾個stage(stage劃分).性能
狀況一: 大數據
(避免聚合)spark算子聚合做業,其實就是groupByKey、reduceByKey,其實就是拿到每一個key對應的values;reduceByKey,其實就是對每一個key對應的values執行必定的計算。
這些有可能致使數據傾斜的操做,好比groupByKey和reduceByKey,包括以前說的join。都是在spark做業中執行的。
spark做業的數據來源一般(90%)的狀況下,數據來源都是hive表(hdfs,大數據分佈式存儲系統)。hdfs上存儲的大數據。
hive表,hive表中的數據,一般是怎麼出來的呢?有了spark之後,hive比較適合作什麼事情?hive就是適合作離線的,晚上凌晨跑的,ETL(extract transform load,數據的採集、清洗、導入),hive sql,去作這些事情,從而去造成一個完整的hive中的數據倉庫;說白了,數據倉庫,就是一堆表。
spark做業的源表(hive表),其實一般狀況下來講,也是經過某些hive etl生成的。hive etl多是晚上凌晨在那兒跑。今天跑昨天的數據。
數據傾斜,某個key對應的80萬數據,某些key對應幾百條,某些key對應幾十條;如今咱們直接在生成hive表的hive etl中,對數據進行聚合。好比按key來分組,將key對應的全部的values,所有用一種特殊的格式,拼接到一個字符串裏面去,好比「key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001」。
在hive的Hql中咱們能夠對key進行group,而後在spark中,拿到key=sessionid,values<Iterable>;hive etl中,已經對key進行了聚合。那麼也就意味着,每一個key就只對應一條數據。在spark中,就不須要再去執行groupByKey+map這種操做了。直接對每一個key對應的values字符串,map操做,進行你須要的操做便可。key,values串。
spark中,可能對這個操做,就不須要執行shffule操做了,也就根本不可能致使數據傾斜。 或者是,對每一個key在hive etl中進行聚合,對全部values聚合一下,不必定是拼接起來,多是直接進行計算。reduceByKey,計算函數,應用在hive etl中,每一個key的values。
狀況二:
(增粗聚合粒度)咱們可能沒有辦法對每一個key,就聚合出來一條數據; 那麼也能夠作一個妥協;對每一個key對應的數據,10萬條;有好幾個粒度,好比10萬條裏面包含了幾個城市、幾天、幾個地區的數據,如今放粗粒度;直接就按照城市粒度,作一下聚合,幾個城市,幾天、幾個地區粒度的數據,都給聚合起來。
好比說 city_id date area_id select ... from ... group by city_id
儘可能去聚合,減小每一個key對應的數量,也許聚合到比較粗的粒度以後,原先有10萬數據量的key,如今只有1萬數據量。減輕數據傾斜的現象和問題。
若是能夠接受某些數據,在spark做業中直接就摒棄掉,不使用,好比說,總共有100萬個key,只有2個key,是數據量達到10萬的,其餘全部的key,對應的數據量都是幾十.
這個時候,咱們本身能夠進行取捨,若是業務和需求能夠理解和接受的話,在咱們從hive表查詢元數據的時候,直接在sql中用where條件,過濾掉某幾個key.
這個幾個原先有大量數據,會致使數據傾斜的key,被過濾掉以後,在咱們的Spark做業中,天然就不會發生數據傾斜了.
將reduce task的數量,變多,就可讓每一個reduce task分配到更少的數據量,這樣的話,也許就能夠緩解,或者甚至是基本解決掉數據傾斜的問題。
具體方法就是在shuffle算子後面指定task分區數,好比val rdd2 = rdd1.reduceByKey(_+_,10)
提高shuffle reduce端並行度,怎麼來操做?
很簡單,主要給咱們全部的shuffle算子,好比groupByKey、countByKey、reduceByKey。在調用的時候,傳入進去一個參數。一個數字。那個數字,就表明了那個shuffle操做的reduce端的並行度。那麼在進行shuffle操做的時候,就會對應着建立指定數量的reduce task。
這樣的話,就可讓每一個reduce task分配到更少的數據。基本能夠緩解數據傾斜的問題。
好比說,本來某個task分配數據特別多,直接OOM,內存溢出了,程序無法運行,直接掛掉。按照log,找到發生數據傾斜的shuffle操做,給它傳入一個並行度數字,這樣的話,原先那個task分配到的數據,確定會變少。就至少能夠避免OOM的狀況,程序至少是能夠跑的。
提高shuffle reduce並行度的缺陷:
治標不治本的意思,由於,它沒有從根本上改變數據傾斜的本質和問題。不像第一個和第二個方案(直接避免了數據傾斜的發生)。原理沒有改變,只是說,儘量地去緩解和減輕shuffle reduce task的數據壓力,以及數據傾斜的問題。
一、若是最理想的狀況下,提高並行度之後,減輕了數據傾斜的問題,或者甚至可讓數據傾斜的現象忽略不計,那麼就最好。就不用作其餘的數據傾斜解決方案了。
二、不太理想的狀況下,就是好比以前某個task運行特別慢,要5個小時,如今稍微快了一點,變成了4個小時;或者是原先運行到某個task,直接OOM,如今至少不會OOM了,可是那個task運行特別慢,要5個小時才能跑完。
/** * 使用隨機key實現雙重聚合 */ /** * 第一步,給每一個key打上一個隨機數 */ JavaPairRDD<String, Long> mappedClickCategoryIdRDD = clickCategoryIdRDD.mapToPair( new PairFunction<Tuple2<Long,Long>, String, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Long> call(Tuple2<Long, Long> tuple) throws Exception { Random random = new Random(); int prefix = random.nextInt(10); return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2); } }); /** * 第二步,執行第一輪局部聚合 */ JavaPairRDD<String, Long> firstAggrRDD = mappedClickCategoryIdRDD.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }); /** * 第三步,去除掉每一個key的前綴 */ JavaPairRDD<Long, Long> restoredRDD = firstAggrRDD.mapToPair( new PairFunction<Tuple2<String,Long>, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, Long> call(Tuple2<String, Long> tuple) throws Exception { long categoryId = Long.valueOf(tuple._1.split("_")[1]); return new Tuple2<Long, Long>(categoryId, tuple._2); } }); /** * 第四步,最第二輪全局的聚合 */ JavaPairRDD<Long, Long> clickCategoryId2CountRDD = restoredRDD.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } });
1.將一個熱點的key進行加鹽(就是加上隨機的前綴)
2.而後對進行加鹽處理後的key進行reduceByKey,groupByKey等算子操做
3.去掉key的前綴.
4.重複步驟2操做,避免了一個key對應的value過多的算子操做(數據傾斜).
原理圖:
/** * reduce join轉換爲map join */ List<Tuple2<Long, Row>> userInfos = userid2InfoRDD.collect(); final Broadcast<List<Tuple2<Long, Row>>> userInfosBroadcast = sc.broadcast(userInfos); JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2PartAggrInfoRDD.mapToPair( new PairFunction<Tuple2<Long,String>, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(Tuple2<Long, String> tuple) throws Exception { // 獲得用戶信息map List<Tuple2<Long, Row>> userInfos = userInfosBroadcast.value(); Map<Long, Row> userInfoMap = new HashMap<Long, Row>(); for(Tuple2<Long, Row> userInfo : userInfos) { userInfoMap.put(userInfo._1, userInfo._2); } // 獲取到當前用戶對應的信息 String partAggrInfo = tuple._2; Row userInfoRow = userInfoMap.get(tuple._1); String sessionid = StringUtils.getFieldFromConcatString( partAggrInfo, "\\|", Constants.FIELD_SESSION_ID); int age = userInfoRow.getInt(3); String professional = userInfoRow.getString(4); String city = userInfoRow.getString(5); String sex = userInfoRow.getString(6); String fullAggrInfo = partAggrInfo + "|" + Constants.FIELD_AGE + "=" + age + "|" + Constants.FIELD_PROFESSIONAL + "=" + professional + "|" + Constants.FIELD_CITY + "=" + city + "|" + Constants.FIELD_SEX + "=" + sex; return new Tuple2<String, String>(sessionid, fullAggrInfo); } });
reduce join轉換map join,適合在什麼樣的狀況下,能夠來使用?
若是兩個Rdd要進行join,其中一個RDD是比較小的,一個RDD是100萬條數據,一個RDD是1萬數據.
其中一個RDD必須是比較小的,broadcast出去的那個小RDD的數據之後,就會在每一個executor的block manager中都駐留一份.要確保你的內存足夠放那個小RDD的數據.
這種方式下,根本不會發生shuffle操做,確定也不會發生數據傾斜;從根本上杜絕了join操做可能致使的數據傾斜的問題;
對於join中有數據傾斜的狀況,你們儘可能第一時間先考慮這種方式,效果很是好;若是某個RDD比較小的狀況下。
不適合的狀況:
兩個RDD都比較大,那麼這個時候,你去將其中一個RDD作成broadcast,就很笨拙了。極可能致使內存不足。最終致使內存溢出,程序掛掉。 並且其中某些key(或者是某個key),還發生了數據傾斜;此時能夠採用最後兩種方式。
總結:
對於join這種操做,不光是考慮數據傾斜的問題;即便是沒有數據傾斜問題,也徹底能夠優先考慮,用咱們講的這種高級的reduce join轉map join的技術,不要用普通的join,去經過shuffle,進行數據的join;徹底能夠經過簡單的map,使用map join的方式,犧牲一點內存資源;在可行的狀況下,優先這麼使用。 不走shuffle,直接走map,是否是性能也會高不少?這是確定的。
當第五種方案不能解決,就是兩個RDD都比較大的狀況下的join操做,直接針對產生數據傾斜的Key的處理方案.
方案思路:
在要進行join操做的時候,隨機採樣出10%數據,使用countByKey計算出key的數量而後sortByKey(false)倒序取出第一個key,而後對取出來的key和普通的key分別進行join操做以後再進行join操做.
這個方案的實現思路,跟你們解析一下:其實關鍵之處在於,將發生數據傾斜的key,單獨拉出來,放到一個RDD中去;就用這個本來會傾斜的key RDD跟其餘RDD,單獨去join一下,這個時候,key對應的數據,可能就會分散到多個task中去進行join操做。 就不至於說是,這個key跟以前其餘的key混合在一個RDD中時,確定是會致使一個key對應的全部數據,都到一個task中去,就會致使數據傾斜。
這種方案何時適合使用?
優先對於join,確定是但願可以採用上一個方案的,reduce join轉換map join。兩個RDD數據都比較大,那麼就不要那麼搞了。
針對你的RDD的數據,你能夠本身把它轉換成一箇中間表,或者是直接用countByKey()的方式,你能夠看一下這個RDD各個key對應的數據量;
此時若是你發現整個RDD就一個,或者少數幾個key,是對應的數據量特別多;儘可能建議,好比就是一個key對應的數據量特別多。 此時能夠採用我們的這種方案,單拉出來那個最多的key;單獨進行join,儘量地將key分散到各個task上去進行join操做。
何時不適用呢?
若是一個RDD中,致使數據傾斜的key,特別多;那麼此時,最好仍是不要這樣了;仍是使用咱們最後一個方案,終極的join數據傾斜的解決方案。
升級版作法思路:
就是說,我們單拉出來了,一個或者少數幾個可能會產生數據傾斜的key,而後還能夠進行更加優化的一個操做; 對於那個key,從另一個要join的表中,也過濾出來一份數據,好比可能就只有一條數據。userid2infoRDD,一個userid key,就對應一條數據。 而後呢,採起對那個只有一條數據的RDD,進行flatMap操做,打上100個隨機數,做爲前綴,返回100條數據。 單獨拉出來的可能產生數據傾斜的RDD,給每一條數據,都打上一個100之內的隨機數,做爲前綴。 再去進行join,是否是性能就更好了。確定能夠將數據進行打散,去進行join。join完之後,能夠執行map操做,去將以前打上的隨機數,給去掉,而後再和另一個普通RDD join之後的結果,進行union操做。
代碼實現:
https://www.cnblogs.com/gentle-awen/p/10144882.html
方案思路:
針對上面方案都沒辦法解決數據傾斜,只能使用這種.當採用隨機數和擴容表進行join解決數據傾斜的時候,就表明着,你的以前的數據傾斜的解決方案,都無法使用。 這個方案是沒辦法完全解決數據傾斜的,更多的,是一種對數據傾斜的緩解。
方案步驟:
一、選擇一個RDD,要用flatMap,進行擴容,將每條數據,映射爲多條數據,每一個映射出來的數據,都帶了一個n之內的隨機數,一般來講,會選擇10。
二、將另一個RDD,作普通的map映射操做,每條數據,都打上一個10之內的隨機數。
三、最後,將兩個處理後的RDD,進行join操做。
方案侷限性:
一、由於你的兩個RDD都很大,因此你沒有辦法去將某一個RDD擴的特別大,通常我們就是10倍。
二、若是就是10倍的話,那麼數據傾斜問題,的確是只能說是緩解和減輕,不能說完全解決。
方案代碼:
https://www.cnblogs.com/gentle-awen/p/10144893.html