Apache Calcite是一個動態數據管理框架,它包含了許多典型數據庫管理系統的部分,但省略了一些關鍵功能:數據存儲、數據處理算法和元數據存儲。java
基於Apache Calcite,咱們能夠爲任何第三方存儲引擎開發SQL查詢引擎。git
https://calcite.apache.org/github
https://github.com/apache/calcite算法
要想了解Calcite,其實官方文檔確實不妨一看。儘管官方文檔會說起很是多你以前可能沒有接觸過的概念,但好在它文檔內容很少,這樣讓你對SQL執行中可能涉及的一些關鍵術語留下一個印象,那對之後深刻學習和使用Calcite仍是有幫助的,畢竟若是真的想用好Calcite,或者說只是使用Calcite,這些關鍵術語都是須要掌握和理解的。sql
不過,僅僅看官方文檔,那仍是遠遠不夠的。回過頭再來看Calcite的文檔時,你會發現,它徹底是寫給「高端玩家」的,它是對Calcite高度抽象總結的文檔,並非寫給初學者來進行學習的,以致於你想經過官方文檔來跑個QuickStart,那也是至關困難,我的以爲沒有必定的折騰能力或者對SQL執行沒有理解經驗的話,確實不太容易達成。所以,不能僅僅只看官方文檔,你還須要經過其它途徑獲取更多關於它的信息,關於初學者如何快速掌握Apache Calcite,如下是我我的的一些心得體會:數據庫
1.先簡單用起來apache
Calcite做爲一個數據管理框架,首先,你得把它用起來才能慢慢理解它究竟是幹嗎的。理論上,經過Calcite這個組件,你能夠以SQL的方式來訪問任何你想要訪問的數據源,好比你的文件(無論你是什麼格式)、Java對象、第三方存儲引擎(Redis、Kafka、Elasticsearch)等,因此我是用了「任何」來講明它的能力,這是它實實在在存在的能力。編程
本文檔會手把手教你,怎麼樣經過Calcite以SQL的方式來訪問CSV文件、Java內部對象、Elasticsearch數據源。json
2.生產使用與思考瀏覽器
因此一旦你知道Calcite能夠經過SQL的方式來訪問任何的數據源以後,我知道有想法的同窗已經會考慮到:
使用者不須要感知數據存儲在哪裏,在他們看來,這就是一個只提供SQL查詢入口的查詢系統,它屏蔽了它所接入的各類存儲系統的差別;
答案是確定的,Calcite是一個組件,本質上也是一個框架,它提供了各類擴展點來讓你實現這樣的功能。
固然若是你想借用Calcite針對某個存儲系統開發一個好的SQL引擎,仍是須要至關大的努力的,好比VolcanoPlanner就須要好好理解下,比較惋惜的是,直到如今我也沒有精力去研究它,以致於我想爲Elasticsearch開發一個SQL引擎的想法都遲遲未能實現。
所謂的「借用Calcite針對某個存儲系統開發一個好的SQL引擎」,其實在Calcite裏有一個專業的術語,叫作「數據源適配器」。
Calcite自己也提供了多個存儲引擎的適配器,好比Druid、Elasticsearch、SparkSQL等等,固然開源的就並不必定得,前面之因此一直說起要從新寫一個Elasticsearch的適配器,是由於我以爲Calcite自己提供的ES適配器能力比較弱,相信用過的同窗都會有所體會。
3.深度使用與思考
實際上若是隻是想知道Calcite怎麼使用的,有哪些功能可使用的,咱們不妨站在巨人的肩膀上,看看業界的開源項目是怎麼使用它的。
一個不錯的參考是Apache Druid,其SQL引擎正是基於Apache Calcite來開發構建的,所以對於Calcite更多高級功能的使用,咱們不妨去研究一下Apache Druid-SQL模塊的源碼,相信會有很是大的收穫。
4.VolcanPlanner
有時間和精力研究一下其在Calcite的實現,我的以爲會很是不錯。
本文檔會手把手教你,怎麼樣經過Calcite以SQL的方式來訪問CSV文件、Java內部對象、Elasticsearch數據源。
對於Calcite更多的實現細節,仍是本身想辦法根據實際應用場景,去思考一下它的各個模塊功能,好比想了解某一個功能原理,就去看其源碼結構和細節,我相信這自己對我的能力的提高都是極其有幫助的。
先構建一個maven項目,而後引入Calcite的依賴:
<dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> <version>1.20.0</version> </dependency> <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-example-csv</artifactId> <version>1.20.0</version> </dependency> <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-elasticsearch</artifactId> <version>1.20.0</version> </dependency>
先準備一個CSV文件:
EMPNO:long,NAME:string,DEPTNO:int,GENDER:string,CITY:string,EMPID:int,AGE:int,SLACKER:boolean,MANAGER:boolean,JOINEDAT:date 100,"Fred",10,,,30,25,true,false,"1996-08-03" 110,"Eric",20,"M","San Francisco",3,80,,false,"2001-01-01" 110,"John",40,"M","Vancouver",2,,false,true,"2002-05-03" 120,"Wilma",20,"F",,1,5,,true,"2005-09-07" 130,"Alice",40,"F","Vancouver",2,,false,true,"2007-01-01"
Calcite會把每一個csv文件映射成一個SQL表。csv文件表頭指定該列數據類型,根據必定規則映射到對應的SQL類型。如沒有指定,則統一映射成VARCHAR。
文件命名爲depts.csv,Caclite會構建表名爲文件名的table,即depts.
而後編寫下面的代碼經過Calcite以SQL方式訪問數據:
// Author: xpleaf public class CsvDemo { public static void main(String[] args) throws Exception { // 0.獲取csv文件的路徑,注意獲取到文件所在上層路徑就能夠了 String path = Objects.requireNonNull(CsvDemo.class.getClassLoader().getResource("csv").getPath()); // 1.構建CsvSchema對象,在Calcite中,不一樣數據源對應不一樣Schema,好比CsvSchema、DruidSchema、ElasticsearchSchema等 CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.SCANNABLE); // 2.構建Connection // 2.1 設置鏈接參數 Properties info = new Properties(); // 不區分sql大小寫 info.setProperty("caseSensitive", "false"); // 2.2 獲取標準的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 獲取Calcite封裝的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.構建RootSchema,在Calcite中,RootSchema是全部數據源schema的parent,多個不一樣數據源schema能夠掛在同一個RootSchema下 // 以實現查詢不一樣數據源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.將不一樣數據源schema掛載到RootSchema,這裏添加CsvSchema rootSchema.add("csv", csvSchema); // 5.執行SQL查詢,經過SQL方式訪問csv文件 String sql = "select * from csv.depts"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍歷打印查詢結果集 System.out.println(ResultSetUtil.resultString(resultSet)); } }
執行代碼,其輸出結果以下:
100, Fred, 10, , , 30, 25, true, false, 1996-08-03 110, Eric, 20, M, San Francisco, 3, 80, null, false, 2001-01-01 110, John, 40, M, Vancouver, 2, null, false, true, 2002-05-03 120, Wilma, 20, F, , 1, 5, null, true, 2005-09-07 130, Alice, 40, F, Vancouver, 2, null, false, true, 2007-01-01
思考:
csv是官方文檔有說起的一個例子,在總體上若是須要對Calcite源碼的使用有一個認識(尤爲是如何開發適配器),能夠基於這個demo,對照文檔說起的各個概念、類,經過分析源碼來進行理解,好比:
- 1.Schema是怎麼構建的,在Calcite的位置和具體做用是什麼;
- 2.Table是怎麼構建的,在Calcite的位置和具體做用是什麼;
- 3.在執行查詢時是如何作SQL Parse、Validate、Optimize和執行的;
你均可以經過這個demo來一探究竟,固然,雖然我這裏短短几句話帶過,實際上若是你想研究這個過程,可能須要花費你較多時間,我建議不急着步子一下跨得太大,慢慢來,不急的。
另外,其實經過官方文檔的介紹,對於怎麼去開發一個Caclite的數據源適配器,應該也是有必定的體會的,其實若是隻是實現一個簡單的適配器(不考慮太多的SQL優化規則),那這個難度仍是不大的。
我經過這個例子,包括後面的幾個例子,其實都是想告訴你,如何快速使用Calcite(也就是至關給你寫了一個QuickStart),從而對Calcite總體使用有一個認識,若是你想更深度使用Calcite,建議:
有用過SparkSQL的同窗會知道,在SparkSQL中,可使用編程的方式來將對象實例轉換爲DataFrame,進而註冊Table,以經過SQL來訪問這些對象實例:
public class _01SparkRDD2DataFrame { public static void main(String[] args) { Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName(_01SparkRDD2DataFrame.class.getSimpleName()) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(new Class[]{Person.class}); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(jsc); List<Person> persons = Arrays.asList( new Person(1, "name1", 25, 179), new Person(2, "name2", 22, 176), new Person(3, "name3", 27, 178), new Person(1, "name4", 24, 175) ); DataFrame df = sqlContext.createDataFrame(persons, Person.class); // 構造方法有多個,使用personsRDD的方法也是能夠的 // where age > 23 and height > 176 df.select(new Column("id"), new Column("name"), new Column("age"), new Column("height")) .where(new Column("age").gt(23).and(new Column("height").lt(179))) .show(); df.registerTempTable("person"); sqlContext.sql("select * from person where age > 23 and height < 179").show(); jsc.close(); } }
以上代碼例子來自xpleaf的文章《Spark SQL筆記整理(二):DataFrame編程模型與操做案例》
注意這裏給出的案例仍是Spark 1.x的用法,Spark 2.x以及以後的版本則可能不推薦這種用法了,具體請參考Spark的官方文檔。
那麼對應到Calcite,它也提供了相似的方式來經過SQL訪問對象實例數據。
爲了進行演示,咱們先構建Object對象類:
public class HrSchema { public final Employee[] emps = { new Employee(100, 10, "Bill", 10000, 1000), new Employee(200, 20, "Eric", 8000, 500), new Employee(150, 10, "Sebastian", 7000, null), new Employee(110, 10, "Theodore", 11500, 250), }; @Override public String toString() { return "HrSchema"; } public static class Employee { public int empid; public int deptno; public String name; public float salary; public Integer commission; public Employee(int empid, int deptno, String name, float salary, Integer commission) { this.empid = empid; this.deptno = deptno; this.name = name; this.salary = salary; this.commission = commission; } @Override public String toString() { return "Employee [empid: " + empid + ", deptno: " + deptno + ", name: " + name + "]"; } @Override public boolean equals(Object obj) { return obj == this || obj instanceof Employee && empid == ((Employee) obj).empid; } } }
Calcite會將HrSchema的emps映射爲一張表。
編寫Calcite代碼以下:
public class ObjectDemo { public static void main(String[] args) throws Exception { // 1.構建CsvSchema對象,在Calcite中,不一樣數據源對應不一樣Schema,好比CsvSchema、DruidSchema、ElasticsearchSchema等 ReflectiveSchema reflectiveSchema = new ReflectiveSchema(new HrSchema()); // 2.構建Connection // 2.1 設置鏈接參數 Properties info = new Properties(); // 不區分sql大小寫 info.setProperty("caseSensitive", "false"); // 2.2 獲取標準的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 獲取Calcite封裝的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.構建RootSchema,在Calcite中,RootSchema是全部數據源schema的parent,多個不一樣數據源schema能夠掛在同一個RootSchema下 // 以實現查詢不一樣數據源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.將不一樣數據源schema掛載到RootSchema,這裏添加ReflectiveSchema rootSchema.add("hr", reflectiveSchema); // 5.執行SQL查詢,經過SQL方式訪問object對象實例 String sql = "select * from hr.emps"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍歷打印查詢結果集 System.out.println(ResultSetUtil.resultString(resultSet)); } }
執行代碼,其輸出結果以下:
100, 10, Bill, 10000.0, 1000 200, 20, Eric, 8000.0, 500 150, 10, Sebastian, 7000.0, null 110, 10, Theodore, 11500.0, 250
通常在使用Calcite構建統一查詢系統時,Object對象表會被用於構建數據表的元數據信息表(即表有哪些字段、字段的類型、用於構建數據表的元數據信息)等,詳情能夠參考Apache Druid-SQL源碼。
不用有壓力,若是你以前徹底沒有接觸過Elasticsearch,也不用擔憂學習成本的問題,你就徹底能夠把它簡單理解爲一個數據庫就行了,不用想那麼複雜,而且,它開箱即用,沒有任何部署成本。
下載:
https://www.elastic.co/cn/downloads/elasticsearch
根據對應的操做系統下載相應的版本就能夠。
下載完成後,解壓,進入bin目錄,執行elasticsearch.bat
或elasticsearch
(取決於你的操做系統)就能夠啓動Elasticsearch,在瀏覽器上面訪問localhost:9200
,返回以下信息:
{ "name": "yeyonghaodeMacBook-Pro.local", "cluster_name": "elasticsearch", "cluster_uuid": "6sMhfd0fSgSnqk7M_CTmug", "version": { "number": "7.11.1", "build_flavor": "default", "build_type": "tar", "build_hash": "ff17057114c2199c9c1bbecc727003a907c0db7a", "build_date": "2021-02-15T13:44:09.394032Z", "build_snapshot": false, "lucene_version": "8.7.0", "minimum_wire_compatibility_version": "6.8.0", "minimum_index_compatibility_version": "6.0.0-beta1" }, "tagline": "You Know, for Search" }
則說明服務已經部署成功。
接下來咱們經過postman來建立index(表)和寫入數據到ES:
PUT http://localhost:9200/teachers/_doc/1 { "name":"xpleaf", "age":26, "rate":0.86, "percent":0.95, "join_time":1551058601000 }
數據寫入成功後,經過postman來查詢數據:
GET http://localhost:9200/teachers/_search { "took": 115, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "teachers", "_type": "_doc", "_id": "1", "_score": 1.0, "_source": { "name": "xpleaf", "age": 26, "rate": 0.86, "percent": 0.95, "join_time": 1551058601000 } } ] } }
固然你可能會說,ES自己也提供了SQL的能力,但實際上它是屬於x-pack組件的一部分,是商用的,所以使用需謹慎,而且我我的以爲,它提供的SQL能力比較弱。
固然Calcite的Elasticsearch適配器其實也寫得通常。
有了前面的準備以後,咱們編寫以下Calcite代碼:
public class ElasticsearchDemo { public static void main(String[] args) throws Exception { // 1.構建ElasticsearchSchema對象,在Calcite中,不一樣數據源對應不一樣Schema,好比CsvSchema、DruidSchema、ElasticsearchSchema等 RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build(); ElasticsearchSchema elasticsearchSchema = new ElasticsearchSchema(restClient, new ObjectMapper(), "teachers"); // 2.構建Connection // 2.1 設置鏈接參數 Properties info = new Properties(); // 不區分sql大小寫 info.setProperty("caseSensitive", "false"); // 2.2 獲取標準的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 獲取Calcite封裝的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.構建RootSchema,在Calcite中,RootSchema是全部數據源schema的parent,多個不一樣數據源schema能夠掛在同一個RootSchema下 // 以實現查詢不一樣數據源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.將不一樣數據源schema掛載到RootSchema,這裏添加ElasticsearchSchema rootSchema.add("es", elasticsearchSchema); // 5.執行SQL查詢,經過SQL方式訪問object對象實例 String sql = "select * from es.teachers"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍歷打印查詢結果集 System.out.println(ResultSetUtil.resultString(resultSet)); } }
執行代碼,其輸出結果以下:
{name=xpleaf, age=26, rate=0.86, percent=0.95, join_time=1551058601000}
經過前面的基本介紹和QuickStart,相信你對Apache Calcite已經有了最基本的瞭解,固然若是想要在生產環境真正使用Calcite,使用它來定製化構建咱們的統一查詢系統,僅僅瞭解這些確定是遠遠不夠的,確實是路漫漫其修遠兮,不過不急,不要緊的,後面有機會我將會介紹更多Calcite的高級用法。
其實不少高級用法都是經過研讀Apache Druid-SQL的源碼得知的,因此我會一直強調,若是較多時間和精力,不妨閱讀它的源碼。
public class ResultSetUtil { public static String resultString(ResultSet resultSet) throws SQLException { return resultString(resultSet, false); } public static String resultString(ResultSet resultSet, boolean printHeader) throws SQLException { List<List<Object>> resultList = resultList(resultSet, printHeader); return resultString(resultList); } public static List<List<Object>> resultList(ResultSet resultSet) throws SQLException { return resultList(resultSet, false); } public static String resultString(List<List<Object>> resultList) throws SQLException { StringBuilder builder = new StringBuilder(); resultList.forEach(row -> { String rowStr = row.stream() .map(columnValue -> columnValue + ", ") .collect(Collectors.joining()); rowStr = rowStr.substring(0, rowStr.lastIndexOf(", ")) + "\n"; builder.append(rowStr); }); return builder.toString(); } public static List<List<Object>> resultList(ResultSet resultSet, boolean printHeader) throws SQLException { ArrayList<List<Object>> results = new ArrayList<>(); final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); if (printHeader) { ArrayList<Object> header = new ArrayList<>(); for (int i = 1; i <= columnCount; i++) { header.add(metaData.getColumnName(i)); } results.add(header); } while (resultSet.next()) { ArrayList<Object> row = new ArrayList<>(); for (int i = 1; i <= columnCount; i++) { row.add(resultSet.getObject(i)); } results.add(row); } return results; } }