Apache Ignite(五):Ignite和Kafka集成-實現高可擴展和高可靠的數據處理

這二者都具備高可擴展性和高可靠性,本文將說明如何集成Apache Ignite和Apache Kafka消息系統,以得到一個健壯的數據處理管道。目前,這二者的集成有兩個開箱即用的解決方案,一個是KafkaStreamer,一個是IgniteSinkConnector,其中IgniteSinkConnector是基於Apache Kafka最近發佈的新特性-Kafka鏈接器html

1.經過KafkaStreamer注入數據

從Ignite 1.3版本開始,經過他的KafkaStreamer,支持從Kafka中獲取數據,而後將其注入Ignite用於基於內存的數據處理。KafkaStreamer是IgniteDataStreamer的一個實現,他會使用Kafka的消費者從Kafka代理中拉取數據,而後將數據高效地注入Ignite緩存。 要使用它,首先,須要將KafkaStreamer依賴加入pom.xml文件:java

<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-kafka</artifactId>
    <version>${ignite.version}</version>
</dependency>

假設已經有一個緩存,鍵和值都是String類型,經過一個簡單的方式就能實現數據的流化處理:shell

KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {

    // 覆蓋已有數據
    stmr.allowOverwrite(true);

    kafkaStreamer.setIgnite(ignite);

    kafkaStreamer.setStreamer(stmr);

    // 設置主題
    kafkaStreamer.setTopic(someKafkaTopic);

    // 設置處理Kafka流進程的線程數
    kafkaStreamer.setThreads(4);

    // 設置Kafka消費者的配置
    kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);

    // 設置解碼器
    kafkaStreamer.setKeyDecoder(strDecoder);

    kafkaStreamer.setValueDecoder(strDecoder);
    kafkaStreamer.start();
}finally {
    kafkaStreamer.stop();
}

至於如何配置消費者,能夠參考Kafka文檔apache

2.經過IgniteSinkConnector注入數據

Apache Ignite從即將到來的1.6版本開始,將能夠經過另外一種方式實現數據處理的集成。他基於Kafka鏈接器,這是一個從Apache Kafka 0.9版本中引入的一個新特性,它在Apache Kafka和其餘的數據系統間實現了可擴展和可靠的的流化數據處理。這樣的集成能夠從Kafka中得到持續且安全的流化數據,而後注入Ignite用於內存中的大規模數據集的計算和事務處理。 bootstrap

鏈接器位於optional/ignite-kafka,它和它的依賴須要加入一個Kafka運行實例的類路徑中。 同時,還須要像下面這樣配置鏈接器:緩存

# connector
name=my-ignite-connector
connector.class=IgniteSinkConnector
tasks.max=2
topics=someTopic1,someTopic2
# cache
cacheName=myCache
cacheAllowOverwrite=true
igniteCfg=/some-path/ignite.xml

其中igniteCfg要設置爲Ignite緩存配置文件的路徑,cacheName設置爲*/some-path/ignite.xml中設置的緩存的名字,someTopic1,someTopic2是數據的來源,cacheAllowOverwrite設置爲true*,它會覆蓋緩存中的已有數據。 另外一個重要的配置是Kafka鏈接器屬性,能夠看以下的示例:安全

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

這時能夠啓動鏈接器,好比,在一個獨立運行模式中能夠這樣:線程

bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties

3.流程驗證

能夠經過一個簡單的方式,作一下這個流程的基本驗證。 啓動Zookeeper和Kafka服務:翻譯

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

由於這個示例只能處理字符串數據,因此只是經過kafka-console-producer注入key1val1(這麼作只是爲了舉例--正常的話須要使用一個Kafka生產者或者數據源頭的鏈接器來注入數據):代理

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=, key1,val1

而後,按照前述的方法啓動鏈接器。

完成!數據所有注入Ignite緩存,已經能夠進行內存內的處理,或者經過SQL查詢進行分析(能夠參照Apache Ignite文檔)。

本文翻譯整理自Roman Shtykh的博客

相關文章
相關標籤/搜索