新手入門:Spark部署實戰入門

Spark簡介
總體認識
Apache Spark是一個圍繞速度、易用性和複雜分析構建的大數據處理框架。最初在2009年由加州大學伯克利分校的AMPLab開發,並於2010年成爲Apache的開源項目之一。
Spark在整個大數據系統中處於中間偏上層的地位,以下圖,對hadoop起到了補充做用:
圖片描述
基本概念
Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。
圖片描述
第一步分割任務。首先咱們須要有一個fork類來把大任務分割成子任務,有可能子任務仍是很大,因此還須要不停的分割,直到分割出的子任務足夠小。html

第二步執行任務併合並結果。分割的子任務分別放在雙端隊列裏,而後幾個啓動線程分別從雙端隊列裏獲取任務執行。子任務執行完的結果都統一放在一個隊列裏,啓動一個線程從隊列裏拿數據,而後合併這些數據。 node

具體可參考Fork/Joingit

核心概念sql

RDD(Resilient Distributed Dataset) 彈性分佈數據集介紹 數據庫

彈性分佈式數據集(基於Matei的研究論文)或RDD是Spark框架中的核心概念。能夠將RDD視做數據庫中的一張表。其中能夠保存任何類型的數據。Spark將數據存儲在不一樣分區上的RDD之中。
RDD能夠幫助從新安排計算並優化數據處理過程。 apache

此外,它還具備容錯性,由於RDD知道如何從新建立和從新計算數據集。 編程

RDD是不可變的。你能夠用變換(Transformation)修改RDD,可是這個變換所返回的是一個全新的RDD,而原有的RDD仍然保持不變。 api

RDD支持兩種類型的操做:
o 變換(Transformation)
o 行動(Action) 緩存

變換:變換的返回值是一個新的RDD集合,而不是單個值。調用一個變換方法,不會有任何求值計算,它只獲取一個RDD做爲參數,而後返回一個新的RDD。變換函數包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。 app

行動:行動操做計算並返回一個新的值。當在一個RDD對象上調用行動函數時,會在這一時刻計算所有的數據處理查詢並返回結果值。

行動操做包括:reduce,collect,count,first,take,countByKey以及foreach。
• 共享變量(Shared varialbes)
o 廣播變量(Broadcast variables)
o 累加器(Accumulators)
• Master/Worker/Driver/Executor
圖片描述
o Master:1. 接受Worker的註冊請求,統籌記錄全部Worker的CPU、Memory等資源,並跟蹤Worker結點的活動狀態;2. 接受Driver中App的註冊請求(這個請求由Driver端的Client發出),爲App在Worker上分配CPU、Memory資源,生成後臺Executor進程;以後跟蹤Executor和App的活動狀態。

o Worker:負責接收Master的指示,爲App建立Executor進程。Worker在Master和Executor之間起着橋樑做用,實際不會參與計算工做。

o Driver:負責用戶側邏輯處理。

o Executor:負責計算,接受並執行由App劃分的Task任務,並將結果緩存在本地內存或磁盤。

Spark部署

關於Spark的部署網上相關資料不少,這裏進行概括整理

部署環境
• Ubuntu 14.04LTS
• Hadoop:2.7.0
• Java JDK 1.8
• Spark 1.6.1
• Scala 2.11.8

Hadoop安裝

因爲Spark會利用HDFS和YARN,因此須要提早配置Hadoop,配置教程能夠參考:
Setting up a Apache Hadoop 2.7 single node on Ubuntu 14.04
Hadoop安裝教程_單機/僞分佈式配置_Hadoop2.6.0/Ubuntu14.04

Spark安裝

在安裝好Hadoop的基礎上,搭建Spark,配置教程參考:

Spark快速入門指南 – Spark安裝與基礎使用

scala安裝

Scala做爲編寫Spark的源生語言,更新速度和支持狀況確定是最好的,而另外一方面Scala自己語言中對於面向對象和函數式編程兩種思想的糅合,使得該語言具備不少炫酷的語法糖,因此在使用Spark的過程當中我採用了Scala語言進行開發。

