基於opentsdb-2.4.0版本,本篇開啓opentsdb探索之路(主要涉及讀寫特性
以及一些其餘細節
),下一篇將開啓opentsdb優化之路——性能優化思路與建議
(總結當前痛點問題、優化思路和解決方案,同時也歡迎朋友提出更好的思路與方案)。
注意:閱讀本篇文章應該要對HBase
有最基本的認識,好比rowkey
、region
、store
、 ColumnFamily
、ColumnQualifier
等概念以及HBase
邏輯結構、物理存儲結構有大體的認知。html
上圖取自官方http://opentsdb.net/overview.html
。其中的TSD
(對應實際進程名是TSDMain
)就是opentsdb
組件。每一個實例TSD
都是獨立的。沒有master
,沒有共享狀態(shared state
),所以實際生產部署可能會經過nginx
+Consul
運行多個TSD
實例以實現負載均衡
。java
Each TSD uses the open source database HBase or hosted Google Bigtable service to store and retrieve time-series datanode
咱們大多應該仍是用HBase
做爲數據存儲。
安裝部署一文中提到過在HBase中建立表結構,這裏先簡單介紹一下這4張表(table
),隨着探究的深刻會對tsdb
和tsdb-uid
這兩張表有更深入的認識,至於tsdb-meta
、tsdb-tree
兩張表不是這裏討論的重點,簡單瞭解一下便可。相關文檔:http://opentsdb.net/docs/build/html/user_guide/backends/index.html
mysql
opentsdb
所有的時序數據都存在這張表中,該表只有一個名爲"t"的列族(ColumnFamily
)。因此這張表的數據很是大,大多狀況下讀寫性能瓶頸也就與這張表密切相關,進而優化也可能與它相關。an optional salt, the metric UID, a base timestamp and the UID for tagk/v pairs
,即[可選的salt位+metric的UID+小時級別的時間戳+依次有序的tagk、tagv組成的UID鍵值對],以下:[salt]<metric_uid><timestamp><tagk1><tagv1>[...<tagkN><tagvN>]
暫不考慮salt
位,關於加salt
下面有章節單獨拿出來看它的設計與實現。來看一個不加salt
且含有兩個tag
的時序數據的rowkey
組成:nginx
00000150E22700000001000001000002000004 '----''------''----''----''----''----' metric time tagk tagv tagk tagv
至於rowkey
爲何要這樣設計以及具體實現,後面詳細介紹,這裏先有個基本認知。git
rowkey
的長度,opentsdb
會將metric
、tagk
、tagv
都映射成UID
,映射是雙向的,好比說既能夠根據tagk
找到對應的UID
,也能夠根據UID
直接找到相應的tagk
。而這些映射關係
就記錄在tsdb-uid
表中。該表有兩個ColumnFamily
,分別是name
和id
,另外這兩個ColumnFamily
下都有三列,分別是metric
、tagk
、tagv
。以下圖所示:RowKey | id:metric | id:tagk | id:tagv | name:metric | name:tagk | name:tagv |
---|---|---|---|---|---|---|
metric01 | 0x01 | |||||
metric02 | 0x02 | |||||
tagk01 | 0x01 | |||||
tagv01 | 0x01 | |||||
tagv02 | 0x02 | |||||
0x01 | metric01 | |||||
0x01 | tagk01 | |||||
0x01 | tagv01 | |||||
0x02 | metric02 | |||||
0x02 | tagv02 |
從上面能夠看出,metric
、tagk
、tagv
三種類型的UID
映射互不干擾,這也就使得0x01
這個UID
在不一樣類型中有着不一樣的含義。後面會從源碼角度講一下uid大體的分配。github
opentsdb
實例的配置決定是否爲相關時序記錄元數據信息。看一下opentsdb.conf
配置文件中tsd.core.meta.enable_tsuid_tracking
配置項便可。tsd.core.meta.enable_tsuid_tracking
(默認false
): 若是開啓該選項,每次寫入一個DataPoint
(時序數據)的同時還會向tsdb-meta
表中寫入rowkey
爲該時序數據的tsuid
(下面會講到它,即完整的rowkey
除去salt
和timestamp
後的數據), value
爲1的記錄。這樣,每一個點就對應兩次HBase
的寫入,必定程度上加大了HBase集羣的壓力。相關代碼見TSDB#storeIntoDB()#WriteCB#call()
// if the meta cache plugin is instantiated then tracking goes through it if (meta_cache != null) { meta_cache.increment(tsuid); } else { // tsd.core.meta.enable_tsuid_tracking if (config.enable_tsuid_tracking()) { // tsd.core.meta.enable_realtime_ts if (config.enable_realtime_ts()) { // tsd.core.meta.enable_tsuid_incrementing if (config.enable_tsuid_incrementing()) { TSMeta.incrementAndGetCounter(TSDB.this, tsuid); } else { TSMeta.storeIfNecessary(TSDB.this, tsuid); } } else { // 寫入rowkey爲tsuid,value爲1的記錄 final PutRequest tracking = new PutRequest(meta_table, tsuid, TSMeta.FAMILY(), TSMeta.COUNTER_QUALIFIER(), Bytes.fromLong(1)); client.put(tracking); } } }
http://opentsdb.net/docs/build/html/user_guide/trees.html
。這裏就不細說了,有興趣的話看下上面連接中官方介紹的Examples
,就能秒懂是幹嗎的。相關文檔:
http://opentsdb.net/docs/build/html/user_guide/writing/index.html
web
只有一個名爲"t"的列族sql
metric
、tagk
、tagv
三部分字符串都會被轉成UID
,這樣再長的字符串在rowkey
中也會由UID
代替,大大縮短了rowkey
的長度rowkey
中的時序數據的timestamp
並不是實際的時序數據時間,是格式化成以小時
爲單位的時間戳(所謂的base_time
),也就是說該rowkey
中的base_time
表示的是該時序數據發生在哪一個整點(小時)。每一個數據寫入的時候,會用該時序數據實際時間戳相對base_time
的偏移量(offset
)做爲ColumnQualifier
寫入。rowkey | t: +1 | t: +2 | t: +3 | t: ... | t: +3600 |
---|---|---|---|---|---|
salt+metric_uid+base_time+tagk1+tagv1+...+tagkN+tagvN | 10 | 9 | 12 | ... | 8 |
在沒有啓用salt
的狀況下,我整理出來生成rowkey
的代碼以下(注意一下:源碼中並無這段代碼哦):docker
public byte[] generateRowKey(String metricName, long timestamp, Map<String, String> tags) { // 獲取metricUid byte[] metricUid = tsdb.getUID(UniqueId.UniqueIdType.METRIC, metricName); // 將時間戳轉爲秒 if ((timestamp & Const.SECOND_MASK) != 0L) { timestamp /= 1000L; } final long timestamp_offset = timestamp % Const.MAX_TIMESPAN;//3600 // 提取出時間戳所在的整點(小時)時間 final long basetime = timestamp - timestamp_offset; // 用TreeMap存儲<tagkUid,tagvUid>, 排序用的是memcmp()方法,下面會有介紹 Map<byte[], byte[]> tagsUidMap = new org.hbase.async.Bytes.ByteMap<>(); tags.forEach((k, v) -> tagsUidMap.put( tsdb.getUID(UniqueId.UniqueIdType.TAGK, k), tsdb.getUID(UniqueId.UniqueIdType.TAGV, v))); // 不加salt的rowkey,metricUid+整點時間戳+全部的tagK、tagV byte[] rowkey = new byte[metricUid.length + Const.TIMESTAMP_BYTES + tags.size() * (TSDB.tagk_width() + TSDB.tagv_width())]; // 下面拷貝相應的數據到rowkey字節數組中的相應位置 System.arraycopy(metricUid, 0, rowkey, 0, metricUid.length); Bytes.setInt(rowkey, (int) basetime, metricUid.length); int startOffset = metricUid.length + Const.TIMESTAMP_BYTES; for (Map.Entry<byte[], byte[]> entry : tagsUidMap.entrySet()) { System.arraycopy(entry.getKey(), 0, rowkey, startOffset, TSDB.tagk_width()); startOffset += TSDB.tagk_width(); System.arraycopy(entry.getValue(), 0, rowkey, startOffset, TSDB.tagv_width()); startOffset += TSDB.tagv_width(); } return rowkey; }
其中的ByteMap
就是TreeMap
,見org.hbase.async.Bytes.ByteMap
/** A convenient map keyed with a byte array. */ public static final class ByteMap<V> extends TreeMap<byte[], V> implements Iterable<Map.Entry<byte[], V>> { public ByteMap() { super(MEMCMP); } }
多個tag
的排序規則是對tag_id
的bytes
進行排序,調用的是org.hbase.async.Bytes#memcmp(final byte[] a, final byte[] b)
方法,以下
/** * {@code memcmp} in Java, hooray. * @param a First non-{@code null} byte array to compare. * @param b Second non-{@code null} byte array to compare. * @return 0 if the two arrays are identical, otherwise the difference * between the first two different bytes, otherwise the different between * their lengths. */ public static int memcmp(final byte[] a, final byte[] b) { final int length = Math.min(a.length, b.length); if (a == b) { // Do this after accessing a.length and b.length return 0; // in order to NPE if either a or b is null. } for (int i = 0; i < length; i++) { if (a[i] != b[i]) { return (a[i] & 0xFF) - (b[i] & 0xFF); // "promote" to unsigned. } } return a.length - b.length; }
相關文檔:
http://opentsdb.net/docs/build/html/user_guide/definitions.html#compaction
An OpenTSDB compaction takes multiple columns in an HBase row and merges them into a single column to reduce disk space. This is not to be confused with HBase compactions where multiple edits to a region are merged into one. OpenTSDB compactions can occur periodically for a TSD after data has been written, or during a query.
tsd.storage.enable_compaction
:是否開啓壓縮(默認爲true,開啓壓縮)
爲了減小存儲空間(講道理對查詢也有好處),opentsdb
在寫入時序數據的同時會把rowkey
放到ConcurrentSkipListMap
中,一個daemon
線程不斷檢查System.currentTimeMillis()/1000-3600-1
以前的數據可否被壓縮,知足壓縮條件則會把一小時內的時序數據(它們的rowkey
是相同的)查出來在內存壓縮(compact
)成一列回寫(write
)到HBase
中,而後delete
以前的原始數據。或者是查詢(query
)操做可能也會觸發compaction
操做。代碼見CompactionQueue
final class CompactionQueue extends ConcurrentSkipListMap<byte[], Boolean> { public CompactionQueue(final TSDB tsdb) { super(new Cmp(tsdb)); // tsd.storage.enable_appends if (tsdb.config.enable_compactions()) { // 啓用了壓縮則會啓一個daemon的線程 startCompactionThread(); } } /** * Helper to sort the byte arrays in the compaction queue. * <p> * This comparator sorts things by timestamp first, this way we can find * all rows of the same age at once. */ private static final class Cmp implements Comparator<byte[]> { /** The position with which the timestamp of metric starts. */ private final short timestamp_pos; public Cmp(final TSDB tsdb) { timestamp_pos = (short) (Const.SALT_WIDTH() + tsdb.metrics.width()); } @Override public int compare(final byte[] a, final byte[] b) { // 取rowkey中的base_time進行排序 final int c = Bytes.memcmp(a, b, timestamp_pos, Const.TIMESTAMP_BYTES); // If the timestamps are equal, sort according to the entire row key. return c != 0 ? c : Bytes.memcmp(a, b); } } }
看看上面啓動的daemon
線程在作啥CompactionQueue#Thrd
/** * Background thread to trigger periodic compactions. */ final class Thrd extends Thread { public Thrd() { super("CompactionThread"); } @Override public void run() { while (true) { final int size = size(); // 達到最小壓縮閾值則觸發flush() if (size > min_flush_threshold) { final int maxflushes = Math.max(min_flush_threshold, size * flush_interval * flush_speed / Const.MAX_TIMESPAN); final long now = System.currentTimeMillis(); // 檢查上個整點的數據可否被壓縮 flush(now / 1000 - Const.MAX_TIMESPAN - 1, maxflushes); } } } }
再看CompactionQueue#flush(final long cut_off, int maxflushes)
private Deferred<ArrayList<Object>> flush(final long cut_off, int maxflushes) { final ArrayList<Deferred<Object>> ds = new ArrayList<Deferred<Object>>(Math.min(maxflushes, max_concurrent_flushes)); int nflushes = 0; int seed = (int) (System.nanoTime() % 3); for (final byte[] row : this.keySet()) { final long base_time = Bytes.getUnsignedInt(row, Const.SALT_WIDTH() + metric_width); if (base_time > cut_off) { // base_time比較靠近當前時間,則直接跳出 break; } else if (nflushes == max_concurrent_flushes) { break; } // 這裏會發向hbase發get請求獲取時序數據,在callback中進行壓縮操做 ds.add(tsdb.get(row).addCallbacks(compactcb, handle_read_error)); } return group; }
最後看一下compaction
具體作了啥,見CompactionQueue#Compaction#compact()
public Deferred<Object> compact() { // merge the datapoints, ordered by timestamp and removing duplicates final ByteBufferList compacted_qual = new ByteBufferList(tot_values); final ByteBufferList compacted_val = new ByteBufferList(tot_values); mergeDatapoints(compacted_qual, compacted_val); // build the compacted columns final KeyValue compact = buildCompactedColumn(compacted_qual, compacted_val); final boolean write = updateDeletesCheckForWrite(compact); final byte[] key = compact.key(); deleted_cells.addAndGet(to_delete.size()); // We're going to delete this. if (write) { // 把壓縮後的結果回寫到tsdb表 Deferred<Object> deferred = tsdb.put(key, compact.qualifier(), compact.value(), compactedKVTimestamp); if (!to_delete.isEmpty()) { // 壓縮結果寫入成功後 delete查詢出來的cells deferred = deferred.addCallbacks(new DeleteCompactedCB(to_delete), handle_write_error); } return deferred; } } // delete compacted cells的回調 private final class DeleteCompactedCB implements Callback<Object, Object> { /** What we're going to delete. */ private final byte[] key; private final byte[][] qualifiers; @Override public Object call(final Object arg) { return tsdb.delete(key, qualifiers).addErrback(handle_delete_error); } @Override public String toString() { return "delete compacted cells"; } }
追蹤整個compaction
過程,咱們不難發現其中多了很多get
、write
、delete
請求,數據量很是大的狀況下無形給HBase
帶來不小壓力。留意一下,這裏可能也是咱們重點優化的地方。
相關文檔:
http://opentsdb.net/docs/build/html/user_guide/writing/index.html#appends
Also in 2.2, writing to HBase columns via appends is now supported. This can improve both read and write performance in that TSDs will no longer maintain a queue of rows to compact at the end of each hour, thus preventing a massive read and re-write operation in HBase. However due to the way appends operate in HBase, an increase in CPU utilization, store file size and HDFS traffic will occur on the region servers. Make sure to monitor your HBase servers closely.
tsd.storage.enable_appends
:默認是false
在追加模式下,opentsdb
寫入的時候,會將rowkey
相同的點的value值寫到一個單獨的ColumnQualifier
(0x050000)中。因此與以前的直接寫入模式是兼容的,這就意味着能夠隨時啓用或者禁用追加模式。
/** The prefix ID of append columns */ public static final byte APPEND_COLUMN_PREFIX = 0x05; /** The full column qualifier for append columns */ public static final byte[] APPEND_COLUMN_QUALIFIER = new byte[]{APPEND_COLUMN_PREFIX, 0x00, 0x00};
顯然這就是咱們想要的壓縮後的效果。少了把已經寫入HBase
的數據拉過來在opentsdb
內存壓縮,回寫數據,再刪除原數據的一系列操做,固然了壓力應該是丟給了HBase
。
追加模式會消耗更多的HBase集羣的資源(官方是這麼說的,究竟多大,有待研究),另外本人猜想對於大量高併發的寫入可能有鎖的同步問題,講道理單從瞬間寫入性能考慮,追加模式下的性能應該是不及以前的直接寫入。
相關文檔:
http://opentsdb.net/docs/build/html/user_guide/uids.html#uid
相信到這裏應該已經到UID
有必定的認識了,使用UID
大大節省了存儲空間。
Within the storage system there is a counter that is incremented for each metric, tagk and tagv. When you create a new tsdb-uid table, this counter is set to 0 for each type.
很相似mysql
中的自增主鍵。見UniqueId#allocateUid()
private Deferred<Long> allocateUid() { // randomize_id默認是false,兩種方式:一種是隨機數uid,另一種是遞增uid // tagk和tagv目前沒法配置,用的是遞增uid(metric卻是可配 tsd.core.uid.random_metrics默認false) if (randomize_id) { return Deferred.fromResult(RandomUniqueId.getRandomUID()); } else { //實際走這裏,會去hbase的tsdb-uid表請求遞增uid return client.atomicIncrement(new AtomicIncrementRequest(table, MAXID_ROW, ID_FAMILY, kind)); } }
tsdb-uid
表中rowkey
爲0x00
的cell
中存有目前三種類型的最大UID
這裏咱們看到metric
、tagk
、tagv
三種類型的UID
映射是獨立的。另外,注意兩個與此相關的配置項
tsd.core.auto_create_metrics
:默認爲false
,是否給tsdb-uid
表中不存在的metric
分配UID
,false
的狀況下,寫入新的metric
時序數據會拋出異常tsd.core.preload_uid_cache
:默認爲false
,是否程序啓動時就從tsdb-uid
表獲取UID
並緩存在本地,見TSDB#TSDB(final HBaseClient client, final Config config)
if (config.getBoolean("tsd.core.preload_uid_cache")) { final ByteMap<UniqueId> uid_cache_map = new ByteMap<UniqueId>(); uid_cache_map.put(METRICS_QUAL.getBytes(CHARSET), metrics); uid_cache_map.put(TAG_NAME_QUAL.getBytes(CHARSET), tag_names); uid_cache_map.put(TAG_VALUE_QUAL.getBytes(CHARSET), tag_values); UniqueId.preloadUidCache(this, uid_cache_map); }
從這裏咱們也能夠看到使用這種遞增UID
分配方式,先來的tagk
必然會分配到數值較小的UID
,後來的tagk
會分配到數值較大的UID
,如此一來結合上文寫入的時候rowkey
中的tags
會按照tagk_uid
的byte數組進行排序,就能得出最早寫入的tagk
是排在rowkey
中較爲靠前的位置,那麼知道了這種規則,在某些狀況下對於查詢優化有沒有幫助呢?
相關文檔:
http://opentsdb.net/docs/build/html/user_guide/query/index.html
查詢放在這個地方講是由於咱們只有弄清楚數據是怎麼存的,纔會明白如何取。經過前文咱們知道寫入的時候rowkey
中的tags
會按照tagk_uid
的byte數組進行排序,那麼一樣從HBase
讀數據的時候講道理也應該這樣排序是否是。來看QueryUtil#setDataTableScanFilter()
可是,正常狀況下的scan
(除非查詢的時候設置explicit_tags
爲true
),對於tag
的過濾並非直接拼在rowkey
中,而是放在scanner.setFilter(regex_filter)
final byte[] start_row = new byte[metric_salt_width + Const.TIMESTAMP_BYTES]; final byte[] end_row = new byte[metric_salt_width + Const.TIMESTAMP_BYTES]; scanner.setStartKey(start_row); scanner.setStopKey(end_row); // 關於regex_filter生成下面有簡單例子 if (!(explicit_tags && enable_fuzzy_filter)) { scanner.setFilter(regex_filter); return; }
QueryUtil#getRowKeyUIDRegex()
// Generate a regexp for our tags. Say we have 2 tags: { 0 0 1 0 0 2 } // and { 4 5 6 9 8 7 }, the regexp will be: // "^.{7}(?:.{6})*\\Q\000\000\001\000\000\002\\E(?:.{6})*\\Q\004\005\006\011\010\007\\E(?:.{6})*$"
官方對查詢設置explicit_tags
爲true
的介紹:
http://opentsdb.net/docs/build/html/user_guide/query/filters.html#explicit-tags
意思我已經知道了要查詢的metric
明確只有這些tags
,想查詢的時序數據不會出現其餘tag
,這樣opentsdb
就會把用戶過濾的tag
直接拼到rowkey
中,必定程度上優化了查詢。見代碼
if (explicit_tags && enable_fuzzy_filter) { fuzzy_key = new byte[prefix_width + (row_key_literals.size() * (name_width + value_width))]; fuzzy_mask = new byte[prefix_width + (row_key_literals.size() * (name_width + value_width))]; System.arraycopy(scanner.getCurrentKey(), 0, fuzzy_key, 0, scanner.getCurrentKey().length); // 由於已經明確了只有哪些指定的tag,這個時候纔會把tags直接拼到startKey中 scanner.setStartKey(fuzzy_key); }
explicit_tags
爲true
的狀況下,會用FuzzyRowFilter
,看一下源碼中的描述
/** * FuzzyRowFilter is a server-side fast-forward filter that allows skipping * whole range of rows when scanning. The feature is available in HBase * 0.94.5 and above. * <p> * It takes two byte array to match a rowkey, one to hold the fixed value * and one to hold a mask indicating which bytes of the rowkey must match the * fixed value. The two arrays must have the same length. * <p> * Bytes in the mask can take two values, 0 meaning that the corresponding byte * in the rowkey must match the corresponding fixed byte and 1 meaning that the * corresponding byte in the rowkey can take any value. * <p> * One can combine several {@link FuzzyFilterPair} to match multiple patterns at * once. * <p> * Example : * You store logs with this rowkey design : * group(3bytes)timestamp(4bytes)severity(1byte) * * You want to get all FATAL("5") logs : * * Build a FuzzyFilterPair with * - rowkey : "????????5" * - fuzzy mask : "111111110" * And CRITICAL("4") logs only for web servers : * * Add another FuzzyFilterPair with * - rowkey : "web????4" * - fuzzy mask : "00011110" * * @since 1.7 */ public final class FuzzyRowFilter extends ScanFilter { // ... }
總結一下就是,若是你明確你要查的數據有哪幾個tag
,建議查詢的時候指定explicit_tags
爲true
,有助於查詢優化。
# Example 1: http://host:4242/api/query?start=1h-ago&m=sum:explicit_tags:sys.cpu.system{host=web01} # Example 2: http://host:4242/api/query?start=1h-ago&m=sum:explicit_tags:sys.cpu.system{host=*}{dc=*} # Example 3: { "start":1584408560754, "end":1584409460754, "msResolution":false, "queries":[ { "aggregator":"avg", "metric":"metric.test", "downsample":"5m-avg", "explicitTags":true, "filters":[ { "type":"literal_or", "tagk":"instance", "filter":"total", "groupBy":true }, { "type":"literal_or", "tagk":"ip", "filter":"192.168.1.1", "groupBy":true } ] } ] }
關於tsd.storage.use_otsdb_timestamp
這個配置與HBase
特性有關。下篇寫優化的時候再講,這裏提出來放在這裏。TsdbQuery#getScanner(final int salt_bucket)
// tsd.storage.use_otsdb_timestamp if (tsdb.getConfig().use_otsdb_timestamp()) { long stTime = (getScanStartTimeSeconds() * 1000); long endTime = end_time == UNSET ? -1 : (getScanEndTimeSeconds() * 1000); if (tsdb.getConfig().get_date_tiered_compaction_start() <= stTime && rollup_query == null) { // TODO - we could set this for rollups but we also need to write // the rollup columns at the proper time. scanner.setTimeRange(stTime, endTime); } }
相關文檔:
http://opentsdb.net/docs/build/html/user_guide/writing/index.html#salting
時序數據的寫入,寫熱點
是一個不可規避的問題,當某個metric
下數據點不少時,則該metric
很容易形成寫入熱點,即往一個region server
寫,甚至同一個region
,若是這樣,對這部分數據的讀寫都會落到HBase
集羣中的一臺機器上,沒法發揮集羣的處理能力,甚至直接將某個region server
壓垮。加salt
就是爲了將時序數據的rowkey
打散,從而分配到不一樣的region
中,以均衡負載。
When enabled, a configured number of bytes are prepended to each row key. Each metric and combination of tags is then hashed into one "bucket", the ID of which is written to the salt bytes
從2.2開始,OpenTSDB
採起了容許將metric
加salt
,加salt
後的變化就是在rowkey
前會拼上一個桶編號(bucket index
)。
To
enable salting
you must modify the config file parametertsd.storage.salt.width
and optionallytsd.storage.salt.buckets
. We recommend setting thesalt width
to1
and determine the number ofbuckets
based on a factor of the number ofregion servers
in your cluster. Note that atquery
time, the TSD will firetsd.storage.salt.buckets
number ofscanners
to fetch data. The proper number of salt buckets must be determined through experimentation as at some pointquery performance
may suffer due to having too many scanners open and collating the results. In the future the salt width and buckets may be configurable but we didn't want folks changing settings on accident and losing data.
對上面的描述解釋一下:
tsd.storage.salt.width
:rowkey
加多少個byte前綴(默認0(即不開啓),若是啓用的話 建議1)
tsd.storage.salt.buckets
:分桶數(默認20,建議根據region servers數肯定)
salt
,則根據metric_uid
+全部[tagK+tagV]uid
組成的byte數組,計算hashcode
值,對分桶數
取模,得出salt
位RowKey#prefixKeyWithSalt
(注意:取的是關鍵代碼,去除了干擾信息)
public static void prefixKeyWithSalt(final byte[] row_key) { // tsd.storage.salt.width if (Const.SALT_WIDTH() > 0) { final int tags_start = Const.SALT_WIDTH() + TSDB.metrics_width() + Const.TIMESTAMP_BYTES; // we want the metric and tags, not the timestamp final byte[] salt_base = new byte[row_key.length - Const.SALT_WIDTH() - Const.TIMESTAMP_BYTES]; System.arraycopy(row_key, Const.SALT_WIDTH(), salt_base, 0, TSDB.metrics_width()); System.arraycopy(row_key, tags_start,salt_base, TSDB.metrics_width(), row_key.length - tags_start); // 這裏經過對salt_buckets取模得出salt位的數值 int modulo = Arrays.hashCode(salt_base) % Const.SALT_BUCKETS();// tsd.storage.salt.buckets final byte[] salt = getSaltBytes(modulo); // 填充salt位的byte System.arraycopy(salt, 0, row_key, 0, Const.SALT_WIDTH()); } }
rowkey
前加了salt
位,那麼查詢的時候怎麼搞?OpenTSDB
一條數據,OpenTSDB
將這個請求拆成分桶數
個查詢到HBase
,而後返回桶數個結果集到OpenTSDB
層作合併。對HBase
併發請求相應的也會桶數倍的擴大。見TsdbQuery#findSpans()
if (Const.SALT_WIDTH() > 0) { final List<Scanner> scanners = new ArrayList<Scanner>(Const.SALT_BUCKETS()); for (int i = 0; i < Const.SALT_BUCKETS(); i++) { // 構建出等於分桶數大小個scanner scanners.add(getScanner(i)); } scan_start_time = DateTime.nanoTime(); return new SaltScanner(tsdb, metric, scanners, spans, scanner_filters, delete, rollup_query, query_stats, query_index, null, max_bytes, max_data_points).scan(); }
在每個scanner
的rowkey
前面填充bucket index
做爲salt
位,這樣才能去hbase
中scan
到完整的結果,見QueryUtil#getMetricScanner()
public static Scanner getMetricScanner(final TSDB tsdb, final int salt_bucket, final byte[] metric, final int start, final int stop, final byte[] table, final byte[] family) { if (Const.SALT_WIDTH() > 0) { final byte[] salt = RowKey.getSaltBytes(salt_bucket); // 這裏把salt_bucket填充到rowkey中 System.arraycopy(salt, 0, start_row, 0, Const.SALT_WIDTH()); System.arraycopy(salt, 0, end_row, 0, Const.SALT_WIDTH()); } return scanner; }
相關文檔:
opentsdb
的配置:http://opentsdb.net/docs/build/html/user_guide/configuration.html
AsyncHBase client
的配置:http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html
opentsdb
使用的hbase client
是http://opentsdb.github.io/asynchbase/
public TSDB(final HBaseClient client, final Config config) { this.config = config; if (client == null) { final org.hbase.async.Config async_config; if (config.configLocation() != null && !config.configLocation().isEmpty()) { try { // AsyncHBase client讀取和opentsdb同樣的文件 // 因此 有一些須要設置AsyncHBase client的地方直接寫在opentsdb的配置文件就能生效 async_config = new org.hbase.async.Config(config.configLocation()); } catch (final IOException e) { throw new RuntimeException("Failed to read the config file: " + config.configLocation(), e); } } else { async_config = new org.hbase.async.Config(); } async_config.overrideConfig("hbase.zookeeper.znode.parent", config.getString("tsd.storage.hbase.zk_basedir")); async_config.overrideConfig("hbase.zookeeper.quorum", config.getString("tsd.storage.hbase.zk_quorum")); this.client = new HBaseClient(async_config); } else { this.client = client; } }
性能優化的一方面可能與參數調優有關,有些與啓動參數,操做系統設置等有關,有些參數就是寫在配置文件的(好比說最大鏈接數、超時時間等等)
這裏提一下前面沒有講到的與opentsdb
相關的兩個配置。
tsd.query.skip_unresolved_tagvs
:默認爲false,查詢的時候遇到不存在的tagv時候是否跳過,true則跳過,false則拋出異常,我的感受這個默認false極不友好。TagVFilter#resolveTags()#TagVErrback
/** * Allows the filter to avoid killing the entire query when we can't resolve * a tag value to a UID. */ class TagVErrback implements Callback<byte[], Exception> { @Override public byte[] call(final Exception e) throws Exception { if (config.getBoolean("tsd.query.skip_unresolved_tagvs")) { LOG.warn("Query tag value not found: " + e.getMessage()); return null; } else { // 默認狀況下直接拋出異常 throw e; } } }
AsyncHBase Configuration
中的hbase.rpc.timeout
:How long, in milliseconds, to wait for a response to an RPC from a region server before failing the RPC with a RpcTimedOutException. This value can be overridden on a per-RPC basis. A value of 0 will not allow RPCs to timeout相關文檔:
http://opentsdb.net/docs/build/html/api_http/index.html
經常使用:
http://opentsdb.net/docs/build/html/api_http/put.html
http://opentsdb.net/docs/build/html/api_http/query/index.html
http://opentsdb.net/docs/build/html/api_http/uid/index.html
http://opentsdb.net/docs/build/html/api_http/stats/index.html
同時咱們注意到:OpenTSDB3.0
相關的工做正在進行中(work-in-progress
),詳情:http://opentsdb.net/docs/3x/build/html/index.html
相關文檔:
http://opentsdb.github.io/asynchbase/docs/build/html/authentication.html
http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html
(搜kerberos
關鍵字)
相關問題討論:
https://github.com/OpenTSDB/opentsdb/issues/491
參考帶有Kerberos
認證hbase docker鏡像Dockerfile
項目:
https://github.com/Knappek/docker-phoenix-secure
該項目中bootstrap-phoenix.sh
、docker-compose.yml
以及config_files
下的配置文件頗有參考價值
/etc/opentsdb/opentsdb.conf
配置 末尾添加:hbase.security.auth.enable=true hbase.security.authentication=kerberos hbase.sasl.clientconfig=Client hbase.kerberos.regionserver.principal=hbase/_HOST@EXAMPLE.COM
hbase-client.jaas
文件,文件內容基本以下樣子Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab="/etc/security/keytabs/hbase.keytab" principal="hbase/phoenix.docker.com"; };
/usr/share/opentsdb/etc/init.d/opentsdb
文件,修改啓動參數# start command的位置(約第78行處)加上 -Djava.security.auth.login.config=hbase-client.jaas文件路徑 # 注意:若是Zookeeper沒有加Kerberos認證,再加一個參數 -Dzookeeper.sasl.client=false JVMARGS="-Djava.security.auth.login.config=/.../jaas.conf"
重啓opentsdb
,若是成功,則能看到以下示例日誌:
13:31:55.045 INFO [
ZooKeeperSaslClient
.run] - Client will useGSSAPI as SASL mechanism
.
13:31:55.062 INFO [Login.getRefreshTime] - TGT valid starting at: Fri Apr 03 13:31:54 CST 2020
13:31:55.062 INFO [Login.getRefreshTime] - TGT expires: Sat Apr 04 13:31:54 CST 2020
13:31:55.255 INFO [
KerberosClientAuthProvider
.run] - Client will useGSSAPI as SASL mechanism
.
13:31:55.269 INFO [RegionClient.channelConnected] - Initialized security helper: org.hbase.async.SecureRpcHelper96@6471f1e for region client: RegionClient@63709091(chan=null, #pending_rpcs=2, #batched=0, #rpcs_inflight=0)
13:31:55.276 INFO [SecureRpcHelper96.handleResponse] -SASL client context established
. Negotiated QoP: auth on for: RegionClient@63709091(chan=null, #pending_rpcs=2, #batched=0, #rpcs_inflight=0)
閱讀、探索的過程很累,遇到不太理解的地方又會很困惑,但柳暗花明又一村,凌絕頂一覽衆山小的喜悅卻難以言表。另外,整理的過程也挺煩人,既然花時間整理了,我儘可能讓感興趣的讀者能從中有一絲收穫。固然了,整理的過程也鍛鍊了我學習知識、解決問題的思路與能力。因爲本人能力之有限、理解之不透徹,文中若有錯誤的理解、不恰當的描述,衷心但願朋友提出一塊兒討論!