apache ignite系列(三):數據處理(數據加載,數據並置,數據查詢)

​ 使用ignite的一個常見思路就是將現有的關係型數據庫中的數據導入到ignite中,而後直接使用ignite中的數據,至關於將ignite做爲一個緩存服務,固然ignite的功能遠不止於此,下面以將ignite集成進java服務的方式進行演示ignite的數據存儲和查詢相關的功能。因爲我的習慣,示例演示沒有使用測試代碼,而是使用rest接口演示。java

​ 在講數據加載以前,ignite中存儲的幾種模式(LOCAL, REPLICATED, PARTITIONED):node

LOCAL:本地模式,數據都存儲在本地,無數據再平衡,相似常見的存儲服務;git

PARTITIONED:分區模式,數據分散到集羣中的各個節點,分區模式適合存儲數量龐大的數據github

如圖所示是設置了Backup備份數的,默認備份數是0,若是分區模式下不設置備份數的話則會存在丟失數據的風險。算法

REPLICATED:複製模式,有數據再平衡過程,主節點(Primary)數據與分區模式的一致,只是複製模式默認備份了除主節點數據外的其他數據。複製模式適合存儲數據量小,增加不快的數據。spring

分區模式和複製模式各有優勢和缺點,具體選擇要根據實際場景的特色去權衡:sql

模式 優勢 缺點
分區模式(PARTITIONED) 能存儲海量數據,頻繁更新對其影響不大 查詢緩存涉及到數據移動,對查詢性能有影響
複製模式(REPLICATED) 適合存儲數據量不大的數據,數據查詢性能穩定 頻繁更新對其影響較大

1,數據加載

這裏使用mybatis查詢MYSQL裏的數據而後存入ignite,完整代碼能夠參考:數據庫

https://github.com/cording/ignite-exampleapache

爲了演示,須要先在MYSQL中生成樣本數據,相關sql腳本爲ignite-example\src\main\resources\import.sql,執行該SQL腳本便可完成表的建立和測試數據的初始化。api

在配置文件中定義緩存

<bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="student"/>
                    <property name="cacheMode" value="REPLICATED"/>
                    <property name="backups" value="1"/>
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="copyOnRead" value="false"/>
                    <property name="dataRegionName" value="Default_Region"/>
                    <property name="indexedTypes">
                        <list>
                            <value>java.lang.Long</value>
                            <value>org.cord.ignite.data.domain.Student</value>
                        </list>
                    </property>
                </bean>

添加相關依賴

<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-core</artifactId>
    <version>${ignite.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-spring</artifactId>
    <version>${ignite.version}</version>
</dependency>
<!-- 使用索引的話須要用到ignite-indexing這個模塊 -->
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-indexing</artifactId>
    <version>${ignite.version}</version>
</dependency>

​ 通常將數據導入ignite集羣的方式是使用cache.put(...)方法,可是當有大量的數據須要導入的時候,put的效率已經沒法知足了,針對大量數據導入可使用ignite的流處理器:

DataLoader.java

......
    /**導入數據到ignite*/
    public void loadData(){
        //查詢student集合
        List<Student> result = studentDao.findAllStudents();
        //分佈式id生成器
        IgniteAtomicSequence sequence = ignite.atomicSequence("studentPk", 0, true);
        //根據緩存名獲取流處理器,並往流處理器中添加數據
        try(IgniteDataStreamer<Long, Student> streamer = ignite.dataStreamer(CacheKeyConstant.STUDENT)) {
            result.stream().forEach(r -> streamer.addData(sequence.incrementAndGet(), r));
            //將流裏面的剩餘數據壓進ignite
            streamer.flush();
        }
    }
......

導入數據以後,能夠在監控程序中看到數據存儲狀況:

​ 流之因此能提升加載數據的速度,是由於流本質就是一個批處理。ignite是經過一致性哈希保證一致性的,每往集羣中存入一條cache記錄,ignite會先根據一致性哈希算法計算出這條cache映射到哪一個節點,而後會將這條記錄存儲在該節點。而在流處理器中,流處理器會將映射到相同節點的數據批量存儲到對應節點,這樣會顯著提高數據加載的效率。

2,數據查詢

​ 最直接的查詢緩存方式是使用cache.get(...)方法,這種方式只能應對簡單的key-value緩存,若是是設置了索引類型(indexedTypes),則緩存就會變成SQL table,這個時候就須要使用SQL方式查詢,當使用SQL方式查詢的時候,通常會有各類查詢條件,這些查詢條件對應的字段均須要預先設置索引。ignite裏面有兩種索引,一種是普通索引,一種是組合索引,要用到@QuerySqlField註解。而查詢用到的api主要是SqlFieldsQuerySqlQuery,前者是域查詢,也就是查詢部分字段結果集,然後者是普通查詢。

​ 因此,若是想使用SQL查詢,就須要在加載數據以前在緩存定義中設置索引類型(indexedTypes),並對查詢中可能用到的字段在對應實體類中相關屬性添加註解,有必要的狀況下還要設置索引。當定義緩存的時候設置了索引類型,則緩存再也不是普通的KV形式的緩存了,而是具備數據庫表的特性,這時候ignite就變成了分佈式的內存數據庫了,其sql相關功能是基於h2的sql引擎實現的。

1) 設置緩存索引類型

  • JAVA代碼定義緩存時設置索引類型

