記得學習編程語言時,老師直接讓我打印Hello World!。這種直接動手操做,而後看到效果的方式;比先講一大堆語法、概念更容易讓人理解,接受。java
天然而然的,詞頻統計(WordCount)就是學習分佈式計算的第一步。mysql
val master = "local"
val conf = new SparkConf().setMaster(master).setAppName("WordCount")
val sc = new SparkContext(conf)
sc.textFile("realRatings.txt")
.flatMap(_.split(","))
.map(word=>(word,1))
.reduceByKey(_+_)
.sortBy(_._1.toInt)
.collect()
.foreach(println) sql
其中數據文件是:apache
1001,1,4
1001,3,3
1001,5,4
1003,1,5
1003,3,4
1002,2,2
1002,4,3
1002,5,4
1004,2,2
1004,4,3編程
運行結果是:架構
(1,2)
(2,4)
(3,5)
(4,6)
(5,3)
(1001,3)
(1002,3)
(1003,2)
(1004,2)app
運行時出現錯誤框架
class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package;
參考解決方案http://stackoverflow.com/questions/28086520/spark-application-throws-javax-servlet-filterregistration;本地spark-hive_2.11現依賴
<artifactId>javax.servlet</artifactId> <groupId>org.eclipse.jetty.orbit</groupId>
而初始化org.apache.spark.ui.WebUI須要eclipse
<dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> <version>9.3.6.v20151106</version> </dependency>
看下任務執行流程及相關的基本概念。Application、Job、Driver、Stage、Task、RDD機器學習
RDD兩類操做方式:transformation和action
Task任務類型:ShuffleMapTask和ResultTask
伯克利大學AMPLab實驗室從2009年針對當時的Map-Reduce框架執行速度的問題。進行研發改進,到2010年開源;2013年開源到apache。如今比較成熟的組件;他的優勢就是統一一套API;可以完成離線、在線、機器學習和圖計算不一樣場景的數據處理。減低的學習成本。
而spark架構使咱們熟悉的主從結構;master節點負責接收提交了任務,管理、分配資源;worker節點負責實際任務執行。
如今在回到例子。這是實際執行流程圖:
spark load data to hive and sparkSQL(目前使用典型場景)
數據解析模板:
1 val sqlContext = new HiveContext(sc) 2 import sqlContext.implicits._ 3 4 val filePath = s"${fsdir}/flume/${event}/$today" 5 sc.textFile(filePath) 6 .map(EventInfo(_)) 7 .coalesce(load_parallelism) 8 .toDF() 9 .write.parquet(tmpPath.toUri().toString()) 10 11 sqlContext.sql("load data inpath '"+tmpPath.toUri()+s"' into table ${event_minute} partition (date='$today')")
統計邏輯模板:
1 val connectionProperties = new Properties 2 connectionProperties.put("user", user) 3 connectionProperties.put("password", pwd) 4 5 sqlContext.sql(s"select $dateNum as click_date ,eventid as event_name,label,acc,cast(count(1) as int) as click_times,cast(count(distinct deviceid) as int) as click_users "+ 6 s"from $event where date='$statDay' and (eventid in ('m_banner','m_news','m_well','m_help','m_points','m_star','m_more') or (eventid='recreate_click' and label='5' ) ) "+ 7 s"group by eventid,label,acc ") 8 .toDF() 9 .coalesce(load_parallelism) 10 .write.mode(SaveMode.Append) 11 .jdbc(url, mysqlTableName, connectionProperties)
UC Berkeley AMPLabl:https://zhuanlan.zhihu.com/p/21350352?refer=bittiger