Apache Spark 入門 (DataFrame+Hive+SparkSQL)

layout: post

http://spark.apache.org/docs/latest/quick-start.html| 0f41ff82e270667d9fadbc467533cee31 |html

Spark Shell

➜ spark-1.4.0-bin-hadoop2.6 ./bin/spark-shelljava

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_25)
15/06/28 10:36:07 INFO ui.SparkUI: Started SparkUI at http://127.0.0.1:4040
15/06/28 10:36:07 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
15/06/28 10:36:08 INFO hive.HiveContext: Initializing execution hive, version 0.13.1
15/06/28 10:36:23 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

Basic RDD Operation

第一個例子: 統計一個文本文件的單詞數量.
調用sc的textFile(fileName)會生成一個MapPartitionsRDDmysql

scala> val textFile = sc.textFile("README.md")
15/06/28 10:36:45 INFO storage.MemoryStore: ensureFreeSpace(63424) called with curMem=0, maxMem=278019440
15/06/28 10:36:45 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.9 KB, free 265.1 MB)
15/06/28 10:36:45 INFO storage.MemoryStore: ensureFreeSpace(20061) called with curMem=63424, maxMem=278019440
15/06/28 10:36:45 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.6 KB, free 265.1 MB)
15/06/28 10:36:45 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58638 (size: 19.6 KB, free: 265.1 MB)
15/06/28 10:36:45 INFO spark.SparkContext: Created broadcast 0 from textFile at <console>:21
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

調用上面生成的textFile RDD的count()會觸發一個Action.es6

scala> textFile.count()
java.net.ConnectException: Call From hadoop/127.0.0.1 to localhost:9000 failed on connection exception: java.net.ConnectException: 拒絕鏈接; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    ...
Caused by: java.net.ConnectException: 拒絕鏈接
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    ...

因爲本機已經安裝了Hadoop,使用的是僞分佈式模式,因此Spark會讀取Hadoop的配置信息.
咱們這裏先不啓動Hadoop,使用本地模式,要手動添加file:///並使用絕對路徑讀取文本文件web

scala> textFile
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

從新構造讀取本地文本文件的textFile RDDsql

scala> val textFile = sc.textFile("file:///home/hadoop/soft/spark-1.4.0-bin-hadoop2.6/README.md")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21

觸發RDD的Action: countshell

scala> textFile.count()
15/06/28 10:44:07 INFO scheduler.DAGScheduler: Job 0 finished: count at <console>:24, took 0.275609 s
res2: Long = 98

又一個Action RDD : 輸出文本文件的第一行apache

scala> textFile.first()
15/06/28 10:44:27 INFO scheduler.DAGScheduler: Job 1 finished: first at <console>:24, took 0.017917 s
res3: String = # Apache Spark

More RDD Operations

1.統計包含了Spark這個單詞一共有幾行bootstrap

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:23
scala> textFile.filter(line => line.contains("Spark")).count()

2.文本文件中長度最長的那一行,它一共有多少個單詞api

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

3.MapReduce WordCount

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:23

