opentsdb探索之路——部分設計與實現

基於opentsdb-2.4.0版本,本篇開啓opentsdb探索之路(主要涉及讀寫特性以及一些其餘細節),下一篇將開啓opentsdb優化之路——性能優化思路與建議(總結當前痛點問題、優化思路和解決方案,同時也歡迎朋友提出更好的思路與方案)。
注意:閱讀本篇文章應該要對HBase有最基本的認識,好比rowkeyregionstore ColumnFamilyColumnQualifier等概念以及HBase邏輯結構、物理存儲結構有大體的認知。html

opentsdb 概覽(overview)

opentsdb整體架構圖

上圖取自官方http://opentsdb.net/overview.html。其中的TSD(對應實際進程名是TSDMain)就是opentsdb組件。每一個實例TSD都是獨立的。沒有master,沒有共享狀態(shared state),所以實際生產部署可能會經過nginx+Consul運行多個TSD實例以實現負載均衡java

Each TSD uses the open source database HBase or hosted Google Bigtable service to store and retrieve time-series datanode

咱們大多應該仍是用HBase做爲數據存儲。
安裝部署一文中提到過在HBase中建立表結構,這裏先簡單介紹一下這4張表(table),隨着探究的深刻會對tsdbtsdb-uid這兩張表有更深入的認識,至於tsdb-metatsdb-tree兩張表不是這裏討論的重點,簡單瞭解一下便可。相關文檔:http://opentsdb.net/docs/build/html/user_guide/backends/index.htmlmysql

  • tsdb: opentsdb所有的時序數據都存在這張表中,該表只有一個名爲"t"的列族(ColumnFamily)。因此這張表的數據很是大,大多狀況下讀寫性能瓶頸也就與這張表密切相關,進而優化也可能與它相關。
    rowkey的設計爲an optional salt, the metric UID, a base timestamp and the UID for tagk/v pairs,即[可選的salt位+metric的UID+小時級別的時間戳+依次有序的tagk、tagv組成的UID鍵值對],以下:
[salt]<metric_uid><timestamp><tagk1><tagv1>[...<tagkN><tagvN>]

暫不考慮salt位,關於加salt下面有章節單獨拿出來看它的設計與實現。來看一個不加salt且含有兩個tag的時序數據的rowkey組成:nginx

00000150E22700000001000001000002000004
'----''------''----''----''----''----'
metric  time   tagk  tagv  tagk  tagv

至於rowkey爲何要這樣設計以及具體實現,後面詳細介紹,這裏先有個基本認知。git

  • tsdb-uid: 爲了減小rowkey的長度,opentsdb會將metrictagktagv都映射成UID,映射是雙向的,好比說既能夠根據tagk找到對應的UID,也能夠根據UID直接找到相應的tagk。而這些映射關係就記錄在tsdb-uid表中。該表有兩個ColumnFamily,分別是nameid,另外這兩個ColumnFamily下都有三列,分別是metrictagktagv。以下圖所示:
RowKey id:metric id:tagk id:tagv name:metric name:tagk name:tagv
metric01 0x01
metric02 0x02
tagk01 0x01
tagv01 0x01
tagv02 0x02
0x01 metric01
0x01 tagk01
0x01 tagv01
0x02 metric02
0x02 tagv02

從上面能夠看出,metrictagktagv三種類型的UID映射互不干擾,這也就使得0x01這個UID在不一樣類型中有着不一樣的含義。後面會從源碼角度講一下uid大體的分配。github

  • tsdb-meta: 在完成時序數據的寫入以後,會根據當前opentsdb實例的配置決定是否爲相關時序記錄元數據信息。看一下opentsdb.conf配置文件中tsd.core.meta.enable_tsuid_tracking配置項便可。
    tsd.core.meta.enable_tsuid_tracking(默認false): 若是開啓該選項,每次寫入一個DataPoint(時序數據)的同時還會向tsdb-meta表中寫入rowkey爲該時序數據的tsuid(下面會講到它,即完整的rowkey除去salttimestamp後的數據), value爲1的記錄。這樣,每一個點就對應兩次HBase的寫入,必定程度上加大了HBase集羣的壓力。相關代碼見TSDB#storeIntoDB()#WriteCB#call()
