本項目主要解決 check 和 opinion2 張歷史數據表(歷史數據是指當業務發生過程當中的完整中間流程和結果數據)的在線查詢。原實現基於 Oracle 提供存儲查詢服務,隨着數據量的不斷增長,在寫入和讀取過程當中面臨性能問題,且歷史數據僅供業務查詢參考,並不影響實際流程,從系統結構上來講,放在業務鏈條上游比較重。本項目將其置於下游數據處理 Hadoop 分佈式平臺來實現此需求。下面列一些具體的需求指標:html
從數據量及查詢要求來看,分佈式平臺上具有大數據量存儲,且提供實時查詢能力的組件首選 HBase。根據需求作了初步的調研和評估後,大體肯定 HBase 做爲主要存儲組件。將需求拆解爲寫入和讀取 HBase 兩部分。java
讀取 HBase 相對來講方案比較肯定,基本根據需求設計 RowKey,而後根據 HBase 提供的豐富 API(get,scan 等)來讀取數據,知足性能要求便可。git
寫入 HBase 的方法大體有如下幾種:github
本文采用第 3 種方式,Spark + Bulk Load 寫入 HBase。該方法相對其餘 2 種方式有如下優點:web
圖示以下:spring
Hadoop 2.5-2.7 HBase 0.98.6 Spark 2.0.0-2.1.1 Sqoop 1.4.6
本段的重點在於討論 HBase 表的設計,其中 RowKey 是最重要的部分。爲了方便說明問題,咱們先來看看數據格式。如下以 check 舉例,opinion 同理。sql
check 表(原表字段有 18 個,爲方便描述,本文截選 5 個字段示意)數據庫
如上圖所示,主鍵爲 id,32 位字母和數字隨機組成,業務查詢字段 check_id 爲不定長字段(不超過 32 位),字母和數字組成,同一 check_id 可能對應多條記錄,其餘爲相關業務字段。衆所周知,HBase 是基於 RowKey 提供查詢,且要求 RowKey 是惟一的。RowKey 的設計主要考慮的是數據將怎樣被訪問。初步來看,咱們有 2 種設計方法。apache
第一種方法優勢是表結構簡單,RowKey 容易設計,缺點爲 1)數據寫入時,一行原始數據須要寫入到 2 張表,且索引表寫入前須要先掃描該 RowKey 是否存在,若是存在,則加入一列,不然新建一行,2)讀取的時候,即使是採用 List, 也至少須要讀取 2 次表。第二種設計方法,RowKey 設計較爲複雜,可是寫入和讀取都是一次性的。綜合考慮,咱們採用第二種設計方法。api
熱點問題
HBase 中的行是以 RowKey 的字典序排序的,其熱點問題一般發生在大量的客戶端直接訪問集羣的一個或極少數節點。默認狀況下,在開始建表時,表只會有一個 region,並隨着 region 增大而拆分紅更多的 region,這些 region 才能分佈在多個 regionserver 上從而使負載均分。對於咱們的業務需求,存量數據已經較大,所以有必要在一開始就將 HBase 的負載均攤到每一個 regionserver,即作 pre-split。常見的防治熱點的方法爲加鹽,hash 散列,自增部分(如時間戳)翻轉等。
RowKey 設計
Step1:肯定預分區數目,建立 HBase Table
不一樣的業務場景及數據特色肯定數目的方式不同,我我的認爲應該綜合考慮數據量大小和集羣大小等因素。好比 check 表大小約爲 11G,測試集羣大小爲 10 臺機器,hbase.hregion.max.filesize=3G(當 region 的大小超過這個數時,將拆分爲 2 個),因此初始化時儘可能使得一個 region 的大小爲 1~2G(不會一上來就 split),region 數據分到 11G/2G=6 個,但爲了充分利用集羣資源,本文中 check 表劃分爲 10 個分區。若是數據量爲 100G,且不斷增加,集羣狀況不變,則 region 數目增大到 100G/2G=50 個左右較合適。Hbase check 表建表語句以下:
create 'tinawang:check', { NAME => 'f', COMPRESSION => 'SNAPPY',DATA_BLOCK_ENCODING => 'FAST_DIFF',BLOOMFILTER=>'ROW'}, {SPLITS => [ '1','2','3', '4','5','6','7','8','9']}
其中,Column Family =‘f’,越短越好。
COMPRESSION => 'SNAPPY',HBase 支持 3 種壓縮 LZO, GZIP and Snappy。GZIP 壓縮率高,可是耗 CPU。後二者差很少,Snappy 稍微勝出一點,cpu 消耗的比 GZIP 少。通常在 IO 和 CPU 均衡下,選擇 Snappy。
DATA_BLOCK_ENCODING => 'FAST_DIFF',本案例中 RowKey 較爲接近,經過如下命令查看 key 長度相對 value 較長。
./hbase org.apache.hadoop.hbase.io.hfile.HFile -m -f /apps/hbase/data/data/tinawang/check/a661f0f95598662a53b3d8b1ae469fdf/f/a5fefc880f87492d908672e1634f2eed_SeqId_2_
Step2:RowKey 組成
Salt
讓數據均衡的分佈到各個 Region 上,結合 pre-split,咱們對查詢鍵即 check 表的 check_id 求 hashcode 值,而後 modulus(numRegions) 做爲前綴,注意補齊數據。
StringUtils.leftPad(Integer.toString(Math.abs(check_id.hashCode() % numRegion)),1,’0’)
說明:若是數據量達上百 G 以上,則 numRegions 天然到 2 位數,則 salt 也爲 2 位。
Hash 散列
由於 check_id 自己是不定長的字符數字串,爲使數據散列化,方便 RowKey 查詢和比較,咱們對 check_id 採用 SHA1 散列化,並使之 32 位定長化。
MD5Hash.getMD5AsHex(Bytes.toBytes(check_id))
惟一性
以上 salt+hash 做爲 RowKey 前綴,加上 check 表的主鍵 id 來保障 RowKey 惟一性。綜上,check 表的 RowKey 設計以下:(check_id=A208849559)
爲加強可讀性,中間還能夠加上自定義的分割符,如’+’,’|’等。
以上設計能保證對每次查詢而言,其 salt+hash 前綴值是肯定的,而且落在同一個 region 中。須要說明的是 HBase 中 check 表的各列同數據源 Oracle 中 check 表的各列存儲。
RowKey 設計與查詢息息相關,查詢方式決定 RowKey 設計,反之基於以上 RowKey 設計,查詢時經過設置 Scan 的 [startRow,stopRow], 便可完成掃描。以查詢 check_id=A208849559 爲例,根據 RowKey 的設計原則,對其進行 salt+hash 計算,得前綴。
startRow = 7+7c9498b4a83974da56b252122b9752bf stopRow = 7+7c9498b4a83974da56b252122b9752bg
Step0: prepare work
由於是從上游系統承接的業務數據,存量數據採用 sqoop 抽到 hdfs;增量數據每日以文件的形式從 ftp 站點獲取。由於業務數據字段中包含一些換行符,且 sqoop1.4.6 目前只支持單字節,因此本文選擇’0x01’做爲列分隔符,’0x10’做爲行分隔符。
Step1: Spark read hdfs text file
SparkContext.textfile() 默認行分隔符爲」\n」,此處咱們用「0x10」,須要在 Configuration 中配置。應用配置,咱們調用 newAPIHadoopFile 方法來讀取 hdfs 文件,返回 JavaPairRDD,其中 LongWritable 和 Text 分別爲 Hadoop 中的 Long 類型和 String 類型(全部 Hadoop 數據類型和 java 的數據類型都很相像,除了它們是針對網絡序列化而作的特殊優化)。咱們須要的數據文件放在 pairRDD 的 value 中,即 Text 指代。爲後續處理方便,可將 JavaPairRDD轉換爲 JavaRDD< String >。
Step2: Transfer and sort RDD
① 將 avaRDD< String>轉換成 JavaPairRDD<tuple2,String>,其中參數依次表示爲,RowKey,col,value。作這樣轉換是由於 HBase 的基本原理是基於 RowKey 排序的,而且當採用 bulk load 方式將數據寫入多個預分區(region)時,要求 Spark 各 partition 的數據是有序的,RowKey,column family(cf),col name 均須要有序。在本案例中由於只有一個列簇,因此將 RowKey 和 col name 組織出來爲 Tuple2格式的 key。請注意本來數據庫中的一行記錄(n 個字段),此時會被拆成 n 行。
② 基於 JavaPairRDD<tuple2,String>進行 RowKey,col 的二次排序。若是不作排序,會報如下異常:
java.io.IOException: Added a key notlexically larger than previous key
③ 將數據組織成 HFile 要求的 JavaPairRDDhfileRDD。
Step3:create hfile and bulk load to HBase
①主要調用 saveAsNewAPIHadoopFile 方法:
hfileRdd.saveAsNewAPIHadoopFile(hfilePath,ImmutableBytesWritable.class, KeyValue.class,HFileOutputFormat2.class,config);
② hfilebulk load to HBase
final Job job = Job.getInstance(); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); HFileOutputFormat2.configureIncrementalLoad(job,htable); LoadIncrementalHFiles bulkLoader = newLoadIncrementalHFiles(config); bulkLoader.doBulkLoad(newPath(hfilePath),htable);
注:若是集羣開啓了 kerberos,step4 須要放置在 ugi.doAs()方法中,在進行以下驗證後實現
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(keyUser,keytabPath); UserGroupInformation.setLoginUser(ugi);
訪問 HBase 集羣的 60010 端口 web,能夠看到 region 分佈狀況。
本文基於 spring boot 框架來開發 web 端訪問 HBase 內數據。
use connection pool(使用鏈接池)
建立鏈接是一個比較重的操做,在實際 HBase 工程中,咱們引入鏈接池來共享 zk 鏈接,meta 信息緩存,region server 和 master 的鏈接。
HConnection connection = HConnectionManager.createConnection(config); HTableInterface table = connection.getTable("table1"); try { // Use the table as needed, for a single operation and a single thread } finally { table.close(); }
也能夠經過如下方法,覆蓋默認線程池。
HConnection createConnection(org.apache.hadoop.conf.Configuration conf,ExecutorService pool);
process query
Step1: 根據查詢條件,肯定 RowKey 前綴
根據 3.3 RowKey 設計介紹,HBase 的寫和讀都遵循該設計規則。此處咱們採用相同的方法,將 web 調用方傳入的查詢條件,轉化成對應的 RowKey 前綴。例如,查詢 check 表傳遞過來的 check_id=A208849559,生成前綴 7+7c9498b4a83974da56b252122b9752bf。
Step2:肯定 scan 範圍
A208849559 對應的查詢結果數據即在 RowKey 前綴爲 7+7c9498b4a83974da56b252122b9752bf 對應的 RowKey 及 value 中。
scan.setStartRow(Bytes.toBytes(rowkey_pre)); //scan, 7+7c9498b4a83974da56b252122b9752bf byte[] stopRow = Bytes.toBytes(rowkey_pre); stopRow[stopRow.length-1]++; scan.setStopRow(stopRow);// 7+7c9498b4a83974da56b252122b9752bg
Step3:查詢結果組成返回對象
遍歷 ResultScanner 對象,將每一行對應的數據封裝成 table entity,組成 list 返回。
從原始數據中隨機抓取 1000 個 check_id,用於模擬測試,連續發起 3 次請求數爲 2000(200 個線程併發,循環 10 次),平均響應時間爲 51ms,錯誤率爲 0。
如上圖,經歷 N 次累計測試後,各個 region 上的 Requests 數較爲接近,符合負載均衡設計之初。
若是集羣開啓了安全認證,那麼在進行 Spark 提交做業以及訪問 HBase 時,均須要進行 kerberos 認證。
本文采用 yarn cluster 模式,像提交普通做業同樣,可能會報如下錯誤。
ERROR StartApp: job failure, java.lang.NullPointerException at com.tinawang.spark.hbase.utils.HbaseKerberos.<init>(HbaseKerberos.java:18) at com.tinawang.spark.hbase.job.SparkWriteHbaseJob.run(SparkWriteHbaseJob.java:60)
定位到 HbaseKerberos.java:18,代碼以下:
this.keytabPath = (Thread.currentThread().getContextClassLoader().getResource(prop.getProperty("hbase.keytab"))).getPath();
這是由於 executor 在進行 HBase 鏈接時,須要從新認證,經過 --keytab 上傳的 tina.keytab 並未被 HBase 認證程序塊獲取到,因此認證的 keytab 文件須要另外經過 --files 上傳。示意以下
--keytab /path/tina.keytab \ --principal tina@GNUHPC.ORG \ --files "/path/tina.keytab.hbase"
其中 tina.keytab.hbase 是將 tina.keytab 複製並重命名而得。由於 Spark 不容許同一個文件重複上傳。
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2101) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) ... org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext Serialization stack: - object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@24a16d8c) - field (class: com.tinawang.spark.hbase.processor.SparkReadFileRDD, name: sc, type: class org.apache.spark.api.java.JavaSparkContext) ...
解決方法一:
若是 sc 做爲類的成員變量,在方法中被引用,則加 transient 關鍵字,使其不被序列化。
private transient JavaSparkContext sc;
解決方法二:
將 sc 做爲方法參數傳遞,同時使涉及 RDD 操做的類 implements Serializable。 代碼中採用第二種方法。詳見代碼。
Exception in thread "http-nio-8091-Acceptor-0" java.lang.NoClassDefFoundError: org/apache/tomcat/util/ExceptionUtils
或者
Exception in thread "http-nio-8091-exec-34" java.lang.NoClassDefFoundError: ch/qos/logback/classic/spi/ThrowableProxy
查看下面 issue 以及一次排查問題的過程,多是 open file 超過限制。
https://github.com/spring-projects/spring-boot/issues/1106
http://mp.weixin.qq.com/s/34GVlaYDOdY1OQ9eZs-iXg
使用 ulimit-a 查看每一個用戶默認打開的文件數爲 1024。
在系統文件 /etc/security/limits.conf 中修改這個數量限制,在文件中加入如下內容, 便可解決問題。
汪婷,中國民生銀行大數據開發工程師,專一於 Spark 大規模數據處理和 Hbase 系統設計。
http://hbase.apache.org/book.html#perf.writing
http://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/
http://hbasefly.com/2016/03/23/hbase_writer/
https://github.com/spring-projects/spring-boot/issues/1106
http://mp.weixin.qq.com/s/34GVlaYDOdY1OQ9eZs-iXg