scala>  wordCounts.collect()
res6: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (systems.,1...

4.Cache

scala>  linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[4] at filter at <console>:23

scala> linesWithSpark.count()
15/06/28 10:47:11 INFO scheduler.DAGScheduler: Job 5 finished: count at <console>:26, took 0.054036 s
res8: Long = 19

scala> linesWithSpark.count()
15/06/28 10:47:14 INFO scheduler.DAGScheduler: Job 6 finished: count at <console>:26, took 0.016638 s
res9: Long = 19
val textFile = sc.textFile("file:///home/hadoop/soft/spark-1.4.0-bin-hadoop2.6/README.md")
〇 textFile.count()
① textFile.first()
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
② textFile.filter(line => line.contains("Spark")).count()
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
③ val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
④ wordCounts.collect()
linesWithSpark.cache()
⑤ linesWithSpark.count()
⑥ linesWithSpark.count()
⑦ linesWithSpark.count()

Spark Shell UI

http://127.0.0.1:4040

Jobs, Stages, Storage

Jobs: 上面每一個Action RDD編號對應了下圖中的Job Id.

Stages: 上面有8個Job, 可是Stages多了一個. 實際上是④的collect有兩個stage

Storage: 在Cache的時候纔有

查看Stage

在Jobs中點擊Job Id=4的collect RDD(輸出WordCount的結果). 在下方的列表中能夠看到有2個Stages
仔細觀察列表的最後面兩列, 分別是Shuffle Read和Shuffle Write.
其中map會進行Shuffle Write, collect會進行Shuffle Read

點擊Stage Id=4的map. 它的DAG可視化圖和上面的概覽圖的左側是同樣的

Spark的WebUI還提供了一個EventTime,能夠很清楚地看到每一個階段消耗的時間

回退,點擊Stage Id=5的collect


Spark Standalone 集羣安裝

準備工做:

1.master無密碼ssh到slaves(將master的pub追加到全部slaves的authorized_keys)
2.關閉全部節點的防火牆(chkconfig iptables off)
3.安裝scala-2.10,並設置~/.bashrc

cd $SPARK_HOME
vi conf/spark-env.sh

export JAVA_HOME=/usr/java/jdk1.7.0_51
export SCALA_HOME=/usr/install/scala-2.10.5
export HADOOP_HOME=/usr/install/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_MASTER_IP=dp0652
export MASTER=spark://dp0652:7077
#export SPARK_LOCAL_IP=dp0652
export SPARK_LOCAL_DIRS=/usr/install/spark-1.4.0-bin-hadoop2.6
export SPARK_MASTER_WEBUI_PORT=8082
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=8g

vi conf/slaves

dp0652
dp0653
dp0655
dp0656
dp0657

將spark目錄分發到集羣的其餘節點

cd ..
scp -r $SPARK_HOME dp0653:/usr/install
scp -r $SPARK_HOME dp0655:/usr/install
scp -r $SPARK_HOME dp0656:/usr/install
scp -r $SPARK_HOME dp0657:/usr/install

因爲集羣中dp0652和dp0653的內存比較大, 咱們修改了這兩個節點的spark-env.sh

export SPARK_WORKER_INSTANCES=2
export SPARK_WORKER_MEMORY=20g

啓動集羣, 在master上啓動便可.

[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.master.Master-1-dp0652.out
dp0656: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0656.out
dp0655: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0655.out
dp0657: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0657.out
dp0652: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0652.out
dp0653: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0653.out
dp0652: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-2-dp0652.out
dp0653: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-2-dp0653.out

在master和slaves上查看Spark進程

[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ jps -lm
40708 org.apache.spark.deploy.master.Master --ip dp0652 --port 7077 --webui-port 8082
41095 org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://dp0652:7077
40926 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ ssh dp0653
Last login: Thu Jul  2 09:07:17 2015 from 192.168.6.140
[qihuang.zheng@dp0653 ~]$ jps -lm
27153 org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://dp0652:7077
27029 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077
[qihuang.zheng@dp0653 ~]$ exit
logout
Connection to dp0653 closed.
[qihuang.zheng@dp0652 logs]$ ssh dp0655
Last login: Thu Jul  2 08:55:05 2015 from 192.168.6.140
[qihuang.zheng@dp0655 ~]$ jps -lm
8766 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077
[qihuang.zheng@dp0655 ~]$

在master上查看web ui: http://dp0652:8082/

遇到一些問題

1.若是配置了SPARK_LOCAL_IP, 可是並無在slaves上修改成本身的IP,則會報錯:

15/07/02 09:04:08 ERROR netty.NettyTransport: failed to bind to /192.168.6.52:0, shutting down Netty transport
Exception in thread "main" java.net.BindException: Failed to bind to: /192.168.6.52:0: Service 'sparkWorker' failed after 16 retries!
        at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
        at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
        at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
        at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
        at scala.util.Try$.apply(Try.scala:161)
        at scala.util.Success.map(Try.scala:206)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
        at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
        at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
        at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/07/02 09:04:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/07/02 09:04:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/07/02 09:04:09 INFO util.Utils: Shutdown hook called

緣由分析: SPARK_LOCAL_IP指的是本機IP地址,所以分發到集羣的不一樣節點上,都要到各自的節點修改成本身的IP地址.
若是集羣節點比較多,則比較麻煩, 能夠用SPARK_LOCAL_DIRS代替.

2.若是沒有配置export MASTER, 在worker上會報錯:

5/07/02 08:40:51 INFO worker.Worker: Retrying connection to master (attempt # 12)
15/07/02 08:40:51 INFO worker.Worker: Connecting to master akka.tcp://sparkMaster@dp0652:7077/user/Master...
15/07/02 08:40:51 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@dp0652:7077].
Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.
Reason: 拒絕鏈接: dp0652/192.168.6.52:7077
15/07/02 08:41:23 ERROR worker.Worker: RECEIVED SIGNAL 15: SIGTERM
15/07/02 08:41:23 INFO util.Utils: Shutdown hook called

致使的後果是雖然slaves上都啓動了Worker進程(使用jps查看),可是在Master上並無看到workers. 這時候應該查看Master上的日誌.
master上啓動成功顯示的日誌是spark@dp0652:7077. 而上面卻顯示的是sparkMaster@dp0652:7077. 因此應該手動export MASTER

3.最後成功啓動集羣, 在Master上的日誌:

Spark Command: /usr/java/jdk1.7.0_51/bin/java -cp /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../conf/:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/install/hadoop/etc/hadoop/:/usr/install/hadoop/etc/hadoop/ -Xms512m -Xmx512m -XX:MaxPermSize=128m org.apache.spark.deploy.master.Master --ip dp0652 --port 7077 --webui-port 8082
========================================
15/07/02 09:27:49 INFO master.Master: Registered signal handlers for [TERM, HUP, INT]
15/07/02 09:27:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/07/02 09:27:50 INFO spark.SecurityManager: Changing view acls to: qihuang.zheng
15/07/02 09:27:50 INFO spark.SecurityManager: Changing modify acls to: qihuang.zheng
15/07/02 09:27:50 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qihuang.zheng); users with modify permissions: Set(qihuang.zheng)
15/07/02 09:27:51 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/02 09:27:51 INFO Remoting: Starting remoting
15/07/02 09:27:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@dp0652:7077]
15/07/02 09:27:51 INFO util.Utils: Successfully started service 'sparkMaster' on port 7077.
15/07/02 09:27:51 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/02 09:27:51 INFO server.AbstractConnector: Started SelectChannelConnector@dp0652:6066
15/07/02 09:27:51 INFO util.Utils: Successfully started service on port 6066.
15/07/02 09:27:51 INFO rest.StandaloneRestServer: Started REST server for submitting applications on port 6066
15/07/02 09:27:51 INFO master.Master: Starting Spark master at spark://dp0652:7077
15/07/02 09:27:51 INFO master.Master: Running Spark version 1.4.0
15/07/02 09:27:51 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/02 09:27:51 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8082
15/07/02 09:27:51 INFO util.Utils: Successfully started service 'MasterUI' on port 8082.
15/07/02 09:27:51 INFO ui.MasterWebUI: Started MasterWebUI at http://192.168.6.52:8082
15/07/02 09:27:52 INFO master.Master: I have been elected leader! New state: ALIVE
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.52:35398 with 1 cores, 20.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.56:60106 with 1 cores, 8.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.55:50995 with 1 cores, 8.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.53:55994 with 1 cores, 20.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.57:34020 with 1 cores, 8.0 GB RAM
15/07/02 09:27:56 INFO master.Master: Registering worker 192.168.6.52:55912 with 1 cores, 20.0 GB RAM
15/07/02 09:27:56 INFO master.Master: Registering worker 192.168.6.53:35846 with 1 cores, 20.0 GB RAM

在53的其中一個Worker上的日誌:

Spark Command: /usr/java/jdk1.7.0_51/bin/java -cp /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../conf/:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/install/hadoop/etc/hadoop/:/usr/install/hadoop/etc/hadoop/ -Xms512m -Xmx512m -XX:MaxPermSize=128m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077
========================================
15/07/02 09:27:52 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT]
15/07/02 09:27:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/07/02 09:27:52 INFO spark.SecurityManager: Changing view acls to: qihuang.zheng
15/07/02 09:27:52 INFO spark.SecurityManager: Changing modify acls to: qihuang.zheng
15/07/02 09:27:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qihuang.zheng); users with modify permissions: Set(qihuang.zheng)
15/07/02 09:27:53 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/02 09:27:53 INFO Remoting: Starting remoting
15/07/02 09:27:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkWorker@192.168.6.53:55994]
15/07/02 09:27:54 INFO util.Utils: Successfully started service 'sparkWorker' on port 55994.
15/07/02 09:27:54 INFO worker.Worker: Starting Spark worker 192.168.6.53:55994 with 1 cores, 20.0 GB RAM
15/07/02 09:27:54 INFO worker.Worker: Running Spark version 1.4.0
15/07/02 09:27:54 INFO worker.Worker: Spark home: /usr/install/spark-1.4.0-bin-hadoop2.6
15/07/02 09:27:54 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/02 09:27:54 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081
15/07/02 09:27:54 INFO util.Utils: Successfully started service 'WorkerUI' on port 8081.
15/07/02 09:27:54 INFO ui.WorkerWebUI: Started WorkerWebUI at http://192.168.6.53:8081
15/07/02 09:27:54 INFO worker.Worker: Connecting to master akka.tcp://sparkMaster@dp0652:7077/user/Master...
15/07/02 09:27:54 INFO worker.Worker: Successfully registered with master spark://dp0652:7077

