Spark實戰--尋找5億次訪問中,訪問次數最多的人

問題描述

對於一個大型網站,用戶訪問量嚐嚐高達數十億。對於數十億是一個什麼樣的概念,咱們這裏能夠簡單的計算一下。對於一個用戶,單次訪問,咱們一般會記錄下哪些數據呢?算法

  • 一、用戶的id
  • 二、用戶訪問的時間
  • 三、用戶逗留的時間
  • 四、用戶執行的操做
  • 五、用戶的其他數據(好比IP等等)

咱們單單從用戶id來講,好比10011802330414,這個ID,那麼咱們一個id差很少就是一個long類型,由於在大量數據存儲的時候,咱們都是採用文本存儲。所以對於5億個用戶ID,徹底存儲在磁盤當中,大概是5G的大小,對於這個大小,並不能算是大數據。可是對於一個案例來講,已經很是足夠了。apache

咱們會產生一個5億條ID的數據集,咱們上面說到,這個數據集大小爲5G(不壓縮的狀況下),所以我不會在GitHub上上傳這樣一個數據集,可是咱們提供一個方法,來生成一個5億條數據。緩存

固然要解決這個問題,你能夠依然在local模式下運行項目,可是你得有足夠的磁盤空間和內存空間,大概8G磁盤空間(由於除了數據自己,spark運行過程還要產生一些臨時數據),5G內存(要進行reduceByKey)。爲了真正展現spark的特性,咱們這個案例,將會運行在spark集羣上。bash

關於如何搭建集羣,我準備在後續的章節補上。可是在網上有大量的集羣搭建教程,其中不乏一些詳細優秀的教程。固然,這節咱們不講如何搭建集羣,可是咱們仍然能夠開始咱們的案例。併發

問題分析

那麼如今咱們擁有了一個5億條數據(實際上這個數據並不以文本存儲,而是在運行的時候生成),從五億條數據中,找出訪問次數最多的人,這看起來並不難。但實際上咱們想要經過這個案例瞭解spark的真正優點。dom

5億條ID數據,首先能夠用map將其緩存到RDD中,而後對RDD進行reduceByKey,最後找出出現最多的ID。思路很簡單,所以代碼量也不會不少異步

實現

scala實現

首先是ID生成方法:大數據

RandomId.class優化

import scala.Serializable;

public class RandomId implements Serializable {

    private static final long twist(long u, long v) {
        return (((u & 0x80000000L) | (v & 0x7fffffffL)) >> 1) ^ ((v & 1) == 1 ? 0x9908b0dfL : 0);
    }
    private long[] state= new long[624];
    private int left = 1;
    public RandomId() {
        for (int j = 1; j < 624; j++) {
            state[j] = (1812433253L * (state[j - 1] ^ (state[j - 1] >> 30)) + j);
            state[j] &= 0xfffffffffL;
        }
    }
    public void next_state() {
        int p = 0;
        left = 624;
        for (int j = 228; --j > 0; p++)
            state[p] = state[p+397] ^ twist(state[p], state[p + 1]);

        for (int j=397;--j>0;p++)
            state[p] = state[p-227] ^ twist(state[p], state[p + 1]);

        state[p] = state[p-227] ^ twist(state[p], state[0]);
    }

    public long next() {
        if (--left == 0) next_state();
        return state[624-left];
    }

}

複製代碼

而後是用它生成5億條數據網站

import org.apache.spark.{SparkConf, SparkContext}

object ActiveVisitor {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")

    val sc = new SparkContext(conf)

    val list = 1 until 100000

    val id =new RandomId()

    var max = 0

    var maxId = 0L

    val lastNum = sc.parallelize(list).flatMap(num => {
      var list2 = List(id.next())
      for (i <- 1 to 50000){
        list2 = id.next() :: list2
      }
      println(num +"%")
      list2
    }).map((_,1)).reduceByKey(_+_).foreach(x => {
      if (x._2 > max){
        max = x._2
        maxId = x._1
        println(x)
      }
    })
  }

}
複製代碼

處理5億條數據

import org.apache.spark.{SparkConf, SparkContext}

object ActiveVisitor {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")

    val sc = new SparkContext(conf)

    //生成一個0-9999的列表
    val list = 1 until 10000

    val id =new RandomId()

    //這裏記錄最大的次數
    var max = 0

    //這裏記錄最大次數的ID
    var maxId = 0L

    val lastNum = sc.parallelize(list)
      //第一步生成5億條數據
      .flatMap(num => {
      //遍歷list列表
      //總共遍歷1萬次每次生成5萬個ID
      var list2 = List(id.next())
      for (i <- 1 to 50000){
        list2 = id.next() :: list2
      }
      //這裏記錄當前生成ID的百分比
      println(num/1000.0 +"%")
      
      //返回生成完成後的list
      //每次循環裏面都包含5萬個ID
      list2
    })
      //遍歷5億條數據
      //爲每條數據出現標記1
      .map((_,1))
      //對標記後的數據進行處理
      //獲得每一個ID出現的次數,即(ID,Count)
      .reduceByKey(_+_)
      //遍歷處理後的數據
      .foreach(x => {
      //將最大值存儲在max中
      if (x._2 > max){
        max = x._2
        maxId = x._1
        //若X比以前記錄的值大,則輸出該id和次數
        //最後一次輸出結果,則是出現次數最多的的ID和以及其出現的次數
        //固然出現次數最多的可能有多個ID
        //這裏只輸出一個
        println(x)
      }
    })
  }

}

