Apache Calcite精簡入門與學習指導

1 Apache Calcite基本介紹

Apache Calcite是一個動態數據管理框架,它包含了許多典型數據庫管理系統的部分,但省略了一些關鍵功能:數據存儲、數據處理算法和元數據存儲。java

基於Apache Calcite,咱們能夠爲任何第三方存儲引擎開發SQL查詢引擎。git

  • 官網地址

https://calcite.apache.org/github

  • 項目地址

https://github.com/apache/calcite算法

2 Apache Calcite學習指導

要想了解Calcite,其實官方文檔確實不妨一看。儘管官方文檔會說起很是多你以前可能沒有接觸過的概念,但好在它文檔內容很少,這樣讓你對SQL執行中可能涉及的一些關鍵術語留下一個印象,那對之後深刻學習和使用Calcite仍是有幫助的,畢竟若是真的想用好Calcite,或者說只是使用Calcite,這些關鍵術語都是須要掌握和理解的。sql

不過,僅僅看官方文檔,那仍是遠遠不夠的。回過頭再來看Calcite的文檔時,你會發現,它徹底是寫給「高端玩家」的,它是對Calcite高度抽象總結的文檔,並非寫給初學者來進行學習的,以致於你想經過官方文檔來跑個QuickStart,那也是至關困難,我的以爲沒有必定的折騰能力或者對SQL執行沒有理解經驗的話,確實不太容易達成。所以,不能僅僅只看官方文檔,你還須要經過其它途徑獲取更多關於它的信息,關於初學者如何快速掌握Apache Calcite,如下是我我的的一些心得體會:數據庫

  • 1.先簡單用起來apache

    Calcite做爲一個數據管理框架,首先,你得把它用起來才能慢慢理解它究竟是幹嗎的。理論上,經過Calcite這個組件,你能夠以SQL的方式來訪問任何你想要訪問的數據源,好比你的文件(無論你是什麼格式)、Java對象、第三方存儲引擎(Redis、Kafka、Elasticsearch)等,因此我是用了「任何」來講明它的能力,這是它實實在在存在的能力。編程

    本文檔會手把手教你,怎麼樣經過Calcite以SQL的方式來訪問CSV文件、Java內部對象、Elasticsearch數據源。json

  • 2.生產使用與思考瀏覽器

    因此一旦你知道Calcite能夠經過SQL的方式來訪問任何的數據源以後,我知道有想法的同窗已經會考慮到:

    • (1)那假如在個人業務系統中,有各類各樣不一樣的存儲系統,是否是能夠經過Calcite來構建一個統一的數據查詢系統(一個查詢入口,同時查詢多種不一樣數據源)?

    使用者不須要感知數據存儲在哪裏,在他們看來,這就是一個只提供SQL查詢入口的查詢系統,它屏蔽了它所接入的各類存儲系統的差別;

    • (2)假如業務存在一個流行的數據存儲系統或者引擎,但它不支持SQL查詢,我是否是能夠借用Calcite來爲它開發一個SQL引擎?

    答案是確定的,Calcite是一個組件,本質上也是一個框架,它提供了各類擴展點來讓你實現這樣的功能。

    固然若是你想借用Calcite針對某個存儲系統開發一個好的SQL引擎,仍是須要至關大的努力的,好比VolcanoPlanner就須要好好理解下,比較惋惜的是,直到如今我也沒有精力去研究它,以致於我想爲Elasticsearch開發一個SQL引擎的想法都遲遲未能實現。

    所謂的「借用Calcite針對某個存儲系統開發一個好的SQL引擎」,其實在Calcite裏有一個專業的術語,叫作「數據源適配器」。

    Calcite自己也提供了多個存儲引擎的適配器,好比Druid、Elasticsearch、SparkSQL等等,固然開源的就並不必定得,前面之因此一直說起要從新寫一個Elasticsearch的適配器,是由於我以爲Calcite自己提供的ES適配器能力比較弱,相信用過的同窗都會有所體會。

  • 3.深度使用與思考

    實際上若是隻是想知道Calcite怎麼使用的,有哪些功能可使用的,咱們不妨站在巨人的肩膀上,看看業界的開源項目是怎麼使用它的。

    一個不錯的參考是Apache Druid,其SQL引擎正是基於Apache Calcite來開發構建的,所以對於Calcite更多高級功能的使用,咱們不妨去研究一下Apache Druid-SQL模塊的源碼,相信會有很是大的收穫。

  • 4.VolcanPlanner

    有時間和精力研究一下其在Calcite的實現,我的以爲會很是不錯。

