Spark從入門到放棄---RDD

什麼是Spark?java

關於Spark具體的定義,你們能夠去閱讀官網或者百度關於Spark的詞條,在此再也不贅述。從一個野生程序猿的角度去理解,做爲大數據時代的一個準王者,Spark是一款主流的高性能分佈式計算大數據框架之一,和MapReduce,Hive,Flink等其餘大數據框架一塊兒支撐了大數據處理方案的一片天空。筆者所在的公司,集羣裏面有數千臺高配機器搭載了Spark(還有Hive和Flink),用來處理千億萬億級別的大數據。黑體字內容基本就是對Spark的一個歸納。git

什麼是RDD?sql

套用一段關於RDD的常規解釋,RDD 是 Spark 提供的最重要的抽象概念,它是一種有容錯機制的特殊數據集合,能夠分佈在集羣的結點上,以函數式操做集合的方式進行各類並行操做。通俗點來說,能夠將 RDD 理解爲一個分佈式對象集合,本質上是一個只讀的分區記錄集合。每一個 RDD 能夠分紅多個分區,每一個分區就是一個數據集片斷。一個 RDD 的不一樣分區能夠保存到集羣中的不一樣結點上,從而能夠在集羣中的不一樣結點上進行並行計算。你們聽懂了啵?apache

Again,用一個野生程序猿的話來講,RDD就是一個數據集,裏面包含着咱們要處理的千億萬億數據,相似於Java裏面的ArrayList,Python裏面的list。不一樣的是,Spark基於RDD提供了一大堆很好用的函數(算子),專門來處理大數據。編程

Next?app

做爲一我的狠話很少的野生程序猿,就喜歡生猛地直接上代碼。No BB, show you the code.框架

  

Wait.dom

思惟縝密的我,仍是得BB一句,工欲善其事必先利其器。想玩起來Spark,請先作好一下準備,如下以Windows舉例說明,Linux雷同。環境已經搭好的同窗們,請忽略這一步,直接往下看。maven

#1,備好IDE編程語言

Java/Scala,請安裝好宇宙 第二的IDE,IDEAL(全名 IntelliJ IDEA),社區版便可,無需破解。Scala須要在IDEAL的Plugins裏面,安裝Scala插件。

Python,也請安裝好世界第三的IDE,PyCharm,社區版便可,無需破解。

IDEA和PyCharm都出自於一個很厲害的軟件公司,JetBrains,這家公司以一己之力,扛起了編程界的好幾門主流語言的IDE。

 

#2,Spark

無論你們吃飯的傢伙是Java,Scacla,仍是Python,建議你們都去裝一個Python,宇宙第二的編程語言(宇宙第一的語言是PHP),太好用了。

---若是是Python,直接在命令行執行pip install pyspark,便可安裝Spark。裝好以後,Java/Scala也能夠用來操做Spark。

---若是是Java/Scala,若是你們電腦上有安裝Python,直接按照上一步操做裝好pyspark以後,Java/Scala就能夠共用。

若是老鐵們不肯意安裝Python,就須要自行去Spark官網下載相應版本,解壓後,把spark的bin路徑添加到Windows環境變量。(Windows下可能會報一個找不到null的錯誤,莫慌,須要自行下載Hadoop,以及對應版本的winutils,而後用winutils bin裏面的內容新覆蓋hadoop bin文件夾)

走到這一步,準備各作就緒。

 祭出代碼

Part I --- 測試數據

先準備點測試數據。數據包含2個字段,結構:name  score,每列用\t分割。代碼以下:

 Python版本測試數據,name長度能夠修改get_random_string參數,數據條數請根據本身電腦的配置修改loops參數。

import string
import random

file_data = 'seed'
file_save = 'result'
def get_random_string(size: int) -> str: stack = string.digits + string.ascii_letters rs = [stack[random.randrange(len(stack))] for _ in range(size)] # return ''.join(rs) def produce_seed(): loops = 100000 rs = ['{}\t{}'.format(get_random_string(4), random.randint(0, loops)) for _ in range(loops)] with open(file_data, 'w') as f: f.write('\n'.join(rs)) if __name__ == '__main__': produce_seed()

Scala/Java版本測試數據,一樣的,請老鐵們自行修改name長度和數據總行數。

import java.io.File
import java.util
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.RandomStringUtils
import scala.util.{Random}

object Course {

  val dataFile = "seed"
  val savePath = "result"

  def main(args: Array[String]): Unit = {
    produceSeed(100000)
  }

