能夠採用多種方式建立Pair RDD,其中一種主要方式是使用map()函數來實現java
惰性機制,即便輸入了錯誤的語句spark-shell也不會立刻報錯。python
給出路徑名稱,TextFile會把路徑下面的全部文件都讀進來,生成一個RDD shell
當只有一個分區時,單線程纔會出現part-0000apache
若是分了兩個分區,寫完以後會生成part-0000和part-0001編程
JSON(JavaScript Object Notation) 是一種輕量級的數據交換格式json
Scala中有一個自帶的JSON庫——scala.util.parsing.json.JSON,能夠實現對JSON數據的解析數組
JSON.parseFull(jsonString:String)函數,以一個JSON字符串做爲輸入並進行解析,若是解析成功則返回一個Some(map: Map[String, Any]),若是解析失敗則返回None。分佈式
將整個應用程序打包成 JAR包,經過 spark-submit 運行程序函數
執行後能夠在屏幕上的大量輸出信息中找到以下結果:oop
HBase是Google BigTable的開源實現
先切水平(學號1-10000,10001-20000,...),再切豎直(按列族分),每一種顏色獲得小方格內都是一個分區,一個分區就是屬於負載分發的基本單位,這樣就能夠實現分佈式存儲。有個元數據表能夠存儲每一個分區對應的位置。
第一步:安裝配置HBase,將HBase安裝成僞分佈式
第二步:啓動Hadoop和HBase
HBase也有shell環境,能夠輸入相似SQL的語句
把HBase的lib目錄下的一些jar文件拷貝到Spark中,這些都是編程時須要引入的jar包,須要拷貝的jar文件包括:全部hbase開頭的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar
若是要讓Spark讀取HBase,就須要使用SparkContext提供的newAPIHadoopRDD這個API將表的內容以RDD的形式加載到Spark中。
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf // 在SparkOperateHBase.scala文件中輸入如下代碼: object SparkOperateHBase { def main(args: Array[String]) { val conf = HBaseConfiguration.create() val sc = new SparkContext(new SparkConf()) //設置查詢的表名 conf.set(TableInputFormat.INPUT_TABLE, "student") val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = stuRDD.count() println("Students RDD Count:" + count) stuRDD.cache() //遍歷輸出 stuRDD.foreach({ case (_,result) => val key = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes)) val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes)) val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes)) println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age) }) } }
下面編寫應用程序把表中的兩個學生信息插入到HBase的student表中
在SparkWriteHBase.scala文件中輸入下面代碼:
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.spark._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes object SparkWriteHBase { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local") val sc = new SparkContext(sparkConf) val tablename = "student" sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename) val job = new Job(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //構建兩行記錄 val rdd = indataRDD.map(_.split(',')).map{arr=>{ val put = new Put(Bytes.toBytes(arr(0))) //行健的值 put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) //info:name列的值 put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2))) //info:gender列的值 put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt)) //info:age列的值 (new ImmutableBytesWritable, put) }} rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) } }
切換到HBase Shell中,執行以下命令查看student表