ClickHouse最佳實戰之分佈表寫入流程分析

雲妹導讀:node

前不久,京東智聯雲正式上線了基於Clickhouse的分析型雲數據庫JCHDB,一經推出便受到廣大用戶的極大關注。有興趣的小夥伴能夠回顧上一篇文章**《比MySQL快839倍!揭開分析型數據庫JCHDB的神祕面紗》**。數據庫

ClickHouse像ElasticSearch同樣具備數據分片(shard)的概念,這也是分佈式存儲的特色之一,即經過並行讀寫提升效率。ClickHouse依靠Distributed引擎實現了Distributed(分佈式)表機制,在全部分片(本地表)上創建視圖進行分佈式查詢,使用很方便。express

Distributed表引擎是**一種特殊的表引擎,自身不會存儲任何數據,而是經過讀取或寫入其餘遠端節點上的表進行數據處理的表引擎。**該表引擎須要依賴各個節點的本地表來建立,本地表的存在是Distributed表建立的依賴條件,建立語句以下:異步

CREATE TABLE {teble} ON CLUSTER {cluster}
AS {local_table}
ENGINE= Distributed({cluster}, {database}, {local_table},{policy})

這裏的policy通常可使用隨機(例如rand())或哈希(例如halfMD5hash(id))。tcp

再來看下ClickHouse集羣節點配置文件,相關參數以下:分佈式

<remote_servers>
    <logs>
        <shard>
            <weight>1</weight>
            <internal_replication>true</internal_replication>
            <replica>
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <internal_replication>true</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <port>9000</port>
            </replica>
        </shard>
    </logs>
</remote_servers>

有了上面的基礎瞭解,就將進入主題了,本文主要是對Distributed表如何寫入及如何分發作一下分析,略過SQL的詞法解析、語法解析等步驟,從寫入流開始,其構造方法以下:函數

DistributedBlockOutputStream(const Context & context_, StorageDistributed &
storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_, bool
insert_sync_, UInt64 insert_timeout_);

若是insert_sync_爲true,表示是同步寫入,並配合insert_timeout_參數使用(insert_timeout_爲零表示沒有超時時間);若是insert_sync_爲false,表示寫入是異步。優化

1,同步寫入仍是異步寫入ui

同步寫入是指數據直寫入實際的表中,而異步寫入是指數據首先被寫入本地文件系統,而後發送到遠端節點。this

BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context &
context)
{
   ......

   /// Force sync insertion if it is remote() table function
   bool insert_sync = settings.insert_distributed_sync || owned_cluster;
   auto timeout = settings.insert_distributed_timeout;
   /// DistributedBlockOutputStream will not own cluster, but will own 
ConnectionPools of the cluster
   return std::make_shared(
       context, *this, createInsertToRemoteTableQuery(remote_database,
remote_table, getSampleBlockNonMaterialized()), cluster,
       nsert_sync, timeout);
}

是否執行同步寫入是由insert_sync決定的,最終是由是否配置insert_distributed_sync(默認爲false)和owned_cluster值的或關係決定的,通常在使用MergeTree之類的普通表引擎時,一般是異步寫入,但在使用表函數時(使用owned_cluster來判斷是不是表函數),一般會使用同步寫入。這也是在設計業務邏輯時須要注意的。

owned_cluster是何時賦值的呢?

StoragePtr TableFunctionRemoteexecuteImpl(const ASTPtr & astfunction, const Context & 
context, const stdstring & tablename) const
{ 
 ......
 StoragePtr res = remotetablefunction_ptr
     ? StorageDistributed::createWithOwnCluster(
       table_name,
       structureremotetable,
       remotetablefunction_ptr,
       cluster,
       context)
     : StorageDistributed::createWithOwnCluster(
       table_name,
       structureremotetable,
       remote_database,
       remote_table,
       cluster,
       context);
 ......
}  
StoragePtr StorageDistributed::createWithOwnCluster(
  const std::string & tablename, 
  const ColumnsDescription & columns_,
  ASTPtr & remotetablefunctionptr, 
  ClusterPtr & ownedcluster, 
  const Context & context_)
{ 
  auto res = create(String{}, tablename, columns, ConstraintsDescription{}, 
remotetablefunctionptr, String{}, context, ASTPtr(), String(), false);
  res->ownedcluster = ownedcluster_;
  return res;
}

能夠發如今建立remote表時會根據remote_table_function_ptr參數對最終的owned_cluster_賦值爲true。

2,異步寫入是如何實現的

瞭解了何時使用同步寫入何時異步寫入後,再繼續分析正式的寫入過程,同步寫入通常場景中涉及較少,這裏主要對異步寫入邏輯進行分析。outStream的write方法主邏輯以下:

DistributedBlockOutputStream::write()
                 ↓
            if insert_sync
             |         |
           true      false
             ↓         ↓
      writeSync()   writeAsync()

其實這個write方法是重寫了virtual void IBlockOutputStream::write(const Block & block),因此節點在接收到流並調用流的write方法就會進入該邏輯中。而且根據insert_sync來決定走同步寫仍是異步寫。

3,寫入本地節點仍是遠端節點

主要仍是對異步寫入進行分析,其實writeAsync()最終的實現方法是writeAsyncImpl(),大體邏輯圖以下:

writeAsyncImpl()
               ↓
 if shard_info.hasInternalReplication()
    |                          |
   true                       false
    ↓                          ↓
writeToLocal()             writeToLocal()
    ↓                          ↓
writeToShard()        for(every shard){writeToShard()}
    ↓                          ↓ 
   end                        end

其中getShardsInfo()方法就是獲取config.xml配置文件中獲取集羣節點信息,hasInternalReplication()就對應着配置文件中的internal_replication參數,若是爲true,就會進入最外層的if邏輯,不然就會進入else邏輯。

其中writeToLocal()方法是相同的,是指若是shard包含本地節點,優先選擇本地節點進行寫入;後半部分writeToShard()就是根據internal_replication參數的取值來決定是寫入其中一個遠端節點,仍是全部遠端節點都寫一次。

4,數據如何寫入本地節點

固然通常狀況Distributed表仍是基於ReplicatedMergeTree系列表進行建立,而不是基於表函數的,因此大多數場景仍是會先寫入本地再分發到遠端節點。那寫入Distributed表的數據是如何保證原子性落盤而不會在數據正在寫入的過程當中就把不完整的數據發送給遠端其餘節點呢?看下writeToShard()方法大體邏輯,以下:

writeToShard()
          ↓
for(every dir_names){
          |
          └──if first iteration
                 |       |
               false     true
                 ↓       ↓ 
                 |       ├──storage.requireDirectoryMonitor()
                 |       ├──CompressedWriteBuffer
                 |       ├──writeStringBinary()
                 |       ├──stream.writePrefix()
                 |       ├──stream.write(block)
                 |       ├──stream.writeSuffix()
                 ↘     ↙ 
             link(tmp_file, file) 
                    └──}

繼續具體再看下源碼的具體實現,以下:

void DistributedBlockOutputStream::writeToShard(const Block & block, const
std::vector<std::string> & dir_names) 
{
   /** tmp directory is used to ensure atomicity of transactions
     * and keep monitor thread out from reading incomplete data
     */
   std::string first_file_tmp_path{};

   auto first = true;

   /// write first file, hardlink the others
   for (const auto & dir_name : dir_names)
   {
       const auto & path = storage.getPath() + dir_name + '/';

       /// ensure shard subdirectory creation and notify storage
       if (Poco::File(path).createDirectory())
           storage.requireDirectoryMonitor(dir_name);

       const auto & file_name = toString(storage.file_names_increment.get()) +
".bin";
       const auto & block_file_path = path + file_name;

       /** on first iteration write block to a temporary directory for 
subsequent hardlinking to ensure
           * the inode is not freed until we're done */
       if (first)
       {
           first = false;

           const auto & tmp_path = path + "tmp/";
           Poco::File(tmp_path).createDirectory();
           const auto & block_file_tmp_path = tmp_path + file_name;

           first_file_tmp_path = block_file_tmp_path;

           WriteBufferFromFile out{block_file_tmp_path};
           CompressedWriteBuffer compress{out};
           NativeBlockOutputStream stream{compress, ClickHouseRevision::get(),
block.cloneEmpty()};

           writeVarUInt(UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER), out);
           context.getSettingsRef().serialize(out);
           writeStringBinary(query_string, out);

          stream.writePrefix();
          stream.write(block);
          stream.writeSuffix();
       }

       if (link(first_file_tmp_path.data(), block_file_path.data()))
           throwFromErrnoWithPath("Could not link " + block_file_path + " to "
+ first_file_tmp_path, block_file_path,
                  ErrorCodes::CANNOT_LINK);
   }
       ......
}

首先來了解下Distributed表在目錄中的存儲方式,默認位置都是/var/lib/clickhouse/data/{database}/{table}/在該目錄下會爲每一個shard生成不一樣的目錄,其中存放須要發送給該shard的數據文件,例如:

[root@ck test]# tree
.
├── 'default@ck2-0:9000,default@ck2-1:9000'
│   ├── 25.bin
│   └── tmp
│   └── 26.bin
└── 'default@ck3-0:9000,default@ck3-1:9000'
└── tmp