Spark Shell

[qihuang.zheng@dp0653 spark-1.4.0-bin-hadoop2.6]$ bin/spark-shell --master spark://dp0652:7077 --executor-memory 4g
[qihuang.zheng@dp0653 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --master spark://dp0652:7077 --class org.apache.spark.examples.SparkPi --executor-memory 4g --total-executor-cores 2 lib/spark-examples-1.4.0-hadoop2.6.0.jar 1000

SparkSQL

SparkSQL Table Operation with ParquetFile

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("/user/qihuang.zheng/sparktest/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.saveAsParquetFile("/user/qihuang.zheng/sparktest/people.parquet")
people.write.parquet("/user/qihuang.zheng/sparktest/people2.parquet")

// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("/user/qihuang.zheng/sparktest/people.parquet")

// Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// JOIN TABLE
val jointbls = sqlContext.sql("SELECT people.name FROM people join parquetFile where people.name=parquetFile.name")
jointbls.map(t => "Name: " + t(0)).collect().foreach(println)

若是在執行cache時內存不足,會退出當前shell,解決辦法是在spark-shell命令前添加SPARK_SUBMIT_OPTS="-XX:MaxPermSize=1g"

Spark Hive

若是沒有編譯hive on spark,而是直接把hive-site.xml分發到spark集羣的conf目錄下,直接啓動spark-sql會報錯:

[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-sql
Exception in thread "main" java.lang.RuntimeException: java.io.IOException: 權限不夠
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:330)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:109)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: 權限不夠
    at java.io.UnixFileSystem.createFileExclusively(Native Method)
    at java.io.File.createNewFile(File.java:1006)
    at java.io.File.createTempFile(File.java:1989)
    at org.apache.hadoop.hive.ql.session.SessionState.createTempFile(SessionState.java:432)
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:328)
    ... 11 more
