<iframe width="800" height="500" src="//player.bilibili.com/player.html?aid=38193405&cid=67137841&page=3" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>html
數據集是分佈式數據集合。數據集是Spark 1.6中添加的一個新接口,它提供了RDD的優點(強類型,使用強大的lambda函數的能力)以及Spark SQL優化執行引擎的優勢。數據集能夠從JVM對象構造,而後使用功能轉換(map,flatMap,filter等)進行操做。數據集API在Scala和Java中可用。 Python沒有對Dataset API的支持。但因爲Python的動態特性,數據集API的許多好處已經可用(即您能夠經過名稱天然地訪問行的字段row.columnName)。 R的狀況相似。
def sparkSession(isLocal:Boolean = false): SparkSession = { if(isLocal){ master = "local" val spark = SparkSession.builder .master(master) .appName(appName) .getOrCreate() //spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar") //import spark.implicits._ spark }else{ val spark = SparkSession.builder .master(master) .appName(appName) .config("spark.eventLog.enabled","true") .config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog") .config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog") .getOrCreate() // spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar") //import spark.implicits._ spark } }
val spark = sparkSession(true) //返回dataFrame val df = spark.read.textFile("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt") df.show() // +-----------+ // | value| // +-----------+ // |Michael, 29| // | Andy, 30| // | Justin, 19| // | Think, 30| // +-----------+
val spark = sparkSession(true) //返回dataFrame val df = spark.read.textFile("hdfs://standalone.com:9000/home/liuwen/data/people.txt") df.show() // +-----------+ // | value| // +-----------+ // |Michael, 29| // | Andy, 30| // | Justin, 19| // | Think, 30| // +-----------+ spark.stop()
val spark = sparkSession(true) //返回dataFrame val df = spark.read.text("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt") df.show() // +-----------+ // | value| // +-----------+ // |Michael, 29| // | Andy, 30| // | Justin, 19| // | Think, 30| // +-----------+
object Run extends BaseSparkSession{ def main(args: Array[String]): Unit = { val spark = sparkSession(true) //返回dataFrame val df = spark.read.text("hdfs://standalone.com:9000/home/liuwen/data/people.txt") df.show() // +-----------+ // | value| // +-----------+ // |Michael, 29| // | Andy, 30| // | Justin, 19| // | Think, 30| // +-----------+ spark.stop() } }
object Run1 extends BaseSparkSession{ case class Person(name: String, age: Long) def main(args: Array[String]): Unit = { val spark = sparkSession(true) import spark.implicits._ spark.read.textFile("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt") .map(line => Person(line.split(",")(0),line.split(" ")(1).trim.toLong)) .foreach( person => println(s"name:${person.name}\t age:${person.age}")) spark.stop() } }
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/a.txt") println(dataSet.first()) //first裏邊調用的是head() spark.stop()
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/a.text") println(dataSet.head()) //first裏邊調用的是head()
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/a.text") println(dataSet.head(5)) //first裏邊調用的是head()
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/a.text") println(dataSet.count())
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/a.txt") println(dataSet.collect().mkString("\n"))
val spark = sparkSession(true) val dataSet = spark.read.textFile("/home/liuwen/data/a.txt") println( dataSet.collectAsList()) import scala.collection.JavaConversions._ for( v <- dataSet.collectAsList()) println(v) spark.stop()
val spark = sparkSession(true) val dataSet = spark.read.textFile("/home/liuwen/data/a.txt") dataSet.foreach(println(_))
object Run1 extends BaseSparkSession{ case class Person(name: String, age: Long) def main(args: Array[String]): Unit = { val spark = sparkSession(true) import spark.implicits._ spark.read.textFile("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt") .map(line => Person(line.split(",")(0),line.split(" ")(1).trim.toLong)) .foreach( person => println(s"name:${person.name}\t age:${person.age}")) spark.stop() } }
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/a.text") import spark.implicits._ val lineWordLength = dataSet.map( line => line.split(" ").size) println(lineWordLength.collect().mkString("\n"))
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/word.text") /** * 統計全部行單詞個數 */ import spark.implicits._ val lineWordLength = dataSet.map( line => line.split(" ").size) val result = lineWordLength.reduce((a,b) => a + b) println(result)
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/word.big.cn.text") val result = dataSet.show() println(result)
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/word.big.cn.text") /** * 以表格的形式顯示前3行數據 * numRows是顯示前幾行的數據 */ val result = dataSet.show(3) println(result)
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/word.big.text") /** * 以表格的形式顯示前3行數據 * numRows是顯示前幾行的數據 * false 不進行返回行數據截斷 */ val result = dataSet.show(10,false) println(result)
val spark = sparkSession() val dataSet = spark.read.textFile("/home/liuwen/data/word.big.txt") val result = dataSet.take(10) //等於head(n) println(result.mkString("\n"))
val spark = sparkSession() val dataSet = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/json/people.json") dataSet.describe("name","age").show() // +-------+-------+------------------+ // |summary| name| age| // +-------+-------+------------------+ // | count| 3| 2| // | mean| null| 24.5| // | stddev| null|7.7781745930520225| // | min| Andy| 19| // | max|Michael| 30| // +-------+-------+------------------+
endjava