Hive源碼剖析之HiveServer2服務啓動過程

1、前言java

    前段時間用Hive JDBC出現了阻塞問題,客戶端一直處於等待狀態,爲了解決該問題,花了幾天研究了Hive源碼,因而準備寫成系列博文,與你們分享其中的心得。有不當之處,請你們更正。web

2、背景算法

    咱們生活在一個數據的時代,咱們在經意與不經意間留下了數據。帶着手機出門,不經意間咱們留下了計步數據和活動軌跡數據,咱們網購商品,不經意間留下我的偏好數據,發個朋友圈,留下了各類圖片和文字數據……咱們每一個人都是數據貢獻者。編程

    數據的暴增給數據的存儲、分析和查詢帶來了不少難點,因而谷歌提出了不少解決方案,並發表了三篇意義重大的論文Google File System、Google MapReduce、Google Bigtable,基於這三篇論文,便產生了Hadoop和HBase。Hadoop是基於GFS和MapReduce理論的開源實現,解決大文件的存儲與分析問題,HBase是基於Bigtable理論的開源實現,解決海量數據的實時檢索。服務器

    MapReduce對於Hadoop而言,既是算法模型,也是框架。要使用MapReduce必須先把業務轉化爲適合該算法模型,並基於該框架編程。對於框架,筆者這樣認爲,框架提升了開發效率的同時,也限定了人的思惟範圍和編程範圍。房子是一個框架結構,住在房子裏的人,活動範圍只限於活動範圍之間,開發框架亦是如此,Struts2是MVC框架,開發人員只能按照它所限定的模式開發,這未嘗不是一種禁錮呢?做爲一個軟件架構者,瞭解框架實現的細節,才能走出禁錮。這也是筆者堅持看各類框架源碼的緣由之一。多線程

    編寫出好的MapReduce程序自己不是垂手可得的事,全部就有了Hive,它能把Sql指令,轉化爲MapReduce任務,讓開發人員用Sql的方式去使用Hadoop的運算能力。咱們在使用這些工具的同時,也不得不讚嘆這些傑出科學家奇思妙想。架構

3、分享重點併發

    HiveServer2服務啓動過程負載均衡

4、源碼框架

    一、開一段程序,入口確定是main方法

public static void main(String[] args) {
    //設置加載配置
	HiveConf.setLoadHiveServer2Config(true);
	try {
		ServerOptionsProcessor oproc = new ServerOptionsProcessor(
				"hiveserver2");
		ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);
		String initLog4jMessage = LogUtils.initHiveLog4j();
		LOG.debug(initLog4jMessage);
		HiveStringUtils
				.startupShutdownMessage(HiveServer2.class, args, LOG);
		LOG.debug(oproc.getDebugMessage().toString());
        //這裏即是要啓動服務了
		oprocResponse.getServerOptionsExecutor().execute();
	} catch (LogInitializationException e) {
		LOG.error("Error initializing log: " + e.getMessage(), e);
		System.exit(-1);
	}
}

    二、ServerOptionsExecutor接口有三個實現類

    DeregisterOptionExecutor:將HiveServer2實例從zookeeper中移除

    HelpOptionExecutor:打印help參數

    ServerOptionsExecutor:啓動HiveServer2服務

    這裏很顯然,咱們應該看ServerOptionsExecutor類

static class StartOptionExecutor implements ServerOptionsExecutor {
	@Override
	public void execute() {
		try {
            //啓動服務
			startHiveServer2();
		} catch (Throwable t) {
			LOG.fatal("Error starting HiveServer2", t);
			System.exit(-1);
		}
	}
}

    三、startHiveServer2方法,重要的地方給出了必要的註釋

private static void startHiveServer2() throws Throwable {
	long attempts = 0, maxAttempts = 1;
	while (true) {
		LOG.info("Starting HiveServer2");
		HiveConf hiveConf = new HiveConf();
		maxAttempts = hiveConf
				.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
		HiveServer2 server = null;
		try {
			server = new HiveServer2();
            //初始化
			server.init(hiveConf);
            //啓動
			server.start();
           //省略了一些代碼
			break;
		} catch (Throwable throwable) {
			throwable.printStackTrace();
            //出現異常就中止服務
			if (server != null) {
				try {
					server.stop();
				} catch (Throwable t) {
					LOG.info(
							"Exception caught when calling stop of HiveServer2 before retrying start",
							t);
				} finally {
					server = null;
				}
			}
            //若是拋出異常,並嘗試啓動超過了配置的最大嘗試次數,拋出錯誤,啓動失敗
			if (++attempts >= maxAttempts) {
				throw new Error("Max start attempts " + maxAttempts
						+ " exhausted", throwable);
			} else {
                //60秒後再次嘗試啓動
				LOG.warn("Error starting HiveServer2 on attempt "
						+ attempts + ", will retry in 60 seconds",
						throwable);
				try {
					Thread.sleep(60L * 1000L);
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				}
			}
		}
	}
}

    四、接下來咱們看看server的初始化作了哪些事情