15/07/03 08:42:33 INFO util.Utils: Shutdown hook called
15/07/03 08:42:33 INFO util.Utils: Deleting directory /tmp/spark-831ff199-cf80-4d49-a22f-824736065289

這是由於Spark集羣的每一個Worker都須要Hive的支持,而Worker節點並無都安裝了hive. 並且spark須要編譯支持hive的包.
可是從新編譯hive on spark要花不少時間,可不能夠直接使用集羣中已經安裝好的hive呢? YES!!
http://lxw1234.com/archives/2015/06/294.htm| 0f41ff82e270667d9fadbc467533cee318 |
http://shiyanjun.cn/archives/1113.html| 0f41ff82e270667d9fadbc467533cee320 |
http://www.cnblogs.com/hseagle/p/3758922.html| 0f41ff82e270667d9fadbc467533cee322 |

1.在spark-env.sh中添加

export HIVE_HOME=/usr/install/apache-hive-0.13.1-bin
export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.34.jar:$SPARK_CLASSPATH

2.將apache-hive-0.13.1-bin分發到集羣中的每一個節點(SparkWorker所在的節點)

cd install
scp -r apache-hive-0.13.1-bin dp0653:/usr/install/

3.拷貝apache-hive-0.13.1-bin/conf/hive-site.xml到$SPARK_HOME/conf下

scp apache-hive-0.13.1-bin/conf/hive-site.xml dp0653:/usr/install/spark-1.4.0-bin-hadoop2.6/conf

4.重啓spark集羣

sbin/stop-all.sh
sbin/start-all.sh

5.測試spark-sql

SPARK_CLASSPATH was detected (set to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:').
This is deprecated in Spark 1.0+.
Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath

15/07/03 10:00:56 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:' as a work-around.
15/07/03 10:00:56 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:' as a work-around.

15/07/03 10:01:00 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.6.53:9083
15/07/03 10:01:00 INFO hive.metastore: Connected to metastore.
15/07/03 10:01:00 INFO session.SessionState: No Tez session required at this point. hive.execution.engine=mr.
SET spark.sql.hive.version=0.13.1
SET spark.sql.hive.version=0.13.1
spark-sql> show databases;
default
test
spark-sql> use test;
Time taken: 2.045 seconds
spark-sql> show tables;
koudai    false
...
spark-sql> select count(*) from koudai;
311839
Time taken: 12.443 seconds, Fetched 1 row(s)
spark-sql>

Tips & TODO

  1. 在作指標分析時,若是是天天或者每週這樣的統計間隔,能夠將分析後的結果保存成Persistent Table或者save到HDFS上供別人使用.

  2. 線上的數據通常都比較多,查詢時能夠使用partition分區

相關文章
相關標籤/搜索