【讀取JSON數據源】
開啓Hadoop和YARN
cd /usr/local/hadoop
./sbin/start-dfs.sh
./sbin/start-yarn.shsql
關閉Hadoop和YARN
cd /usr/local/hadoop
./sbin/stop-dfs.sh
./sbin/stop-yarn.shshell
cat /usr/local/spark/examples/src/main/resources/people.jsonapache
啓動Spark
cd /usr/local/spark
./bin/spark-shell編程
執行以下命令導入數據源
val df = sqlContext.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
df.show()json
DataFrames 處理結構化數據的一些基本操做
df.select("name").show() // 只顯示 "name" 列
df.select(df("name"), df("age") + 1).show() // 將 "age" 加 1
df.filter(df("age") > 21).show() # 條件語句
df.groupBy("age").count().show() // groupBy 操做oop
使用 SQL 語句來進行操做
df.registerTempTable("people") // 將 DataFrame 註冊爲臨時表 people
val result = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // 執行 SQL 查詢
result.show() // 輸出結果
————————————————————————————————————————————————————————————
【讀取txt文件】
http://www.infoq.com/cn/articles/apache-spark-sql/編碼
下述代碼片斷展現了能夠在Spark Shell終端執行的Spark SQL命令
cd /usr/local/spark
./bin/spark-shellspa
// 首先用已有的Spark Context對象建立SQLContext對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)code
// 導入語句,能夠隱式地將RDD轉化成DataFrame
import sqlContext.implicits._對象
// 建立一個表示客戶的自定義類
case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String)
val dfCustomers = sc.textFile("file:///home/hadoop/customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF() // 用數據集文本文件建立一個Customer對象的DataFrame
dfCustomers.registerTempTable("customers") // 將DataFrame註冊爲一個表
dfCustomers.show() // 顯示DataFrame的內容
// 打印DF模式
dfCustomers.printSchema()
// 選擇客戶名稱列
dfCustomers.select("name").show()
// 選擇客戶名稱和城市列
dfCustomers.select("name", "city").show()
// 根據id選擇客戶
dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()
// 根據郵政編碼統計客戶數量
dfCustomers.groupBy("zip_code").count().show()
————————————————————————————————————————————————————————————
模式是經過反射而得來的。咱們也能夠經過編程的方式指定數據集的模式。這種方法在因爲數據的結構以字符串的形式編碼而沒法提早定義定製類的狀況下很是實用
// 用編程的方式指定模式
// 用已有的Spark Context對象建立SQLContext對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 建立RDD對象
val rddCustomers = sc.textFile("data/customers.txt")
// 用字符串編碼模式
val schemaString = "customer_id name city state zip_code"
// 導入Spark SQL數據類型和Row
import org.apache.spark.sql._
import org.apache.spark.sql.types._;
// 用模式字符串生成模式對象
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// 將RDD(rddCustomers)記錄轉化成Row。
val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))
// 將模式應用於RDD對象。
val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)
// 將DataFrame註冊爲表
dfCustomers.registerTempTable("customers")
// 用sqlContext對象提供的sql方法執行SQL語句。
val custNames = sqlContext.sql("SELECT name FROM customers")
// SQL查詢的返回結果爲DataFrame對象,支持全部通用的RDD操做。
// 能夠按照順序訪問結果行的各個列。
custNames.map(t => "Name: " + t(0)).collect().foreach(println)
// 用sqlContext對象提供的sql方法執行SQL語句。
val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")
// SQL查詢的返回結果爲DataFrame對象,支持全部通用的RDD操做。
// 能夠按照順序訪問結果行的各個列。
customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)
————————————————————————————————————————————————————————————
【user_behavior.txt】
cd /usr/local/hadoop
./sbin/start-dfs.sh
./sbin/start-yarn.sh
cd /usr/local/spark
./bin/spark-shell
cat /home/hadoop/test/user_behavior.txt
val df = sqlContext.read.json("file:///home/hadoop/test/user_behavior.txt")
df.show()
df.select("LBS").show()
df.groupBy("LBS").count().show()
df.groupBy("Behavior").count().show()
//使用 SQL 語句來進行操做
df.registerTempTable("behavior") // 將 DataFrame 註冊爲臨時表 behavior
val result = sqlContext.sql("SELECT Behavior FROM behavior WHERE Behavior != '{}'") // 執行 SQL 查詢埋點Behavior
result.show() // 輸出結果
val result = sqlContext.sql("SELECT * FROM behavior WHERE Behavior != '{}'") // 執行 SQL 查詢
result.show() // 輸出結果
————————————————————————————————————————————————————————————