MapReduce的核心是Shuffle(洗牌)。在整個Shuffle的過程當中,至少會產生6次的I/O。 html
中間結果輸出:基於MapReduce的計算引擎一般會將中間結果輸出到磁盤上,進行存儲和容錯。另外,當一些查詢(如:Hive)翻譯到MapReduce任務時,每每會產生多個Stage(階段),而這些串聯的Stage又依賴於底層文件系統(如HDFS)來存儲每個Stage的輸出結果,而I/O的效率每每較低,從而影響了MapReduce的運行速度。java
tar -zxvf .tar.gz -C 目標目錄
mv spark-2.1.0-bin-hadoop2.7/ spark-2.1.0
mv spark-env.sh.template spark-env.sh
vi spark-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata01
export SPARK_MASTER_PORT=7077
複製代碼
mv slaves.template slaves
vi slaves
bigdata02
bigdata03
複製代碼
vi /etc/profile
export SPARK_HOME=spark安裝路徑
export PATH=$PATH:$SPARK_HOME/bin
export PATH=$PATH:$SPARK_HOME/sbin
source /etc/profile
scp -r spark-2.1.0/ bigdata02:$PWD
scp -r spark-2.1.0/ bigdata03:$PWD
start-master.sh
start-slaves.sh
spark-shell
stop-master.sh
stop-slaves.sh
主要用於開發或測試環境。node
當spark提供目錄保存spark Application和worker的註冊信息,並將他們的恢復狀態寫入該目錄中,一旦Master發生故障,就能夠經過從新啓動Master進程(sbin/start-master.sh),恢復已運行的spark Application和worker的註冊信息。python
基於文件系統的單點恢復,主要是在spark-env.sh裏對SPARK_DAEMON_JAVA_OPTS設置 mysql
mkdir /opt/module/spark-2.1.0/recovery
vi spark-env.sh
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/opt/module/spark-2.1.0/recovery"
適用於現實生產。es6
ZooKeeper提供了一個Leader Election機制,利用這個機制能夠保證雖然集羣存在多個Master,可是隻有一個是Active的,其餘的都是Standby。當Active的Master出現故障時,另外的一個Standby Master會被選舉出來。因爲集羣的信息,包括Worker,Driver和Application的信息都已經持久化到ZooKeeper,所以在切換的過程當中只會影響新Job的提交,對於正在進行的Job沒有任何的影響。加入ZooKeeper的集羣總體架構以下圖所示: web
修改配置信息:算法
vi spark-env.sh
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata01:2181,bigdata02:2181,bigdata03:2181 -Dspark.deploy.zookeeper.dir=/spark"
export SPARK_MASTER_HOST
和export SPARK_MASTER_PORT
發送新的配置文件到集羣其他節點:sql
scp spark-env.sh bigdata02:$PWD
scp spark-env.sh bigdata03:$PWD
spark-submit --master spark://XXXX:7077
(指明master地址) --class org.apache.spark.examples.SparkPi
(指明主程序的名字) /XXXX/spark/examples/jars/spark-examples_2.11-2.1.0.jar
(指明jar包地址) 100
(指明運行次數)--master spark://XXXX:7077
指定Master的地址--executor-memory 2g
指定每一個worker可用內存爲2G--total-executor-cores 2
指定整個集羣使用的cup核數爲2個spark-shell
(後面不接任何參數)spark-shell --master spark://XXXX:7077
(指明master地址)spark-shell
sc.textFile("/XXXX/WordCount.txt")
(本地文件路徑).flatMap(_.split(" "))
(按照空格分割).map((_,1))
(單詞遍歷).reduceByKey(_+_)
(單詞計數).collect
spark-shell --master spark://XXXX:7077
(指spark-shell
scala> val rdd1 = sc.textFile("/root/sp_wc.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /root/sp_wc.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd1.collect
res0: Array[String] = Array(I love Scala, I love Skark, 2019/5/8)
scala> val rdd2 = rdd1.flatMap(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26
scala> rdd2.collect
res1: Array[String] = Array(I, love, Scala, I, love, Skark, 2019/5/8)
scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28
scala> rdd3.collect
res2: Array[(String, Int)] = Array((I,1), (love,1), (Scala,1), (I,1), (love,1), (Skark,1), (2019/5/8,1))
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:30
scala> rdd4.collect
res3: Array[(String, Int)] = Array((2019/5/8,1), (love,2), (I,2), (Skark,1), (Scala,1))
複製代碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCount {
def main(args: Array[String]): Unit = {
//建立一個Spark配置文件
val conf = new SparkConf().setAppName("Scala WordCount").setMaster("local")
//建立Spark對象
val sc = new SparkContext(conf)
val result = sc.textFile(args(0))
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.saveAsTextFile(args(1))
sc.stop()
}
}
複製代碼
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import parquet.format.PageHeader;
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new SparkConf()
.setAppName("JavaWordCount")
.setMaster("local") ;
//新建SparkContext對象
JavaSparkContext sc = new JavaSparkContext(conf) ;
//讀入數據
JavaRDD<String> lines = sc.textFile("hdfs://XXXX:9000/WordCount.txt") ;
//分詞 第一個參數表示讀進來的話 第二個參數表示 返回值
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String input) throws Exception {
return Arrays.asList(input.split(" ")).iterator() ;
}
}) ;
//每一個單詞記一次數
/*
* String, String, Integer
* input <key value>
*/
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String input) throws Exception {
return new Tuple2<String, Integer>(input, 1) ;
}
}) ;
//執行reduce操做
/*
* Integer, Integer, Integer
* nteger arg0, Integer arg1 返回值
*/
JavaPairRDD<String,Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
// TODO Auto-generated method stub
return arg0 + arg1 ;
}
}) ;
//打印結果
List<Tuple2<String, Integer>> output = counts.collect() ;
for (Tuple2<String, Integer> tuple :output) {
System.out.println(tuple._1 + " : " + tuple._2) ;
}
sc.stop() ;
}
}
複製代碼
val rdd1 = sc.textFile(「hdfs://XXXX:9000/data.txt」)
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
RDD的依賴關係shell
RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
窄依賴指的是每個父RDD的Partition最多被子RDD的一個Partition使用
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition
Spark任務中的Stage
練習1:
//經過並行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//對rdd1裏的每個元素乘2而後排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//過濾出大於等於十的元素
val rdd3 = rdd2.filter(_ >= 10)
//將元素以數組的方式在客戶端顯示
rdd3.collect
複製代碼
練習2:
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//將rdd1裏面的每個元素先切分在壓平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect
複製代碼
練習3:
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求並集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
複製代碼
練習4:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求並集
val rdd4 = rdd1 union rdd2
//按key進行分組
rdd4.groupByKey
rdd4.collect
複製代碼
練習5:
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup與groupByKey的區別
rdd3.collect
複製代碼
練習6:
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
複製代碼
練習7:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
複製代碼
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
}
複製代碼
rdd1.mapPartitionsWithIndex(func1).collect
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
查看每一個分區中的元素:
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ],
[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])
複製代碼
將每一個分區中的最大值求和,注意初始值是0:
scala> rdd2.aggregate(0)(max(_,_),_+_)
res6: Int = 7
複製代碼
scala> rdd2.aggregate(100)(max(_,_),_+_)
res8: Int = 300
```
複製代碼
若是是求和,注意初始值是0:
scala> rdd2.aggregate(0)(_+_,_+_)
res9: Int = 15
複製代碼
scala> rdd2.aggregate(10)(_+_,_+_)
res10: Int = 45
複製代碼
e.g. —— 字符串:
val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
複製代碼
[partID:0, val: a], [partID:0, val: b], [partID:0, val: c],
[partID:1, val: d], [partID:1, val: e], [partID:1, val: f]
複製代碼
e.g.:
val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
複製代碼
結果多是24
,也多是42
val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
複製代碼
結果是10
,也多是01
緣由:注意有個初始值""
,其長度0,而後0.toString變成字符串
val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
複製代碼
結果是11
,緣由同上。
準備數據:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
複製代碼
兩個分區中的元素:
e.g.:
scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res69: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
複製代碼
scala> pairRDD.aggregateByKey(0)(_+_, _ + _).collect
res71: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
複製代碼
scala> pairRDD.reduceByKey(_+_).collect
res73: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
複製代碼
def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
下面兩句話是等價的:
val rdd2 = rdd1.repartition(3)
val rdd3 = rdd1.coalesce(3,true) -> 若是是false,查看RDD的length依然是2
複製代碼
Tomcat的訪問日誌以下:
需求:找到訪問量最高的兩個網頁,要求顯示網頁名稱和訪問量
步驟分析:
代碼:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TomcatLogCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("TomcatLogCount")
val sc = new SparkContext(conf)
/*
* 讀入日誌並解析
*
* 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
* */
val rdd1 = sc.textFile(" ").map(
line => {
//解析字符串,獲得jsp的名字
//1. 解析兩個引號間的字符串
val index1 = line.indexOf("\"")
val index2 = line.lastIndexOf("\"")
//line1 = GET /MyDemoWeb/oracle.jsp HTTP/1.1
val line1 = line.substring(index1 + 1, index2)
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
//line2 = /MyDemoWeb/oracle.jsp
val line2 = line1.substring(index3 + 1, index4)
//獲得jsp的名字 oracle.jsp
val jspName = line2.substring(line2.lastIndexOf("/"))
(jspName, 1)
}
)
//統計每一個jsp的次數
val rdd2 = rdd1.reduceByKey(_+_)
//使用Value排序
val rdd3 = rdd2.sortBy(_._2, false)
//獲得次數最多的兩個jsp
rdd3.take(2).foreach(println)
sc.stop()
}
}
複製代碼
生成的分區文件
如:part-00000文件中的內容:只包含了web.jsp的訪問日誌
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.HashMap
object TomcatLogPartitioner {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("TomcatLogPartitioner")
val sc = new SparkContext(conf)
/*
* 讀入日誌並解析
*
* 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
* */
val rdd1 = sc.textFile(" ").map(
line => {
//解析字符串,獲得jsp的名字
//1. 解析兩個引號間的字符串
val index1 = line.indexOf("\"")
val index2 = line.lastIndexOf("\"")
//line1 = GET /MyDemoWeb/oracle.jsp HTTP/1.1
val line1 = line.substring(index1 + 1, index2)
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
//line2 = /MyDemoWeb/oracle.jsp
val line2 = line1.substring(index3 + 1, index4)
//獲得jsp的名字 oracle.jsp
val jspName = line2.substring(line2.lastIndexOf("/"))
(jspName, line)
}
)
//獲得不重複的jsp名字
val rdd2 = rdd1.map(_._1).distinct().collect()
//建立分區規則
val wepPartitioner = new WepPartitioner(rdd2)
val rdd3 = rdd1.partitionBy(wepPartitioner)
//輸出rdd3
rdd3.saveAsTextFile(" ")
}
//定義分區規則
class WepPartitioner(jspList : Array[String]) extends Partitioner {
/*
* 定義集合來保存分區條件:
* String 表明jsp的名字
* Int 表明序號
* */
val partitionMap = new HashMap[String, Int]()
//初始分區號
val partID = 0
//填值
for (jsp <- jspList) {
patitionMap.put(jsp, partID)
partID += 1
}
//返回分區個數
def numPartitioners : Int = partitionMap.size
//根據jsp,返回對應的分區
def getPartition(key : Any) : Int = partitionMap.getOrElse(key.toString(), 0)
}
}
複製代碼
JdbcRDD參數說明:
從上面的參數說明能夠看出,JdbcRDD有如下兩個缺點:
代碼:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement
/*
* 把Spark結果存放到mysql數據庫中
*
*/
object TomcatLogCountToMysql {
def main(args: Array[String]): Unit = {
//建立SparkContext
val conf = new SparkConf().setMaster("local").setAppName("MyTomcatLogCountToMysql")
val sc = new SparkContext(conf)
/*
*
* 讀入日誌 解析:
*
* 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
*/
val rdd1 = sc.textFile("H:\\tmp_files\\localhost_access_log.txt")
.map(
line => {
//解析字符串,獲得jsp的名字
//一、解析兩個引號之間的字符串
val index1 = line.indexOf("\"")
val index2 = line.lastIndexOf("\"")
val line1 = line.substring(index1 + 1, index2) // GET /MyDemoWeb/oracle.jsp HTTP/1.1
//獲得兩個空格的位置
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3 + 1, index4) // /MyDemoWeb/oracle.jsp
//獲得jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/")) // oracle.jsp
(jspName, 1)
})
//
// try {
// /*
// * create table mydata(jsname varchar(50),countNumber Int)
// *
// * foreach 沒有返回值,在本需求中,只須要寫數據庫,不須要返回新的RDD,因此用foreach便可
// *
// *
// * 運行 Task not serializable
// */
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
// pst = conn.prepareStatement("insert into mydata values(?,?)")
//
// rdd1.foreach(f => {
// pst.setString(1, f._1)
// pst.setInt(2, f._2)
//
// pst.executeUpdate()
// })
// } catch {
// case t: Throwable => t.printStackTrace()
// } finally {
// if (pst != null) pst.close()
// if (conn != null) conn.close()
// }
//
// sc.stop()
// //存入數據庫
// var conn: Connection = null
// var pst: PreparedStatement = null
// //第一種修改方法
// /*
// * 修改思路:
// * conn pst 讓每個節點都是用到,須要在不一樣的節點上傳輸,實現sericalizable接口
// */
// try {
// rdd1.foreach(f => {
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
// pst = conn.prepareStatement("insert into mydata values(?,?)")
//
// pst.setString(1, f._1)
// pst.setInt(2, f._2)
//
// pst.executeUpdate()
// })
// } catch {
// case t: Throwable => t.printStackTrace()
// } finally {
// if (pst != null) pst.close()
// if (conn != null) conn.close()
// }
//
// sc.stop()
/*
* 第一種修改方式,功能上能夠實現,但每條數據都會建立鏈接,對數據庫形成很大壓力
*
* 針對分區來操做:一個分區,創建一個鏈接便可
*/
rdd1.foreachPartition(saveToMysql)
sc.stop()
}
def saveToMysql(it: Iterator[(String, Int)]) = {
var conn: Connection = null
var pst: PreparedStatement = null
try {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
pst = conn.prepareStatement("insert into mydata values(?,?)")
it.foreach(f => {
pst.setString(1, f._1)
pst.setInt(2, f._2)
pst.executeUpdate()
})
} catch {
case t: Throwable => t.printStackTrace()
} finally {
if (pst != null) pst.close()
if (conn != null) conn.close()
}
}
}
複製代碼
<1>. DataFrame
DataFrame是組織成命名列的數據集。它在概念上等同於關係數據庫中的表,但在底層具備更豐富的優化。DataFrames能夠從各類來源構建,
例如:
DataFrame API支持的語言有Scala,Java,Python和R。
從上圖能夠看出,DataFrame多了數據的結構信息,即schema。RDD是分佈式的 Java對象的集合。DataFrame是分佈式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子之外,更重要的特色是提高執行效率、減小數據讀取以及執行計劃的優化。
<2>. Datasets
<1>. 建立 DataFrames
case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
val lines = sc.textFile("/XXXX/emp.csv").map(_.split(","))
val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
val df1 = allEmp.toDF
df1.show
import org.apache.spark.sql.types._
,import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val myschema = StructType(
List(
StructField("empno",DataTypes.IntegerType),
StructField("ename",DataTypes.StringType),
StructField("job",DataTypes.StringType),
StructField("mgr",DataTypes.IntegerType),
StructField("hiredate",DataTypes.StringType),
StructField("sal",DataTypes.IntegerType),
StructField("comm",DataTypes.IntegerType),
StructField("deptno",DataTypes.IntegerType),
))
val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
import org.apache.spark.sql.Row
val df2 = spark.createDataFrame(allEmp,myschema)
複製代碼
val df3 = spark.read 讀文件,默認是Parquet文件
val df3 = spark.read.json("/XXXX/people.json") 讀json文件
df3.show
val df4 = spark.read.format("json").load("/XXXX/people.json")
複製代碼
<2>. DataFrame 操做
DataFrame操做也稱爲無類型的Dataset操做
a. DSL語句
b. SQL語句
df.createOrReplaceTempView("emp")
spark.sql("select * from emp").show
spark.sql("select * from emp where deptno=10").show
spark.sql("select deptno,sum(sal) from emp group by deptno").show
<1>. 普通視圖(本地視圖)——createOrReplaceTempView
<2>. 全局視圖: ——createGlobalTempView
e.g.: ``` 建立一個新session,讀取不到emp視圖 spark.newSession.sql("select * from emp")
如下兩種方式都可讀到 全局視圖 中的數據:
df1.createGlobalTempView("emp1")
spark.newSession.sql("select * from global_temp.emp1").show
spark.sql("select * from global_temp.emp1").show
複製代碼
複製代碼
case class MyData(a:Int,b:String)
val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
ds.show
case class Person(name: String, gender: String)
val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""" :: Nil))
df.as[Person].show
df.as[Person].collect
val linesDS = spark.read.text("hdfs://XXXX:9000/XXXX/data.txt").as[String]
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
words.show
words.collect
複製代碼
val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count
result.show
排序:result.orderBy($"value").show
複製代碼
val empDF = spark.read.json("/XXXX/emp.json")
查詢工資大於3000的員工
empDF.where($"sal" >= 3000).show
複製代碼
case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)
val empDS = empDF.as[Emp]
查詢工資大於3000的員工
empDS.filter(_.sal > 3000).show
查看10號部門的員工
empDS.filter(_.deptno == 10).show
複製代碼
val deptRDD=sc.textFile("/XXXX/dept.csv").map(_.split(","))
case class Dept(deptno:Int,dname:String,loc:String)
val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
複製代碼
複製代碼
case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
val empRDD = sc.textFile("/XXXX/emp.csv").map(_.split(","))
val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS
複製代碼
val result = deptDS.join(empDS,"deptno")
另外一種寫法:注意有三個等號
val result = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))
joinWith和join的區別是鏈接後的新Dataset的schema會不同
複製代碼
result.explain
讀取 users.parquet 文件(Spark自帶的示例文件)
val userDF = spark.read.load("/root/users.parquet")
查看結構:
userDF.printSchema
查看內容:
userDF.show
讀取json文件:
val userDF = spark.read.load("/root/emp.json") ——>報錯
正確方法:
val userDF = spark.read.format("json").load("/root/emp.json")
val userDF = spark.read.json("/root/emp.json")
保存parquet文件到本地路徑:
userDF.select($"name",$"favorite_color").write.save("/root/parquet")
讀取剛寫入的文件:
val userDF1 = spark.read.load("/root/parquet/part-00000-888d505a-7d51-4a50-aaf5-2bbdb56e67a1.snappy.parquet") --> 不推薦
生產:(直接讀目錄)
val userDF2 = spark.read.load("/usr/local/tmp_files/parquet")
複製代碼
調用save函數的時候,能夠指定存儲模式,追加、覆蓋等等
能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表:
userDF2.write.save("/root/parquet") ——>報錯
save的時候覆蓋:
userDF2.write.mode("overwrite").save("/root/parquet")
將結果保存成表:
userDF2.select($"name").write.saveAsTable("table1")
查看數據:
spark.sql("select * from table2").show
也能夠進行分區、分桶等操做:partitionBy、bucketBy
複製代碼
讀入json格式的數據,將其轉換成parquet格式,並建立相應的表來使用SQL進行查詢。(把數據讀進來,再寫出去,就是Parquet文件)
讀入文件:
val empDF = spark.read.json("/root/emp.json")
寫出文件:
empDF.write.mode("overwrite").save("/root/parquet")
empDF.write.mode("overwrite").parquet("/root/parquet")
建表查詢:
val emp1 = spark.read.parquet("/root/parquet")
emp1.createOrReplaceTempView("emp1")
spark.sql("select * from emp1").show
複製代碼
經過RDD來建立DataFrame:
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF("single","double") ——>"single","double" 是表結構
df1.show
df1.write.mode("overwrite").save("/root/test_table/key=1")
val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF("single","triple")
df2.show
df2.write.mode("overwrite").save("/root/test_table/key=2")
合併兩個部分:
val df3 = spark.read.parquet("/root/tmp_files/test_table")
val df3 = spark.read.option("mergeSchema",true).parquet("/root/tmp_files/test_table")
複製代碼
讀取Json文件,生成DataFrame:
val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
打印Schema結構信息:
peopleDF.printSchema
建立臨時視圖:
peopleDF.createOrReplaceTempView("peopleView")
執行查詢:
spark.sql("select * from peopleView").show
Spark SQL 支持統一的訪問接口。對於不一樣的數據源,讀取進來,生成DataFrame後,操做徹底同樣。
複製代碼
spark-shell --master spark://XXXX:7077 --jars /XXXX/.jar --driver-class-path /XXXX/.jar
val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://XXXX:3306/company?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","123456").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","emp").load
mysqlDF.show```
複製代碼
import java.util.Properties
val mysqlProps = new Properties()
mysqlProps.setProperty("user","root")
mysqlProps.setProperty("password","123456")
val mysqlDF1 = spark.read.jdbc("jdbc:mysql://XXXX:3306/company?serverTimezone=UTC&characterEncoding=utf-8","emp",mysqlProps)
mysqlDF1.show ```
複製代碼
spark.sql("create table spark.emp1(empno Int,ename String,job String,mgr String,hiredate String,sal Int,comm String,deptno Int)row format delimited fields terminated by ','")
spark.sql("load data local inpath '/root/emp.csv' overwrite into table spark.emp1")
spark.sql("select * from spark.emp1").show
package Demo
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.log4j.Logger
import org.apache.log4j.Level
/*建立DataFrame StructType方式*/
object Demo01 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// 建立Spark Session對象
val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()
// 從指定的地址建立RDD對象
/*1 Tom 12
*2 Mary 13
*3 Lily 15
* */
val personRDD = spark.sparkContext.textFile("/Users/apple/Documents/student.txt").map(_.split("\t"))
// 經過StructType方式指定Schema
val schema = StructType(
List(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("age", IntegerType)))
// 將RDD映射到rowRDD上,映射到Schema上
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1), p(2).toInt))
val personDataFrame = spark.createDataFrame(rowRDD, schema)
// 註冊視圖
personDataFrame.createOrReplaceTempView("t_person")
//執行SQL語句
val df = spark.sql("select * from t_person order by age desc")
df.show()
spark.stop()
}
複製代碼
複製代碼
package Demo
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.log4j.Logger
import org.apache.log4j.Level
/*使用case Class來建立DataFrame*/
object Demo02 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//建立Spark Session對象
val spark = SparkSession.builder().master("local").appName("Demo2").getOrCreate()
//從指定的地址建立RDD對象
val lineRDD = spark.sparkContext.textFile("").map(_.split("\t"))
//把數據與case class作匹配
val studentRDD = lineRDD.map(x => Student(x(0).toInt, x(1), x(2).toInt))
//生成DataFrame
import spark.sqlContext.implicits._
val studentDF = studentRDD.toDF
//註冊視圖 執行SQL
studentDF.createOrReplaceTempView("student")
spark.sql("select * from student").show
spark.stop()
}
}
//定義case class
case class Student(stuId: Int, stuName: String, stuAge: Int)
複製代碼
package Demo
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.Properties
/*寫入MySQL*/
object Demo03 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//建立Spark Session對象
val spark = SparkSession.builder().master("local").appName("Demo3").getOrCreate()
//從指定的地址建立RDD對象
val lineRDD = spark.sparkContext.textFile("").map(_.split("\t"))
//經過StructType方式指定Schema
val schema = StructType(
List(
//字段與MySQL表中字段對應一致
StructField("personID", IntegerType),
StructField("personName", StringType),
StructField("personAge", IntegerType)))
//將RDD映射到rowRDD上,映射到Schema上
val rowRDD = lineRDD.map(p => Row(p(0).toInt,p(1),p(2).toInt))
val personDataFrame = spark.createDataFrame(rowRDD, schema)
personDataFrame.createOrReplaceTempView("myperson")
val result = spark.sql("select * from myperson")
result.show()
//把結果存入到mysql中
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123456")
//append追加模式
result.write.mode("append").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)
spark.stop()
}
}
複製代碼
package Demo
import org.apache.spark.sql.SparkSession
import java.util.Properties
/*使用Spark SQL 讀取Hive中的數據,將計算結果存入mysql*/
//命令:./bin/spark-submit --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --class day0410.Demo4 /usr/local/tmp_files/Demo4.jar
object Demo4 {
def main(args: Array[String]): Unit = {
//建立SparkSession
val spark = SparkSession.builder().appName("Demo4").enableHiveSupport().getOrCreate()
//執行SQL
val result = spark.sql("select deptno,count(1) from company.emp group by deptno")
//將結果保存到mysql中
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123456")
result.write.mode("append").jdbc("jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "emp_stat", props)
spark.stop()
}
}
複製代碼
<1>. 在內存中緩存數據
操做mysql,啓動spark shell 時,須要:
./bin/spark-shell --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar
val mysqlDF = spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","emp").load
mysqlDF.show
mysqlDF.createOrReplaceTempView("emp")
spark.sqlContext.cacheTable("emp") ----> 標識這張表能夠被緩存,數據尚未真正被緩存
spark.sql("select * from emp").show ----> 依然讀取mysql
spark.sql("select * from emp").show ----> 從緩存中讀取數據
spark.sqlContext.clearCache
清空緩存後,執行查詢,會觸發查詢mysql數據庫。
複製代碼
<2>. 性能優化相關參數
a. 將數據緩存到內存中的相關優化參數:
spark.sql.inMemoryColumnarStorage.compressed
spark.sql.inMemoryColumnarStorage.batchSize
b. 其餘性能相關的配置選項(不過不推薦手動修改,可能在後續版本自動的自適應修改)
yum install nc.x86_64
)nc -l 1234
(-l
監聽模式;1234
端口號)run-example streaming.NetworkWordCount localhost 1234
/**
*
* @ClassName: MyNetworkWordCount
* @Description
* @Author: YBCarry
* @Date2019-05-13 20:49
* @Version: V1.0
*
**/
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.internal.Logging
/*
* 自寫流式計算程序
*
* 知識點:
* 一、建立一個StreamingContext對象 --> 核心:建立一個DStream
* 二、DStream的表現形式:就是一個RDD
* 三、使用DStream把連續的數據流變成不連續的RDD
*
* spark Streaming 最核心的內容
*/
object MyNetworkWordCount {
def main(args: Array[String]): Unit = {
//建立一個Streaming Context對象
//local[2] 表示開啓了兩個線程
val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(3))
//建立DStream 從netcat服務器上接收數據
val lines = ssc.socketTextStream("172.16.194.128", 1234, StorageLevel.MEMORY_ONLY)
//lines中包含了netcat服務器發送過來的數據
//分詞操做
val words = lines.flatMap(_.split(" "))
//計數
val wordPair = words.transform(x => x.map(x => (x, 1)))
//打印結果
wordPair.print()
//啓動StreamingContext 進行計算
ssc.start()
//等待任務結束
ssc.awaitTermination()
}
}
複製代碼
transform(func)
updateStateByKey(func)
重寫NetworkWordCount程序,累計每一個單詞出現的頻率(注意:累計)
package test.Network
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
* @ClassName: TotalNetworkWordCount
* @Description: 實現累加操做
* @Author: YBCarry
* @Date2019-05-15 16:05
* @Version: V1.0
*
**/
object TotalNetworkWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//建立一個Streaming Context對象
//local[2] 表示開啓了兩個線程
val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(3))
//設置檢查點目錄,保存以前都的狀態信息
ssc.checkpoint("")
//建立DStream
val lines = ssc.socketTextStream("bigdata01", 1234, StorageLevel.MEMORY_ONLY)
//分割
val words = lines.flatMap(_.split(" "))
//計數
// val wordPair = words.map((_, 1))
val wordPair = words.transform( x => x.map(x => (x, 1)))
//定義一個值函數 ;累加計數
/*
* 接收兩個參數
* currentValues —— 當前值
* previousValue ——歷史值
* */
val addFunc = (currentValues : Seq[Int], previousValues : Option[Int]) => {
//累加當前的序列
val currrentTotal = currentValues.sum
//累加歷史值
Some(currrentTotal + previousValues.getOrElse(0))
}
//累加運算
val total = wordPair.updateStateByKey(addFunc)
total.print()
ssc.start()
ssc.awaitTermination()
}
}
複製代碼
複製代碼
Spark Streaming還提供了窗口計算功能,容許在數據的滑動窗口上應用轉換操做。下圖說明了滑動窗口的工做方式:
如圖所示,每當窗口滑過originalDStream時,落在窗口內的源RDD被組合並被執行操做以產生windowed DStream的RDD。在上面的例子中,操做應用於最近3個時間單位的數據,並以2個時間單位滑動。這代表任何窗口操做都須要指定兩個參數。
e.g.: 假設對以前的單詞計數的示例進行擴展,每10秒鐘對過去30秒的數據進行wordcount。則在最近30秒的pairs DStream數據中對(word, 1)鍵值對應用reduceByKey操做。這是經過使用reduceByKeyAndWindow操做完成的。
package test.NetworkByWindow
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
* @ClassName: NetworkWordCountByWindow
* @Description: 每10秒讀取過去30秒的數據
* @Author: YBCarry
* @Date2019-05-15 17:00
* @Version: V1.0
*
**/
object NetworkWordCountByWindow {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//建立一個Streaming Context對象
//local[2] 表示開啓了兩個線程
val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(3))
//設置檢查點目錄,保存以前都的狀態信息
ssc.checkpoint("")
//建立DStream
val lines = ssc.socketTextStream("bigdata01", 1234, StorageLevel.MEMORY_ONLY)
//分割 每一個單詞計數
val words = lines.flatMap(_.split(" ")).map((_, 1))
/*
* 窗口操做
* 參數說明:要進行的操做 窗口的大小(30s) 窗口移動距離(12s) ——> 採樣時間(3)的整數倍
* */
val result = words.reduceByKeyAndWindow((x : Int, y : Int) => (x + y), Seconds(30), Seconds(12))
}
}
複製代碼
<1>. 文件流:經過監控文件系統的變化,如有新文件添加,則將它讀入並做爲數據流
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
* @ClassName: FileStreaming
* @Description
* @Author: YBCarry
* @Date2019-05-16 09:24
* @Version: V1.0
*
**/
object FileStreaming {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//建立一個Streaming Context對象
//local[2] 表示開啓了兩個線程
val conf = new SparkConf().setAppName("MyFileStreaming").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(10))
//監控目錄,讀取產生的新文件
val lines = ssc.textFileStream("\\Users\\apple\\學習\\SparkFiles")
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
複製代碼
<2>. RDD隊列流
package test.RDDQueue
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.Queue
/**
*
* @ClassName: RDDQueueStream
* @Description: RDD隊列流
* @Author: YBCarry
* @Date2019-05-16 10:48
* @Version: V1.0
*
**/
object RDDQueueStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//建立一個Streaming Context對象
//local[2] 表示開啓了兩個線程
val conf = new SparkConf().setAppName("MyRDDQueueStream").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(3))
//建立隊列 RDD[Int]
val rddQueue = new Queue[RDD[Int]]()
//向隊列裏添加數據 (建立數據源)
for (i <- 1 to 3) {
rddQueue += ssc.sparkContext.makeRDD(1 to 10)
//便於觀察
Thread.sleep(1000)
}
//從隊列中接收數據,建立DStream
val inputDStream = ssc.queueStream(rddQueue)
//處理數據
val result = inputDStream.map(x => (x, x * 2))
result.print()
ssc.start()
ssc.awaitTermination()
}
}
複製代碼
<3>. 套接字流:經過監聽Socket端口來接收數據
#定義agent名, source、channel、sink的名稱
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#具體定義source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /usr/local/tmp_files/logs
#具體定義channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
#具體定義sink
a4.sinks = k1
a4.sinks.k1.type = avro
a4.sinks.k1.channel = c1
a4.sinks.k1.hostname = bigdata01
a4.sinks.k1.port = 1234
#組裝source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
複製代碼
package test.Flume
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
* @ClassName: MyFlumeStream
* @Description: flume將數據推送給Spark Streaming 使用push
* @Author: YBCarry
* @Date2019-05-16 14:01
* @Version: V1.0
*
**/
object MyFlumeStream01 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//建立一個Streaming Context對象
//local[2] 表示開啓了兩個線程
val conf = new SparkConf().setAppName("MyRDDQueueStream").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(3))
//對接Flume
//建立一個Flumeevent從flume中接收puch來的數據(也是DStream)
//flume將數據push到localhost:1234,Spark Stream在這裏監聽
val flumeEventDStream = FlumeUtils.createStream(ssc, "bigdata01", 1234)
//將Flumeevent中的事件轉換成字符串
val lineDStream = flumeEventDStream.map(e => {
new String(e.event.getBody.array)
})
//輸出結果
lineDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
複製代碼
flume-ng agent -n a4 -f Spark/MyFlumeStream01.conf -c conf -Dflume.root.logger=INFO,console
**第一步:**Flume的配置文件
a1.channels = c1
a1.sinks = k1
a1.sources = r1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/tmp_files/logs
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = bigdata01
a1.sinks.k1.port = 1234
#組裝source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
複製代碼
**第二步:**Spark Streaming程序
複製代碼
package test.Flume
import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming.flume.FlumeUtils
/** *
**/ object FlumeLogPull {
def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //建立一個Streaming Context對象 //local[2] 表示開啓了兩個線程 val conf = new SparkConf().setAppName("FlumeLogPull").setMaster("local[2]") //Seconds(3) 表示採樣時間間隔 val ssc = new StreamingContext(conf,Seconds(3))
//建立FlumeEvent的DStream,採用pull的方式
val flumeEvent = FlumeUtils.createPollingStream(ssc, "172.16.194.128",1234, StorageLevel.MEMORY_ONLY)
//將FlumeEvent的事件準換成字符串
val lineDStream = flumeEvent.map( e => {
new String(e.event.getBody.array)
})
//輸出結果
lineDStream.print()
ssc.start()
ssc.awaitTermination()
}
複製代碼
}
複製代碼
**第三步:**須要的jar包
spark-streaming-flume-sink_2.11-2.1.0.jar
拷貝到Flume的lib目錄下。**第四步:**測試
double wucha = 1.0
while ( wucha >= 0.00001 ) {
建模 wucha -= 某個值
}
模型計算完畢
複製代碼