本文檔會手把手教你,怎麼樣經過Calcite以SQL的方式來訪問CSV文件、Java內部對象、Elasticsearch數據源。

對於Calcite更多的實現細節,仍是本身想辦法根據實際應用場景,去思考一下它的各個模塊功能,好比想了解某一個功能原理,就去看其源碼結構和細節,我相信這自己對我的能力的提高都是極其有幫助的。

3 經過Apache Calcite接入不一樣數據源

先構建一個maven項目,而後引入Calcite的依賴:

<dependency>
  <groupId>org.apache.calcite</groupId>
  <artifactId>calcite-core</artifactId>
  <version>1.20.0</version>
</dependency>
<dependency>
  <groupId>org.apache.calcite</groupId>
  <artifactId>calcite-example-csv</artifactId>
  <version>1.20.0</version>
</dependency>
<dependency>
  <groupId>org.apache.calcite</groupId>
  <artifactId>calcite-elasticsearch</artifactId>
  <version>1.20.0</version>
</dependency>

3.1 接入CSV數據源

先準備一個CSV文件:

EMPNO:long,NAME:string,DEPTNO:int,GENDER:string,CITY:string,EMPID:int,AGE:int,SLACKER:boolean,MANAGER:boolean,JOINEDAT:date
100,"Fred",10,,,30,25,true,false,"1996-08-03"
110,"Eric",20,"M","San Francisco",3,80,,false,"2001-01-01"
110,"John",40,"M","Vancouver",2,,false,true,"2002-05-03"
120,"Wilma",20,"F",,1,5,,true,"2005-09-07"
130,"Alice",40,"F","Vancouver",2,,false,true,"2007-01-01"

Calcite會把每一個csv文件映射成一個SQL表。csv文件表頭指定該列數據類型,根據必定規則映射到對應的SQL類型。如沒有指定,則統一映射成VARCHAR。

文件命名爲depts.csv,Caclite會構建表名爲文件名的table,即depts.

而後編寫下面的代碼經過Calcite以SQL方式訪問數據:

// Author: xpleaf
public class CsvDemo {

    public static void main(String[] args) throws Exception {
        // 0.獲取csv文件的路徑,注意獲取到文件所在上層路徑就能夠了
        String path = Objects.requireNonNull(CsvDemo.class.getClassLoader().getResource("csv").getPath());

        // 1.構建CsvSchema對象,在Calcite中,不一樣數據源對應不一樣Schema,好比CsvSchema、DruidSchema、ElasticsearchSchema等
        CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.SCANNABLE);

        // 2.構建Connection
        // 2.1 設置鏈接參數
        Properties info = new Properties();
        // 不區分sql大小寫
        info.setProperty("caseSensitive", "false");
        // 2.2 獲取標準的JDBC Connection
        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
        // 2.3 獲取Calcite封裝的Connection
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        // 3.構建RootSchema,在Calcite中,RootSchema是全部數據源schema的parent,多個不一樣數據源schema能夠掛在同一個RootSchema下
        // 以實現查詢不一樣數據源的目的
        SchemaPlus rootSchema = calciteConnection.getRootSchema();

        // 4.將不一樣數據源schema掛載到RootSchema,這裏添加CsvSchema
        rootSchema.add("csv", csvSchema);

        // 5.執行SQL查詢,經過SQL方式訪問csv文件
        String sql = "select * from csv.depts";
        Statement statement = calciteConnection.createStatement();
        ResultSet resultSet = statement.executeQuery(sql);

        // 6.遍歷打印查詢結果集
        System.out.println(ResultSetUtil.resultString(resultSet));
    }

}

執行代碼,其輸出結果以下:

100, Fred, 10, , , 30, 25, true, false, 1996-08-03
110, Eric, 20, M, San Francisco, 3, 80, null, false, 2001-01-01
110, John, 40, M, Vancouver, 2, null, false, true, 2002-05-03
120, Wilma, 20, F, , 1, 5, null, true, 2005-09-07
130, Alice, 40, F, Vancouver, 2, null, false, true, 2007-01-01

