什麼是Spark?java
關於Spark具體的定義,你們能夠去閱讀官網或者百度關於Spark的詞條,在此再也不贅述。從一個野生程序猿的角度去理解,做爲大數據時代的一個準王者,Spark是一款主流的高性能分佈式計算大數據框架之一,和MapReduce,Hive,Flink等其餘大數據框架一塊兒支撐了大數據處理方案的一片天空。筆者所在的公司,集羣裏面有數千臺高配機器搭載了Spark(還有Hive和Flink),用來處理千億萬億級別的大數據。黑體字內容基本就是對Spark的一個歸納。git
什麼是RDD?sql
套用一段關於RDD的常規解釋,RDD 是 Spark 提供的最重要的抽象概念,它是一種有容錯機制的特殊數據集合,能夠分佈在集羣的結點上,以函數式操做集合的方式進行各類並行操做。通俗點來說,能夠將 RDD 理解爲一個分佈式對象集合,本質上是一個只讀的分區記錄集合。每一個 RDD 能夠分紅多個分區,每一個分區就是一個數據集片斷。一個 RDD 的不一樣分區能夠保存到集羣中的不一樣結點上,從而能夠在集羣中的不一樣結點上進行並行計算。你們聽懂了啵?apache
Again,用一個野生程序猿的話來講,RDD就是一個數據集,裏面包含着咱們要處理的千億萬億數據,相似於Java裏面的ArrayList,Python裏面的list。不一樣的是,Spark基於RDD提供了一大堆很好用的函數(算子),專門來處理大數據。編程
Next?app
做爲一我的狠話很少的野生程序猿,就喜歡生猛地直接上代碼。No BB, show you the code.框架
Wait.dom
思惟縝密的我,仍是得BB一句,工欲善其事必先利其器。想玩起來Spark,請先作好一下準備,如下以Windows舉例說明,Linux雷同。環境已經搭好的同窗們,請忽略這一步,直接往下看。maven
#1,備好IDE編程語言
Java/Scala,請安裝好宇宙 第二的IDE,IDEAL(全名 IntelliJ IDEA),社區版便可,無需破解。Scala須要在IDEAL的Plugins裏面,安裝Scala插件。
Python,也請安裝好世界第三的IDE,PyCharm,社區版便可,無需破解。
IDEA和PyCharm都出自於一個很厲害的軟件公司,JetBrains,這家公司以一己之力,扛起了編程界的好幾門主流語言的IDE。
#2,Spark
無論你們吃飯的傢伙是Java,Scacla,仍是Python,建議你們都去裝一個Python,宇宙第二的編程語言(宇宙第一的語言是PHP),太好用了。
---若是是Python,直接在命令行執行pip install pyspark,便可安裝Spark。裝好以後,Java/Scala也能夠用來操做Spark。
---若是是Java/Scala,若是你們電腦上有安裝Python,直接按照上一步操做裝好pyspark以後,Java/Scala就能夠共用。
若是老鐵們不肯意安裝Python,就須要自行去Spark官網下載相應版本,解壓後,把spark的bin路徑添加到Windows環境變量。(Windows下可能會報一個找不到null的錯誤,莫慌,須要自行下載Hadoop,以及對應版本的winutils,而後用winutils bin裏面的內容新覆蓋hadoop bin文件夾)
走到這一步,準備各作就緒。
祭出代碼
Part I --- 測試數據
先準備點測試數據。數據包含2個字段,結構:name score,每列用\t分割。代碼以下:
Python版本測試數據,name長度能夠修改get_random_string參數,數據條數請根據本身電腦的配置修改loops參數。
import string import random
file_data = 'seed'
file_save = 'result'
def get_random_string(size: int) -> str: stack = string.digits + string.ascii_letters rs = [stack[random.randrange(len(stack))] for _ in range(size)] # return ''.join(rs) def produce_seed(): loops = 100000 rs = ['{}\t{}'.format(get_random_string(4), random.randint(0, loops)) for _ in range(loops)] with open(file_data, 'w') as f: f.write('\n'.join(rs)) if __name__ == '__main__': produce_seed()
Scala/Java版本測試數據,一樣的,請老鐵們自行修改name長度和數據總行數。
import java.io.File import java.util import org.apache.commons.io.FileUtils import org.apache.commons.lang3.RandomStringUtils import scala.util.{Random} object Course { val dataFile = "seed" val savePath = "result" def main(args: Array[String]): Unit = { produceSeed(100000) } def produceSeed(loops:Int):Unit = { val file = new File(dataFile) val data = new util.ArrayList[String]() var str = "" (1 to loops).foreach(_ => { str = RandomStringUtils.randomAlphanumeric(10) data.add(s"$str\t${Random.nextInt(loops)}") if (data.size()>0 && data.size() % 10000==0) { FileUtils.writeLines(file,"UTF-8",data,true) data.clear() } }) if (data.size()>0) { FileUtils.writeLines(file,"UTF-8",data,true) } } }
附上pom,
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>groupId</groupId> <artifactId>scala3</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.version>2.11.12</scala.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.5</version> </dependency> </dependencies> </project>
Part II -- 生成RDD
爲了給你們儘量多展現一些算子,下面的示例部分算子可能有些冗餘,你們能夠根據需求自行修改。
#Python版本:
需求:從源數據中找出第二列大於等於500的數據,並保存
#encoding=utf-8 import shutil from pyspark.sql import SparkSession file_data = 'seed' file_save = 'result' def remove(filename: str): try: shutil.rmtree(filename) except: pass def rdd_sample_1(): ''' 選出第二列 >= 500的數據,並輸出file_save ''' remove(file_save) def map1(row): parts = row.split('\t') return parts[0], int(parts[1]) # 若是數據很大,能夠在textFile以後,用repartition進行重分區 rdd = sc.textFile(file_data) rdd1 = rdd \ .map(map1) \ .filter(lambda r: r[1] >= 500) \ .map(lambda r: '{}\t{}'.format(r[0], r[1])) \ .coalesce(1) # 或者直接在map partition階段就進行挑選,效率要高一些。可是須要調大對應的內存,不然容易形成內存溢出 def map2(iter): for row in iter: try: parts = row.split('\t') if int(parts[1]) >= 500: yield row except Exception as e: print(e) # rdd2 = rdd.mapPartitions(map2).coalesce(1) # 你們2種方式選其一便可。這裏選擇第1種 rdd1.saveAsTextFile(file_save) if __name__ == '__main__': spark = SparkSession.builder.appName('pyspark').master('local[*]').getOrCreate() sc = spark.sparkContext sc.setLogLevel("ERROR") # rdd_sample_1() # spark.stop()
Scala版本:
需求同Python版本
import java.io.File import java.util import org.apache.commons.io.FileUtils import org.apache.commons.lang3.RandomStringUtils import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer import scala.util.{Random, Try} object Course { val dataFile = "seed" val savePath = "result" val spark = SparkSession .builder() .appName("scala-spark") .master("local") .config("spark.sql.shuffle.partitions", "1000") .config("mapreduce.job.reduces",5) .getOrCreate() val sc = spark.sparkContext def main(args: Array[String]): Unit = { rddSample1() } def rddSample1(): Unit = { delete(savePath) // val rdd = sc.textFile(dataFile) // 第一種方式 rdd.filter(_.split("\t")(1).toInt >= 500) //.saveAsTextFile(savePath) // 第二種方式 rdd.map(v => { val parts = v.split("\t") if (parts(1).toInt >= 500) { v } else {""} }) .filter(!_.isEmpty)//.saveAsTextFile(savePath) // 第三種方式,若是你機器或者內存夠大,能夠用如下方式,效率更高 rdd.mapPartitions(iterator => { val rs = ListBuffer[String]() var parts = Array[String]() iterator.foreach(v => { parts = v.split("\t") if (parts(1).toInt >= 500) rs += v }) // rs.iterator }) .saveAsTextFile(savePath) // 以上3種方式任選其一,最後用saveAsTextFile保存便可 } def produceSeed(loops:Int):Unit = { val file = new File(dataFile) val data = new util.ArrayList[String]() var str = "" (1 to loops).foreach(_ => { str = RandomStringUtils.randomAlphanumeric(10) data.add(s"$str\t${Random.nextInt(loops)}") if (data.size()>0 && data.size() % 10000==0) { FileUtils.writeLines(file,"UTF-8",data,true) data.clear() } }) if (data.size()>0) { FileUtils.writeLines(file,"UTF-8",data,true) } } def delete(path:String):Try[Unit] = { Try(FileUtils.deleteDirectory(new File(path))) } }