5.3 RDD編程---數據讀寫

1、文件數據讀寫

1.本地文件系統的數據讀寫

能夠採用多種方式建立Pair RDD,其中一種主要方式是使用map()函數來實現java

惰性機制,即便輸入了錯誤的語句spark-shell也不會立刻報錯。python

(1)讀

給出路徑名稱,TextFile會把路徑下面的全部文件都讀進來,生成一個RDD shell

(2)寫

當只有一個分區時,單線程纔會出現part-0000apache

若是分了兩個分區,寫完以後會生成part-0000和part-0001編程

2.分佈式文件系統HDFS的數據讀寫

(1)讀

(2)寫

3.JSON文件的數據讀寫

JSON(JavaScript Object Notation) 是一種輕量級的數據交換格式json

(1)讀

Scala中有一個自帶的JSON庫——scala.util.parsing.json.JSON,能夠實現對JSON數據的解析數組

JSON.parseFull(jsonString:String)函數,以一個JSON字符串做爲輸入並進行解析,若是解析成功則返回一個Some(map: Map[String, Any]),若是解析失敗則返回None。分佈式

將整個應用程序打包成 JAR包,經過 spark-submit 運行程序函數

 執行後能夠在屏幕上的大量輸出信息中找到以下結果:oop

2、讀寫HBase數據

1.HBase簡介

HBase是Google BigTable的開源實現

  1. HBase是一個稀疏、多維度、排序的映射表,這張表的索引是行鍵、列族、列限定符和時間戳;
  2. 每一個值是一個未經解釋的字符串,沒有數據類型;
  3. 用戶在表中存儲數據,每一行都有一個可排序的行鍵和任意多的列;
  4. 表在水平方向由一個或者多個列族組成,一個列族中能夠包含任意多個列,同一個列族裏面的數據存儲在一塊兒;
  5. 列族支持動態擴展,能夠很輕鬆地添加一個列族或列,無需預先定義列的數量以及類型,全部列均以字符串形式存儲,用戶須要自行進行數據類型轉換;
  6. HBase中執行更新操做時,並不會刪除數據舊的版本,而是生成一個新的版本,舊有的版本仍然保留(這是和HDFS只容許追加不容許修改的特性相關的)

  1. 表:HBase採用表來組織數據,表由行和列組成,列劃分爲若干個列族
  2. 行:每一個HBase表都由若干行組成,每一個行由行鍵(row key)來標識。
  3. 列族:一個HBase表被分組成許多「列族」(Column Family)的集合,它是基本的訪問控制單元
  4. 列限定符:列族裏的數據經過列限定符(或列)來定位
  5. 單元格:在HBase表中,經過行、列族和列限定符肯定一個「單元格」(cell),單元格中存儲的數據沒有數據類型,總被視爲字節數組byte[]
  6. 時間戳:每一個單元格都保存着同一份數據的多個版本,這些版本採用時間戳進行索引

 

先切水平(學號1-10000,10001-20000,...),再切豎直(按列族分),每一種顏色獲得小方格內都是一個分區,一個分區就是屬於負載分發的基本單位,這樣就能夠實現分佈式存儲。有個元數據表能夠存儲每一個分區對應的位置。

2.建立一個HBase表

第一步:安裝配置HBase,將HBase安裝成僞分佈式

第二步:啓動Hadoop和HBase

HBase也有shell環境,能夠輸入相似SQL的語句

3.配置Spark

把HBase的lib目錄下的一些jar文件拷貝到Spark中,這些都是編程時須要引入的jar包,須要拷貝的jar文件包括:全部hbase開頭的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar

4.編寫程序讀取HBase數據

若是要讓Spark讀取HBase,就須要使用SparkContext提供的newAPIHadoopRDD這個API將表的內容以RDD的形式加載到Spark中。

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

// 在SparkOperateHBase.scala文件中輸入如下代碼:

object SparkOperateHBase {
def main(args: Array[String]) {
    val conf = HBaseConfiguration.create()
    val sc = new SparkContext(new SparkConf())
    //設置查詢的表名
    conf.set(TableInputFormat.INPUT_TABLE, "student")
    val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])
    val count = stuRDD.count()
    println("Students RDD Count:" + count)
    stuRDD.cache()
    //遍歷輸出
    stuRDD.foreach({ case (_,result) =>
        val key = Bytes.toString(result.getRow)
        val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
        val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
        val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
        println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
    })
}
}

 

5.編寫程序向HBase寫入數據

下面編寫應用程序把表中的兩個學生信息插入到HBase的student表中

在SparkWriteHBase.scala文件中輸入下面代碼:

import org.apache.hadoop.hbase.HBaseConfiguration  
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
import org.apache.spark._  
import org.apache.hadoop.mapreduce.Job  
import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
import org.apache.hadoop.hbase.client.Result  
import org.apache.hadoop.hbase.client.Put  
import org.apache.hadoop.hbase.util.Bytes

object SparkWriteHBase {  
  def main(args: Array[String]): Unit = {  
    val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")  
    val sc = new SparkContext(sparkConf)        
    val tablename = "student"        
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
    val job = new Job(sc.hadoopConfiguration)  
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
    job.setOutputValueClass(classOf[Result])    
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    
    val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //構建兩行記錄
    val rdd = indataRDD.map(_.split(',')).map{arr=>{  
      val put = new Put(Bytes.toBytes(arr(0))) //行健的值 
      put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  //info:name列的值
      put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))  //info:gender列的值
      put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))  //info:age列的值
      (new ImmutableBytesWritable, put)   
    }}        
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
  }    
} 

切換到HBase Shell中,執行以下命令查看student表 

相關文章
相關標籤/搜索