這裏以long爲主鍵,String爲實體類做爲示例:

使用CacheConfiguration.setIndexedTypes(Long.class, String.class)便可設置索引

  • XML配置中設置索引類型

一樣也是設置indexedTypes屬性便可

<bean class="org.apache.ignite.configuration.CacheConfiguration">
......
    <property name="indexedTypes">
        <list>
            <value>java.lang.Long</value>
            <value>org.cord.ignite.data.domain.Student</value>
        </list>
    </property>
......
</bean>

2) 註解@QuerySqlField的三種用法

  • 啓用實體類屬性爲查詢域
@QuerySqlField
    private String test;

加上該註解後,test字段才能夠在sql語句中訪問,這種形式不對該屬性列建立索引。

  • 啓用查詢域併爲該列設置普通索引
@QuerySqlField(index = true)
    private String test;
  • 啓用查詢域並設置組合索引
@QuerySqlField(orderedGroups = {@QuerySqlField.Group(
            name = "student", order = 0)})
    private String name;

    @QuerySqlField(orderedGroups = {@QuerySqlField.Group(
            name = "student", order = 1)})
    private String email;

其中name屬性指定了組合索引的名字,order表示該字段在組合索引中的順序。

該組合索引與普通數據庫相似,一樣遵循最左原則,即組合索引是否會用到會受到最左原則的限制。

3) 使用 SqlFieldsQuery進行域查詢

SQL語法中有兩個預約義字段_key_val

_key : 表示緩存中的全部鍵

_val: 表示緩存中的全部值對象

List<List<?>> res = cache.query(new SqlFieldsQuery("select _VAL,name from \"Test\".student")).getAll();
System.out.format("The name is %s.\n", res.get(0).get(0));

4) 使用SqlQuery進行普通查詢

NormalController.class