// if the meta cache plugin is instantiated then tracking goes through it
if (meta_cache != null) {
  meta_cache.increment(tsuid);
} else {
// tsd.core.meta.enable_tsuid_tracking
  if (config.enable_tsuid_tracking()) {
  // tsd.core.meta.enable_realtime_ts
    if (config.enable_realtime_ts()) {
    // tsd.core.meta.enable_tsuid_incrementing
      if (config.enable_tsuid_incrementing()) {
        TSMeta.incrementAndGetCounter(TSDB.this, tsuid);
      } else {
        TSMeta.storeIfNecessary(TSDB.this, tsuid);
      }
    } else {
    // 寫入rowkey爲tsuid,value爲1的記錄
      final PutRequest tracking = new PutRequest(meta_table, tsuid,
          TSMeta.FAMILY(), TSMeta.COUNTER_QUALIFIER(), Bytes.fromLong(1));
      client.put(tracking);
    }
  }
}
  • tsdb-tree: 做用,可按照樹形層次結構組織時序,就像瀏覽文件系統同樣瀏覽時序。相關介紹http://opentsdb.net/docs/build/html/user_guide/trees.html。這裏就不細說了,有興趣的話看下上面連接中官方介紹的Examples,就能秒懂是幹嗎的。

opentsdb 存儲細節(Writing)

相關文檔:
http://opentsdb.net/docs/build/html/user_guide/writing/index.htmlweb

rowkey的設計

只有一個名爲"t"的列族sql

  • 時序數據的metrictagktagv三部分字符串都會被轉成UID,這樣再長的字符串在rowkey中也會由UID代替,大大縮短了rowkey的長度
  • rowkey中的時序數據的timestamp並不是實際的時序數據時間,是格式化成以小時爲單位的時間戳(所謂的base_time),也就是說該rowkey中的base_time表示的是該時序數據發生在哪一個整點(小時)。每一個數據寫入的時候,會用該時序數據實際時間戳相對base_time的偏移量(offset)做爲ColumnQualifier寫入。
    結合下面的圖以及以後的代碼,就一目瞭然。
rowkey t: +1 t: +2 t: +3 t: ... t: +3600
salt+metric_uid+base_time+tagk1+tagv1+...+tagkN+tagvN 10 9 12 ... 8

rowkey的具體實現

在沒有啓用salt的狀況下,我整理出來生成rowkey的代碼以下(注意一下:源碼中並無這段代碼哦):docker

public byte[] generateRowKey(String metricName, long timestamp, Map<String, String> tags) {
        // 獲取metricUid
        byte[] metricUid = tsdb.getUID(UniqueId.UniqueIdType.METRIC, metricName);

        // 將時間戳轉爲秒
        if ((timestamp & Const.SECOND_MASK) != 0L) {
            timestamp /= 1000L;
        }

        final long timestamp_offset = timestamp % Const.MAX_TIMESPAN;//3600
        // 提取出時間戳所在的整點(小時)時間
        final long basetime = timestamp - timestamp_offset;

        // 用TreeMap存儲<tagkUid,tagvUid>, 排序用的是memcmp()方法,下面會有介紹
        Map<byte[], byte[]> tagsUidMap = new org.hbase.async.Bytes.ByteMap<>();

        tags.forEach((k, v) -> tagsUidMap.put(
                tsdb.getUID(UniqueId.UniqueIdType.TAGK, k),
                tsdb.getUID(UniqueId.UniqueIdType.TAGV, v)));

        // 不加salt的rowkey,metricUid+整點時間戳+全部的tagK、tagV
        byte[] rowkey = new byte[metricUid.length + Const.TIMESTAMP_BYTES +
                tags.size() * (TSDB.tagk_width() + TSDB.tagv_width())];

        // 下面拷貝相應的數據到rowkey字節數組中的相應位置
        System.arraycopy(metricUid, 0, rowkey, 0, metricUid.length);
        Bytes.setInt(rowkey, (int) basetime, metricUid.length);

        int startOffset = metricUid.length + Const.TIMESTAMP_BYTES;
        for (Map.Entry<byte[], byte[]> entry : tagsUidMap.entrySet()) {
            System.arraycopy(entry.getKey(), 0, rowkey, startOffset, TSDB.tagk_width());
            startOffset += TSDB.tagk_width();

            System.arraycopy(entry.getValue(), 0, rowkey, startOffset, TSDB.tagv_width());
            startOffset += TSDB.tagv_width();
        }

        return rowkey;
    }

其中的ByteMap就是TreeMap,見org.hbase.async.Bytes.ByteMap

/** A convenient map keyed with a byte array.  */
public static final class ByteMap<V> extends TreeMap<byte[], V>
  implements Iterable<Map.Entry<byte[], V>> {

  public ByteMap() {
    super(MEMCMP);
  }
}

多個tag的排序規則是對tag_idbytes進行排序,調用的是org.hbase.async.Bytes#memcmp(final byte[] a, final byte[] b)方法,以下

