這二者都具備高可擴展性和高可靠性,本文將說明如何集成Apache Ignite和Apache Kafka消息系統,以得到一個健壯的數據處理管道。目前,這二者的集成有兩個開箱即用的解決方案,一個是KafkaStreamer,一個是IgniteSinkConnector,其中IgniteSinkConnector是基於Apache Kafka最近發佈的新特性-Kafka鏈接器。html
從Ignite 1.3版本開始,經過他的KafkaStreamer,支持從Kafka中獲取數據,而後將其注入Ignite用於基於內存的數據處理。KafkaStreamer是IgniteDataStreamer的一個實現,他會使用Kafka的消費者從Kafka代理中拉取數據,而後將數據高效地注入Ignite緩存。 要使用它,首先,須要將KafkaStreamer依賴加入pom.xml文件:java
<dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-kafka</artifactId> <version>${ignite.version}</version> </dependency>
假設已經有一個緩存,鍵和值都是String類型,經過一個簡單的方式就能實現數據的流化處理:shell
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>(); try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) { // 覆蓋已有數據 stmr.allowOverwrite(true); kafkaStreamer.setIgnite(ignite); kafkaStreamer.setStreamer(stmr); // 設置主題 kafkaStreamer.setTopic(someKafkaTopic); // 設置處理Kafka流進程的線程數 kafkaStreamer.setThreads(4); // 設置Kafka消費者的配置 kafkaStreamer.setConsumerConfig(kafkaConsumerConfig); // 設置解碼器 kafkaStreamer.setKeyDecoder(strDecoder); kafkaStreamer.setValueDecoder(strDecoder); kafkaStreamer.start(); }finally { kafkaStreamer.stop(); }
至於如何配置消費者,能夠參考Kafka文檔apache
Apache Ignite從即將到來的1.6版本開始,將能夠經過另外一種方式實現數據處理的集成。他基於Kafka鏈接器,這是一個從Apache Kafka 0.9版本中引入的一個新特性,它在Apache Kafka和其餘的數據系統間實現了可擴展和可靠的的流化數據處理。這樣的集成能夠從Kafka中得到持續且安全的流化數據,而後注入Ignite用於內存中的大規模數據集的計算和事務處理。 bootstrap
鏈接器位於optional/ignite-kafka
,它和它的依賴須要加入一個Kafka運行實例的類路徑中。 同時,還須要像下面這樣配置鏈接器:緩存
# connector name=my-ignite-connector connector.class=IgniteSinkConnector tasks.max=2 topics=someTopic1,someTopic2 # cache cacheName=myCache cacheAllowOverwrite=true igniteCfg=/some-path/ignite.xml
其中igniteCfg要設置爲Ignite緩存配置文件的路徑,cacheName設置爲*/some-path/ignite.xml中設置的緩存的名字,someTopic1,someTopic2是數據的來源,cacheAllowOverwrite設置爲true*,它會覆蓋緩存中的已有數據。 另外一個重要的配置是Kafka鏈接器屬性,能夠看以下的示例:安全
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.storage.StringConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000
這時能夠啓動鏈接器,好比,在一個獨立運行模式中能夠這樣:線程
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
能夠經過一個簡單的方式,作一下這個流程的基本驗證。 啓動Zookeeper和Kafka服務:翻譯
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
由於這個示例只能處理字符串數據,因此只是經過kafka-console-producer注入key1和val1(這麼作只是爲了舉例--正常的話須要使用一個Kafka生產者或者數據源頭的鏈接器來注入數據):代理
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=, key1,val1
而後,按照前述的方法啓動鏈接器。
完成!數據所有注入Ignite緩存,已經能夠進行內存內的處理,或者經過SQL查詢進行分析(能夠參照Apache Ignite文檔)。
本文翻譯整理自Roman Shtykh的博客。