HBase是一個分佈式的、面向列的開源數據庫,該技術來源於 Fay Chang 所撰寫的Google論文「Bigtable:一個結構化數據的分佈式存儲系統」。python
HBase在Hadoop之上提供了相似於Bigtable的能力。sql
HBase不一樣於通常的關係數據庫,它是一個適合於非結構化數據存儲的數據庫。shell
另外一個不一樣的是HBase基於列的而不是基於行的模式。數據庫
做爲獨立的體系,有其所屬的資源:HBase教程apache
Ref: How and When should you use HBase NoSQL DByii
HBase is a NoSQL database and it works on top of HDFS.nosql
Data volume: peta bytes of data (1024 TB) 級別的數據纔有必要。分佈式
Application Types: 不適合分析,畢竟sql天生適合分析。函數
Hardware environment: 硬件得好。oop
No requirement of relational features: 適合不須要怎麼分析的數據。
Quick access to data: 適合隨機實時訪問。
做爲比對:HBase is to real-time querying and Hive is to analytical queries.
每一個單元格都有版本控制的屬性,也就是「時間戳」。
列被分爲了 「列族「 和 」列限定符「。
邏輯視圖多是稀疏矩陣。
在物理視圖中,會切割行分組保存。
先啓動Hadoop。
再啓動HBase,以下:
cd /usr/local/hbase ./bin/start-hbase.sh ./bin/hbase shell # 支持交互式操做
首先,確保此表以前沒有。
disable 'student' drop 'student'
而後,建立以下表。
create 'student', 'info' # 意味着 'info'裏包含:name, gender, age. put 'student', '1', 'info: name', 'Xueqian' put 'student', '1', 'info: gender', 'F' put 'student', '1', 'info: age', '23'
#!/usr/bin/env python3 from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("ReadHBase") sc = SparkContext(conf = conf) host = 'localhost' table = 'student' conf= {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" valueConv= "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apache.hadoop.hbase.io.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result", \ keyConverter=keyConv, valueConverter=valueConv, conf=conf) count = hbase_rdd.count() hbase_rdd.cache() output = hbase_rdd.collect() for (k, v) in output: print (k, v)
#!/usr/bin/env python3 from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("ReadHBase") sc = SparkContext(conf = conf) host = 'localhost' table = 'student' keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" conf = {"hbase.zookeeper.quorum" :host, "hbase.mapred.outputtable" :table, "mapreduce.outputformat.class" :"org.apache.hadoop.hbase.mapreduce.TableOutputFormat", "mapreduce.job.output.key.class" :"org.apache.hadoop.hbase.io.ImmutableBytesWritable", "mapreduce.job.output.value.class":"org.apache.hadoop.io.Writable"} rawData = ['3,info,name,Rongcheng','3,info,gender,M','3,info,age,26','4,info,name,Guanhua','4,info,gender,M','4,info,age,27'] sc.parallelize(rawData).map(lambda x:(x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf, keyConverter=keyConv, valueConverter=valueConv)
對RDD中某一列排序。
#!/usr/bin/env python3 from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("ReadHBase") sc = SparkContext(conf = conf) lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")
# 內存中生成了一個rdd
# 預處理,取出壞數據 result1 = lines.filter(lambda line: (len(line.strip()) > 0) and (len(line.split(","))== 4))
# 取出第三個元素,也就是第三列 result2 = result1.map(lambda x:x.split(",")[2])
# 轉化爲能夠處理的數字形式 result3 = result2.map(lambda x:(int(x),""))
# 全局排序,因此只要一個分區 result4 = result3.repartition(1)
# 降序排列 result5 = result4.sortByKey(False) result6 = result5.map(lambda x:x[0]) result7 = result6.take(5) for a in result7: print(a)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")
原理相似上一個例子。
#!/usr/bin/env python3 from pyspark import SparkConf, SparkContext
index = 0 def getindex(): global index index += 1 return index
def main(): conf = SparkConf().setMaster("local[1]").setAppName("FileSort") sc = SparkContext(conf = conf) lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesort/file*.txt")
# 讀取了若干文件,構成了一個RDD
index = 0 result1 = lines.filter(lambda line:(len(line.strip()) > 0)) result2 = result1.map(lambda x:(int(x.strip()),"")) result3 = result2.repartition(1)
result4 = result3.sortByKey(True) result5 = result4.map(lambda x:x[0]) result6 = result5.map(lambda x:(getindex(),x)) result6.foreach(print) result6.saveAsTextFile("file:///usr/local/spark/mycode/rdd/filesort/sortresult")
if __name__ == '__main__': main() lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesort/file*.txt") result1 = lines.filter(lambda line:(len(line.strip()) > 0))
根據多個屬性去排序,好比總分同樣的話就繼續比較數學成績。
重點是:構建一個可排序的,而且是能夠二次排序的屬性。
def main(): conf = SparkConf().setAppName('spark_sort').setMaster('local[1]') sc = SparkContext(conf=conf) file="file:///usr/local/spark/mycode/rdd/secondarysort/file4.txt" rdd1 = sc.textFile(file)
rdd2 = rdd1.filter(lambda x:(len(x.strip()) > 0)) rdd3 = rdd2.map(lambda x:((int(x.split(" ")[0]),int(x.split(" ")[1])),x))
# 寫兩個排序所需的key,生成」可排序「的rdd4 rdd4 = rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1])) rdd5 = rdd4.sortByKey(False)
rdd6 = rdd5.map(lambda x:x[1]) rdd6.foreach(print)
if __name__ == '__main__': main() rdd1 = sc.textFile(file) rdd2 = rdd1.filter(lambda x:(len(x.strip()) > 0))
二次排序函數定義:
#!/usr/bin/env python3 from operator import gt from pyspark import SparkContext, SparkConf
class SecondarySortKey(): def __init__(self, k): self.column1 = k[0] self.column2 = k[1]
def __gt__(self, other): if other.column1 == self.column1: return gt(self.column2,other.column2) else: return gt(self.column1, other.column1)
End.