/**
   * {@code memcmp} in Java, hooray.
   * @param a First non-{@code null} byte array to compare.
   * @param b Second non-{@code null} byte array to compare.
   * @return 0 if the two arrays are identical, otherwise the difference
   * between the first two different bytes, otherwise the different between
   * their lengths.
   */
public static int memcmp(final byte[] a, final byte[] b) {
  final int length = Math.min(a.length, b.length);
  if (a == b) {  // Do this after accessing a.length and b.length
    return 0;    // in order to NPE if either a or b is null.
  }
  for (int i = 0; i < length; i++) {
    if (a[i] != b[i]) {
      return (a[i] & 0xFF) - (b[i] & 0xFF);  // "promote" to unsigned.
    }
  }
  return a.length - b.length;
}

壓縮(compaction)

相關文檔:
http://opentsdb.net/docs/build/html/user_guide/definitions.html#compaction

An OpenTSDB compaction takes multiple columns in an HBase row and merges them into a single column to reduce disk space. This is not to be confused with HBase compactions where multiple edits to a region are merged into one. OpenTSDB compactions can occur periodically for a TSD after data has been written, or during a query.

tsd.storage.enable_compaction:是否開啓壓縮(默認爲true,開啓壓縮)

爲了減小存儲空間(講道理對查詢也有好處),opentsdb在寫入時序數據的同時會把rowkey放到ConcurrentSkipListMap中,一個daemon線程不斷檢查System.currentTimeMillis()/1000-3600-1以前的數據可否被壓縮,知足壓縮條件則會把一小時內的時序數據(它們的rowkey是相同的)查出來在內存壓縮(compact)成一列回寫(write)到HBase中,而後delete以前的原始數據。或者是查詢(query)操做可能也會觸發compaction操做。代碼見CompactionQueue

final class CompactionQueue extends ConcurrentSkipListMap<byte[], Boolean> {

  public CompactionQueue(final TSDB tsdb) {
      super(new Cmp(tsdb));
      // tsd.storage.enable_appends
      if (tsdb.config.enable_compactions()) {
        // 啓用了壓縮則會啓一個daemon的線程
        startCompactionThread();
      }
  }

  /**
     * Helper to sort the byte arrays in the compaction queue.
     * <p>
     * This comparator sorts things by timestamp first, this way we can find
     * all rows of the same age at once.
     */
    private static final class Cmp implements Comparator<byte[]> {

      /** The position with which the timestamp of metric starts.  */
      private final short timestamp_pos;

      public Cmp(final TSDB tsdb) {
        timestamp_pos = (short) (Const.SALT_WIDTH() + tsdb.metrics.width());
      }

      @Override
      public int compare(final byte[] a, final byte[] b) {
      // 取rowkey中的base_time進行排序
        final int c = Bytes.memcmp(a, b, timestamp_pos, Const.TIMESTAMP_BYTES);
        // If the timestamps are equal, sort according to the entire row key.
        return c != 0 ? c : Bytes.memcmp(a, b);
      }
  }
}

看看上面啓動的daemon線程在作啥CompactionQueue#Thrd

/**
   * Background thread to trigger periodic compactions.
   */
  final class Thrd extends Thread {
    public Thrd() {
      super("CompactionThread");
    }

    @Override
    public void run() {
      while (true) {
        final int size = size();
        // 達到最小壓縮閾值則觸發flush()
          if (size > min_flush_threshold) {
            final int maxflushes = Math.max(min_flush_threshold,
              size * flush_interval * flush_speed / Const.MAX_TIMESPAN);
            final long now = System.currentTimeMillis();
            // 檢查上個整點的數據可否被壓縮
            flush(now / 1000 - Const.MAX_TIMESPAN - 1, maxflushes);
          }
      }
    }
}

再看CompactionQueue#flush(final long cut_off, int maxflushes)

private Deferred<ArrayList<Object>> flush(final long cut_off, int maxflushes) {
    final ArrayList<Deferred<Object>> ds =
      new ArrayList<Deferred<Object>>(Math.min(maxflushes, max_concurrent_flushes));
    int nflushes = 0;
    int seed = (int) (System.nanoTime() % 3);
    for (final byte[] row : this.keySet()) {
      final long base_time = Bytes.getUnsignedInt(row,
          Const.SALT_WIDTH() + metric_width);
      if (base_time > cut_off) {
        // base_time比較靠近當前時間,則直接跳出
        break;
      } else if (nflushes == max_concurrent_flushes) {
        break;
      }
      // 這裏會發向hbase發get請求獲取時序數據,在callback中進行壓縮操做
      ds.add(tsdb.get(row).addCallbacks(compactcb, handle_read_error));
    }
    return group;
}

