先說明,此次咱們用的還不是Spark streaming,而是從hadoop hdfs拿取文件,通過計算,再把結果放回hadoop hdfs.java
首先咱們須要在以前的工程文件下修改咱們的pom(具體參考IDEA全程搭建第一個Scala Spark streaming maven工程),增長hadoop版本號linux
<hadoop.version>2.7.6</hadoop.version>
添加兩個依賴web
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency>
修改打包方式shell
<build> <pluginManagement> <plugins> <!-- 編譯scala的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <!-- 編譯java的插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- 打jar插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build>
這樣咱們能夠打出一個富jar包(包含全部第三方jar包的包),這個文件可能會比較大。express
先來寫一個單詞統計的對象(Scala實現)apache
object ScalaWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[4]").setAppName("ScalaWorkCount") val scc = new SparkContext(conf) //從hadoop hdfs獲取文件 val lines = scc.textFile(args(0)) //統計文件中的單詞的個數 val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //將統計結果存入hadoop hdfs result.saveAsTextFile(args(1)) scc.stop() } }
用maven打包後,獲得這樣一組文件,而咱們須要的是這個大的jar包bootstrap
在linux系統中,咱們隨便寫一個文件,假設咱們命名爲a.txt,內容也隨便寫幾個單詞session
ice park dog fish dinsh cark balana apple fuck fool my him cryapp
而後將其上傳到hadoop hdfs中dom
[root@host2 bin]# ./hdfs dfs -put ./a.txt /usr/file
[root@host2 bin]# ./hdfs dfs -lsr /
lsr: DEPRECATED: Please use 'ls -R' instead.
drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr
drwxr-xr-x - root supergroup 0 2018-11-03 16:06 /usr/file
-rw-r--r-- 3 root supergroup 63 2018-11-03 16:06 /usr/file/a.txt
-rw-r--r-- 3 root supergroup 173271626 2018-09-14 13:50 /usr/file/jdk-8u45-linux-x64.tar.gz
咱們能夠查看他的內容
[root@host2 bin]# ./hdfs dfs -cat /usr/file/a.txt
ice park dog fish dinsh cark balana apple fuck fool my him cry
此時咱們也把咱們須要的jar包上傳到linux系統中
執行命令spark-submit獲得一串輸出
./spark-submit --master spark://host2:7077,host1:7077 --class com.guanjian.ScalaWordCount ./jar/sparknew-1.0-SNAPSHOT.jar hdfs://host2:8020/usr/file/a.txt hdfs://host2:8020/usr/file/wcount
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/11/03 16:20:21 INFO SparkContext: Running Spark version 2.2.0
18/11/03 16:20:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/11/03 16:20:22 INFO SparkContext: Submitted application: ScalaWorkCount
18/11/03 16:20:22 INFO SecurityManager: Changing view acls to: root
18/11/03 16:20:22 INFO SecurityManager: Changing modify acls to: root
18/11/03 16:20:22 INFO SecurityManager: Changing view acls groups to:
18/11/03 16:20:22 INFO SecurityManager: Changing modify acls groups to:
18/11/03 16:20:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
18/11/03 16:20:22 INFO Utils: Successfully started service 'sparkDriver' on port 42065.
18/11/03 16:20:22 INFO SparkEnv: Registering MapOutputTracker
18/11/03 16:20:22 INFO SparkEnv: Registering BlockManagerMaster
18/11/03 16:20:22 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/11/03 16:20:22 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/11/03 16:20:22 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7aa12077-6316-47b4-97e9-f65b3009ac79
18/11/03 16:20:22 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
18/11/03 16:20:22 INFO SparkEnv: Registering OutputCommitCoordinator
18/11/03 16:20:23 INFO Utils: Successfully started service 'SparkUI' on port 4040.
18/11/03 16:20:23 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.5.182:4040
18/11/03 16:20:23 INFO SparkContext: Added JAR file:/usr/local/spark2.2/bin/./jar/sparknew-1.0-SNAPSHOT.jar at spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar with timestamp 1541233223259
18/11/03 16:20:23 INFO Executor: Starting executor ID driver on host localhost
18/11/03 16:20:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44491.
18/11/03 16:20:23 INFO NettyBlockTransferService: Server created on 192.168.5.182:44491
18/11/03 16:20:23 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/11/03 16:20:23 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.5.182, 44491, None)
18/11/03 16:20:23 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.5.182:44491 with 366.3 MB RAM, BlockManagerId(driver, 192.168.5.182, 44491, None)
18/11/03 16:20:23 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.5.182, 44491, None)
18/11/03 16:20:23 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.5.182, 44491, None)
18/11/03 16:20:24 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 236.5 KB, free 366.1 MB)
18/11/03 16:20:24 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.9 KB, free 366.0 MB)
18/11/03 16:20:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.5.182:44491 (size: 22.9 KB, free: 366.3 MB)
18/11/03 16:20:24 INFO SparkContext: Created broadcast 0 from textFile at ScalaWordCount.scala:13
18/11/03 16:20:24 INFO FileInputFormat: Total input paths to process : 1
18/11/03 16:20:25 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
18/11/03 16:20:25 INFO SparkContext: Starting job: saveAsTextFile at ScalaWordCount.scala:15
18/11/03 16:20:25 INFO DAGScheduler: Registering RDD 3 (map at ScalaWordCount.scala:14)
18/11/03 16:20:25 INFO DAGScheduler: Got job 0 (saveAsTextFile at ScalaWordCount.scala:15) with 2 output partitions
18/11/03 16:20:25 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at ScalaWordCount.scala:15)
18/11/03 16:20:25 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
18/11/03 16:20:25 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
18/11/03 16:20:25 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at ScalaWordCount.scala:14), which has no missing parents
18/11/03 16:20:25 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.7 KB, free 366.0 MB)
18/11/03 16:20:25 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.8 KB, free 366.0 MB)
18/11/03 16:20:25 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.5.182:44491 (size: 2.8 KB, free: 366.3 MB)
18/11/03 16:20:25 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
18/11/03 16:20:25 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at ScalaWordCount.scala:14) (first 15 tasks are for partitions Vector(0, 1))
18/11/03 16:20:25 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
18/11/03 16:20:25 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 4840 bytes)
18/11/03 16:20:25 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 4840 bytes)
18/11/03 16:20:25 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
18/11/03 16:20:25 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/11/03 16:20:25 INFO Executor: Fetching spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar with timestamp 1541233223259
18/11/03 16:20:25 INFO TransportClientFactory: Successfully created connection to /192.168.5.182:42065 after 51 ms (0 ms spent in bootstraps)
18/11/03 16:20:25 INFO Utils: Fetching spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar to /tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4/userFiles-5bb37b1d-3c33-4f7b-acdb-ff85603d088f/fetchFileTemp5672377923687175291.tmp
18/11/03 16:20:26 INFO Executor: Adding file:/tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4/userFiles-5bb37b1d-3c33-4f7b-acdb-ff85603d088f/sparknew-1.0-SNAPSHOT.jar to class loader
18/11/03 16:20:26 INFO HadoopRDD: Input split: hdfs://host2:8020/usr/file/a.txt:0+31
18/11/03 16:20:26 INFO HadoopRDD: Input split: hdfs://host2:8020/usr/file/a.txt:31+32
18/11/03 16:20:26 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1155 bytes result sent to driver
18/11/03 16:20:26 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1026 bytes result sent to driver
18/11/03 16:20:26 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 824 ms on localhost (executor driver) (1/2)
18/11/03 16:20:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 810 ms on localhost (executor driver) (2/2)
18/11/03 16:20:26 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/11/03 16:20:26 INFO DAGScheduler: ShuffleMapStage 0 (map at ScalaWordCount.scala:14) finished in 0.858 s
18/11/03 16:20:26 INFO DAGScheduler: looking for newly runnable stages
18/11/03 16:20:26 INFO DAGScheduler: running: Set()
18/11/03 16:20:26 INFO DAGScheduler: waiting: Set(ResultStage 1)
18/11/03 16:20:26 INFO DAGScheduler: failed: Set()
18/11/03 16:20:26 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at ScalaWordCount.scala:15), which has no missing parents
18/11/03 16:20:26 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 72.0 KB, free 366.0 MB)
18/11/03 16:20:26 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 25.9 KB, free 365.9 MB)
18/11/03 16:20:26 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.5.182:44491 (size: 25.9 KB, free: 366.2 MB)
18/11/03 16:20:26 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
18/11/03 16:20:26 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at ScalaWordCount.scala:15) (first 15 tasks are for partitions Vector(0, 1))
18/11/03 16:20:26 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
18/11/03 16:20:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, ANY, 4621 bytes)
18/11/03 16:20:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 4621 bytes)
18/11/03 16:20:26 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
18/11/03 16:20:26 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
18/11/03 16:20:26 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.5.182:44491 in memory (size: 2.8 KB, free: 366.3 MB)
18/11/03 16:20:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
18/11/03 16:20:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
18/11/03 16:20:26 INFO FileOutputCommitter: Saved output of task 'attempt_20181103162025_0001_m_000000_2' to hdfs://host2:8020/usr/file/wcount/_temporary/0/task_20181103162025_0001_m_000000
18/11/03 16:20:26 INFO FileOutputCommitter: Saved output of task 'attempt_20181103162025_0001_m_000001_3' to hdfs://host2:8020/usr/file/wcount/_temporary/0/task_20181103162025_0001_m_000001
18/11/03 16:20:26 INFO SparkHadoopMapRedUtil: attempt_20181103162025_0001_m_000001_3: Committed
18/11/03 16:20:26 INFO SparkHadoopMapRedUtil: attempt_20181103162025_0001_m_000000_2: Committed
18/11/03 16:20:26 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1267 bytes result sent to driver
18/11/03 16:20:26 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1267 bytes result sent to driver
18/11/03 16:20:26 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 325 ms on localhost (executor driver) (1/2)
18/11/03 16:20:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 330 ms on localhost (executor driver) (2/2)
18/11/03 16:20:26 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
18/11/03 16:20:26 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at ScalaWordCount.scala:15) finished in 0.332 s
18/11/03 16:20:26 INFO DAGScheduler: Job 0 finished: saveAsTextFile at ScalaWordCount.scala:15, took 1.653739 s
18/11/03 16:20:26 INFO SparkUI: Stopped Spark web UI at http://192.168.5.182:4040
18/11/03 16:20:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/11/03 16:20:26 INFO MemoryStore: MemoryStore cleared
18/11/03 16:20:26 INFO BlockManager: BlockManager stopped
18/11/03 16:20:26 INFO BlockManagerMaster: BlockManagerMaster stopped
18/11/03 16:20:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/11/03 16:20:26 INFO SparkContext: Successfully stopped SparkContext
18/11/03 16:20:26 INFO ShutdownHookManager: Shutdown hook called
18/11/03 16:20:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4
這個時候咱們來查看保存在hadoop hdfs中的結果。
[root@host2 bin]# ./hdfs dfs -lsr /
lsr: DEPRECATED: Please use 'ls -R' instead.
drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr
drwxr-xr-x - root supergroup 0 2018-11-03 16:20 /usr/file
-rw-r--r-- 3 root supergroup 63 2018-11-03 16:06 /usr/file/a.txt
-rw-r--r-- 3 root supergroup 173271626 2018-09-14 13:50 /usr/file/jdk-8u45-linux-x64.tar.gz
drwxr-xr-x - root supergroup 0 2018-11-03 16:20 /usr/file/wcount
-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS
-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000
-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001
[root@host2 bin]# ./hdfs dfs -cat /usr/file/wcount/part-00000
(him,1)
(park,1)
(fool,1)
(dinsh,1)
(fish,1)
(dog,1)
(apple,1)
(cry,1)
(my,1)
[root@host2 bin]# ./hdfs dfs -cat /usr/file/wcount/part-00001
(ice,1)
(cark,1)
(balana,1)
(fuck,1)
這樣咱們就獲得了咱們須要的結果,文本文件a.txt的單詞統計,固然這種處理主要是一種離線處理,跟Web程序的關聯不大,要作實時處理還要用到Spark streaming。
固然Spark也支持命令行式的操做,相似於Scala同樣,以下,咱們給本次操做分配5G內存,16線程
[root@host2 bin]# ./spark-shell --master spark://host2:7077,host1:7077 --executor-memory 5g --total-executor-cores 16
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/11/03 20:59:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/11/03 21:00:01 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.5.182:4040
Spark context available as 'sc' (master = spark://host2:7077,host1:7077, app id = app-20181103205956-0001).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
好比咱們要查看a.txt裏面有多少個單詞
scala> val r1 = sc.textFile("hdfs://host2:8020/usr/file/a.txt")
r1: org.apache.spark.rdd.RDD[String] = hdfs://host2:8020/usr/file/a.txt MapPartitionsRDD[1] at textFile at <console>:24
咱們能夠看到他的返回值是一個RDD類型,那RDD是什麼呢,一張圖來講明
scala> r1.count
res0: Long = 1
scala> r1.flatMap(_.split(" ")).map((_,1)).count
res1: Long = 13
咱們對這個RDD操做,行數爲1行,單詞數爲13個單詞
最後咱們來看一下Spark的8091端口的系統界面
咱們執行過的任務在這裏面均可以有一些記錄。