http://spark.apache.org/docs/latest/quick-start.html| 0f41ff82e270667d9fadbc467533cee31 |html
➜ spark-1.4.0-bin-hadoop2.6 ./bin/spark-shell
java
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.4.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_25) 15/06/28 10:36:07 INFO ui.SparkUI: Started SparkUI at http://127.0.0.1:4040 15/06/28 10:36:07 INFO repl.SparkILoop: Created spark context.. Spark context available as sc. 15/06/28 10:36:08 INFO hive.HiveContext: Initializing execution hive, version 0.13.1 15/06/28 10:36:23 INFO repl.SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext.
第一個例子: 統計一個文本文件的單詞數量.
調用sc的textFile(fileName)會生成一個MapPartitionsRDDmysql
scala> val textFile = sc.textFile("README.md") 15/06/28 10:36:45 INFO storage.MemoryStore: ensureFreeSpace(63424) called with curMem=0, maxMem=278019440 15/06/28 10:36:45 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.9 KB, free 265.1 MB) 15/06/28 10:36:45 INFO storage.MemoryStore: ensureFreeSpace(20061) called with curMem=63424, maxMem=278019440 15/06/28 10:36:45 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.6 KB, free 265.1 MB) 15/06/28 10:36:45 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58638 (size: 19.6 KB, free: 265.1 MB) 15/06/28 10:36:45 INFO spark.SparkContext: Created broadcast 0 from textFile at <console>:21 textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
調用上面生成的textFile RDD的count()會觸發一個Action.es6
scala> textFile.count() java.net.ConnectException: Call From hadoop/127.0.0.1 to localhost:9000 failed on connection exception: java.net.ConnectException: 拒絕鏈接; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ... Caused by: java.net.ConnectException: 拒絕鏈接 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ...
因爲本機已經安裝了Hadoop,使用的是僞分佈式模式,因此Spark會讀取Hadoop的配置信息.
咱們這裏先不啓動Hadoop,使用本地模式,要手動添加file:///並使用絕對路徑讀取文本文件web
scala> textFile res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
從新構造讀取本地文本文件的textFile RDDsql
scala> val textFile = sc.textFile("file:///home/hadoop/soft/spark-1.4.0-bin-hadoop2.6/README.md") textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
觸發RDD的Action: countshell
scala> textFile.count() 15/06/28 10:44:07 INFO scheduler.DAGScheduler: Job 0 finished: count at <console>:24, took 0.275609 s res2: Long = 98
又一個Action RDD : 輸出文本文件的第一行apache
scala> textFile.first() 15/06/28 10:44:27 INFO scheduler.DAGScheduler: Job 1 finished: first at <console>:24, took 0.017917 s res3: String = # Apache Spark
1.統計包含了Spark這個單詞一共有幾行bootstrap
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:23 scala> textFile.filter(line => line.contains("Spark")).count()
2.文本文件中長度最長的那一行,它一共有多少個單詞api
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
3.MapReduce WordCount
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:23 scala> wordCounts.collect() res6: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (systems.,1...
4.Cache
scala> linesWithSpark.cache() res7: linesWithSpark.type = MapPartitionsRDD[4] at filter at <console>:23 scala> linesWithSpark.count() 15/06/28 10:47:11 INFO scheduler.DAGScheduler: Job 5 finished: count at <console>:26, took 0.054036 s res8: Long = 19 scala> linesWithSpark.count() 15/06/28 10:47:14 INFO scheduler.DAGScheduler: Job 6 finished: count at <console>:26, took 0.016638 s res9: Long = 19
val textFile = sc.textFile("file:///home/hadoop/soft/spark-1.4.0-bin-hadoop2.6/README.md") 〇 textFile.count() ① textFile.first() val linesWithSpark = textFile.filter(line => line.contains("Spark")) ② textFile.filter(line => line.contains("Spark")).count() textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) ③ val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) ④ wordCounts.collect() linesWithSpark.cache() ⑤ linesWithSpark.count() ⑥ linesWithSpark.count() ⑦ linesWithSpark.count()
http://127.0.0.1:4040
Jobs: 上面每一個Action RDD編號對應了下圖中的Job Id.
Stages: 上面有8個Job, 可是Stages多了一個. 實際上是④的collect
有兩個stage
Storage: 在Cache的時候纔有
在Jobs中點擊Job Id=4的collect RDD(輸出WordCount的結果). 在下方的列表中能夠看到有2個Stages
仔細觀察列表的最後面兩列, 分別是Shuffle Read和Shuffle Write.
其中map會進行Shuffle Write, collect會進行Shuffle Read
點擊Stage Id=4的map. 它的DAG可視化圖和上面的概覽圖的左側是同樣的
Spark的WebUI還提供了一個EventTime,能夠很清楚地看到每一個階段消耗的時間
回退,點擊Stage Id=5的collect
準備工做:
1.master無密碼ssh到slaves(將master的pub追加到全部slaves的authorized_keys) 2.關閉全部節點的防火牆(chkconfig iptables off) 3.安裝scala-2.10,並設置~/.bashrc
cd $SPARK_HOME
vi conf/spark-env.sh
export JAVA_HOME=/usr/java/jdk1.7.0_51 export SCALA_HOME=/usr/install/scala-2.10.5 export HADOOP_HOME=/usr/install/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=dp0652 export MASTER=spark://dp0652:7077 #export SPARK_LOCAL_IP=dp0652 export SPARK_LOCAL_DIRS=/usr/install/spark-1.4.0-bin-hadoop2.6 export SPARK_MASTER_WEBUI_PORT=8082 export SPARK_MASTER_PORT=7077 export SPARK_WORKER_CORES=1 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_MEMORY=8g
vi conf/slaves
dp0652 dp0653 dp0655 dp0656 dp0657
將spark目錄分發到集羣的其餘節點
cd .. scp -r $SPARK_HOME dp0653:/usr/install scp -r $SPARK_HOME dp0655:/usr/install scp -r $SPARK_HOME dp0656:/usr/install scp -r $SPARK_HOME dp0657:/usr/install
因爲集羣中dp0652和dp0653的內存比較大, 咱們修改了這兩個節點的spark-env.sh
export SPARK_WORKER_INSTANCES=2 export SPARK_WORKER_MEMORY=20g
啓動集羣, 在master上啓動便可.
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ sbin/start-all.sh starting org.apache.spark.deploy.master.Master, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.master.Master-1-dp0652.out dp0656: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0656.out dp0655: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0655.out dp0657: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0657.out dp0652: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0652.out dp0653: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0653.out dp0652: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-2-dp0652.out dp0653: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-2-dp0653.out
在master和slaves上查看Spark進程
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ jps -lm 40708 org.apache.spark.deploy.master.Master --ip dp0652 --port 7077 --webui-port 8082 41095 org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://dp0652:7077 40926 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077 [qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ ssh dp0653 Last login: Thu Jul 2 09:07:17 2015 from 192.168.6.140 [qihuang.zheng@dp0653 ~]$ jps -lm 27153 org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://dp0652:7077 27029 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077 [qihuang.zheng@dp0653 ~]$ exit logout Connection to dp0653 closed. [qihuang.zheng@dp0652 logs]$ ssh dp0655 Last login: Thu Jul 2 08:55:05 2015 from 192.168.6.140 [qihuang.zheng@dp0655 ~]$ jps -lm 8766 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077 [qihuang.zheng@dp0655 ~]$
在master上查看web ui: http://dp0652:8082/
1.若是配置了SPARK_LOCAL_IP, 可是並無在slaves上修改成本身的IP,則會報錯:
15/07/02 09:04:08 ERROR netty.NettyTransport: failed to bind to /192.168.6.52:0, shutting down Netty transport Exception in thread "main" java.net.BindException: Failed to bind to: /192.168.6.52:0: Service 'sparkWorker' failed after 16 retries! at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/07/02 09:04:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/07/02 09:04:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/07/02 09:04:09 INFO util.Utils: Shutdown hook called
緣由分析: SPARK_LOCAL_IP指的是本機IP地址,所以分發到集羣的不一樣節點上,都要到各自的節點修改成本身的IP地址.
若是集羣節點比較多,則比較麻煩, 能夠用SPARK_LOCAL_DIRS代替.
2.若是沒有配置export MASTER, 在worker上會報錯:
5/07/02 08:40:51 INFO worker.Worker: Retrying connection to master (attempt # 12) 15/07/02 08:40:51 INFO worker.Worker: Connecting to master akka.tcp://sparkMaster@dp0652:7077/user/Master... 15/07/02 08:40:51 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@dp0652:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: 拒絕鏈接: dp0652/192.168.6.52:7077 15/07/02 08:41:23 ERROR worker.Worker: RECEIVED SIGNAL 15: SIGTERM 15/07/02 08:41:23 INFO util.Utils: Shutdown hook called
致使的後果是雖然slaves上都啓動了Worker進程(使用jps查看),可是在Master上並無看到workers. 這時候應該查看Master上的日誌.
master上啓動成功顯示的日誌是spark@dp0652:7077. 而上面卻顯示的是sparkMaster@dp0652:7077. 因此應該手動export MASTER
3.最後成功啓動集羣, 在Master上的日誌:
Spark Command: /usr/java/jdk1.7.0_51/bin/java -cp /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../conf/:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/install/hadoop/etc/hadoop/:/usr/install/hadoop/etc/hadoop/ -Xms512m -Xmx512m -XX:MaxPermSize=128m org.apache.spark.deploy.master.Master --ip dp0652 --port 7077 --webui-port 8082 ======================================== 15/07/02 09:27:49 INFO master.Master: Registered signal handlers for [TERM, HUP, INT] 15/07/02 09:27:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/02 09:27:50 INFO spark.SecurityManager: Changing view acls to: qihuang.zheng 15/07/02 09:27:50 INFO spark.SecurityManager: Changing modify acls to: qihuang.zheng 15/07/02 09:27:50 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qihuang.zheng); users with modify permissions: Set(qihuang.zheng) 15/07/02 09:27:51 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/07/02 09:27:51 INFO Remoting: Starting remoting 15/07/02 09:27:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@dp0652:7077] 15/07/02 09:27:51 INFO util.Utils: Successfully started service 'sparkMaster' on port 7077. 15/07/02 09:27:51 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/02 09:27:51 INFO server.AbstractConnector: Started SelectChannelConnector@dp0652:6066 15/07/02 09:27:51 INFO util.Utils: Successfully started service on port 6066. 15/07/02 09:27:51 INFO rest.StandaloneRestServer: Started REST server for submitting applications on port 6066 15/07/02 09:27:51 INFO master.Master: Starting Spark master at spark://dp0652:7077 15/07/02 09:27:51 INFO master.Master: Running Spark version 1.4.0 15/07/02 09:27:51 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/02 09:27:51 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8082 15/07/02 09:27:51 INFO util.Utils: Successfully started service 'MasterUI' on port 8082. 15/07/02 09:27:51 INFO ui.MasterWebUI: Started MasterWebUI at http://192.168.6.52:8082 15/07/02 09:27:52 INFO master.Master: I have been elected leader! New state: ALIVE 15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.52:35398 with 1 cores, 20.0 GB RAM 15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.56:60106 with 1 cores, 8.0 GB RAM 15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.55:50995 with 1 cores, 8.0 GB RAM 15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.53:55994 with 1 cores, 20.0 GB RAM 15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.57:34020 with 1 cores, 8.0 GB RAM 15/07/02 09:27:56 INFO master.Master: Registering worker 192.168.6.52:55912 with 1 cores, 20.0 GB RAM 15/07/02 09:27:56 INFO master.Master: Registering worker 192.168.6.53:35846 with 1 cores, 20.0 GB RAM
在53的其中一個Worker上的日誌:
Spark Command: /usr/java/jdk1.7.0_51/bin/java -cp /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../conf/:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/install/hadoop/etc/hadoop/:/usr/install/hadoop/etc/hadoop/ -Xms512m -Xmx512m -XX:MaxPermSize=128m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077 ======================================== 15/07/02 09:27:52 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT] 15/07/02 09:27:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/02 09:27:52 INFO spark.SecurityManager: Changing view acls to: qihuang.zheng 15/07/02 09:27:52 INFO spark.SecurityManager: Changing modify acls to: qihuang.zheng 15/07/02 09:27:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qihuang.zheng); users with modify permissions: Set(qihuang.zheng) 15/07/02 09:27:53 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/07/02 09:27:53 INFO Remoting: Starting remoting 15/07/02 09:27:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkWorker@192.168.6.53:55994] 15/07/02 09:27:54 INFO util.Utils: Successfully started service 'sparkWorker' on port 55994. 15/07/02 09:27:54 INFO worker.Worker: Starting Spark worker 192.168.6.53:55994 with 1 cores, 20.0 GB RAM 15/07/02 09:27:54 INFO worker.Worker: Running Spark version 1.4.0 15/07/02 09:27:54 INFO worker.Worker: Spark home: /usr/install/spark-1.4.0-bin-hadoop2.6 15/07/02 09:27:54 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/02 09:27:54 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 15/07/02 09:27:54 INFO util.Utils: Successfully started service 'WorkerUI' on port 8081. 15/07/02 09:27:54 INFO ui.WorkerWebUI: Started WorkerWebUI at http://192.168.6.53:8081 15/07/02 09:27:54 INFO worker.Worker: Connecting to master akka.tcp://sparkMaster@dp0652:7077/user/Master... 15/07/02 09:27:54 INFO worker.Worker: Successfully registered with master spark://dp0652:7077
[qihuang.zheng@dp0653 spark-1.4.0-bin-hadoop2.6]$ bin/spark-shell --master spark://dp0652:7077 --executor-memory 4g [qihuang.zheng@dp0653 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --master spark://dp0652:7077 --class org.apache.spark.examples.SparkPi --executor-memory 4g --total-executor-cores 2 lib/spark-examples-1.4.0-hadoop2.6.0.jar 1000
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("/user/qihuang.zheng/sparktest/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // or by field name: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // Map("name" -> "Justin", "age" -> 19) // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.saveAsParquetFile("/user/qihuang.zheng/sparktest/people.parquet") people.write.parquet("/user/qihuang.zheng/sparktest/people2.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("/user/qihuang.zheng/sparktest/people.parquet") // Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // JOIN TABLE val jointbls = sqlContext.sql("SELECT people.name FROM people join parquetFile where people.name=parquetFile.name") jointbls.map(t => "Name: " + t(0)).collect().foreach(println)
若是在執行cache時內存不足,會退出當前shell,解決辦法是在spark-shell命令前添加SPARK_SUBMIT_OPTS="-XX:MaxPermSize=1g"
若是沒有編譯hive on spark,而是直接把hive-site.xml分發到spark集羣的conf目錄下,直接啓動spark-sql會報錯:
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-sql Exception in thread "main" java.lang.RuntimeException: java.io.IOException: 權限不夠 at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:330) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:109) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.IOException: 權限不夠 at java.io.UnixFileSystem.createFileExclusively(Native Method) at java.io.File.createNewFile(File.java:1006) at java.io.File.createTempFile(File.java:1989) at org.apache.hadoop.hive.ql.session.SessionState.createTempFile(SessionState.java:432) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:328) ... 11 more 15/07/03 08:42:33 INFO util.Utils: Shutdown hook called 15/07/03 08:42:33 INFO util.Utils: Deleting directory /tmp/spark-831ff199-cf80-4d49-a22f-824736065289
這是由於Spark集羣的每一個Worker都須要Hive的支持,而Worker節點並無都安裝了hive. 並且spark須要編譯支持hive的包.
可是從新編譯hive on spark要花不少時間,可不能夠直接使用集羣中已經安裝好的hive呢? YES!!
http://lxw1234.com/archives/2015/06/294.htm| 0f41ff82e270667d9fadbc467533cee318 |
http://shiyanjun.cn/archives/1113.html| 0f41ff82e270667d9fadbc467533cee320 |
http://www.cnblogs.com/hseagle/p/3758922.html| 0f41ff82e270667d9fadbc467533cee322 |
1.在spark-env.sh中添加
export HIVE_HOME=/usr/install/apache-hive-0.13.1-bin export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.34.jar:$SPARK_CLASSPATH
2.將apache-hive-0.13.1-bin
分發到集羣中的每一個節點(SparkWorker所在的節點)
cd install scp -r apache-hive-0.13.1-bin dp0653:/usr/install/
3.拷貝apache-hive-0.13.1-bin/conf/hive-site.xml到$SPARK_HOME/conf下
scp apache-hive-0.13.1-bin/conf/hive-site.xml dp0653:/usr/install/spark-1.4.0-bin-hadoop2.6/conf
4.重啓spark集羣
sbin/stop-all.sh sbin/start-all.sh
5.測試spark-sql
SPARK_CLASSPATH was detected (set to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath 15/07/03 10:00:56 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:' as a work-around. 15/07/03 10:00:56 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:' as a work-around. 15/07/03 10:01:00 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.6.53:9083 15/07/03 10:01:00 INFO hive.metastore: Connected to metastore. 15/07/03 10:01:00 INFO session.SessionState: No Tez session required at this point. hive.execution.engine=mr. SET spark.sql.hive.version=0.13.1 SET spark.sql.hive.version=0.13.1 spark-sql> show databases; default test spark-sql> use test; Time taken: 2.045 seconds spark-sql> show tables; koudai false ... spark-sql> select count(*) from koudai; 311839 Time taken: 12.443 seconds, Fetched 1 row(s) spark-sql>
在作指標分析時,若是是天天或者每週這樣的統計間隔,能夠將分析後的結果保存成Persistent Table或者save到HDFS上供別人使用.
線上的數據通常都比較多,查詢時能夠使用partition分區