最後看一下compaction具體作了啥,見CompactionQueue#Compaction#compact()

public Deferred<Object> compact() {
  // merge the datapoints, ordered by timestamp and removing duplicates
  final ByteBufferList compacted_qual = new ByteBufferList(tot_values);
  final ByteBufferList compacted_val = new ByteBufferList(tot_values);

  mergeDatapoints(compacted_qual, compacted_val);

  // build the compacted columns
  final KeyValue compact = buildCompactedColumn(compacted_qual, compacted_val);

  final boolean write = updateDeletesCheckForWrite(compact);

  final byte[] key = compact.key();
  
  deleted_cells.addAndGet(to_delete.size());  // We're going to delete this.

  if (write) {
    // 把壓縮後的結果回寫到tsdb表
    Deferred<Object> deferred = tsdb.put(key, compact.qualifier(), compact.value(), compactedKVTimestamp);

    if (!to_delete.isEmpty()) {
      // 壓縮結果寫入成功後 delete查詢出來的cells
      deferred = deferred.addCallbacks(new DeleteCompactedCB(to_delete), handle_write_error);
    }
    return deferred;
  }  
}

// delete compacted cells的回調
private final class DeleteCompactedCB implements Callback<Object, Object> {

  /** What we're going to delete.  */
  private final byte[] key;
  private final byte[][] qualifiers;

  @Override
  public Object call(final Object arg) {
    return tsdb.delete(key, qualifiers).addErrback(handle_delete_error);
  }

  @Override
  public String toString() {
    return "delete compacted cells";
  }
}

追蹤整個compaction過程,咱們不難發現其中多了很多getwritedelete請求,數據量很是大的狀況下無形給HBase帶來不小壓力。留意一下,這裏可能也是咱們重點優化的地方。

追加模式(appends)

相關文檔:
http://opentsdb.net/docs/build/html/user_guide/writing/index.html#appends

Also in 2.2, writing to HBase columns via appends is now supported. This can improve both read and write performance in that TSDs will no longer maintain a queue of rows to compact at the end of each hour, thus preventing a massive read and re-write operation in HBase. However due to the way appends operate in HBase, an increase in CPU utilization, store file size and HDFS traffic will occur on the region servers. Make sure to monitor your HBase servers closely.

tsd.storage.enable_appends:默認是false
在追加模式下,opentsdb寫入的時候,會將rowkey相同的點的value值寫到一個單獨的ColumnQualifier(0x050000)中。因此與以前的直接寫入模式是兼容的,這就意味着能夠隨時啓用或者禁用追加模式。

/** The prefix ID of append columns */
public static final byte APPEND_COLUMN_PREFIX = 0x05;

/** The full column qualifier for append columns */
public static final byte[] APPEND_COLUMN_QUALIFIER = new byte[]{APPEND_COLUMN_PREFIX, 0x00, 0x00};

顯然這就是咱們想要的壓縮後的效果。少了把已經寫入HBase的數據拉過來在opentsdb內存壓縮,回寫數據,再刪除原數據的一系列操做,固然了壓力應該是丟給了HBase

追加模式會消耗更多的HBase集羣的資源(官方是這麼說的,究竟多大,有待研究),另外本人猜想對於大量高併發的寫入可能有鎖的同步問題,講道理單從瞬間寫入性能考慮,追加模式下的性能應該是不及以前的直接寫入。

opentsdb UID的分配(UID Assignment)

相關文檔:
http://opentsdb.net/docs/build/html/user_guide/uids.html#uid

相信到這裏應該已經到UID有必定的認識了,使用UID大大節省了存儲空間。

Within the storage system there is a counter that is incremented for each metric, tagk and tagv. When you create a new tsdb-uid table, this counter is set to 0 for each type.

很相似mysql中的自增主鍵。見UniqueId#allocateUid()

private Deferred<Long> allocateUid() {
// randomize_id默認是false,兩種方式:一種是隨機數uid,另一種是遞增uid
// tagk和tagv目前沒法配置,用的是遞增uid(metric卻是可配 tsd.core.uid.random_metrics默認false)
    if (randomize_id) {
    return Deferred.fromResult(RandomUniqueId.getRandomUID());
    } else { //實際走這裏,會去hbase的tsdb-uid表請求遞增uid
    return client.atomicIncrement(new AtomicIncrementRequest(table,
                                    MAXID_ROW, ID_FAMILY, kind));
    }
}

