各個業務數據「彙總到hive, 通過ETL處理後, 導出到數據庫「是大數據產品的典型業務流程。這其中,sqoop(離線)和kafka(實時)幾乎是數據總線的標配了。java
可是有些業務也有不標準的,好比hive數據導入到ES. hive數據導入到ES, 官方組件是elasticsearch-hadoop. 其用法在前面的博客中已有介紹。 那麼其實現原理是怎樣的呢? 或者說, es-hadoop這傢伙究竟是怎麼把hive表的數據弄到es中去的? 爲了弄清楚這個問題, 咱們首先須要有一個本地的源碼環境。git
s1: 下載elasticsearch-hadoop源碼。github
git clone https://github.com/elastic/elasticsearch-hadoop.git
s2: 編譯源碼。直接編譯master便可。redis
gradlew distZip
s3: 編譯成功後,導入到intellij。 這裏注意導入build.gradle文件,就像maven項目導入pom文件同樣。spring
s4: 在intellij中編譯一次項目。sql
s5: 在本地啓動一個es, 默認的端口便可。數據庫
s6: 運行測試用例AbstractHiveSaveTest.testBasicSave()
。 直接運行是會報錯的, 須要略微修改一下代碼,添加一個類的屬性:apache
@Cla***ule public static ExternalResource hive = HiveSuite.hive;
若是是在windows環境下,須要新建packageorg.apache.hadoop.io.nativeio
, 而後在該package下創建NativeIO.java
類。 修改代碼以下:windows
// old public static boolean access(String path, Acce***ight desiredAccess) throws IOException { return access0(path, desiredAccess.acce***ight()); } // new public static boolean access(String path, Acce***ight desiredAccess) throws IOException { return true; }
這樣就運行起來了一個本地的hive到es的代碼。能夠debug,瞭解詳細流程了。api
在elasticsearch-hadoop這個比較龐大的項目中,修改代碼也比較麻煩,所以能夠單獨創建一個項目hive-shgy, 而後改造這個測試類, 跑通testBasicSave()
。
因爲對gradle不熟悉, 仍是創建maven項目, 項目的依賴以下:
<repositories> <repository> <id>spring-libs</id> <url>http://repo.spring.io/libs-milestone/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-1.2-api</artifactId> <version>2.6.2</version> <scope>test</scope> </dependency> <dependency> <!-- 橋接:告訴Slf4j使用Log4j2 --> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.6.2</version> <scope>test</scope> </dependency> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.6</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-cli</artifactId> <version>1.2.1</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.2.0</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>6.3.0</version> <scope>test</scope> </dependency> </dependencies>
這裏用到了log4j2, 因此日誌類放在前面。
接下來遷移測試代碼。遷移的原則是 若無必要,不新增類。 若是隻用到了類的一個方法,那麼只遷移一個方法。
這裏的測試代碼遷移,其實就是圍繞HiveEmbeddedServer2
來構建的。我的感受這裏比較巧妙的是,經過HiveEmbeddedServer2
啓動了一個嵌入式的hive實例。可以執行hive sql, 並且是在一個jvm中,對於研究hive的實現原理來講,太酷了。
基礎的環境搭建好後,就能夠研究elasticsearch-hadoop的源碼了, 先看源碼的結構:
elasticsearch-hadoop/hive/src/main/java/org/elasticsearch/hadoop/hive$ tree . . ├── EsHiveInputFormat.java ├── EsHiveOutputFormat.java ├── EsSerDe.java ├── EsStorageHandler.java ├── HiveBytesArrayWritable.java ├── HiveBytesConverter.java ├── HiveConstants.java ├── HiveFieldExtractor.java ├── HiveType.java ├── HiveUtils.java ├── HiveValueReader.java ├── HiveValueWriter.java ├── HiveWritableValueWriter.java └── package-info.java 0 directories, 14 files
這裏簡要描述一下elasticsearch-hadoop將hive數據同步到es的原理, Hive開放了StorageHandler的接口。經過StoreageHandler, 可使用SQL將數據寫入到es,同時也可使用SQL讀取ES中的數據。 因此, 整個es-hive, 其入口類爲EsStorageHandler, 這就是整個功能的框架。 瞭解了EsStorageHandler後,接下來很重要的一個類就是EsSerDe, 是序列化反序列化的功能組件。它是一個橋樑,經過它實現ES數據類型和Hive數據類型的轉換。 核心類就是這兩個了。
瞭解了代碼的原理及結構,就能夠本身仿照實現hive數據同步到mongo, hive數據同步到redis 等其餘的功能了。 這樣作的好處是業務無關, 一次開發,屢次使用。方便管理維護。
最後總結一下,本文沒有直接給出答案, 而是記錄了尋找答案的過程。 經過這個過程,學會將hive數據同步到其餘NoSQL中,這個實踐比理解源碼更重要。