<iframe width="800" height="500" src="//player.bilibili.com/player.html?aid=38193405&cid=68636905&page=5" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>html
spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet").show //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+
val usersDF = spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet") usersDF.select("name", "favorite_color").write.save("hdfs://standalone.com:9000/home/liuwen/data/parquest/namesAndFavColors.parquet") spark.read.load("hdfs://m0:9000/home/liuwen/data/parquest/namesAndFavColors.parquet").show //+------+--------------+ //| name|favorite_color| //+------+--------------+ //|Alyssa| null| //| Ben| red| //+------+--------------+
spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/data/json/people.json").show //+----+-------+ //| age| name| //+----+-------+ //|null|Michael| //| 30| Andy| //| 19| Justin| //+----+-------+
spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/data/json/people.json").show //+----+-------+ //| age| name| //+----+-------+ //|null|Michael| //| 30| Andy| //| 19| Justin| //+----+-------+ //保存json格式數據到hdfs上面 ds.select("name", "age").write.format("json").save("hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json") //讀取保存的數據 spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json").show //+----+-------+ //| age| name| //+----+-------+ //|null|Michael| //| 30| Andy| //| 19| Justin| //+----+-------+
hdfs dfs -ls -R hdfs://standalone.com:9000/home/liuwen/output/json // drwxr-xr-x - liuwen supergroup 0 2018-12-18 17:44 //hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json //-rw-r--r-- 1 liuwen supergroup 0 2018-12-18 17:44 //hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json/_SUCCESS //-rw-r--r-- 1 liuwen supergroup 71 2018-12-18 17:44 //hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json/part-00000-6690fee8-33d3-413c-8364-927f02593ff2-c000.json hdfs dfs -cat hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json/* //數據在文件 namesAndAges.json/part-00000-6690fee8-33d3-413c-8364-927f02593ff2-c000.json //{"name":"Michael"} //{"name":"Andy","age":30} //{"name":"Justin","age":19} //[liuwen@standalone ~]$
val peopleDFCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("hdfs://m0:9000/home/liuwen/data/csv/people.csv") //peopleDFCsv: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] peopleDFCsv.show // +-----+---+---------+ //| name|age| job| //+-----+---+---------+ //|Jorge| 30|Developer| //| Bob| 32|Developer| //+-----+---+---------+
//保存json格式數據到hdfs上面 peopleDFCsv.select("name", "age").write.format("csv").save("hdfs://standalone.com:9000/home/liuwen/output/csv/people.csv") spark.read.format("csv").option("sep", ",").option("inferSchema", "true").option("header", "true").load("hdfs://standalone.com:9000//home/liuwen/output/csv/people.csv").show //+-----+---+ //|Jorge| 30| //+-----+---+ //| Bob| 32| //+-----+---+
hdfs dfs -ls -R hdfs://m0:9000/home/liuwen/output/csv/ //drwxr-xr-x - liuwen supergroup 0 2018-12-18 18:04 hdfs://m0:9000/home/liuwen/output/csv/people.csv //-rw-r--r-- 1 liuwen supergroup 0 2018-12-18 18:04 hdfs://m0:9000/home/liuwen/output/csv/people.csv/_SUCCESS //-rw-r--r-- 1 liuwen supergroup 16 2018-12-18 18:04 hdfs://m0:9000/home/liuwen/output/csv/people.csv/part-00000-d6ad5563-5908-4c0e-8e6f-f13cd0ff445e-c000.csv hdfs dfs -text hdfs://m0:9000/home/liuwen/output/csv/people.csv/part-00000-d6ad5563-5908-4c0e-8e6f-f13cd0ff445e-c000.csv //Jorge,30 //Bob,32
val usersDF = spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet") usersDF.show //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+ usersDF.write.format("orc").option("orc.bloom.filter.columns", "favorite_color").option("orc.dictionary.key.threshold", "1.0").save("hdfs://standalone.com:9000/home/liuwen/output/orc/users_with_options.orc")
spark.read.format("orc").load("hdfs://standalone.com:9000/home/liuwen/output/orc/users_with_options.orc").show //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+
val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet`") sqlDF.show //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+
val sqlDF = spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/output/json/employ.json") sqlDF.show //+----+-------+ //| age| name| //+----+-------+ //|null|Michael| //| 30| Andy| //| 19| Justin| sqlDF.write.saveAsTable("people_bucketed") val sqlDF2 = spark.sql("select * from people_bucketed")
val sqlDF = spark.sql("select * from people_bucketed")
val sqlDF = spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/output/json/employ.json") sqlDF.show //+----+-------+ //| age| name| //+----+-------+ //|null|Michael| //| 30| Andy| //| 19| Justin| sqlDF.write.bucketBy(42, "name").sortBy("salary") .saveAsTable("people_bucketed3") val sqlDF2 = spark.sql("select * from people_bucketed3") sqlDF2.show
val sqlDF = spark.sql("select * from people_bucketed3")
val spark = sparkSession(true) val usersDF = spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet") usersDF.show() //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+ //保存在HDFS上 hdfs://standalone.com:9000/user/liuwen/namesPartByColor.parquet usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
val sqlDF = spark.sql("select * from namesPartByColor.parquet")
import spark.implicits._ val df1 = Seq(1,2,3,5).map(x => (x,x * x)).toDF("a","b") val df2 = Seq(10,20,30,50).map(x => (x,x * x)).toDF("a","b") df1.write.parquet("data/test_table/key=1") df1.show() // +---+---+ // | a| b| // +---+---+ // | 1| 1| // | 2| 4| // | 3| 9| // | 5| 25| // +---+---+ df2.write.parquet("data/test_table/key=2") df2.show() // +---+----+ // | a| b| // +---+----+ // | 10| 100| // | 20| 400| // | 30| 900| // | 50|2500| // +---+----+ val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // root // |-- a: integer (nullable = true) // |-- b: integer (nullable = true) // |-- key: integer (nullable = true) mergedDF.show() // +---+----+---+ // | a| b|key| // +---+----+---+ // | 10| 100| 2| // | 20| 400| 2| // | 30| 900| 2| // | 50|2500| 2| // | 1| 1| 1| // | 2| 4| 1| // | 3| 9| 1| // | 5| 25| 1| // +---+----+---+
val connectionProperties = new Properties() connectionProperties.put("user","admin") connectionProperties.put("password","000000") val jdbcDF = spark.read.jdbc("jdbc:mysql://mysql.com:3306/test","test.test2",connectionProperties) jdbcDF.show()
val connectionProperties = new Properties() connectionProperties.put("user","admin") connectionProperties.put("password","000000") val jdbcDF = spark.read.jdbc("jdbc:mysql://macbookmysql.com:3306/test","test.test",connectionProperties) jdbcDF.show() jdbcDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://macbookmysql.com:3306/test","test.test3",connectionProperties)
val warehouseLocation = new File("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .master("local") // .master("spark://standalone.com:7077") .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.sql sql("CREATE database IF NOT EXISTS test_tmp") sql("use test_tmp") sql("CREATE TABLE IF NOT EXISTS student(name VARCHAR(64), age INT)") sql("INSERT INTO TABLE student VALUES ('小王', 35), ('小李', 50)")
endjava