  def produceSeed(loops:Int):Unit = {
    val file = new File(dataFile)
    val data = new util.ArrayList[String]()
    var str = ""
    (1 to loops).foreach(_ => {
      str = RandomStringUtils.randomAlphanumeric(10)
      data.add(s"$str\t${Random.nextInt(loops)}")
      if (data.size()>0 && data.size() % 10000==0) {
        FileUtils.writeLines(file,"UTF-8",data,true)
        data.clear()
      }
    })
    if (data.size()>0) {
      FileUtils.writeLines(file,"UTF-8",data,true)
    }
  }
}

附上pom,

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>groupId</groupId>
    <artifactId>scala3</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11.12</scala.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
    </dependencies>
</project>

 

Part II -- 生成RDD

爲了給你們儘量多展現一些算子,下面的示例部分算子可能有些冗餘,你們能夠根據需求自行修改。

#Python版本:

需求:從源數據中找出第二列大於等於500的數據,並保存

#encoding=utf-8
import shutil
from pyspark.sql import SparkSession

file_data = 'seed'
file_save = 'result'


def remove(filename: str):
    try:
        shutil.rmtree(filename)
    except:
        pass


def rdd_sample_1():
    '''
    選出第二列 >= 500的數據,並輸出file_save
    '''
    remove(file_save)

    def map1(row):
        parts = row.split('\t')
        return parts[0], int(parts[1])

    # 若是數據很大,能夠在textFile以後,用repartition進行重分區
    rdd = sc.textFile(file_data)

    rdd1 = rdd  \
        .map(map1) \
        .filter(lambda r: r[1] >= 500) \
        .map(lambda r: '{}\t{}'.format(r[0], r[1])) \
        .coalesce(1)

    # 或者直接在map partition階段就進行挑選,效率要高一些。可是須要調大對應的內存,不然容易形成內存溢出
    def map2(iter):
        for row in iter:
            try:
                parts = row.split('\t')
                if int(parts[1]) >= 500:
                    yield row
            except Exception as e:
                print(e)
    #
    rdd2 = rdd.mapPartitions(map2).coalesce(1)

    # 你們2種方式選其一便可。這裏選擇第1種
    rdd1.saveAsTextFile(file_save)


if __name__ == '__main__':
    spark = SparkSession.builder.appName('pyspark').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    #
    rdd_sample_1()
    #
    spark.stop()

 

Scala版本:

需求同Python版本

import java.io.File
import java.util

import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.RandomStringUtils
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.util.{Random, Try}

object Course {

  val dataFile = "seed"
  val savePath = "result"

  val spark = SparkSession
    .builder()
    .appName("scala-spark")
    .master("local")
    .config("spark.sql.shuffle.partitions", "1000")
    .config("mapreduce.job.reduces",5)
    .getOrCreate()
  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {
    rddSample1()
  }

  def rddSample1(): Unit = {
    delete(savePath)
    //
    val rdd = sc.textFile(dataFile)
    // 第一種方式
    rdd.filter(_.split("\t")(1).toInt >= 500) //.saveAsTextFile(savePath)

    // 第二種方式
    rdd.map(v => {
      val parts = v.split("\t")
      if (parts(1).toInt >= 500) {
        v
      } else {""}
    })
      .filter(!_.isEmpty)//.saveAsTextFile(savePath)

    // 第三種方式,若是你機器或者內存夠大,能夠用如下方式,效率更高
    rdd.mapPartitions(iterator => {
      val rs = ListBuffer[String]()
      var parts = Array[String]()
      iterator.foreach(v => {
        parts = v.split("\t")
        if (parts(1).toInt >= 500) rs += v
      })
      //
      rs.iterator
    })
      .saveAsTextFile(savePath)

    // 以上3種方式任選其一,最後用saveAsTextFile保存便可
  }

  def produceSeed(loops:Int):Unit = {
    val file = new File(dataFile)
    val data = new util.ArrayList[String]()
    var str = ""
    (1 to loops).foreach(_ => {
      str = RandomStringUtils.randomAlphanumeric(10)
      data.add(s"$str\t${Random.nextInt(loops)}")
      if (data.size()>0 && data.size() % 10000==0) {
        FileUtils.writeLines(file,"UTF-8",data,true)
        data.clear()
      }
    })
    if (data.size()>0) {
      FileUtils.writeLines(file,"UTF-8",data,true)
    }
  }

  def delete(path:String):Try[Unit] = {
    Try(FileUtils.deleteDirectory(new File(path)))
  }
}

相關文章
相關標籤/搜索