最完整的數據傾斜解決方案(spark)

一.瞭解數據傾斜

  數據傾斜的原理:

      

      在執行shuffle操做的時候,按照key,來進行values的數據的輸出,拉取和聚合.同一個key的values,必定是分配到一個Reduce task進行處理.html

      假如多個key對應的values,總共是90萬,可是可能某個key對應了88萬條數據,key-88萬條values,分配到一個task上面去執行.sql

      另外兩個task,可能各分配到了1萬條數據,多是數百個key,對應一萬條數據.session

 

  數據傾斜的現象:

    發生數據傾斜的兩種表現: 

      1.你的大部分的task,都執行的特別特別快,在很短的時間就執行完了(用client模式,standalone client,yarn client,本地機器主要一執行spark-submit腳本,就會開始打印log),好比task88 finished,剩下幾個task,執行的特別慢,前面的task,通常1s能夠執行完5個,最後發現1000個task,998,999task,要執行1個小時,甚至兩個小時才能執行完一個task.app

      2.運行的時候,一樣是其餘的task都很快執行完了,也沒什麼特別的問題,可是有的task,就是會忽然間,啪,報了一個OOM,JVM Out Of Memory,內存溢出了,task failed, task lost, resubmitting task.反覆提交執行了幾回都到了某個task就是跑不通,最後就會掛掉.某個task就直接OOM,那麼基本上也是由於數據傾斜了,task分配的數量實在是太大了!!!因此內存放不下,而後你的task每處理一條數據,還要建立大量的對象。內存爆掉了。dom

  數據傾斜的產生緣由與定位:

      根據log去定位:分佈式

      出現數據傾斜的緣由,基本只多是由於發生了shuffle操做,在shuffle的過程當中,出現了數據傾斜的問題,由於某個,或者某些key對應的數據,遠遠高於其餘的key.ide

      1.在所寫的程序找找,哪些地方用到了回產生shuffle的算子,groupByKey、countByKey、reduceByKey、join,groupBy,repartition,cogroup,distinct,leftouterJoin函數

      2.看log,log通常會報是在你的哪一行代碼,致使了OOM異常,或者呢,看log,看看是執行到了第幾個stage(stage劃分).性能