複製代碼

運行獲得結果

將其提交到spark上運行,觀察日誌

1%
5000%
2%
5001%
3%
5002%
4%
5003%
5%
5004%
6%
5005%
7%
5006%
8%
5007%
9%
5008%
10%
5009%
11%
5010%
12%
5011%
5012%
13%
5013%
14%
15%
5014%

...
...
...

複製代碼

這裏是輸出的部分日誌,從日誌中,咱們顯然發現,程序是並行的。我採用的集羣由四個節點組成,每一個節點提供5G的內存空間,集羣在不一樣節點中運行,有節點分配到的分區是從1開始,而有節點則是從5000開始,所以程序並無按照咱們所想的從1%-9999%。好在未按照順序執行,也並不影響最終結果,畢竟最終要進行一個reduceByKey,纔是咱們真正須要獲得結果的地方。

再看日誌另外一部分

5634%
5635%
5636%
5637%
5638%
5639%
5640%
5641%
5642%
5643%
5644%
5645%
2019-03-05 11:52:14 INFO  ExternalSorter:54 - Thread 63 spilling in-memory map of 1007.3 MB to disk (2 times so far)
647%
648%
649%
650%
651%
652%
653%
654%
655%
656%
複製代碼

注意到這裏,spilling in-memory map of 1007.3 MB to disk,spilling操做將map中的 1007.3 MB的數據溢寫到磁盤中。這是因爲spark在處理的過程當中,因爲數據量過於龐大,所以將多的數據溢寫到磁盤,當再次用到時,會從磁盤讀取。對於實時性操做的程序來講,屢次、大量讀寫磁盤是絕對不被容許的。可是在處理大數據中,溢寫到磁盤是很是常見的操做。

事實上,在完整的日誌中,咱們能夠看到有至關一部分日誌是在溢寫磁盤的時候生成的,大概49次(這是我操做過程當中的總數)

如圖:

總共出現49條溢寫操做的日誌,每次大概是1G,這也印證了咱們5億條數據,佔據空間5G的一個說法。事實上,我曾將這5億條數據存儲在磁盤中,的確其佔據的空間是5G左右。

結果

最終,咱們能夠在日誌中看到結果。

整個過程持續了將近47min,固然在龐大的集羣中,時間可以大大縮短,要知道,咱們如今只採用了4個節點。

咱們看到了次數二、四、六、8竟然分別出現了兩次,這並不奇怪,由於集羣並行運行,異步操做,出現重複結果十分正常,固然咱們也能夠用併發機制,去處理這個現象。這個在後續的案例中,咱們會繼續優化結果。

從結果上看,咱們發現5億條數據中,出現最多的ID也僅僅出現了8次,這說明了在大量數據中,不少ID可能只出現了1次、2次。這也就是爲何最後我採用的是foreach方法去尋找最大值,而不採用以下的方法

import org.apache.spark.{SparkConf, SparkContext}

object ActiveVisitor {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")

    val sc = new SparkContext(conf)

    //生成一個0-9999的列表
    val list = 1 until 10000

    val id =new RandomId()

    //這裏記錄最大的次數
    var max = 0

    //這裏記錄最大次數的ID
    var maxId = 0L

    val lastNum = sc.parallelize(list)
      //第一步生成5億條數據
      .flatMap(num => {
      //遍歷list列表
      //總共遍歷1萬次每次生成5萬個ID
      var list2 = List(id.next())
      for (i <- 1 to 50000){
        list2 = id.next() :: list2
      }
      //這裏記錄當前生成ID的百分比
      println(num/1000.0 +"%")
      
      //返回生成完成後的list
      //每次循環裏面都包含5萬個ID
      list2
    })
      //遍歷5億條數據
      //爲每條數據出現標記1
      .map((_,1))
      //對標記後的數據進行處理
      //獲得每一個ID出現的次數,即(ID,Count)
      .reduceByKey(_+_)
      //爲數據進行排序
      //倒序
      .sortByKey(false)

    //次數最多的,在第一個,將其輸出
    println(lastNum.first())
  }

}
複製代碼

這個方法中,咱們對reduceByKey結果進行排序,輸出排序結果的第一個,即次數最大的ID。這樣作彷佛更符合咱們的要求。可是實際上,爲了獲得一樣的結果,這樣作,會消耗更多的資源。如咱們所說,不少ID啓其實只出現了一次,兩次,排序的過程當中,仍然要對其進行排序。要知道,因爲不少ID只出現一次,排序的數據集大小頗有多是數億的條目。

根據咱們對排序算法的瞭解,這樣一個龐大數據集進行排序,勢必要耗費大量資源。所以,咱們可以容忍輸出一些冗餘信息,但不影響咱們的獲得正確結果。

至此,咱們完成了5億數據中,找出最多出現次數的數據。若是感興趣,能夠嘗試用這個方法解決50億條數據,出現最多的數據條目。可是這樣作的話,你得準備好50G的空間。儘管用上述的程序,屬於閱後即焚,可是50億數據仍然會耗費大量的時間。

相關文章
相關標籤/搜索