Spark小試牛刀

Spark小試牛刀

隨着項目的運營,收集了不少的用戶數據。最近業務上想作些社交圖譜相關的產品,但由於數據不少、很雜,傳統的數據庫查詢已經知足不了業務的需求。 試着用Spark來作,權當練練手了。java

安裝Spark

由於有Scala的開發經驗,因此就不用官方提供的二進制包了,自編譯scala 2.11版本。mongodb

下載Spark:http://ftp.cuhk.edu.hk/pub/packages/apache.org/spark/spark-1.5.0/spark-1.5.0.tgzshell

tar zxf spark-1.5.0.tgz
cd spark-1.5.0
./dev/change-scala-version.sh 2.11
mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package

以上命令完成Spark基於scala 2.11版本的編譯。能夠運行自帶的一個示例程序來驗證安裝是否成功。數據庫

./bin/run-example SparkPi

編寫Standalone application

使用sbt來構建一個可提交的簡單Spark程序,功能是計算每一個用戶加入的羣組,並把結果保存下來。project/Build.scala配置文件以下:apache

import _root_.sbt.Keys._
import _root_.sbt._
import sbtassembly.AssemblyKeys._

object Build extends Build {

  override lazy val settings = super.settings :+ {
    shellPrompt := (s => Project.extract(s).currentProject.id + " > ")
  }

  lazy val root = Project("spark-mongodb", file("."))
    .settings(
      scalaVersion := "2.11.7",
      assemblyJarName in assembly := "spark-mongodb.jar",
      assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false),
      libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % verSpark % "scopeProvidedTest,
        "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.0" excludeAll(
            ExclusionRule(organization = "javax.servlet"), 
            ExclusionRule(organization = "commons-beanutils"), 
            ExclusionRule(organization = "org.apache.hadoop")))
    )
  
  private val scopeProvidedTest = "provided,test"
  private val verSpark = "1.5.0"
}

數據存儲在MongoDB數據庫中,因此咱們還須要使用mongo-hadoop鏈接器來訪問MongoDB數據庫。app

示例程序

示例程序很是的簡單,把數據從數據庫裏所有讀出,使用map來把每條記錄裏用戶ID對應加入的羣組ID轉換成一個Set,再使用 reduceByKey來把相同用戶ID的set合併到一塊兒,存入數據庫便可。ide

import com.mongodb.BasicDBObject
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BSONObject

import scala.collection.JavaConverters._

object QQGroup {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("QQGroup")
    val sc = new SparkContext(sparkConf)

    val inputConfig = new Configuration()
    inputConfig.set("mongo.input.uri", "mongodb://192.168.31.121:27017/db.userGroup")
    inputConfig.set("mongo.input.fields", """{"userId":1, "groupId":1, "_id":0}""")
    inputConfig.set("mongo.input.noTimeout", "true")

    val documentRDD = sc.newAPIHadoopRDD(
      inputConfig,
      classOf[MongoInputFormat],
      classOf[Object],
      classOf[BSONObject])

    val userRDD = documentRDD.map { case (_, doc) =>
      (getValue(doc, "userId"), getValue(doc, "groupId"))
    }.reduceByKey(_ ++ _)

    val resultRDD = userRDD.map { case (userId, groupIds) =>
      val o = new BasicDBObject()
      o.put("groupIds", groupIds.asJava)
      userId -> o
    }

    val outputConfig = new Configuration()
    outputConfig.set("mongo.output.uri", "mongodb://192.168.31.121:27017/db_result.userGroup")

    resultRDD.saveAsNewAPIHadoopFile(
      "file://this-is-completely-unused",
      classOf[Object],
      classOf[BSONObject],
      classOf[MongoOutputFormat[Object, BSONObject]],
      outputConfig)
  }

  def getValue(dbo: BSONObject, key: String) = {
    val value = dbo.get(key)
    if (value eq null) "" else value.asInstanceOf[String]
  }
}

MongoDB官方提供了Hadoop鏈接器,Spark可使用mongo-hadoop鏈接器來讀、寫MongoDB數據庫。 主要的輸入配置薦有:oop

  • mongo.input.uri: MongoDB的鏈接URI
  • mongo.input.fields: 指定返回哪些數據,與db.query裏的第2個參數功能同樣
  • mongo.input.query: MongoDB的查詢參數

相應的MongoDB也提供了一系列的輸出參數,如:ui

  • mongo.output.uri: MongoDB的鏈接URI

sc.newAPIHadoopRDD()方法有4個參數,分別爲:配置、輸入格式化類、待映射數據主鍵類型、待映射數據類型。this

主要的操做代碼:

val userRDD = documentRDD.map { case (_, doc) =>
      (getValue(doc, "userId"), Set(getValue(doc, "groupId")))
    }.reduceByKey(_ ++ _)

    val resultRDD = userRDD.map { case (userId, groupIds) =>
      val o = new BasicDBObject()
      o.put("groupIds", groupIds.asJava)
      userId -> o
    }

先使用map方法獲取userIdgroupId,並把groupId轉換爲一個Set

在把數據轉換成Tuple2,就是一個KV的形式之後,咱們就能夠調用一系列的轉換方法來對RDD進行操做,這裏使用reduceByKey方法來將同一個userId的因此value都合併在一塊兒。這樣咱們就有了全部用戶對應加入的羣組 的一個RDD集了。

(RDD上有兩種類型的操做。一種是「變換」,它只是描述了待進行的操做指令,並不會觸發實際的計算;另外一種是「動做」, 它將觸發實際的計算動做,這時候系統纔會實際的從數據源讀入數據,操做內存,保存數據等)

最後使用resultRDD.saveAsNewAPIHadoopFile()方法來把計算結果存入MongoDB,這裏的一個參數:用於指定 HDFS的存儲位置並不會使用到,由於mongo-hadoop將會使用mongo.output.uri指定的存儲URI鏈接地址來保存數據。

相關文章
相關標籤/搜索