0.咱們有這樣一個表,表名爲Studentjava
1.在Hbase中建立一個表sql
代表爲student,列族爲infoapache
2.插入數據編程
咱們這裏採用put來插入數據ide
格式以下 put ‘表命’,‘行鍵’,‘列族:列’,‘值’ oop
咱們知道Hbase 四個鍵肯定一個值,spa
通常查詢的時候咱們須要提供 表名、行鍵、列族:列名、時間戳纔會有一個肯定的值。命令行
可是這裏插入的時候,時間戳自動被生成,咱們並不用額外操做。3d
咱們不用表的時候能夠這樣刪除orm
注意,必定要先disable 再drop,不能像RDMS同樣直接drop
3.配置spark
咱們須要把Hbase的lib目錄下的一些jar文件拷貝到Spark中,這些都是編程中須要引進的jar包。
須要拷貝的jar包包括:全部hbase開頭的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar
咱們將文件拷貝到Spark目錄下的jars文件中
4.編寫程序
(1)讀取數據
咱們程序中須要的jar包以下
咱們這裏使用Maven來導入相關jar包
咱們須要導入hadoop和spark相關的jar包
spark方面須要導入的依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
hadoop方面須要導入的依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
hbase方面須要導入的依賴
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
咱們使用的org.apache.hadoop.hbase.mapreduce是經過hbase-server導入的。
具體的程序以下
import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} object SparkOperateHbase{ def main(args:Array[String]): Unit ={ //創建Hbase的鏈接 val conf = HBaseConfiguration.create(); //設置查詢的表名student conf.set(TableInputFormat.INPUT_TABLE,"student") //經過SparkContext將student表中數據建立一個rdd val sc = new SparkContext(new SparkConf()); val stuRdd = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]); stuRdd.cache();//持久化 //計算數據條數 val count = stuRdd.count(); println("Student rdd count:"+count); //遍歷輸出 //當咱們創建Rdd的時候,前邊所有是參數信息,後邊的result纔是保存數據的數據集 stuRdd.foreach({case (_,result) => //經過result.getRow來獲取行鍵 val key = Bytes.toString(result.getRow); //經過result.getValue("列族","列名")來獲取值 //注意這裏須要使用getBytes將字符流轉化成字節流 val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes)); val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes)); val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes)); //打印結果 println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age); }); } }
(2)存入數據
import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} object HbasePut{ def main(args:Array[String]): Unit = { //創建sparkcontext val sparkConf = new SparkConf().setAppName("HbasePut").setMaster("local") val sc = new SparkContext(sparkConf) //與hbase的student表創建鏈接 val tableName = "student" sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,tableName) //創建任務job val job = new Job(sc.hadoopConfiguration) //配置job參數 job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) //要插入的數據,這裏的makeRDD是parallelize的擴展版 val indataRdd = sc.makeRDD(Array("3,zhang,M,26","4,yue,M,27")) val rdd = indataRdd.map(_.split(",")).map(arr=>{ val put = new Put(Bytes.toBytes(arr(0))) //行鍵的值 //依次給列族info的列添加值 put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2))) put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3))) //必須有這兩個返回值,put爲要傳入的數據 (new ImmutableBytesWritable,put) }) rdd.saveAsNewAPIHadoopDataset(job.getConfiguration) } }
5.Maven打包
咱們用命令行打開到項目的根目錄,輸入mvn clean package -DskipTests=true
打包成功後咱們到項目目錄下的target文件下就會找到相應的jar包
6.提交任務