tsdb-uid表中rowkey0x00cell中存有目前三種類型的最大UID

查看metric、tagk、tagv目前的最大uid

這裏咱們看到metrictagktagv三種類型的UID映射是獨立的。另外,注意兩個與此相關的配置項

  • tsd.core.auto_create_metrics:默認爲false,是否給tsdb-uid表中不存在的metric分配UIDfalse的狀況下,寫入新的metric時序數據會拋出異常
  • tsd.core.preload_uid_cache:默認爲false,是否程序啓動時就從tsdb-uid表獲取UID並緩存在本地,見TSDB#TSDB(final HBaseClient client, final Config config)
if (config.getBoolean("tsd.core.preload_uid_cache")) {
    final ByteMap<UniqueId> uid_cache_map = new ByteMap<UniqueId>();
    uid_cache_map.put(METRICS_QUAL.getBytes(CHARSET), metrics);
    uid_cache_map.put(TAG_NAME_QUAL.getBytes(CHARSET), tag_names);
    uid_cache_map.put(TAG_VALUE_QUAL.getBytes(CHARSET), tag_values);
    UniqueId.preloadUidCache(this, uid_cache_map);
}

從這裏咱們也能夠看到使用這種遞增UID分配方式,先來的tagk必然會分配到數值較小的UID,後來的tagk會分配到數值較大的UID,如此一來結合上文寫入的時候rowkey中的tags會按照tagk_uid的byte數組進行排序,就能得出最早寫入的tagk是排在rowkey中較爲靠前的位置,那麼知道了這種規則,在某些狀況下對於查詢優化有沒有幫助呢?

opentsdb 查詢細節(Reading)

相關文檔:
http://opentsdb.net/docs/build/html/user_guide/query/index.html

查詢放在這個地方講是由於咱們只有弄清楚數據是怎麼存的,纔會明白如何取。經過前文咱們知道寫入的時候rowkey中的tags會按照tagk_uid的byte數組進行排序,那麼一樣從HBase讀數據的時候講道理也應該這樣排序是否是。來看QueryUtil#setDataTableScanFilter()
可是,正常狀況下的scan(除非查詢的時候設置explicit_tagstrue),對於tag的過濾並非直接拼在rowkey中,而是放在scanner.setFilter(regex_filter)

final byte[] start_row = new byte[metric_salt_width + Const.TIMESTAMP_BYTES];
final byte[] end_row = new byte[metric_salt_width + Const.TIMESTAMP_BYTES];
scanner.setStartKey(start_row);
scanner.setStopKey(end_row);
 
// 關於regex_filter生成下面有簡單例子
if (!(explicit_tags && enable_fuzzy_filter)) {
    scanner.setFilter(regex_filter);
    return;
}

QueryUtil#getRowKeyUIDRegex()

// Generate a regexp for our tags.  Say we have 2 tags: { 0 0 1 0 0 2 }
// and { 4 5 6 9 8 7 }, the regexp will be:
// "^.{7}(?:.{6})*\\Q\000\000\001\000\000\002\\E(?:.{6})*\\Q\004\005\006\011\010\007\\E(?:.{6})*$"

官方對查詢設置explicit_tagstrue的介紹:
http://opentsdb.net/docs/build/html/user_guide/query/filters.html#explicit-tags
意思我已經知道了要查詢的metric明確只有這些tags,想查詢的時序數據不會出現其餘tag,這樣opentsdb就會把用戶過濾的tag直接拼到rowkey中,必定程度上優化了查詢。見代碼

if (explicit_tags && enable_fuzzy_filter) {
    fuzzy_key = new byte[prefix_width + (row_key_literals.size() *
        (name_width + value_width))];
    fuzzy_mask = new byte[prefix_width + (row_key_literals.size() *
        (name_width + value_width))];
    System.arraycopy(scanner.getCurrentKey(), 0, fuzzy_key, 0,
        scanner.getCurrentKey().length);
 
    // 由於已經明確了只有哪些指定的tag,這個時候纔會把tags直接拼到startKey中
    scanner.setStartKey(fuzzy_key);
}

explicit_tagstrue的狀況下,會用FuzzyRowFilter,看一下源碼中的描述

