spark 讀取hbase

HBase api方式讀取hbasehtml

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.List;

/**
 * 經過HbaseApi獲取數據
 */
public class DataAchieveFromHbaseApi {
    public static void main(String[] args) throws IOException {
        //Hbase配置
        Configuration conf=HBaseConfiguration.create();
        conf.set("hbase.zookeeper.property.clientPort", "2181");//端口
        conf.set("hbase.zookeeper.quorum","hdh1,hdh2,hdh3");//hbase zookeeper地址
        //掃描配置
        Scan scan=new Scan();
        scan.addFamily(Bytes.toBytes("cf"));//列族,可添加多個
        //hbase表
        HTable hTable=new HTable(conf, Bytes.toBytes("test"));//代表
        //獲取掃描數據
        ResultScanner rs= hTable.getScanner(scan);
        //hbase表的列族信息
        HColumnDescriptor[] hColDes=hTable.getTableDescriptor().getColumnFamilies();
        for (HColumnDescriptor hColDe : hColDes) {
            System.out.println(Bytes.toString(hColDe.getName()));
        }
        //展現每一行的每一列(這個只有一列)信息
        for (Result r : rs) {
            byte [] bytes= r.getValue(Bytes.toBytes("cf"),Bytes.toBytes("SSID"));//列族和列名
            String str=new String(bytes,"UTF-8");
            if(null!=str&&str.trim().length()>0) {
                System.out.println(str.trim());
            }
        }
        System.out.println("end<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
    }
}

spark提供的接口讀取hbasejava

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.io.IOException;

/**
 * 經過hfile形式獲取數據
 */
public class DataAchieveFromHfile {
    private static JavaPairRDD<ImmutableBytesWritable, Result> rdd;

    public static void main(String[] args) throws IOException {
        Configuration conf= HBaseConfiguration.create();
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("hbase.zookeeper.quorum","hdh1,hdh2,hdh3");
        conf.set(TableInputFormat.INPUT_TABLE, "test");
        SparkConf conf1=new SparkConf().setAppName("test").setMaster("local");//設置spark app名稱和運行模式(此爲local模式)
        JavaSparkContext sc=new JavaSparkContext(conf1);
        //加載數據
        rdd=sc.newAPIHadoopRDD(conf,TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
        System.out.println("讀取數據條數:"+rdd.count());
        rdd.foreach(new VoidFunction<Tuple2<ImmutableBytesWritable, Result>>() {
            @Override
            public void call(Tuple2<ImmutableBytesWritable, Result> result) throws Exception {
                byte [] bytes= result._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("SSID"));//列族和列名
                String str= new String(bytes,"UTF-8");
                if(null!=str&&str.trim().length()>0) {
                    System.out.println(str.trim());
                }
            }
        });
    }
}

第二種方式,若是是直接讀取hbase表那麼就是掃描全表,若是讀取的是快照,那就不走regionserverapache

spark讀取快照:https://www.cnblogs.com/kwzblog/p/9007713.htmlapi

spark生成hfile寫入hbase:https://www.cnblogs.com/luckuan/p/5142203.htmlapp

https://blog.csdn.net/wl044090432/article/details/50821313ide

相關文章
相關標籤/搜索