學習連接:
https://www.shiyanlou.com/courses/809node
首先切換用戶:
su hadoop
hadoopsql
進入opt目錄
cd /optshell
第一次使用時,最好先把core-site.xml中的tmp文件位置改一下,而後格式化hdfs,
hadoop-2.6.1/bin/hdfs namenode -formatjson
啓動hadoop集羣(可經過jps判斷是否啓動成功),建立person.json並上傳到hdfs並查看
sudo vi person.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}jvm
start-all.sh
hadoop fs -mkdir /testdata
hadoop fs -put person.json /testdata
hadoop fs -cat /testdata/person.json函數
啓動spark和spark-shell(Spark On Yarn模式,jps出現Master和Worker)
spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh
spark-2.1.0-bin-hadoop2.6/bin/spark-shell --master spark://db74499714f9:7077oop
使用Dataframe:
讀取 json 文件,構造一個 untyped 弱類型的 dataframe
val df = spark.read.json("hdfs://localhost:9000/testdata/person.json")
df.show() //打印數據
df.printSchema() // 打印元數據
df.select($"name", $"age" + 1).show() // 使用表達式,scala的語法,要用$符號做爲前綴
df.select("name").show() // select操做,典型的弱類型,untyped操做
df.createOrReplaceTempView("person") // 基於dataframe建立臨時視圖
spark.sql("SELECT * FROM person").show() // 用SparkSession的sql()函數就能夠執行sql語句,默認是針對建立的臨時視圖學習
使用Dataset:
val sqlDS = Seq(1, 2, 3, 4, 5).toDS()
sqlDS.map(_*2).show()
基於已有的結構化數據文件,構造 dataset:
case class Person(name: String, age: Long)
val pds = spark.read.json("hdfs://localhost:9000/testdata/person.json").as[Person]
pds.show()
直接基於jvm object來構造dataset:
val caseDS = Seq(Person("Zhudy", 28)).toDS()
caseDS.show()
退出spark-shell
:quitui
綜合案例分析spa
編寫department.json和employee.json文件,並上傳至HDFS
department.json
{"id": 1, "name": "Tech Department"}
{"id": 2, "name": "Fina Department"}
{"id": 3, "name": "HR Department"}
employee.json
{"name": "zhangsan", "age": 26, "depId": 1, "gender": "male", "salary": 20000}
{"name": "lisi", "age": 36, "depId": 2, "gender": "female", "salary": 8500}
{"name": "wangwu", "age": 23, "depId": 1, "gender": "male", "salary": 5000}
{"name": "zhaoliu", "age": 25, "depId": 3, "gender": "male", "salary": 7000}
{"name": "marry", "age": 19, "depId": 2, "gender": "female", "salary": 6600}
{"name": "Tom", "age": 36, "depId": 1, "gender": "female", "salary": 5000}
{"name": "kitty", "age": 43, "depId": 2, "gender": "female", "salary": 6000}
hadoop fs -put department.json /testdata
hadoop fs -put employee.json /testdata
hadoop fs -cat hdfs://localhost:9000/testdata/department.json
hadoop fs -cat hdfs://localhost:9000/testdata/employee.json
加載數據
val emp = spark.read.json("hdfs://localhost:9000/testdata/employee.json")
val dep = spark.read.json("hdfs://localhost:9000/testdata/department.json")
計算每一個部門不一樣性別員工的平均薪水和年齡。將兩個表進行 join 操做才能根據部門名稱和員工性別分組再進行聚合。
emp.join(dep, $"id" === $"depId") .groupBy(dep("name"), emp("gender")).agg(avg(emp("salary")), avg(emp("age"))).show()