思考:

csv是官方文檔有說起的一個例子,在總體上若是須要對Calcite源碼的使用有一個認識(尤爲是如何開發適配器),能夠基於這個demo,對照文檔說起的各個概念、類,經過分析源碼來進行理解,好比:

  • 1.Schema是怎麼構建的,在Calcite的位置和具體做用是什麼;
  • 2.Table是怎麼構建的,在Calcite的位置和具體做用是什麼;
  • 3.在執行查詢時是如何作SQL Parse、Validate、Optimize和執行的;

你均可以經過這個demo來一探究竟,固然,雖然我這裏短短几句話帶過,實際上若是你想研究這個過程,可能須要花費你較多時間,我建議不急着步子一下跨得太大,慢慢來,不急的。

另外,其實經過官方文檔的介紹,對於怎麼去開發一個Caclite的數據源適配器,應該也是有必定的體會的,其實若是隻是實現一個簡單的適配器(不考慮太多的SQL優化規則),那這個難度仍是不大的。

我經過這個例子,包括後面的幾個例子,其實都是想告訴你,如何快速使用Calcite(也就是至關給你寫了一個QuickStart),從而對Calcite總體使用有一個認識,若是你想更深度使用Calcite,建議:

  • 1.不妨看看Calcite源碼裏面的UT,裏面提供了很好的參考案例;
  • 2.但方式1可能會比較零散,你能夠研讀一下Apache Druid-SQL模塊的源碼,看一下總體上它是怎麼使用的,它裏面的許多高級使用技巧和方法仍是十分有借鑑意義的,不妨看看。

3.2 接入Object對象數據源

3.2.1 SparkSQL接入Object對象

有用過SparkSQL的同窗會知道,在SparkSQL中,可使用編程的方式來將對象實例轉換爲DataFrame,進而註冊Table,以經過SQL來訪問這些對象實例:

public class _01SparkRDD2DataFrame {
    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName(_01SparkRDD2DataFrame.class.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .registerKryoClasses(new Class[]{Person.class});
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(jsc);
        List<Person> persons = Arrays.asList(
                new Person(1, "name1", 25, 179),
                new Person(2, "name2", 22, 176),
                new Person(3, "name3", 27, 178),
                new Person(1, "name4", 24, 175)
        );

        DataFrame df = sqlContext.createDataFrame(persons, Person.class);   // 構造方法有多個,使用personsRDD的方法也是能夠的

        // where age > 23 and height > 176
        df.select(new Column("id"),
                  new Column("name"),
                  new Column("age"),
                  new Column("height"))
                .where(new Column("age").gt(23).and(new Column("height").lt(179)))
                .show();

        df.registerTempTable("person");

        sqlContext.sql("select * from person where age > 23 and height < 179").show();

        jsc.close();

    }
}

以上代碼例子來自xpleaf的文章《Spark SQL筆記整理(二):DataFrame編程模型與操做案例》

注意這裏給出的案例仍是Spark 1.x的用法,Spark 2.x以及以後的版本則可能不推薦這種用法了,具體請參考Spark的官方文檔。

3.2.2 Calcite接入Object對象

那麼對應到Calcite,它也提供了相似的方式來經過SQL訪問對象實例數據。

爲了進行演示,咱們先構建Object對象類:

public class HrSchema {
    public final Employee[] emps = {
            new Employee(100, 10, "Bill", 10000, 1000),
            new Employee(200, 20, "Eric", 8000, 500),
            new Employee(150, 10, "Sebastian", 7000, null),
            new Employee(110, 10, "Theodore", 11500, 250),
    };

    @Override
    public String toString() {
        return "HrSchema";
    }

    public static class Employee {
        public int empid;
        public int deptno;
        public String name;
        public float salary;
        public Integer commission;

        public Employee(int empid, int deptno, String name, float salary,
                        Integer commission) {
            this.empid = empid;
            this.deptno = deptno;
            this.name = name;
            this.salary = salary;
            this.commission = commission;
        }

        @Override
        public String toString() {
            return "Employee [empid: " + empid + ", deptno: " + deptno
                    + ", name: " + name + "]";
        }

