RDD持久化 --------------- memory disk off-heap serial replication Memory_ONLY(true , false ,false , true ,1) 廣播變量 --------------- driver端切成小塊,存放到blockmanager,executor廣播變量 的小塊,首先從本身的blockmgr中提取,若是提取不到,在從其餘 節點(driver + executor)提取,一旦提取到存放在本身的blockmgr。 RDD + dep 附加在task中。 scala的lazy延遲計算機制。 累加器 ---------------- 只能累加,只能在driver讀取value,executor不能讀取。 不能在map中調用。累計器的update只應該在action中執行, 自定義累加器,實現氣溫雙聚合 ----------------------------- import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer /** * 測試累加器 */ object AccTestScala { //自定義累加器 class MyAcc extends AccumulatorV2[Int,(Int,Int)]{ //最高氣溫 var max: Int = Int.MinValue //最低氣溫 var min: Int = Int.MaxValue //判斷是不是初始值 override def isZero: Boolean = { max == Int.MinValue && min == Int.MaxValue } override def copy(): AccumulatorV2[Int, (Int, Int)] = { val copy = new MyAcc() copy.max = max copy.min = min copy } override def reset(): Unit = { max= Int.MinValue //最低氣溫 min = Int.MaxValue } override def add(v: Int): Unit = { max = math.max(max, v) min = math.min(min, v) } override def merge(other: AccumulatorV2[Int, (Int, Int)]): Unit = { max = math.max(max, other.value._1) min = math.min(min, other.value._2) } override def value: (Int, Int) = { (max,min) } } def main(args: Array[String]): Unit = { //1.建立spark配置對象 val conf = new SparkConf() conf.setAppName("wcApp") conf.setMaster("local[4]") val sc = new SparkContext(conf) val acc = new MyAcc() sc.register(acc , "myacc") val rdd1 = sc.textFile("file:///d:/mr/temp.dat") val rdd2 = rdd1.map(line=>{ val arr = line.split(" ") arr(1).toInt }) rdd2.foreach(temp=>{ acc.add(temp) }) println(acc.value) } } Spark模塊 ------------------ 1.core RDD job sparkSQL spark Streaming spark ml spark graphx Spark SQL模塊 ------------------ 0.介紹 DataFrame,數據框至關於表 DataFrame。 引入spark-sql依賴。 DataFrame是特殊的DataSet,DataSet[Row]. 0'.spark操縱hive出錯問題 1)不能實例化客戶端 a)緣由 版本不一致問題。 b)降級hive到hive1.2 c)複製hive jar到spark的jars下 d)或者關閉hive-site.xml schema版本檢查 <property> <name>hive.metastore.schema.verification</name> <value>false</value> </property> 1.集成hive 0.說明 spark sql操縱hive,使用spark做爲執行引擎。只是從hdfs上讀取hive的數據,放到spark上執行。 1.整合步驟 複製hive的hive-site.xml文件到spark conf目錄下。 複製mysql的驅動到spark/jars/下 2.啓動zk,hdfs。 3.啓動spark集羣 $>spark/sbin/start-all.sh 4.進入spark-shell $>spark-shell --master spark://s101:7077 $scala>spark.sql("select * from mydb.custs").show() 2.編程實現spark sql的hive訪問 2.1)scala版 a)複製hive-site.xml + core-site.xml + hdfs-site.xml到resources目錄下 b)添加maven支持 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.it18zhang</groupId> <artifactId>my-spark</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.24</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.17</version> </dependency> <!--*************************************************--> <!--****** 注意:必定要引入該依賴,不然hive很差使****--> <!--*************************************************--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies> </project> d)編程 val conf = new SparkConf() conf.setAppName("SparkSQLScala") conf.setMaster("local") conf.set("spark.sql.warehouse.dir", "hdfs://mycluster/user/hive/warehouse") //啓用hive支持 val sess = SparkSession.builder() .config(conf) .enableHiveSupport() //必定要啓用hive支持。 .getOrCreate() import sess._ sess.sql("select * from mydb.custs").show() 2.2)java版 import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * */ public class MySparkSQLJava { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("MySparkSQLJava"); conf.setMaster("local[*]") ; SparkSession sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate(); Dataset<Row> df = sess.sql("select * from mydb.custs") ; df.show(); } } 註冊RDD成爲DataFrame ---------------------- import org.apache.spark.sql.SparkSession /** */ object SparkSQLRDDScala { def main(args: Array[String]): Unit = { //建立spark Session val sess = SparkSession.builder().appName("sparksql").master("local").enableHiveSupport().getOrCreate() import sess.implicits._ val rdd1 = sess.sparkContext.textFile("file:///d:/mr/temp.dat") val rdd2 = rdd1.map(line=>{ val arr = line.split(" ") (arr(0).toInt, arr(1).toInt) }) val df1 = rdd2.toDF("year","temp") df1.createOrReplaceTempView("temps") val sql = "select year , max(temp) max ,min(temp) min from temps group by year order by year asc limit 200" sess.sql(sql).show(1000,false) } } Java版RDD和DataFrame之間轉換 ----------------------------- package com.oldboy.spark.java; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import scala.tools.nsc.typechecker.StructuredTypeStrings$class; /** * SparkSQL java操做 */ public class SparkSQRDDLava { public static void main(String[] args) { SparkSession sess = SparkSession.builder().appName("sparksql").master("local").enableHiveSupport().getOrCreate(); JavaSparkContext sc = new JavaSparkContext(sess.sparkContext()) ; JavaRDD<String> rdd1 = sc.textFile("file:///d:/mr/temp.dat"); //將RDD<Row>類型轉換成Dataset<Row> // JavaRDD<Row> rdd2 = rdd1.map(new Function<String, Row>() { // public Row call(String v1) throws Exception { // String[] arr = v1.split(" "); // return RowFactory.create(Integer.parseInt(arr[0]) , Integer.parseInt(arr[1])); // } // }) ; // // //建立結構體 // StructField[] fields = new StructField[2]; // fields[0] = new StructField("year", DataTypes.IntegerType, false, Metadata.empty()); // fields[1] = new StructField("temp", DataTypes.IntegerType, true, Metadata.empty()); // StructType type = new StructType(fields); // // Dataset<Row> df1 = sess.createDataFrame(rdd2 , type) ; // df1.createOrReplaceTempView("temps"); // sess.sql("select * from temps").show(); JavaRDD<TempData> rdd2 = rdd1.map(new Function<String, TempData>() { public TempData call(String v1) throws Exception { String[] arr = v1.split(" "); TempData data = new TempData() ; data.setYear(Integer.parseInt(arr[0])); data.setTemp(Integer.parseInt(arr[1])); return data; } }) ; Dataset<Row> df1 = sess.createDataFrame(rdd2 , TempData.class) ; df1.show(); System.out.println("=================================="); //數據框轉成RDD Dataset<Row> df2 = sess.sql("select * from big10.emp2"); JavaRDD<Row> rdd3 = df2.toJavaRDD(); JavaPairRDD<Integer, Float> rdd4 = rdd3.mapToPair(new PairFunction<Row, Integer, Float>() { public Tuple2<Integer, Float> call(Row row) throws Exception { int depno = row.getInt(row.fieldIndex("deptno")) ; float sal= row.getFloat(row.fieldIndex("salary")) ; return new Tuple2<Integer,Float>(depno , sal) ; } }) ; JavaPairRDD<Integer, Float> rdd5 = rdd4.reduceByKey(new Function2<Float, Float, Float>() { public Float call(Float v1, Float v2) throws Exception { return Math.max(v1, v2); } }) ; rdd5.foreach(new VoidFunction<Tuple2<Integer, Float>>() { public void call(Tuple2<Integer, Float> t) throws Exception { System.out.println(t); } }); ; } } spark打印數據結構 --------------------- df.printSchema() spark sql訪問json文件 ---------------------------- 1.建立json文件 [d:/java/custs.json] {"id":1,"name":"tom","age":12} {"id":2,"name":"tomas","age":13} {"id":3,"name":"tomasLee","age":14} {"id":4,"name":"tomson","age":15} {"id":5,"name":"tom2","age":16} 2.加載文件[scala] val conf = new SparkConf() conf.setAppName("SparkSQLScala") conf.setMaster("local") conf.set("spark.sql.warehouse.dir", "hdfs://mycluster/user/hive/warehouse") //啓用hive支持 val sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() val df = sess.read.json("file:///d:/java/custs.json") df.show(1000,false) 3.[java]版 Dataset<Row> df = spark.read().json("file:///d:/java/custs.json"); 4.保存成json df1.writer.json(path) ; spark sql訪問parquet文件 ---------------------------- //保存成parquet df1.writer.parquet(path) ; //讀取 spark.read.parquet(path) ; spark sql訪問jdbc文件 ---------------------------- //保存成parquet val prop = new java.util.Properties() prop.put("driver" , "com.mysql.jdbc.Driver") prop.put("user" , "root") prop.put("password" , "root") //表不須要存在 df1.writer.jdbc("jdbc:mysql://192.168.231.1:3306/big10" , "emp" ,prop ) ; //讀取 spark.read.jdbc("jdbc:mysql://192.168.231.1:3306/big10" , "emp" ,prop ) ; Spark sql DataFrame API編程 ---------------------------- DataFrame.select("id" , "name") DataFrame.select($"id" , $"name") DataFrame.where(" id > 3") DataFrame.groupBy("id").agg(max("age"),min("age")) ; ... spark臨時視圖 ---------------------------- 1.createOrReplaceTempView 生命週期僅限本session 2.createGlobalTempView 全局,跨session. Spark SQL做爲分佈式查詢引擎 ---------------------------- 1.描述 終端用戶/應用程序能夠直接同spark sql交互,而不須要寫其餘代碼。 2.啓動spark的thrift-server進程 spark/sbin/start-thrift-server --master spark://s101:7077 3.檢測 a)webui b)端口 netstat -anop|grep 10000 4.使用spark的beeline程序測試 $>spark/bin/beeline $beeline>!conn jdbc:hive2://s101:10000/mydb $beeline>select * from customers ; Spark Streaming ---------------------------- 持續計算,沒有中止。 不是實時計算,小批量計算。 體驗流計算 ----------------- 1.編寫流計算代碼 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2018/5/18. */ object SparkStreamingDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) //建立套接字文本流 val lines = ssc.socketTextStream("localhost", 9999) //壓扁 val words = lines.flatMap(_.split(" ")) val ds2 = words.map((_,1)) val ds3 = ds2.reduceByKey(_+_) ds3.print() //啓動上下文 ssc.start() ssc.awaitTermination() } } 2.啓動nc服務器 [win7] nc -l -L -p 9999 3.啓動scala程序 /resources/log4j.properties # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Set everything to be logged to the console log4j.rootCategory=warn, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Set the default spark-shell log level to WARN. When running the spark-shell, the # log level for this class is used to overwrite the root logger's log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=WARN # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=WARN log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR 4.在nc的終端輸入 hello world spark streaming java版 ---------------------------- package com.oldboy.spark.java; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * SparkStreaming java 版 */ public class SparkStreamingJava { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf() ; conf.setAppName("ssc") ; conf.setMaster("local[2]") ; //建立SparkStreaming上下文 JavaStreamingContext ssc = new JavaStreamingContext(conf , Durations.seconds(2)) ; //建立離散流 JavaDStream<String> ds1 = ssc.socketTextStream("s101" , 8888); //壓扁 JavaDStream<String> ds2 = ds1.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String s) throws Exception { String[] arr = s.split(" "); return Arrays.asList(arr).iterator(); } }) ; //標1成對 JavaPairDStream<String, Integer> ds3 = ds2.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }) ; //聚合 JavaPairDStream<String, Integer> ds4 = ds3.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }) ; // ds4.print(); ds4.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() { public void call(JavaPairRDD<String, Integer> rdd) throws Exception { System.out.println("--------------------"); List list = rdd.take(100); for(Object o : list){ System.out.println(o); } } }); ssc.start(); ssc.awaitTermination(); } } SparkStreaming注意事項 ------------------------- 1.上下文啓動後,不能再添加新的計算工做 2.上下文中止後,不能從新啓動。 3.在一個JVM中,只有一個上下文時活躍的。 4.中止Streaming上下文,能夠有選擇性的中止SparkContext. 5.SparkContext能夠重用建立多個Streaming,前提是上一個須要stop掉。 DStream離散流內部是連續的RDD,DStream的操做轉換成對RDD的操做。 離散流和接受者 ------------------------- socket文本流都和Receiver關聯,接受者從source接受數據,存放到spark內存用於計算。 源類型 1.基本源 內置 2.高級源 第三方支持 注意事項: 本地執行流計算,不能夠local == local[1] ,只有一個線程執行本地任務,須要有一個單獨的 線程運行接受者,沒有線程執行計算工做。 Socket sock = new Socket("localhost" , 8888) ; Spark Streaming API ---------------------- 1.StreamingContext spark流計算入口點,建立離散流,可使用多種方式建立。 start()/stop()/awaitTermination(); 2.SocketReceiver 建立Socket對象,孵化分線程,接受數據,存儲到內存中。 3.ReceiverTracker 接受者跟蹤器,管理接收器的執行。 4.JobSchduler 調度job在spark上執行,使用JobGenerator生成器生成job,並使用線程池執行他們。 使用spark sql實現taggen(scala版) --------------------------------- import org.apache.spark.SparkConf import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} /** */ object MySparkSQLScalaTaggen { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("SparkSQLScala") conf.setMaster("local") conf.set("spark.sql.warehouse.dir", "hdfs://mycluster/user/hive/warehouse") //啓用hive支持 val sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() //1.加載文件造成rdd val rdd1 = sess.sparkContext.textFile("file:///d:/temptags.txt") //2.變換過濾,去除無效行 val rdd2 = rdd1.map(_.split("\t")).filter(_.length > 1) //3.json解析 val rdd3 = rdd2.map(arr=>(arr(0) , JSONUtil.parseTag(arr(1)))) //4.過濾,去除空評論 val rdd4 = rdd3.filter(t=>t._2.size() > 0) // val rdd44 = rdd4.map(t=>{ val busid = t._1 val list = t._2 var arr = new Array[String](list.size()) var i:Int = 0 import scala.collection.JavaConversions._ for(x <- list){ arr(i) = x i +=1 } (busid , arr) }) //5.變換rdd成爲rdd[Row] val rdd5 = rdd44.map(t=>{ Row(t._1,t._2) }) //6.數據結構定義 val mytype = StructType(List( StructField("busid" , DataTypes.StringType,false) , StructField("tags" , DataTypes.createArrayType(DataTypes.StringType),false) )) //7.建立數據框 val df = sess.createDataFrame(rdd5, mytype) //8.註冊臨時表 df.createOrReplaceTempView("_tags") //9.炸開tags字段 //val df2 = sess.sql("select busid , explode(tags) tag from _tags") //OK //使用hive的橫向視圖完成炸裂數據的組合 val df2 = sess.sql("select busid , tag from _tags lateral view explode(tags) xx as tag") //10.註冊臨時表 df2.createOrReplaceTempView("_tags2") //11.統計每一個商家每條評論的個數. val sql1 = "select busid, tag , count(*) cnt from _tags2 group by busid , tag order by busid , cnt desc" ; //12.聚合每一個商家的全部評論,busid, List((tag,count),...,涉及子查詢 //val sql2 = "select t.busid , collect_list(struct(t.tag , t.cnt)) st from (" + sql1 + ") as t group by t.busid order by st[0].col2 desc " val sql2 = "select t.busid , collect_list(named_struct('tag' , t.tag , 'cnt' , t.cnt)) st from (" + sql1 + ") as t group by t.busid order by st[0].cnt desc " sess.sql(sql2).show(10000, false) // val sql2 = "select t.busid , collect_list(named_struct('tag' , t.tag , 'cnt' , t.cnt)) st from (" + sql1 + ") as t group by t.busid " // sess.sql(sql2).createOrReplaceTempView("_tags3") // //13.對全部商家按照評論個數的最大值進行倒排序 // val sql3 = "select * from _tags3 order by st[0].cnt desc" // sess.sql(sql3).show(10000,false) } }