[Hadoop] HBase

BHase基本知識


基本概念

 自我介紹 

HBase是一個分佈式的、面向列的開源數據庫,該技術來源於 Fay Chang 所撰寫的Google論文「Bigtable:一個結構化數據的分佈式存儲系統」。python

HBase在Hadoop之上提供了相似於Bigtable的能力。sql

HBase不一樣於通常的關係數據庫,它是一個適合於非結構化數據存儲的數據庫shell

另外一個不一樣的是HBase基於列的而不是基於行的模式。數據庫

做爲獨立的體系,有其所屬的資源:HBase教程apache

 

 必要性 

Ref: How and When should you use HBase NoSQL DByii

HBase is a NoSQL database and it works on top of HDFS.nosql

Data volume: peta bytes of data (1024 TB) 級別的數據纔有必要。分佈式

Application Types: 不適合分析,畢竟sql天生適合分析。函數

Hardware environment: 硬件得好。oop

No requirement of relational features: 適合不須要怎麼分析的數據。

Quick access to data: 適合隨機實時訪問。

做爲比對:HBase is to real-time querying and Hive is to analytical queries.

 

 

四維定位 

每一個單元格都有版本控制的屬性,也就是「時間戳」。

列被分爲了 「列族「 和 」列限定符「。

 

 

存儲原理

邏輯視圖多是稀疏矩陣。

在物理視圖中,會切割行分組保存。

 

 

 

讀寫HBase數據


啓動環境

先啓動Hadoop。

再啓動HBase,以下:

cd /usr/local/hbase
./bin/start-hbase.sh
./bin/hbase shell  # 支持交互式操做

 

 

建立表

首先,確保此表以前沒有。

disable 'student'
drop 'student' 

而後,建立以下表。

create 'student', 'info'  # 意味着 'info'裏包含:name, gender, age.

put 'student', '1', 'info: name', 'Xueqian'
put 'student', '1', 'info: gender', 'F'
put 'student', '1', 'info: age', '23'

 

 

配置Spark支持HBase

 

 

讀取HBase數據

1、代碼實例

#!/usr/bin/env python3
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf = conf)

host  = 'localhost'
table = 'student'
conf= {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv   = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv= "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apache.hadoop.hbase.io.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result", \        keyConverter=keyConv, valueConverter=valueConv, conf=conf) count = hbase_rdd.count() hbase_rdd.cache() output = hbase_rdd.collect() for (k, v) in output:   print (k, v)

 

2、執行代碼

 

 

寫入HBase數據

1、代碼示例

#!/usr/bin/env python3
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf = conf)

host  = 'localhost'
table = 'student'

keyConv   = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf      = {"hbase.zookeeper.quorum"         :host,
        "hbase.mapred.outputtable"        :table,
        "mapreduce.outputformat.class"    :"org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
        "mapreduce.job.output.key.class"  :"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
        "mapreduce.job.output.value.class":"org.apache.hadoop.io.Writable"}

rawData = ['3,info,name,Rongcheng','3,info,gender,M','3,info,age,26','4,info,name,Guanhua','4,info,gender,M','4,info,age,27']
sc.parallelize(rawData).map(lambda x:(x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf, keyConverter=keyConv, valueConverter=valueConv)

 

2、運行結果

 

 

 

 

案例分析


求Top值

對RDD中某一列排序。

#!/usr/bin/env python3
from pyspark import SparkConf, SparkContext
conf  = SparkConf().setMaster("local").setAppName("ReadHBase")
sc    = SparkContext(conf = conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")
# 內存中生成了一個rdd

# 預處理,取出壞數據 result1
= lines.filter(lambda line: (len(line.strip()) > 0) and (len(line.split(","))== 4))
# 取出第三個元素,也就是第三列 result2
= result1.map(lambda x:x.split(",")[2])
# 轉化爲能夠處理的數字形式 result3
= result2.map(lambda x:(int(x),""))
# 全局排序,因此只要一個分區 result4
= result3.repartition(1)
# 降序排列 result5
= result4.sortByKey(False) result6 = result5.map(lambda x:x[0]) result7 = result6.take(5) for a in result7: print(a)

lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")

 

 

若干文件內數字排序

原理相似上一個例子。

#!/usr/bin/env python3
from pyspark import SparkConf, SparkContext
index
= 0 def getindex(): global index index += 1 return index
def main(): conf = SparkConf().setMaster("local[1]").setAppName("FileSort") sc = SparkContext(conf = conf) lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesort/file*.txt")
# 讀取了若干文件,構成了一個RDD
index
= 0 result1 = lines.filter(lambda line:(len(line.strip()) > 0)) result2 = result1.map(lambda x:(int(x.strip()),"")) result3 = result2.repartition(1)
result4
= result3.sortByKey(True) result5 = result4.map(lambda x:x[0]) result6 = result5.map(lambda x:(getindex(),x)) result6.foreach(print) result6.saveAsTextFile("file:///usr/local/spark/mycode/rdd/filesort/sortresult")
if __name__ == '__main__': main() lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesort/file*.txt") result1 = lines.filter(lambda line:(len(line.strip()) > 0))

 

 

二次排序

根據多個屬性去排序,好比總分同樣的話就繼續比較數學成績。

重點是:構建一個可排序的,而且是能夠二次排序的屬性。 

def main():
  conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')
  sc = SparkContext(conf=conf)
  file="file:///usr/local/spark/mycode/rdd/secondarysort/file4.txt"
  rdd1 = sc.textFile(file)
rdd2
= rdd1.filter(lambda x:(len(x.strip()) > 0)) rdd3 = rdd2.map(lambda x:((int(x.split(" ")[0]),int(x.split(" ")[1])),x))

# 寫兩個排序所需的key,生成」可排序「的rdd4 rdd4
= rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1])) rdd5 = rdd4.sortByKey(False)
rdd6
= rdd5.map(lambda x:x[1]) rdd6.foreach(print)
if __name__ == '__main__': main() rdd1 = sc.textFile(file) rdd2 = rdd1.filter(lambda x:(len(x.strip()) > 0))

二次排序函數定義:

#!/usr/bin/env python3
from operator import gt
from pyspark import SparkContext, SparkConf

class SecondarySortKey(): def __init__(self, k): self.column1 = k[0] self.column2 = k[1]
def __gt__(self, other): if other.column1 == self.column1: return gt(self.column2,other.column2) else: return gt(self.column1, other.column1)

 

End.

相關文章
相關標籤/搜索