        @Override
        public boolean equals(Object obj) {
            return obj == this
                    || obj instanceof Employee
                    && empid == ((Employee) obj).empid;
        }
    }
}

Calcite會將HrSchema的emps映射爲一張表。

編寫Calcite代碼以下:

public class ObjectDemo {

    public static void main(String[] args) throws Exception {
        // 1.構建CsvSchema對象,在Calcite中,不一樣數據源對應不一樣Schema,好比CsvSchema、DruidSchema、ElasticsearchSchema等
        ReflectiveSchema reflectiveSchema = new ReflectiveSchema(new HrSchema());

        // 2.構建Connection
        // 2.1 設置鏈接參數
        Properties info = new Properties();
        // 不區分sql大小寫
        info.setProperty("caseSensitive", "false");
        // 2.2 獲取標準的JDBC Connection
        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
        // 2.3 獲取Calcite封裝的Connection
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        // 3.構建RootSchema,在Calcite中,RootSchema是全部數據源schema的parent,多個不一樣數據源schema能夠掛在同一個RootSchema下
        // 以實現查詢不一樣數據源的目的
        SchemaPlus rootSchema = calciteConnection.getRootSchema();

        // 4.將不一樣數據源schema掛載到RootSchema,這裏添加ReflectiveSchema
        rootSchema.add("hr", reflectiveSchema);

        // 5.執行SQL查詢,經過SQL方式訪問object對象實例
        String sql = "select * from hr.emps";
        Statement statement = calciteConnection.createStatement();
        ResultSet resultSet = statement.executeQuery(sql);

        // 6.遍歷打印查詢結果集
        System.out.println(ResultSetUtil.resultString(resultSet));
    }

}

執行代碼,其輸出結果以下:

100, 10, Bill, 10000.0, 1000
200, 20, Eric, 8000.0, 500
150, 10, Sebastian, 7000.0, null
110, 10, Theodore, 11500.0, 250

通常在使用Calcite構建統一查詢系統時,Object對象表會被用於構建數據表的元數據信息表(即表有哪些字段、字段的類型、用於構建數據表的元數據信息)等,詳情能夠參考Apache Druid-SQL源碼。

3.3 接入Elasticsearch數據源

3.3.1 Elasticsearch極快速入門

不用有壓力,若是你以前徹底沒有接觸過Elasticsearch,也不用擔憂學習成本的問題,你就徹底能夠把它簡單理解爲一個數據庫就行了,不用想那麼複雜,而且,它開箱即用,沒有任何部署成本。

下載:

https://www.elastic.co/cn/downloads/elasticsearch

根據對應的操做系統下載相應的版本就能夠。

下載完成後,解壓,進入bin目錄,執行elasticsearch.batelasticsearch(取決於你的操做系統)就能夠啓動Elasticsearch,在瀏覽器上面訪問localhost:9200,返回以下信息:

{
  "name": "yeyonghaodeMacBook-Pro.local",
  "cluster_name": "elasticsearch",
  "cluster_uuid": "6sMhfd0fSgSnqk7M_CTmug",
  "version": {
    "number": "7.11.1",
    "build_flavor": "default",
    "build_type": "tar",
    "build_hash": "ff17057114c2199c9c1bbecc727003a907c0db7a",
    "build_date": "2021-02-15T13:44:09.394032Z",
    "build_snapshot": false,
    "lucene_version": "8.7.0",
    "minimum_wire_compatibility_version": "6.8.0",
    "minimum_index_compatibility_version": "6.0.0-beta1"
  },
  "tagline": "You Know, for Search"
}

則說明服務已經部署成功。

接下來咱們經過postman來建立index(表)和寫入數據到ES:

PUT http://localhost:9200/teachers/_doc/1
{
    "name":"xpleaf",
    "age":26,
    "rate":0.86,
    "percent":0.95,
    "join_time":1551058601000
}

數據寫入成功後,經過postman來查詢數據:

GET http://localhost:9200/teachers/_search
{
    "took": 115,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "teachers",
                "_type": "_doc",
                "_id": "1",
                "_score": 1.0,
                "_source": {
                    "name": "xpleaf",
                    "age": 26,
                    "rate": 0.86,
                    "percent": 0.95,
                    "join_time": 1551058601000
                }
            }
        ]
    }
}

3.3.2 Calcite接入Elasticsearch數據源