@RequestMapping("/sqlQuery")
    public @ResponseBody
    String sqlQuery(HttpServletRequest request, HttpServletResponse response) {
        IgniteCache<Long, Student> tempCache = ignite.cache(CacheKeyConstant.STUDENT);

        /**普通查詢*/
        String sql_query = "name = ? and email = ?";
        SqlQuery<Long, Student> cSqlQuery = new SqlQuery<>(Student.class, sql_query);
        cSqlQuery.setReplicatedOnly(true).setArgs("student_44", "student_44gmail.com");

        List<Cache.Entry<Long, Student>> tempResult = tempCache.query(cSqlQuery).getAll();

        if (CollectionUtils.isEmpty(tempResult)) {
            return "result is Empty!";
        }
        Student student = tempResult.stream().map(t -> t.getValue()).findFirst().get();
        System.out.format("the beginning of student[student_44] is %s\n", student.getDob());

        /**聚合函數查詢*/
        /**[count]*/
        String sql_count = "select count(1) from student";
        SqlFieldsQuery countQuery = new SqlFieldsQuery(sql_count);
        countQuery.setReplicatedOnly(true);
        List<List<?>> countList =  tempCache.query(countQuery).getAll();

        long count = 0;
        if(!CollectionUtils.isEmpty(countList)) {
            count = (Long)countList.get(0).get(0);
        }
        System.out.format("count of cache[student] is %s\n", count);

        /**[sum]*/
        String sql_sum = "select sum(studId) from student";
        SqlFieldsQuery sumQuery = new SqlFieldsQuery(sql_sum);
        sumQuery.setReplicatedOnly(true);
        List<List<?>> sumList = tempCache.query(sumQuery).getAll();
        long sum = 0;
        if(!CollectionUtils.isEmpty(sumList)) {
            sum = (Long)sumList.get(0).get(0);
        }
        System.out.format("sum of cache[student.id] is %s\n", sum);

        return "all executed!";
    }

運行結果以下:

the beginning of student[student_44] is Thu Sep 28 00:00:00 GMT+08:00 2017
count of cache[student] is 500
sum of cache[student.id] is 125250

3,數據並置與關聯查詢

​ 數據的並置主要是針對採用分區模式存儲的數據,所謂的數據並置,就是提供一種約束,將相關連的數據存儲在相同的網格節點上,這樣在數據查詢或者分佈式計算的時候就不須要數據移動了,這樣會提高總體的性能。

​ 如下以X,Y,Z三個cache的並置做爲示例,完整代碼請查看示例工程ignite-example

其中X,Y,Z爲三個分區模式的cache,Y與X並置,也就是說,Y的數據在存儲的時候,會根據其XId屬性,將數據存儲在對應的X所在的節點上,同理,Z與Y並置,也就是將Z的數據存儲在其YId屬性對應的Y所在的節點上。以此造成一種約束,使得數據的分配能夠人爲控制。

要使用數據並置,就不得不提到一個API了,也就是AffinityKey,當一個cache與另一個cache並置的時候,其cache的Key就得是AffinityKey類型了。

首先進行數據初始化:

CollocatedController.java

private String init(){
        if(init.get()){
            return "already execute init.";
        }
        //定義三個緩存
        CacheConfiguration<Long, X>  xcf = new CacheConfiguration<Long, X>("X")
                .setCacheMode(CacheMode.PARTITIONED)
                .setIndexedTypes(Long.class, X.class);
        CacheConfiguration<AffinityKey<Long>, Y>  ycf = new CacheConfiguration<AffinityKey<Long>, Y>("Y")
                .setCacheMode(CacheMode.PARTITIONED)
                .setIndexedTypes(Affinity.class, Y.class);
        CacheConfiguration<AffinityKey<Long>, Z>  zcf = new CacheConfiguration<AffinityKey<Long>, Z>("Z")
                .setCacheMode(CacheMode.PARTITIONED)
                .setIndexedTypes(Affinity.class, Z.class);

        ignite.destroyCache("X");
        ignite.destroyCache("Y");
        ignite.destroyCache("Z");
        ignite.getOrCreateCache(xcf);
        ignite.getOrCreateCache(ycf);
        ignite.getOrCreateCache(zcf);

        IgniteCache<Long, X> xc = ignite.cache("X");
        IgniteCache<AffinityKey<Long>, Y> yc = ignite.cache("Y");
        IgniteCache<AffinityKey<Long>, Z> zc = ignite.cache("Z");
        //加載數據
        Y y;
        Z z;
        for (long i = 0; i < 100; i++) {
            xc.put(i, new X(i, String.valueOf(i)));
            y = new Y(i, String.valueOf(i), i);
            yc.put(y.key(), y);
            z = new Z(i, String.valueOf(i), i);
            zc.put(z.key(), z);
        }

        init.set(true);
        return "all executed.";
    }