能夠發現每一個shard對應的目錄名是{darabse}@{hostname}:{tcpPort}的格式,若是多個副本會用,分隔。而且每一個shard目錄中還有個tmp目錄,這個目錄的設計在writeToShard()方法中作了解釋,是爲了不數據文件在沒寫完就被髮送到遠端。

數據文件在本地寫入的過程當中會先寫入tmp路徑中,寫完後經過硬連接link到shard目錄,保證只要在shard目錄中出現的數據文件都是完整寫入的數據文件。

數據文件的命名是經過全局遞增的數字加.bin命名,是爲了在後續分發到遠端節點保持順序性。

5,數據如何分發到各個節點

細心的你可能已經發如今writeToShard()方法中有個requireDirectoryMonitor(),這個方法就是將shard目錄註冊監聽,並經過專用類StorageDistributedDirectoryMonitor來實現數據文件的分發,根據不一樣配置能夠實現逐一分發或批量分發。而且包含對壞文件的容錯處理。

分析到這,可能還有人會以爲雲裏霧裏,以爲整個流程串不起來,其實這樣寫是爲了先不影響Distributed表寫入的主流程,明白了這個再附加上sharding_key拆分和權重拆分就很好理解了。

上面提到過writeAsync()的最終實現方法是writeAsyncImpl,這個說法是沒問題的,可是中間還有段關鍵邏輯,以下:

writeAsync()
                           ↓
if storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1
               |                       |
             true                     false
              ↓                        ↓
      writeAsyncImpl(block)      writeSplitAsync(block)
                                        ↓
                                   splitBlock(block)
                                        ↓
                        writeAsyncImpl(splitted_blocks,shard_idx)

getShardingKeyExpr()方法就是去獲取sharding_key生成的表達式指針,該表達式是在建立表時就生成的,以下:

sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context,
getColumns().getAllPhysical(), false);

那sharding_key和sharding_key_expr是什麼關係呢?以下:

const ExpressionActionsPtr & getShardingKeyExpr() const { return 
sharding_key_expr; }

因此說sharding_key_expr最終主要就是由sharding_key決定的。

通常狀況下getShardingKeyExpr()方法都爲true,若是再知足shard數量大於1,就會對block進行拆分,由splitBlock()方法主要邏輯就是建立selector並使用selector進行切割,大體邏輯以下:

splitBlock()
                  ↓
           createSelector(block)
                  ↓
for(every shard){column->scatter(num_shards, selector);}

對於如何建立selector以及selector中都作了什麼事兒,來具體看下源碼截取,以下:

IColumn::Selector DistributedBlockOutputStream::createSelector(const Block &
source_block)
{
    Block current_block_with_sharding_key_expr = source_block;
    storage.getShardingKeyExpr()- 
>execute(current_block_with_sharding_key_expr);

    const auto & key_column =
current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName
());
    const auto & slot_to_shard = cluster->getSlotToShard();
    ......
   throw Exception{"Sharding key expression does not evaluate to an integer 
type", ErrorCodes::TYPE_MISMATCH};
}

看splitBlock()方法,ClickHouse是利用createSelector()方法構造selector來進行後續的處理。在createSelector()方法中最重要的就是key_column和slot_to_shard。

key_column是經過sharding_key間接得到的,是爲了根據主鍵列進行切割;slot_to_shard是shard插槽,這裏就是爲了處理權重,在後續向插槽中插入數據時就會結合config.xml中的weight進行按比例處理。

細節比較複雜這裏不作太細緻的分析,有興趣能夠自行看下(如template IColumn::Selector createBlockSelector())。

到此,對於Distributed表的寫入流程的關鍵點就大體分析完了。篇幅有限有些細節沒有作過多說明,有興趣的能夠自行再瞭解下。

經過對Distributed表寫入流程的分析,瞭解了該類型表的實際工做原理,因此在實際應用中有幾個點還須要關注一下:

  1. Distributed表在寫入時會在本地節點生成臨時數據,會產生寫放大,因此會對CPU及內存形成一些額外消耗,建議儘可能少使用Distributed表進行寫操做;
  2. Distributed表寫的臨時block會把原始block根據sharding_key和weight進行再次拆分,會產生更多的block分發到遠端節點,也增長了merge的負擔;
  3. Distributed表若是是基於表函數建立的,通常是同步寫,須要注意。

瞭解原理才能更好的使用,遇到問題才能更好的優化。

點擊【閱讀原文】便可前往京東智聯雲控制檯開通試用JCHDB。

相關文章
相關標籤/搜索