@Override
public synchronized void init(HiveConf hiveConf) {
    //實例化clientService實例,該實例用於把用戶請求轉化並傳遞給Driver
	cliService = new CLIService(this);
	addService(cliService);
	if (isHTTPTransportMode(hiveConf)) {
		thriftCLIService = new ThriftHttpCLIService(cliService);
	} else {//默認狀況是Thrift二進制服務
		thriftCLIService = new ThriftBinaryCLIService(cliService);
	}
    //添加進服務列表
	addService(thriftCLIService);
	super.init(hiveConf);
	// 省略獲取配置信息……
	
	// 啓動web UI,改web UI用於查看正在運行的Hive任務,默認端口10002
	try {
		        //省略大量獲取的配置的代碼
                。。。。。。
				webServer = builder.build();
                //添加查詢運行任務數據的servlet
				webServer.addServlet("query_page", "/query_page",
						QueryProfileServlet.class);
			}
		}
	} catch (IOException ie) {
		throw new ServiceException(ie);
	}
	// Add a shutdown hook for catching SIGTERM & SIGINT
	final HiveServer2 hiveServer2 = this;
    //添加鉤子
	Runtime.getRuntime().addShutdownHook(new Thread() {
		@Override
		public void run() {
			hiveServer2.stop();
		}
	});
}

    初始化主要是獲取了一些配置參數,而且告訴程序,要啓動這些服務CLIService、thriftCLIService 、webServer

    五、下面是重頭戲,啓動服務了,緊接着第3步中server.start()看下來,start方法

@Override
public synchronized void start() {
	super.start();//調用父類start方法
	if (webServer != null) {
		try {
			webServer.start();//啓動web服務
			LOG.info("Web UI has started on port " + webServer.getPort());
		} catch (Exception e) {
			LOG.error("Error starting Web UI: ", e);
			throw new ServiceException(e);
		}
	}
}

    start方法,作了兩件事情,就是調用父類start方法,啓動web服務

    六、下面我跟進去看看,父類CompositeService中定義的start方法

@Override
public synchronized void start() {
	int i = 0;
	try {
		for (int n = serviceList.size(); i < n; i++) {
			Service service = serviceList.get(i);
			service.start();
		}
		super.start();
	} catch (Throwable e) {
		stop(i);
		throw new ServiceException("Failed to Start " + getName(), e);
	}
}

這裏僅僅是把第4步中添加的服務列表中的服務進行了啓動,下面咱們咱們說服務中的重點,HiveServer2怎麼提供遠程服務,請看ThriftBinaryCLIService

    七、ThriftBinaryCLIService中的run方法

@Override
public void run() {
	try {
		// 定義處理請求的線程池
		String threadPoolName = "HiveServer2-Handler-Pool";
		ExecutorService executorService = new ThreadPoolExecutor(
				minWorkerThreads, maxWorkerThreads, workerKeepAliveTime,
				TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
				new ThreadFactoryWithGarbageCleanup(threadPoolName));


		//省略定義傳輸協議、配置參數代碼
         ........
		int maxMessageSize = hiveConf
				.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
		int requestTimeout = (int) hiveConf.getTimeVar(
				HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT,
				TimeUnit.SECONDS);
		int beBackoffSlotLength = (int) hiveConf
				.getTimeVar(
						HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH,
						TimeUnit.MILLISECONDS);
		TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(
				serverSocket)
				.processorFactory(processorFactory)
				.transportFactory(transportFactory)
				.protocolFactory(new TBinaryProtocol.Factory())
				.inputProtocolFactory(
						new TBinaryProtocol.Factory(true, true,
								maxMessageSize, maxMessageSize))
				.requestTimeout(requestTimeout)
				.requestTimeoutUnit(TimeUnit.SECONDS)
				.beBackoffSlotLength(beBackoffSlotLength)
				.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
				.executorService(executorService);

		// TCP Server
		server = new TThreadPoolServer(sargs);
		server.setServerEventHandler(serverEventHandler);
		server.serve();
		String msg = "Started "
				+ ThriftBinaryCLIService.class.getSimpleName()
				+ " on port " + portNum + " with " + minWorkerThreads
				+ "..." + maxWorkerThreads + " worker threads";
		LOG.info(msg);
	} catch (Throwable t) {
		LOG.fatal("Error starting HiveServer2: could not start "
				+ ThriftBinaryCLIService.class.getSimpleName(), t);
		System.exit(-1);
	}
}

    說明:這裏的ExecutorService線程池的定義中用了SynchronousQueue隊列,該隊列的能夠認爲是長度爲1的阻塞隊列,當線程池滿,而且沒有空閒線程,便會阻塞。TThreadPoolServer的特色是,客戶端只要不從服務器上斷開鏈接,就會一直佔據服務器的一個線程,因此出現了本文中開頭出現的阻塞問題,解決辦法,若是服務器內存容許,能夠適當加大線程池長度,或者增長hive節點,在配合負載均衡。

    八、特別說明

    Thrift是RPC界的利器,Facebook的傑做,能夠輕鬆實現誇語言的服務調用,支持的語言有C++, Java, Go,Python, PHP, Haskell, C#,  JavaScript, Node.js等等

   (1)、模型接口
       服務的調用接口以及接口參數model、返回值model 
   (2)、Tprotocol協議定義 
         將數據(model)編碼 、解碼 。 
  ( 3)、Ttramsport傳輸層定義
        編碼後的數據傳輸(簡單socket、http) 
   (5)、Tserver服務類型
        服務的Tserver類型,實現了幾種rpc調用(單線程、多線程、非阻塞IO)

5、後記

    本文中的描述了HiveServer2的啓動過程當中,啓動的服務,那麼客戶端是如何與服務端交互的呢?爲何經過JDBC的方式就能夠去Hive上執行任務了呢?Hive JDBC除了實現JDBC標準接口外,還作了哪些事情呢?敬請期待《Hive源碼剖析之Hive JDBC》

快樂源於分享。

此博客乃做者原創, 轉載請註明出處

相關文章
相關標籤/搜索