【spark】鏈接Hbase

0.咱們有這樣一個表,表名爲Studentjava

1.在Hbase中建立一個表sql

代表爲student,列族爲infoapache

2.插入數據編程

咱們這裏採用put來插入數據ide

格式以下   put  ‘表命’,‘行鍵’,‘列族:列’,‘值’  oop

咱們知道Hbase 四個鍵肯定一個值,spa

通常查詢的時候咱們須要提供  表名、行鍵、列族:列名、時間戳纔會有一個肯定的值。命令行

可是這裏插入的時候,時間戳自動被生成,咱們並不用額外操做。3d

咱們不用表的時候能夠這樣刪除orm

注意,必定要先disable 再drop,不能像RDMS同樣直接drop

3.配置spark

咱們須要把Hbase的lib目錄下的一些jar文件拷貝到Spark中,這些都是編程中須要引進的jar包。

須要拷貝的jar包包括:全部hbase開頭的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar

咱們將文件拷貝到Spark目錄下的jars文件中

4.編寫程序

(1)讀取數據

咱們程序中須要的jar包以下

咱們這裏使用Maven來導入相關jar包

咱們須要導入hadoop和spark相關的jar包

spark方面須要導入的依賴

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

hadoop方面須要導入的依賴

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>

hbase方面須要導入的依賴

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>

咱們使用的org.apache.hadoop.hbase.mapreduce是經過hbase-server導入的。

具體的程序以下

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
object SparkOperateHbase{
    def main(args:Array[String]): Unit ={
        //創建Hbase的鏈接
        val conf = HBaseConfiguration.create();
        //設置查詢的表名student
        conf.set(TableInputFormat.INPUT_TABLE,"student")
        //經過SparkContext將student表中數據建立一個rdd
        val sc = new SparkContext(new SparkConf());
        val stuRdd = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],
            classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
            classOf[org.apache.hadoop.hbase.client.Result]);
        stuRdd.cache();//持久化
        //計算數據條數
        val count = stuRdd.count();
        println("Student rdd count:"+count);
        //遍歷輸出
        //當咱們創建Rdd的時候,前邊所有是參數信息,後邊的result纔是保存數據的數據集
        stuRdd.foreach({case (_,result) =>
            //經過result.getRow來獲取行鍵
            val key = Bytes.toString(result.getRow);
            //經過result.getValue("列族","列名")來獲取值
            //注意這裏須要使用getBytes將字符流轉化成字節流
            val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes));
            val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes));
            val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes));
            //打印結果
            println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age);
        });
    }
}

(2)存入數據

import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
object HbasePut{
    def main(args:Array[String]): Unit = {
        //創建sparkcontext
        val sparkConf = new SparkConf().setAppName("HbasePut").setMaster("local")
        val sc = new SparkContext(sparkConf)
        //與hbase的student表創建鏈接
        val tableName = "student"
        sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,tableName)
        //創建任務job
        val job = new Job(sc.hadoopConfiguration)
        //配置job參數
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Result])
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
        //要插入的數據,這裏的makeRDD是parallelize的擴展版
        val indataRdd = sc.makeRDD(Array("3,zhang,M,26","4,yue,M,27"))
        val rdd = indataRdd.map(_.split(",")).map(arr=>{
            val put = new Put(Bytes.toBytes(arr(0))) //行鍵的值
            //依次給列族info的列添加值
            put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
            put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))
            put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3)))
            //必須有這兩個返回值,put爲要傳入的數據
            (new ImmutableBytesWritable,put)
        })
        rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
    }
} 

5.Maven打包

咱們用命令行打開到項目的根目錄,輸入mvn clean package -DskipTests=true

打包成功後咱們到項目目錄下的target文件下就會找到相應的jar包

6.提交任務

相關文章
相關標籤/搜索