/**
 * FuzzyRowFilter is a server-side fast-forward filter that allows skipping
 * whole range of rows when scanning. The feature is available in HBase
 * 0.94.5 and above.
 * <p>
 * It takes two byte array to match a rowkey, one to hold the fixed value
 * and one to hold a mask indicating which bytes of the rowkey must match the
 * fixed value. The two arrays must have the same length.
 * <p>
 * Bytes in the mask can take two values, 0 meaning that the corresponding byte
 * in the rowkey must match the corresponding fixed byte and 1 meaning that the
 * corresponding byte in the rowkey can take any value.
 * <p>
 * One can combine several {@link FuzzyFilterPair} to match multiple patterns at
 * once.
 * <p>
 * Example :
 * You store logs with this rowkey design :
 *   group(3bytes)timestamp(4bytes)severity(1byte)
 *
 * You want to get all FATAL("5") logs :
 *   * Build a FuzzyFilterPair with
 *     - rowkey     : "????????5"
 *     - fuzzy mask : "111111110"
 * And CRITICAL("4") logs only for web servers :
 *   * Add another FuzzyFilterPair with
 *     - rowkey     : "web????4"
 *     - fuzzy mask : "00011110"
 *
 * @since 1.7
 */
public final class FuzzyRowFilter extends ScanFilter {
// ...
}

總結一下就是,若是你明確你要查的數據有哪幾個tag,建議查詢的時候指定explicit_tagstrue,有助於查詢優化。

# Example 1: 
http://host:4242/api/query?start=1h-ago&m=sum:explicit_tags:sys.cpu.system{host=web01}

# Example 2:
http://host:4242/api/query?start=1h-ago&m=sum:explicit_tags:sys.cpu.system{host=*}{dc=*}

# Example 3:
{
    "start":1584408560754,
    "end":1584409460754,
    "msResolution":false,
    "queries":[
        {
            "aggregator":"avg",
            "metric":"metric.test",
            "downsample":"5m-avg",
            "explicitTags":true,
            "filters":[
                {
                    "type":"literal_or",
                    "tagk":"instance",
                    "filter":"total",
                    "groupBy":true
                },
                {
                    "type":"literal_or",
                    "tagk":"ip",
                    "filter":"192.168.1.1",
                    "groupBy":true
                }
            ]
        }
    ]
}

關於tsd.storage.use_otsdb_timestamp這個配置與HBase特性有關。下篇寫優化的時候再講,這裏提出來放在這裏。TsdbQuery#getScanner(final int salt_bucket)

// tsd.storage.use_otsdb_timestamp
if (tsdb.getConfig().use_otsdb_timestamp()) {
      long stTime = (getScanStartTimeSeconds() * 1000);
      long endTime = end_time == UNSET ? -1 : (getScanEndTimeSeconds() * 1000);
      if (tsdb.getConfig().get_date_tiered_compaction_start() <= stTime &&
          rollup_query == null) {
        // TODO - we could set this for rollups but we also need to write
        // the rollup columns at the proper time.
        scanner.setTimeRange(stTime, endTime);
      }
}

rowkey中加salt的狀況(Salting)

相關文檔:
http://opentsdb.net/docs/build/html/user_guide/writing/index.html#salting

時序數據的寫入,寫熱點是一個不可規避的問題,當某個metric下數據點不少時,則該metric很容易形成寫入熱點,即往一個region server寫,甚至同一個region,若是這樣,對這部分數據的讀寫都會落到HBase集羣中的一臺機器上,沒法發揮集羣的處理能力,甚至直接將某個region server壓垮。加salt就是爲了將時序數據的rowkey打散,從而分配到不一樣的region中,以均衡負載。

When enabled, a configured number of bytes are prepended to each row key. Each metric and combination of tags is then hashed into one "bucket", the ID of which is written to the salt bytes
從2.2開始,OpenTSDB採起了容許將metricsalt,加salt後的變化就是在rowkey前會拼上一個桶編號(bucket index)。

To enable salting you must modify the config file parameter tsd.storage.salt.width and optionally tsd.storage.salt.buckets. We recommend setting the salt width to 1 and determine the number of buckets based on a factor of the number of region servers in your cluster. Note that at query time, the TSD will fire tsd.storage.salt.buckets number of scanners to fetch data. The proper number of salt buckets must be determined through experimentation as at some point query performance may suffer due to having too many scanners open and collating the results. In the future the salt width and buckets may be configurable but we didn't want folks changing settings on accident and losing data.

對上面的描述解釋一下:
tsd.storage.salt.widthrowkey加多少個byte前綴(默認0(即不開啓),若是啓用的話 建議1)
tsd.storage.salt.buckets:分桶數(默認20,建議根據region servers數肯定)

  • 寫入的時候若是啓用了salt,則根據metric_uid+全部[tagK+tagV]uid組成的byte數組,計算hashcode值,對分桶數取模,得出salt

RowKey#prefixKeyWithSalt(注意:取的是關鍵代碼,去除了干擾信息)