二.解決數據傾斜的方案

  方案一:聚合源數據

       狀況一:     大數據

          (避免聚合)spark算子聚合做業,其實就是groupByKey、reduceByKey,其實就是拿到每一個key對應的values;reduceByKey,其實就是對每一個key對應的values執行必定的計算。

          這些有可能致使數據傾斜的操做,好比groupByKey和reduceByKey,包括以前說的join。都是在spark做業中執行的。

          spark做業的數據來源一般(90%)的狀況下,數據來源都是hive表(hdfs,大數據分佈式存儲系統)。hdfs上存儲的大數據。

          hive表,hive表中的數據,一般是怎麼出來的呢?有了spark之後,hive比較適合作什麼事情?hive就是適合作離線的,晚上凌晨跑的,ETL(extract transform load,數據的採集、清洗、導入),hive sql,去作這些事情,從而去造成一個完整的hive中的數據倉庫;說白了,數據倉庫,就是一堆表。

          spark做業的源表(hive表),其實一般狀況下來講,也是經過某些hive etl生成的。hive etl多是晚上凌晨在那兒跑。今天跑昨天的數據。

           數據傾斜,某個key對應的80萬數據,某些key對應幾百條,某些key對應幾十條;如今咱們直接在生成hive表的hive etl中,對數據進行聚合。好比按key來分組,將key對應的全部的values,所有用一種特殊的格式,拼接到一個字符串裏面去,好比「key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001」。

           在hive的Hql中咱們能夠對key進行group,而後在spark中,拿到key=sessionid,values<Iterable>;hive etl中,已經對key進行了聚合。那麼也就意味着,每一個key就只對應一條數據。在spark中,就不須要再去執行groupByKey+map這種操做了。直接對每一個key對應的values字符串,map操做,進行你須要的操做便可。key,values串。

          spark中,可能對這個操做,就不須要執行shffule操做了,也就根本不可能致使數據傾斜。 或者是,對每一個key在hive etl中進行聚合,對全部values聚合一下,不必定是拼接起來,多是直接進行計算。reduceByKey,計算函數,應用在hive etl中,每一個key的values

          狀況二:     

          (增粗聚合粒度)咱們可能沒有辦法對每一個key,就聚合出來一條數據; 那麼也能夠作一個妥協;對每一個key對應的數據,10萬條;有好幾個粒度,好比10萬條裏面包含了幾個城市、幾天、幾個地區的數據,如今放粗粒度;直接就按照城市粒度,作一下聚合,幾個城市,幾天、幾個地區粒度的數據,都給聚合起來。

          好比說 city_id date area_id   select ... from ... group by city_id

           儘可能去聚合,減小每一個key對應的數量,也許聚合到比較粗的粒度以後,原先有10萬數據量的key,如今只有1萬數據量。減輕數據傾斜的現象和問題。

  方案二:過濾致使傾斜的key

       若是能夠接受某些數據,在spark做業中直接就摒棄掉,不使用,好比說,總共有100萬個key,只有2個key,是數據量達到10萬的,其餘全部的key,對應的數據量都是幾十.

      這個時候,咱們本身能夠進行取捨,若是業務和需求能夠理解和接受的話,在咱們從hive表查詢元數據的時候,直接在sql中用where條件,過濾掉某幾個key.

      這個幾個原先有大量數據,會致使數據傾斜的key,被過濾掉以後,在咱們的Spark做業中,天然就不會發生數據傾斜了.

  方案三:提升shuffle操做的reduce並行度

      

      

      將reduce task的數量,變多,就可讓每一個reduce task分配到更少的數據量,這樣的話,也許就能夠緩解,或者甚至是基本解決掉數據傾斜的問題。

      具體方法就是在shuffle算子後面指定task分區數,好比val rdd2 = rdd1.reduceByKey(_+_,10)      

      提高shuffle reduce端並行度,怎麼來操做?

          很簡單,主要給咱們全部的shuffle算子,好比groupByKey、countByKey、reduceByKey。在調用的時候,傳入進去一個參數。一個數字。那個數字,就表明了那個shuffle操做的reduce端的並行度。那麼在進行shuffle操做的時候,就會對應着建立指定數量的reduce task。

          這樣的話,就可讓每一個reduce task分配到更少的數據。基本能夠緩解數據傾斜的問題。

          好比說,本來某個task分配數據特別多,直接OOM,內存溢出了,程序無法運行,直接掛掉。按照log,找到發生數據傾斜的shuffle操做,給它傳入一個並行度數字,這樣的話,原先那個task分配到的數據,確定會變少。就至少能夠避免OOM的狀況,程序至少是能夠跑的。

      提高shuffle reduce並行度的缺陷:      

          治標不治本的意思,由於,它沒有從根本上改變數據傾斜的本質和問題。不像第一個和第二個方案(直接避免了數據傾斜的發生)。原理沒有改變,只是說,儘量地去緩解和減輕shuffle reduce task的數據壓力,以及數據傾斜的問題。 

           一、若是最理想的狀況下,提高並行度之後,減輕了數據傾斜的問題,或者甚至可讓數據傾斜的現象忽略不計,那麼就最好。就不用作其餘的數據傾斜解決方案了。

          二、不太理想的狀況下,就是好比以前某個task運行特別慢,要5個小時,如今稍微快了一點,變成了4個小時;或者是原先運行到某個task,直接OOM,如今至少不會OOM了,可是那個task運行特別慢,要5個小時才能跑完。

  方案四:使用隨機key實現雙重group聚合方案

 

		/**
		 * 使用隨機key實現雙重聚合
		 */
		
		/**
		 * 第一步,給每一個key打上一個隨機數
		 */
		JavaPairRDD<String, Long> mappedClickCategoryIdRDD = clickCategoryIdRDD.mapToPair(
				
				new PairFunction<Tuple2<Long,Long>, String, Long>() {

					private static final long serialVersionUID = 1L;
		
					@Override
					public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
							throws Exception {
						Random random = new Random();
						int prefix = random.nextInt(10);
						return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
					}
					
				});
		
		/**
		 * 第二步,執行第一輪局部聚合
		 */
		JavaPairRDD<String, Long> firstAggrRDD = mappedClickCategoryIdRDD.reduceByKey(
				
				new Function2<Long, Long, Long>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Long call(Long v1, Long v2) throws Exception {
						return v1 + v2;
					}
					
				});
		
		/**
		 * 第三步,去除掉每一個key的前綴
		 */
		JavaPairRDD<Long, Long> restoredRDD = firstAggrRDD.mapToPair(
				
				new PairFunction<Tuple2<String,Long>, Long, Long>() {

					private static final long serialVersionUID = 1L;
		
					@Override
					public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
							throws Exception {
						long categoryId = Long.valueOf(tuple._1.split("_")[1]);  
						return new Tuple2<Long, Long>(categoryId, tuple._2);  
					}
					
				});
		
		/**
		 * 第四步,最第二輪全局的聚合
		 */
		JavaPairRDD<Long, Long> clickCategoryId2CountRDD = restoredRDD.reduceByKey(
				
				new Function2<Long, Long, Long>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Long call(Long v1, Long v2) throws Exception {
						return v1 + v2;
					}
					
				});

 

       1.將一個熱點的key進行加鹽(就是加上隨機的前綴)

       2.而後對進行加鹽處理後的key進行reduceByKey,groupByKey等算子操做

      3.去掉key的前綴.

      4.重複步驟2操做,避免了一個key對應的value過多的算子操做(數據傾斜).

  方案五:將reduce join 轉換爲map join

      原理圖:

 

        /**
         * reduce join轉換爲map join
         */
        
        List<Tuple2<Long, Row>> userInfos = userid2InfoRDD.collect();
        final Broadcast<List<Tuple2<Long, Row>>> userInfosBroadcast = sc.broadcast(userInfos);
        
        JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2PartAggrInfoRDD.mapToPair(
                
                new PairFunction<Tuple2<Long,String>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                            throws Exception {
                        // 獲得用戶信息map
                        List<Tuple2<Long, Row>> userInfos = userInfosBroadcast.value();
                        
                        Map<Long, Row> userInfoMap = new HashMap<Long, Row>();
                        for(Tuple2<Long, Row> userInfo : userInfos) {
                            userInfoMap.put(userInfo._1, userInfo._2);
                        }
                        
                        // 獲取到當前用戶對應的信息
                        String partAggrInfo = tuple._2;
                        Row userInfoRow = userInfoMap.get(tuple._1);
                        
                        String sessionid = StringUtils.getFieldFromConcatString(
                                partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
                        
                        int age = userInfoRow.getInt(3);
                        String professional = userInfoRow.getString(4);
                        String city = userInfoRow.getString(5);
                        String sex = userInfoRow.getString(6);
                        
                        String fullAggrInfo = partAggrInfo + "|"
                                + Constants.FIELD_AGE + "=" + age + "|"
                                + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
                                + Constants.FIELD_CITY + "=" + city + "|"
                                + Constants.FIELD_SEX + "=" + sex;
                        
                        return new Tuple2<String, String>(sessionid, fullAggrInfo);
                    }
                    
                });

       reduce join轉換map join,適合在什麼樣的狀況下,能夠來使用?

          若是兩個Rdd要進行join,其中一個RDD是比較小的,一個RDD是100萬條數據,一個RDD是1萬數據.

          其中一個RDD必須是比較小的,broadcast出去的那個小RDD的數據之後,就會在每一個executor的block manager中都駐留一份.要確保你的內存足夠放那個小RDD的數據.

          這種方式下,根本不會發生shuffle操做,確定也不會發生數據傾斜;從根本上杜絕了join操做可能致使的數據傾斜的問題;

          對於join中有數據傾斜的狀況,你們儘可能第一時間先考慮這種方式,效果很是好;若是某個RDD比較小的狀況下。

      不適合的狀況:    

          兩個RDD都比較大,那麼這個時候,你去將其中一個RDD作成broadcast,就很笨拙了。極可能致使內存不足。最終致使內存溢出,程序掛掉。 並且其中某些key(或者是某個key),還發生了數據傾斜;此時能夠採用最後兩種方式。

      總結:   

          對於join這種操做,不光是考慮數據傾斜的問題;即便是沒有數據傾斜問題,也徹底能夠優先考慮,用咱們講的這種高級的reduce join轉map join的技術,不要用普通的join,去經過shuffle,進行數據的join;徹底能夠經過簡單的map,使用map join的方式,犧牲一點內存資源;在可行的狀況下,優先這麼使用。 不走shuffle,直接走map,是否是性能也會高不少?這是確定的。

  方案六:sample採樣傾斜key進行兩次join

      

      當第五種方案不能解決,就是兩個RDD都比較大的狀況下的join操做,直接針對產生數據傾斜的Key的處理方案.

      方案思路:

        在要進行join操做的時候,隨機採樣出10%數據,使用countByKey計算出key的數量而後sortByKey(false)倒序取出第一個key,而後對取出來的key和普通的key分別進行join操做以後再進行join操做.

        這個方案的實現思路,跟你們解析一下:其實關鍵之處在於,將發生數據傾斜的key,單獨拉出來,放到一個RDD中去;就用這個本來會傾斜的key RDD跟其餘RDD,單獨去join一下,這個時候,key對應的數據,可能就會分散到多個task中去進行join操做。 就不至於說是,這個key跟以前其餘的key混合在一個RDD中時,確定是會致使一個key對應的全部數據,都到一個task中去,就會致使數據傾斜。

      這種方案何時適合使用?

        優先對於join,確定是但願可以採用上一個方案的,reduce join轉換map join。兩個RDD數據都比較大,那麼就不要那麼搞了。

        針對你的RDD的數據,你能夠本身把它轉換成一箇中間表,或者是直接用countByKey()的方式,你能夠看一下這個RDD各個key對應的數據量;

        此時若是你發現整個RDD就一個,或者少數幾個key,是對應的數據量特別多;儘可能建議,好比就是一個key對應的數據量特別多。 此時能夠採用我們的這種方案,單拉出來那個最多的key;單獨進行join,儘量地將key分散到各個task上去進行join操做。

      何時不適用呢?

         若是一個RDD中,致使數據傾斜的key,特別多;那麼此時,最好仍是不要這樣了;仍是使用咱們最後一個方案,終極的join數據傾斜的解決方案。

       升級版作法思路:

        就是說,我們單拉出來了,一個或者少數幾個可能會產生數據傾斜的key,而後還能夠進行更加優化的一個操做; 對於那個key,從另一個要join的表中,也過濾出來一份數據,好比可能就只有一條數據。userid2infoRDD,一個userid key,就對應一條數據。 而後呢,採起對那個只有一條數據的RDD,進行flatMap操做,打上100個隨機數,做爲前綴,返回100條數據。 單獨拉出來的可能產生數據傾斜的RDD,給每一條數據,都打上一個100之內的隨機數,做爲前綴。 再去進行join,是否是性能就更好了。確定能夠將數據進行打散,去進行join。join完之後,能夠執行map操做,去將以前打上的隨機數,給去掉,而後再和另一個普通RDD join之後的結果,進行union操做。

      代碼實現:

        https://www.cnblogs.com/gentle-awen/p/10144882.html

  方案七:使用隨機數以及擴容表進行join

      

      方案思路:

        針對上面方案都沒辦法解決數據傾斜,只能使用這種.當採用隨機數和擴容表進行join解決數據傾斜的時候,就表明着,你的以前的數據傾斜的解決方案,都無法使用。 這個方案是沒辦法完全解決數據傾斜的,更多的,是一種對數據傾斜的緩解。

      方案步驟:

        一、選擇一個RDD,要用flatMap,進行擴容,將每條數據,映射爲多條數據,每一個映射出來的數據,都帶了一個n之內的隨機數,一般來講,會選擇10。

        二、將另一個RDD,作普通的map映射操做,每條數據,都打上一個10之內的隨機數。

        三、最後,將兩個處理後的RDD,進行join操做。

      方案侷限性:

        一、由於你的兩個RDD都很大,因此你沒有辦法去將某一個RDD擴的特別大,通常我們就是10倍。

        二、若是就是10倍的話,那麼數據傾斜問題,的確是只能說是緩解和減輕,不能說完全解決。

      方案代碼:

        https://www.cnblogs.com/gentle-awen/p/10144893.html

相關文章
相關標籤/搜索