固然你可能會說,ES自己也提供了SQL的能力,但實際上它是屬於x-pack組件的一部分,是商用的,所以使用需謹慎,而且我我的以爲,它提供的SQL能力比較弱。

固然Calcite的Elasticsearch適配器其實也寫得通常。

有了前面的準備以後,咱們編寫以下Calcite代碼:

public class ElasticsearchDemo {

    public static void main(String[] args) throws Exception {
        // 1.構建ElasticsearchSchema對象,在Calcite中,不一樣數據源對應不一樣Schema,好比CsvSchema、DruidSchema、ElasticsearchSchema等
        RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();
        ElasticsearchSchema elasticsearchSchema = new ElasticsearchSchema(restClient, new ObjectMapper(), "teachers");

        // 2.構建Connection
        // 2.1 設置鏈接參數
        Properties info = new Properties();
        // 不區分sql大小寫
        info.setProperty("caseSensitive", "false");
        // 2.2 獲取標準的JDBC Connection
        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
        // 2.3 獲取Calcite封裝的Connection
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        // 3.構建RootSchema,在Calcite中,RootSchema是全部數據源schema的parent,多個不一樣數據源schema能夠掛在同一個RootSchema下
        // 以實現查詢不一樣數據源的目的
        SchemaPlus rootSchema = calciteConnection.getRootSchema();

        // 4.將不一樣數據源schema掛載到RootSchema,這裏添加ElasticsearchSchema
        rootSchema.add("es", elasticsearchSchema);

        // 5.執行SQL查詢,經過SQL方式訪問object對象實例
        String sql = "select * from es.teachers";
        Statement statement = calciteConnection.createStatement();
        ResultSet resultSet = statement.executeQuery(sql);

        // 6.遍歷打印查詢結果集
        System.out.println(ResultSetUtil.resultString(resultSet));
    }

}

執行代碼,其輸出結果以下:

{name=xpleaf, age=26, rate=0.86, percent=0.95, join_time=1551058601000}

4 The Next

經過前面的基本介紹和QuickStart,相信你對Apache Calcite已經有了最基本的瞭解,固然若是想要在生產環境真正使用Calcite,使用它來定製化構建咱們的統一查詢系統,僅僅瞭解這些確定是遠遠不夠的,確實是路漫漫其修遠兮,不過不急,不要緊的,後面有機會我將會介紹更多Calcite的高級用法。

其實不少高級用法都是經過研讀Apache Druid-SQL的源碼得知的,因此我會一直強調,若是較多時間和精力,不妨閱讀它的源碼。

附錄1:ResultSetUtil

public class ResultSetUtil {

    public static String resultString(ResultSet resultSet) throws SQLException {
        return resultString(resultSet, false);
    }

    public static String resultString(ResultSet resultSet, boolean printHeader) throws SQLException {
        List<List<Object>> resultList = resultList(resultSet, printHeader);
        return resultString(resultList);
    }

    public static List<List<Object>> resultList(ResultSet resultSet) throws SQLException {
        return resultList(resultSet, false);
    }

    public static String resultString(List<List<Object>> resultList) throws SQLException {
        StringBuilder builder = new StringBuilder();
        resultList.forEach(row -> {
            String rowStr = row.stream()
                    .map(columnValue -> columnValue + ", ")
                    .collect(Collectors.joining());
            rowStr = rowStr.substring(0, rowStr.lastIndexOf(", ")) + "\n";
            builder.append(rowStr);
        });
        return builder.toString();
    }

    public static List<List<Object>> resultList(ResultSet resultSet, boolean printHeader) throws SQLException {
        ArrayList<List<Object>> results = new ArrayList<>();
        final ResultSetMetaData metaData = resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        if (printHeader) {
            ArrayList<Object> header = new ArrayList<>();
            for (int i = 1; i <= columnCount; i++) {
                header.add(metaData.getColumnName(i));
            }
            results.add(header);
        }
        while (resultSet.next()) {
            ArrayList<Object> row = new ArrayList<>();
            for (int i = 1; i <= columnCount; i++) {
                row.add(resultSet.getObject(i));
            }
            results.add(row);
        }
        return results;
    }

}

附錄2:演示Demo源碼地址

https://github.com/xpleaf/calcite-tutorial

相關文章
相關標籤/搜索