public static void prefixKeyWithSalt(final byte[] row_key) {
    // tsd.storage.salt.width
    if (Const.SALT_WIDTH() > 0) {
      final int tags_start = Const.SALT_WIDTH() + TSDB.metrics_width() + 
          Const.TIMESTAMP_BYTES;
      
      // we want the metric and tags, not the timestamp
      final byte[] salt_base = 
          new byte[row_key.length - Const.SALT_WIDTH() - Const.TIMESTAMP_BYTES];
      System.arraycopy(row_key, Const.SALT_WIDTH(), salt_base, 0, TSDB.metrics_width());
      System.arraycopy(row_key, tags_start,salt_base, TSDB.metrics_width(), 
          row_key.length - tags_start);
      // 這裏經過對salt_buckets取模得出salt位的數值
      int modulo = Arrays.hashCode(salt_base) % Const.SALT_BUCKETS();// tsd.storage.salt.buckets
    
      final byte[] salt = getSaltBytes(modulo);
      // 填充salt位的byte
      System.arraycopy(salt, 0, row_key, 0, Const.SALT_WIDTH());
    }
}
  • 這個時候大多數人就會疑惑了,在rowkey前加了salt位,那麼查詢的時候怎麼搞?
    客戶端查詢OpenTSDB一條數據,OpenTSDB將這個請求拆成分桶數個查詢到HBase,而後返回桶數個結果集到OpenTSDB層作合併。對HBase併發請求相應的也會桶數倍的擴大。見TsdbQuery#findSpans()
if (Const.SALT_WIDTH() > 0) {
    final List<Scanner> scanners = new ArrayList<Scanner>(Const.SALT_BUCKETS());
    for (int i = 0; i < Const.SALT_BUCKETS(); i++) {
      // 構建出等於分桶數大小個scanner
      scanners.add(getScanner(i));
    }
    scan_start_time = DateTime.nanoTime();
    return new SaltScanner(tsdb, metric, scanners, spans, scanner_filters,
        delete, rollup_query, query_stats, query_index, null, 
        max_bytes, max_data_points).scan();
}

在每個scannerrowkey前面填充bucket index做爲salt位,這樣才能去hbasescan到完整的結果,見QueryUtil#getMetricScanner()

public static Scanner getMetricScanner(final TSDB tsdb, final int salt_bucket, 
      final byte[] metric, final int start, final int stop, 
      final byte[] table, final byte[] family) {
    
    if (Const.SALT_WIDTH() > 0) {
      final byte[] salt = RowKey.getSaltBytes(salt_bucket);
      // 這裏把salt_bucket填充到rowkey中
      System.arraycopy(salt, 0, start_row, 0, Const.SALT_WIDTH());
      System.arraycopy(salt, 0, end_row, 0, Const.SALT_WIDTH());
    }
    return scanner;
}

其餘配置(Configuration)

相關文檔:

  • opentsdb的配置:http://opentsdb.net/docs/build/html/user_guide/configuration.html
  • AsyncHBase client的配置:http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html

opentsdb使用的hbase clienthttp://opentsdb.github.io/asynchbase/

public TSDB(final HBaseClient client, final Config config) {
    this.config = config;
    if (client == null) {
      final org.hbase.async.Config async_config;
      if (config.configLocation() != null && !config.configLocation().isEmpty()) {
        try {
          // AsyncHBase client讀取和opentsdb同樣的文件
          // 因此 有一些須要設置AsyncHBase client的地方直接寫在opentsdb的配置文件就能生效
          async_config = new org.hbase.async.Config(config.configLocation());
        } catch (final IOException e) {
          throw new RuntimeException("Failed to read the config file: " +
              config.configLocation(), e);
        }
      } else {
        async_config = new org.hbase.async.Config();
      }
      async_config.overrideConfig("hbase.zookeeper.znode.parent",
          config.getString("tsd.storage.hbase.zk_basedir"));
      async_config.overrideConfig("hbase.zookeeper.quorum",
          config.getString("tsd.storage.hbase.zk_quorum"));
      this.client = new HBaseClient(async_config);
    } else {
      this.client = client;
    }
}

性能優化的一方面可能與參數調優有關,有些與啓動參數,操做系統設置等有關,有些參數就是寫在配置文件的(好比說最大鏈接數、超時時間等等)

這裏提一下前面沒有講到的與opentsdb相關的兩個配置。

  • tsd.query.skip_unresolved_tagvs:默認爲false,查詢的時候遇到不存在的tagv時候是否跳過,true則跳過,false則拋出異常,我的感受這個默認false極不友好。TagVFilter#resolveTags()#TagVErrback
