Spark MaprLab-Auction Data分析

1、環境安裝shell

1.安裝hadoopapache

http://my.oschina.net/u/204498/blog/519789json

2.安裝spark數組


3.啓動hadoop數據結構

4.啓動sparkoop

2、測試

1.數據準備spa

從MAPR官網上下載數據DEV360DATA.zip並上傳到server上。.net

[hadoop@hftclclw0001 spark-1.5.1-bin-hadoop2.6]$ pwd
/home/hadoop/spark-1.5.1-bin-hadoop2.6

[hadoop@hftclclw0001 spark-1.5.1-bin-hadoop2.6]$ cd test-data/

[hadoop@hftclclw0001 test-data]$ pwd
/home/hadoop/spark-1.5.1-bin-hadoop2.6/test-data/DEV360Data

[hadoop@hftclclw0001 DEV360Data]$ ll
total 337940
-rwxr-xr-x 1 hadoop root    575014 Jun 24 16:18 auctiondata.csv        =>c測試用到的數據
-rw-r--r-- 1 hadoop root  57772855 Aug 18 20:11 sfpd.csv
-rwxrwxrwx 1 hadoop root 287692676 Jul 26 20:39 sfpd.json

[hadoop@hftclclw0001 DEV360Data]$ more auctiondata.csv 
8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3
8213034705,115,2.943484,davidbresler2,1,95,117.5,xbox,3
8213034705,100,2.951285,gladimacowgirl,58,95,117.5,xbox,3
8213034705,117.5,2.998947,daysrus,10,95,117.5,xbox,3
8213060420,2,0.065266,donnie4814,5,1,120,xbox,3
8213060420,15.25,0.123218,myreeceyboy,52,1,120,xbox,3
...
...

#數據結構以下
auctionid,bid,bidtime,bidder,bidrate,openbid,price,itemtype,daystolve

#把數據上傳到HDFS中
[hadoop@hftclclw0001 DEV360Data]$ hdfs dfs -mkdir -p /spark/exer/mapr
[hadoop@hftclclw0001 DEV360Data]$ hdfs dfs -put auctiondata.csv /spark/exer/mapr
[hadoop@hftclclw0001 DEV360Data]$ hdfs dfs -ls /spark/exer/mapr
Found 1 items
-rw-r--r--   2 hadoop supergroup     575014 2015-10-29 06:17 /spark/exer/mapr/auctiondata.csv

2.運行spark-shell 我用的scala.並針對如下task,進行分析scala

tasks:

a.How many items were sold?

b.How many bids per item type?

c.How many different kinds of item type?

d.What was the minimum number of bids?

e.What was the maximum number of bids?

f.What was the average number of bids?

[hadoop@hftclclw0001 spark-1.5.1-bin-hadoop2.6]$ pwd
/home/hadoop/spark-1.5.1-bin-hadoop2.6

[hadoop@hftclclw0001 spark-1.5.1-bin-hadoop2.6]$ ./bin/spark-shell 
...
...
scala >

#首先從HDFS加載數據生成RDD
scala > val originalRDD = sc.textFile("/spark/exer/mapr/auctiondata.csv")
...
...
scala > originalRDD      ==>咱們來分析下originalRDD的類型 RDD[String] 能夠看作是一條條String的數組,Array[String]
res26: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

##根據「,」把每一行分隔使用map
scala > val auctionRDD = originalRDD.map(_.split(","))
scala> auctionRDD        ==>咱們來分析下auctionRDD的類型 RDD[Array[String]] 能夠看作是String的數組,但元素依然是數組即,能夠認爲Array[Array[string]]
res17: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:23

a.How many items were sold?

 ==> val count = auctionRDD.map(bid => bid(0)).distinct().count()

根據auctionid去重便可:每條記錄根據「,」分隔,再去重,再計數

#獲取第一列,即獲取auctionid,依然用map
#能夠這麼理解下面一行,因爲auctionRDD是Array[Array[String]]那麼進行map的每一個參數類型是Array[String],因爲actionid是數組的第一位,即獲取第一個元素Array(0),注意是()不是[]
scala> val auctionidRDD = auctionRDD.map(_(0))
...
...

scala> auctionidRDD        ==>咱們來分析下auctionidRDD的類型 RDD[String] ,理解爲Array[String],即全部的auctionid的數組
res27: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at map at <console>:26

#對auctionidRDD去重
scala > val auctionidDistinctRDD=auctionidRDD.distinct()

#計數
scala > auctionidDistinctRDD.count()
...
...

b.How many bids per item type?

===> auctionRDD.map(bid => (bid(7),1)).reduceByKey((x,y) => x + y).collect()

#map每一行,獲取出第7列,即itemtype那一列,輸出(itemtype,1)
#能夠看作輸出的類型是(String,Int)的數組
scala > auctionRDD.map(bid=>(bid(7),1))
res30: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[26] at map at <console>:26
...

#reduceByKey即按照key進行reduce
#解析下reduceByKey對於相同的key, 
#(xbox,1)(xbox,1)(xbox,1)(xbox,1)...(xbox,1) ==> reduceByKey ==> (xbox,(..(((1 + 1) + 1) + ... + 1))
scala > auctionRDD.map(bid=>(bid(7),1)).reduceByKey((x,y) => x + y)
#類型依然是(String,Int)的數組 String=>itemtype Int已是該itemtype的計數總和了
res31: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at <console>:26

#經過collect() 轉換成 Array類型數組
scala > auctionRDD.map(bid=>(bid(7),1)).reduceByKey((x,y) => x + y).collect()

res32: Array[(String, Int)] = Array((palm,5917), (cartier,1953), (xbox,2784))
相關文章
相關標籤/搜索