【原創】大叔問題定位分享(22)hive同時執行多個insert overwrite table只有1個能夠執行

hive 2.1html

一 問題

最近有一個場景,要向一個表的多個分區寫數據,爲了縮短執行時間,採用併發的方式,多個sql同時執行,分別寫不一樣的分區,同時開啓動態分區:sql

set hive.exec.dynamic.partition=trueapache

insert overwrite table test_table partition(dt) select * from test_table_another where dt = 1;併發

結果發現只有1個sql運行,其餘sql都會卡住;
查看hive thrift server線程堆棧發現請求都卡在DbTxnManager上,hive關鍵配置以下:app

hive.support.concurrency=true
hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManageride

配置對應的默認值及註釋:oop

org.apache.hadoop.hive.conf.HiveConfui

    HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false,
        "Whether Hive supports concurrency control or not. \n" +
        "A ZooKeeper instance must be up and running when using zookeeper Hive lock manager "),

    HIVE_TXN_MANAGER("hive.txn.manager",
        "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager",
        "Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive\n" +
        "transactions, which also requires appropriate settings for hive.compactor.initiator.on,\n" +
        "hive.compactor.worker.threads, hive.support.concurrency (true), hive.enforce.bucketing\n" +
        "(true), and hive.exec.dynamic.partition.mode (nonstrict).\n" +
        "The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides\n" +
        "no transactions."),

二 代碼分析

hive執行sql的詳細過程詳見:http://www.javashuo.com/article/p-opeaomay-eo.htmlspa

 

hive中執行sql最終都會調用到Driver.run,run會調用runInternal,下面直接看runInternal代碼:線程

org.apache.hadoop.hive.ql.Driver

  private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
      throws CommandNeedRetryException {
...
      if (requiresLock()) {
        // a checkpoint to see if the thread is interrupted or not before an expensive operation
        if (isInterrupted()) {
          ret = handleInterruption("at acquiring the lock.");
        } else {
          ret = acquireLocksAndOpenTxn(startTxnImplicitly);
        }
...

  private boolean requiresLock() {
    if (!checkConcurrency()) {
      return false;
    }
    // Lock operations themselves don't require the lock.
    if (isExplicitLockOperation()){
      return false;
    }
    if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_LOCK_MAPRED_ONLY)) {
      return true;
    }
    Queue<Task<? extends Serializable>> taskQueue = new LinkedList<Task<? extends Serializable>>();
    taskQueue.addAll(plan.getRootTasks());
    while (taskQueue.peek() != null) {
      Task<? extends Serializable> tsk = taskQueue.remove();
      if (tsk.requireLock()) {
        return true;
      }
...

  private boolean checkConcurrency() {
    boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
    if (!supportConcurrency) {
      LOG.info("Concurrency mode is disabled, not creating a lock manager");
      return false;
    }
    return true;
  }

  private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) {
...
      txnMgr.acquireLocks(plan, ctx, userFromUGI);
...

runInternal會調用requiresLock判斷是否須要lock,requiresLock有兩個判斷:

  • 調用checkConcurrency,checkConcurrency會檢查hive.support.concurrency=true才須要lock;
  • 調用Task.requireLock,只有部分task才須要lock;

若是判斷須要lock,會調用acquireLocksAndOpenTxn,acquireLocksAndOpenTxn會調用HiveTxnManager.acquireLocks來獲取lock;

 

1)先看那些task須要lock:

org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer

  private void analyzeAlterTablePartMergeFiles(ASTNode ast,
      String tableName, HashMap<String, String> partSpec)
      throws SemanticException {
...
      DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), mergeDesc);
      ddlWork.setNeedLock(true);
...

可見DDL操做須要;

2)再看怎樣獲取lock:

org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

  public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
    try {
      acquireLocksWithHeartbeatDelay(plan, ctx, username, 0);
...

  void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException {
    LockState ls = acquireLocks(plan, ctx, username, true);
...

  LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException {
...
      switch (output.getType()) {
        case DATABASE:
          compBuilder.setDbName(output.getDatabase().getName());
          break;

        case TABLE:
        case DUMMYPARTITION:   // in case of dynamic partitioning lock the table
          t = output.getTable();
          compBuilder.setDbName(t.getDbName());
          compBuilder.setTableName(t.getTableName());
          break;

        case PARTITION:
          compBuilder.setPartitionName(output.getPartition().getName());
          t = output.getPartition().getTable();
          compBuilder.setDbName(t.getDbName());
          compBuilder.setTableName(t.getTableName());
          break;

        default:
          // This is a file or something we don't hold locks for.
          continue;
      }
...
    LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks);
    ctx.setHiveLocks(locks);
    return lockState;
  }

可見當開啓動態分區時,鎖的粒度是DbName+TableName,這樣就會致使多個sql只有1個sql能夠拿到lock,其餘sql只能等待;

三 總結

解決問題的方式有幾種:

  1. 關閉動態分區:set hive.exec.dynamic.partition=false
  2. 關閉併發:set hive.support.concurrency=false
  3. 關閉事務:set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager

三者任選其一,推薦第1種,由於在剛纔的場景下,不須要動態分區;

相關文章
相關標籤/搜索