而cache並置以後怎麼校驗並置是否成功呢?這就要用到Affinity.classmapKeyToNode()方法了,其做用是根據給定的key,找到存儲該key的節點信息,具體使用方法以下:

@RequestMapping("/verify")
public @ResponseBody
String verifyCollocate(HttpServletRequest request, HttpServletResponse response) throws Exception {
    if(!init.get()){
        init();
    }

    Affinity<Long> affinityX = ignite.affinity("X");
    Affinity<Long> affinityY = ignite.affinity("Y");
    Affinity<Long> affinityZ = ignite.affinity("Z");

    for (long i = 0; i < 100; i++) {
        ClusterNode nodeX = affinityX.mapKeyToNode(i);
        ClusterNode nodeY = affinityY.mapKeyToNode(i);
        ClusterNode nodeZ = affinityZ.mapKeyToNode(i);

        if(nodeX.id() != nodeY.id() || nodeY.id() != nodeZ.id() || nodeX.id() != nodeZ.id()){
            throw new Exception("cache collocated is error!");
        }
    }
    System.out.println("cache collocated is right!");

    return "all executed.";
}

執行/verify以後,無異常拋出,在監控程序中查看一下存儲狀況:

會發現三個cache的數據分佈徹底一致,這與驗證程序的結果(無異常拋出)保持一致,說明cache並置成功。

當數據並置成功後,就可使用關聯查詢了,能夠類比數據庫中的多表聯查:

@RequestMapping("/query")
public @ResponseBody
String query(HttpServletRequest request, HttpServletResponse response){
    if(!init.get()){
        init();
    }
    IgniteCache<Long, X> xc = ignite.cache("X");
    IgniteCache<AffinityKey<Long>, Y> yc = ignite.cache("Y");
    IgniteCache<AffinityKey<Long>, Z> zc = ignite.cache("Z");

    String sql1 = "from Y,\"X\".X " +
            "where Y.XId = X.id " +
            "and Y.info = ?";
    String sql2 = "from Z,\"Y\".Y " +
            "where Z.YId = Y.id " +
            "and Z.info = ?";
    String sql3 = "from Z,\"Y\".Y,\"X\".X " +
            "where Z.YId = Y.id and Y.XId = X.id " +
            "and Z.info = ?";

    int i = IntStream.range(1, 100).skip((int)(100*Math.random())).findFirst().getAsInt();

    System.out.println("query X and Y:");
    System.out.println(yc.query(new SqlQuery<AffinityKey<Long>, Y>(Y.class, sql1).setArgs(i)).getAll());
    System.out.println("**************************************************************************************");

    System.out.println("query Y and Z:");
    System.out.println(zc.query(new SqlQuery<AffinityKey<Long>, Z>(Z.class, sql2).setArgs(i)).getAll());
    System.out.println("**************************************************************************************");

    System.out.println("query X and Y and Z:");
    System.out.println(zc.query(new SqlQuery<AffinityKey<Long>, Z>(Z.class, sql3).setArgs(i)).getAll());
    System.out.println("**************************************************************************************");

    return "all executed.";
}

執行結果以下:

query X and Y:
[Entry [key=AffinityKey [key=83, affKey=83], val=org.cord.ignite.example.collocated.Y@605e8969]]
**************************************************************************************
query Y and Z:
[Entry [key=AffinityKey [key=83, affKey=83], val=org.cord.ignite.example.collocated.Z@562dbd4]]
**************************************************************************************
query X and Y and Z:
[Entry [key=AffinityKey [key=83, affKey=83], val=org.cord.ignite.example.collocated.Z@7ff851ce]]
**************************************************************************************

若是是沒有並置的緩存,在關聯查詢的時候就須要啓用非並置的分佈式關聯:SqlQuery.setDistributedJoins(true)

數據並置還可使用註解@AffinityKeyMapped註解,其用法與使用AffinityKey .class相似,完整示例可參看AffinityMappedController.class

至此,ignite的數據處理相關內容結束。

相關文章
相關標籤/搜索