1、前言java
前段時間用Hive JDBC出現了阻塞問題,客戶端一直處於等待狀態,爲了解決該問題,花了幾天研究了Hive源碼,因而準備寫成系列博文,與你們分享其中的心得。有不當之處,請你們更正。web
2、背景算法
咱們生活在一個數據的時代,咱們在經意與不經意間留下了數據。帶着手機出門,不經意間咱們留下了計步數據和活動軌跡數據,咱們網購商品,不經意間留下我的偏好數據,發個朋友圈,留下了各類圖片和文字數據……咱們每一個人都是數據貢獻者。編程
數據的暴增給數據的存儲、分析和查詢帶來了不少難點,因而谷歌提出了不少解決方案,並發表了三篇意義重大的論文Google File System、Google MapReduce、Google Bigtable,基於這三篇論文,便產生了Hadoop和HBase。Hadoop是基於GFS和MapReduce理論的開源實現,解決大文件的存儲與分析問題,HBase是基於Bigtable理論的開源實現,解決海量數據的實時檢索。服務器
MapReduce對於Hadoop而言,既是算法模型,也是框架。要使用MapReduce必須先把業務轉化爲適合該算法模型,並基於該框架編程。對於框架,筆者這樣認爲,框架提升了開發效率的同時,也限定了人的思惟範圍和編程範圍。房子是一個框架結構,住在房子裏的人,活動範圍只限於活動範圍之間,開發框架亦是如此,Struts2是MVC框架,開發人員只能按照它所限定的模式開發,這未嘗不是一種禁錮呢?做爲一個軟件架構者,瞭解框架實現的細節,才能走出禁錮。這也是筆者堅持看各類框架源碼的緣由之一。多線程
編寫出好的MapReduce程序自己不是垂手可得的事,全部就有了Hive,它能把Sql指令,轉化爲MapReduce任務,讓開發人員用Sql的方式去使用Hadoop的運算能力。咱們在使用這些工具的同時,也不得不讚嘆這些傑出科學家奇思妙想。架構
3、分享重點併發
HiveServer2服務啓動過程負載均衡
4、源碼框架
一、開一段程序,入口確定是main方法
public static void main(String[] args) { //設置加載配置 HiveConf.setLoadHiveServer2Config(true); try { ServerOptionsProcessor oproc = new ServerOptionsProcessor( "hiveserver2"); ServerOptionsProcessorResponse oprocResponse = oproc.parse(args); String initLog4jMessage = LogUtils.initHiveLog4j(); LOG.debug(initLog4jMessage); HiveStringUtils .startupShutdownMessage(HiveServer2.class, args, LOG); LOG.debug(oproc.getDebugMessage().toString()); //這裏即是要啓動服務了 oprocResponse.getServerOptionsExecutor().execute(); } catch (LogInitializationException e) { LOG.error("Error initializing log: " + e.getMessage(), e); System.exit(-1); } }
二、ServerOptionsExecutor接口有三個實現類
DeregisterOptionExecutor:將HiveServer2實例從zookeeper中移除
HelpOptionExecutor:打印help參數
ServerOptionsExecutor:啓動HiveServer2服務
這裏很顯然,咱們應該看ServerOptionsExecutor類
static class StartOptionExecutor implements ServerOptionsExecutor { @Override public void execute() { try { //啓動服務 startHiveServer2(); } catch (Throwable t) { LOG.fatal("Error starting HiveServer2", t); System.exit(-1); } } }
三、startHiveServer2方法,重要的地方給出了必要的註釋
private static void startHiveServer2() throws Throwable { long attempts = 0, maxAttempts = 1; while (true) { LOG.info("Starting HiveServer2"); HiveConf hiveConf = new HiveConf(); maxAttempts = hiveConf .getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS); HiveServer2 server = null; try { server = new HiveServer2(); //初始化 server.init(hiveConf); //啓動 server.start(); //省略了一些代碼 break; } catch (Throwable throwable) { throwable.printStackTrace(); //出現異常就中止服務 if (server != null) { try { server.stop(); } catch (Throwable t) { LOG.info( "Exception caught when calling stop of HiveServer2 before retrying start", t); } finally { server = null; } } //若是拋出異常,並嘗試啓動超過了配置的最大嘗試次數,拋出錯誤,啓動失敗 if (++attempts >= maxAttempts) { throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable); } else { //60秒後再次嘗試啓動 LOG.warn("Error starting HiveServer2 on attempt " + attempts + ", will retry in 60 seconds", throwable); try { Thread.sleep(60L * 1000L); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } }
四、接下來咱們看看server的初始化作了哪些事情
@Override public synchronized void init(HiveConf hiveConf) { //實例化clientService實例,該實例用於把用戶請求轉化並傳遞給Driver cliService = new CLIService(this); addService(cliService); if (isHTTPTransportMode(hiveConf)) { thriftCLIService = new ThriftHttpCLIService(cliService); } else {//默認狀況是Thrift二進制服務 thriftCLIService = new ThriftBinaryCLIService(cliService); } //添加進服務列表 addService(thriftCLIService); super.init(hiveConf); // 省略獲取配置信息…… // 啓動web UI,改web UI用於查看正在運行的Hive任務,默認端口10002 try { //省略大量獲取的配置的代碼 。。。。。。 webServer = builder.build(); //添加查詢運行任務數據的servlet webServer.addServlet("query_page", "/query_page", QueryProfileServlet.class); } } } catch (IOException ie) { throw new ServiceException(ie); } // Add a shutdown hook for catching SIGTERM & SIGINT final HiveServer2 hiveServer2 = this; //添加鉤子 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { hiveServer2.stop(); } }); }
初始化主要是獲取了一些配置參數,而且告訴程序,要啓動這些服務CLIService、thriftCLIService 、webServer
五、下面是重頭戲,啓動服務了,緊接着第3步中server.start()看下來,start方法
@Override public synchronized void start() { super.start();//調用父類start方法 if (webServer != null) { try { webServer.start();//啓動web服務 LOG.info("Web UI has started on port " + webServer.getPort()); } catch (Exception e) { LOG.error("Error starting Web UI: ", e); throw new ServiceException(e); } } }
start方法,作了兩件事情,就是調用父類start方法,啓動web服務
六、下面我跟進去看看,父類CompositeService中定義的start方法
@Override public synchronized void start() { int i = 0; try { for (int n = serviceList.size(); i < n; i++) { Service service = serviceList.get(i); service.start(); } super.start(); } catch (Throwable e) { stop(i); throw new ServiceException("Failed to Start " + getName(), e); } }
這裏僅僅是把第4步中添加的服務列表中的服務進行了啓動,下面咱們咱們說服務中的重點,HiveServer2怎麼提供遠程服務,請看ThriftBinaryCLIService
七、ThriftBinaryCLIService中的run方法
@Override public void run() { try { // 定義處理請求的線程池 String threadPoolName = "HiveServer2-Handler-Pool"; ExecutorService executorService = new ThreadPoolExecutor( minWorkerThreads, maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryWithGarbageCleanup(threadPoolName)); //省略定義傳輸協議、配置參數代碼 ........ int maxMessageSize = hiveConf .getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE); int requestTimeout = (int) hiveConf.getTimeVar( HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS); int beBackoffSlotLength = (int) hiveConf .getTimeVar( HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS); TThreadPoolServer.Args sargs = new TThreadPoolServer.Args( serverSocket) .processorFactory(processorFactory) .transportFactory(transportFactory) .protocolFactory(new TBinaryProtocol.Factory()) .inputProtocolFactory( new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize)) .requestTimeout(requestTimeout) .requestTimeoutUnit(TimeUnit.SECONDS) .beBackoffSlotLength(beBackoffSlotLength) .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS) .executorService(executorService); // TCP Server server = new TThreadPoolServer(sargs); server.setServerEventHandler(serverEventHandler); server.serve(); String msg = "Started " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); } catch (Throwable t) { LOG.fatal("Error starting HiveServer2: could not start " + ThriftBinaryCLIService.class.getSimpleName(), t); System.exit(-1); } }
說明:這裏的ExecutorService線程池的定義中用了SynchronousQueue隊列,該隊列的能夠認爲是長度爲1的阻塞隊列,當線程池滿,而且沒有空閒線程,便會阻塞。TThreadPoolServer的特色是,客戶端只要不從服務器上斷開鏈接,就會一直佔據服務器的一個線程,因此出現了本文中開頭出現的阻塞問題,解決辦法,若是服務器內存容許,能夠適當加大線程池長度,或者增長hive節點,在配合負載均衡。
八、特別說明
Thrift是RPC界的利器,Facebook的傑做,能夠輕鬆實現誇語言的服務調用,支持的語言有C++, Java, Go,Python, PHP, Haskell, C#, JavaScript, Node.js等等
(1)、模型接口
服務的調用接口以及接口參數model、返回值model
(2)、Tprotocol協議定義
將數據(model)編碼 、解碼 。
( 3)、Ttramsport傳輸層定義
編碼後的數據傳輸(簡單socket、http)
(5)、Tserver服務類型
服務的Tserver類型,實現了幾種rpc調用(單線程、多線程、非阻塞IO)
5、後記
本文中的描述了HiveServer2的啓動過程當中,啓動的服務,那麼客戶端是如何與服務端交互的呢?爲何經過JDBC的方式就能夠去Hive上執行任務了呢?Hive JDBC除了實現JDBC標準接口外,還作了哪些事情呢?敬請期待《Hive源碼剖析之Hive JDBC》
快樂源於分享。
此博客乃做者原創, 轉載請註明出處