• Scala最終編譯成字節碼須要運行在JVM中,因此須要依託於jdk,須要部署jdk
• Eclipse做爲一款開發Java的IDE神器,在Scala中固然也可使用,有兩種方式:
o Eclipse->Help->Install New Software安裝Scala Plugins
o 下載官網已經提供的集成好的Scala IDE

基於以上兩步已經能夠進行Scala開發,須要用到Scala自帶的SBT編譯的同窗能夠裝下Scala官網下載地址,本人一直使用Maven進行包管理就延續Maven的使用
簡單示例:WordCount(Spark Scala)
• 開發IDE:Eclipse Scala
• 包管理:Maven
• 開發語言:Scala

建立Maven項目
圖片描述

  1. 跳過archetype項目模板的選擇

  2. 下載模板pom.xml

  3. 對maven項目添加Scala屬性:
    Right click on project -> configure - > Add Scala Nature.

  4. 調整下Scala編譯器的版本,與Spark版本對應:
    Right click on project- > Go to properties -> Scala compiler -> update Scala installation version to 2.10.5

  5. 從Build Path中移除Scala Library(因爲在Maven中添加了Spark Core的依賴項,而Spark是依賴於Scala的,Scala的jar包已經存在於Maven Dependency中):
    Right click on the project -> Build path -> Configure build path and remove Scala Library Container.

  6. 添加package包com.spark.sample
    圖片描述

  7. 建立Object WordCount和SimpleCount,用來做爲Spark的兩個簡單示例
    Spark Sample

SimpleCount.scala

package com.spark.sample

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object SimpleCount {

def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("TrySparkStreaming").setMaster("local[2]") // Create spark context
    val sc = new SparkContext(conf)
    //        val ssc = new StreamingContext(conf, Seconds(1)) // create streaming context

    val txtFile = "test"
    val txtData = sc.textFile(txtFile)
    txtData.cache()

    txtData.count()
    val wcData = txtData.flatMap { line => line.split(",") }.map { word => (word, 1) }.reduceByKey(_ + _)
    wcData.collect().foreach(println)
    
    sc.stop
}

}

WordCount.scala

package com.spark.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object WordCount {

def main(args: Array[String]) = {

    //Start the Spark context
    val conf = new SparkConf()
        .setAppName("WordCount")
        .setMaster("local")
    val sc = new SparkContext(conf)

    //Read some example file to a test RDD
    val test = sc.textFile("input.txt")

    test.flatMap { line => //for each line
        line.split(" ") //split the line in word by word.
    }.map { word => //for each word
        (word, 1) //Return a key/value tuple, with the word as key and 1 as value
    }.reduceByKey(_ + _) //Sum all of the value with same key
        .saveAsTextFile("output.txt") //Save to a text file

    //Stop the Spark context
    sc.stop
}

}

原理以下圖:
圖片描述
參考文獻:

  1. http://km.oa.com/group/2430/articles/show/181711?kmref=search&from_page=1&no=1&is_from_iso=1

  2. http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds

  3. http://www.infoq.com/cn/articles/apache-spark-introduction?utm_source=infoq_en&utm_medium=link_on_en_item&utm_campaign=item_in_other_langs

  4. http://www.infoq.com/cn/articles/apache-spark-sql

  5. http://www.infoq.com/cn/articles/apache-spark-streaming

  6. http://www.devinline.com/2016/01/apache-spark-setup-in-eclipse-scala-ide.html

  7. https://databricks.gitbooks.io/databricks-spark-reference-applications/content/

  8. http://wuchong.me/blog/2015/04/06/spark-on-hbase-new-api/

  9. http://colobu.com/2015/01/05/kafka-spark-streaming-integration-summary/

  10. http://www.devinline.com/2016/01/apache-spark-setup-in-eclipse-scala-ide.html

做者:張景龍 暢移(上海)信息科技有限公司CTO,CCFYOCSEF上海委員,京東今夜酒店特價APP技術奠定人和首任CTO,中國第一代智能手機開發者。

相關文章
相關標籤/搜索