/**
  * Allows the filter to avoid killing the entire query when we can't resolve
  * a tag value to a UID.
  */
class TagVErrback implements Callback<byte[], Exception> {
  @Override
  public byte[] call(final Exception e) throws Exception {
    if (config.getBoolean("tsd.query.skip_unresolved_tagvs")) {
      LOG.warn("Query tag value not found: " + e.getMessage());
      return null;
    } else {
      // 默認狀況下直接拋出異常
      throw e;
    }
  }
}
  • AsyncHBase Configuration中的hbase.rpc.timeout:How long, in milliseconds, to wait for a response to an RPC from a region server before failing the RPC with a RpcTimedOutException. This value can be overridden on a per-RPC basis. A value of 0 will not allow RPCs to timeout

http接口(HTTP API)

相關文檔:
http://opentsdb.net/docs/build/html/api_http/index.html

經常使用:

  • put:http://opentsdb.net/docs/build/html/api_http/put.html
  • query:http://opentsdb.net/docs/build/html/api_http/query/index.html
  • uid:http://opentsdb.net/docs/build/html/api_http/uid/index.html
  • stats:http://opentsdb.net/docs/build/html/api_http/stats/index.html

同時咱們注意到:OpenTSDB3.0相關的工做正在進行中(work-in-progress),詳情:http://opentsdb.net/docs/3x/build/html/index.html

opentsdb鏈接Kerberos認證的HBase(非重點,僅順手記錄於此)

相關文檔:
http://opentsdb.github.io/asynchbase/docs/build/html/authentication.html

http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html(搜kerberos關鍵字)

相關問題討論:
https://github.com/OpenTSDB/opentsdb/issues/491

參考帶有Kerberos認證hbase docker鏡像Dockerfile項目:
https://github.com/Knappek/docker-phoenix-secure
該項目中bootstrap-phoenix.shdocker-compose.yml以及config_files下的配置文件頗有參考價值

具體操做

  1. 根據實際狀況在/etc/opentsdb/opentsdb.conf配置 末尾添加:
hbase.security.auth.enable=true
hbase.security.authentication=kerberos
hbase.sasl.clientconfig=Client
hbase.kerberos.regionserver.principal=hbase/_HOST@EXAMPLE.COM
  1. 根據實際狀況新建hbase-client.jaas文件,文件內容基本以下樣子
Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    useTicketCache=false
    keyTab="/etc/security/keytabs/hbase.keytab"
    principal="hbase/phoenix.docker.com";
};
  1. 修改/usr/share/opentsdb/etc/init.d/opentsdb文件,修改啓動參數
# start command的位置(約第78行處)加上 -Djava.security.auth.login.config=hbase-client.jaas文件路徑
# 注意:若是Zookeeper沒有加Kerberos認證,再加一個參數 -Dzookeeper.sasl.client=false
JVMARGS="-Djava.security.auth.login.config=/.../jaas.conf"

重啓opentsdb,若是成功,則能看到以下示例日誌:

13:31:55.045 INFO [ZooKeeperSaslClient.run] - Client will use GSSAPI as SASL mechanism.
13:31:55.062 INFO [Login.getRefreshTime] - TGT valid starting at: Fri Apr 03 13:31:54 CST 2020
13:31:55.062 INFO [Login.getRefreshTime] - TGT expires: Sat Apr 04 13:31:54 CST 2020

13:31:55.255 INFO [KerberosClientAuthProvider.run] - Client will use GSSAPI as SASL mechanism.
13:31:55.269 INFO [RegionClient.channelConnected] - Initialized security helper: org.hbase.async.SecureRpcHelper96@6471f1e for region client: RegionClient@63709091(chan=null, #pending_rpcs=2, #batched=0, #rpcs_inflight=0)
13:31:55.276 INFO [SecureRpcHelper96.handleResponse] - SASL client context established. Negotiated QoP: auth on for: RegionClient@63709091(chan=null, #pending_rpcs=2, #batched=0, #rpcs_inflight=0)

寫在後面

閱讀、探索的過程很累,遇到不太理解的地方又會很困惑,但柳暗花明又一村,凌絕頂一覽衆山小的喜悅卻難以言表。另外,整理的過程也挺煩人,既然花時間整理了,我儘可能讓感興趣的讀者能從中有一絲收穫。固然了,整理的過程也鍛鍊了我學習知識、解決問題的思路與能力。因爲本人能力之有限、理解之不透徹,文中若有錯誤的理解、不恰當的描述,衷心但願朋友提出一塊兒討論!

相關文章
相關標籤/搜索