hive 2.1html
最近有一個場景,要向一個表的多個分區寫數據,爲了縮短執行時間,採用併發的方式,多個sql同時執行,分別寫不一樣的分區,同時開啓動態分區:sql
set hive.exec.dynamic.partition=trueapache
insert overwrite table test_table partition(dt) select * from test_table_another where dt = 1;併發
結果發現只有1個sql運行,其餘sql都會卡住;
查看hive thrift server線程堆棧發現請求都卡在DbTxnManager上,hive關鍵配置以下:app
hive.support.concurrency=true
hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManageride
配置對應的默認值及註釋:oop
org.apache.hadoop.hive.conf.HiveConfui
HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false, "Whether Hive supports concurrency control or not. \n" + "A ZooKeeper instance must be up and running when using zookeeper Hive lock manager "), HIVE_TXN_MANAGER("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", "Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive\n" + "transactions, which also requires appropriate settings for hive.compactor.initiator.on,\n" + "hive.compactor.worker.threads, hive.support.concurrency (true), hive.enforce.bucketing\n" + "(true), and hive.exec.dynamic.partition.mode (nonstrict).\n" + "The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides\n" + "no transactions."),
hive執行sql的詳細過程詳見:http://www.javashuo.com/article/p-opeaomay-eo.htmlspa
hive中執行sql最終都會調用到Driver.run,run會調用runInternal,下面直接看runInternal代碼:線程
org.apache.hadoop.hive.ql.Driver
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException { ... if (requiresLock()) { // a checkpoint to see if the thread is interrupted or not before an expensive operation if (isInterrupted()) { ret = handleInterruption("at acquiring the lock."); } else { ret = acquireLocksAndOpenTxn(startTxnImplicitly); } ... private boolean requiresLock() { if (!checkConcurrency()) { return false; } // Lock operations themselves don't require the lock. if (isExplicitLockOperation()){ return false; } if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_LOCK_MAPRED_ONLY)) { return true; } Queue<Task<? extends Serializable>> taskQueue = new LinkedList<Task<? extends Serializable>>(); taskQueue.addAll(plan.getRootTasks()); while (taskQueue.peek() != null) { Task<? extends Serializable> tsk = taskQueue.remove(); if (tsk.requireLock()) { return true; } ... private boolean checkConcurrency() { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { LOG.info("Concurrency mode is disabled, not creating a lock manager"); return false; } return true; } private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { ... txnMgr.acquireLocks(plan, ctx, userFromUGI); ...
runInternal會調用requiresLock判斷是否須要lock,requiresLock有兩個判斷:
若是判斷須要lock,會調用acquireLocksAndOpenTxn,acquireLocksAndOpenTxn會調用HiveTxnManager.acquireLocks來獲取lock;
1)先看那些task須要lock:
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer
private void analyzeAlterTablePartMergeFiles(ASTNode ast, String tableName, HashMap<String, String> partSpec) throws SemanticException { ... DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), mergeDesc); ddlWork.setNeedLock(true); ...
可見DDL操做須要;
2)再看怎樣獲取lock:
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { try { acquireLocksWithHeartbeatDelay(plan, ctx, username, 0); ... void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { LockState ls = acquireLocks(plan, ctx, username, true); ... LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException { ... switch (output.getType()) { case DATABASE: compBuilder.setDbName(output.getDatabase().getName()); break; case TABLE: case DUMMYPARTITION: // in case of dynamic partitioning lock the table t = output.getTable(); compBuilder.setDbName(t.getDbName()); compBuilder.setTableName(t.getTableName()); break; case PARTITION: compBuilder.setPartitionName(output.getPartition().getName()); t = output.getPartition().getTable(); compBuilder.setDbName(t.getDbName()); compBuilder.setTableName(t.getTableName()); break; default: // This is a file or something we don't hold locks for. continue; } ... LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks); ctx.setHiveLocks(locks); return lockState; }
可見當開啓動態分區時,鎖的粒度是DbName+TableName,這樣就會致使多個sql只有1個sql能夠拿到lock,其餘sql只能等待;
解決問題的方式有幾種:
三者任選其一,推薦第1種,由於在剛纔的場景下,不須要動態分區;