當前互聯網、金融、政府等行業,活動流數據幾乎無處不在。對這種數據一般的處理方式是先把各類活動以日誌的形式寫入某種文件,而後週期性地對這些文件進行統計分析。活動流數據的這種處理方式對實時性要求愈來愈高的場景已經不在適用而且這種處理方式也增長了整個系統的複雜性,爲了解決這種問題,分佈式開源消息系統Kakfa已被多家不一樣類型的公司 做爲多種類型的數據管道和消息系統使用。html
Kafka是一種分佈式的,基於發佈/訂閱的消息系統。提供消息持久化能力,支持消息分區,分佈式消費,同時保證每一個分區內的消息順序傳輸,支持在線水平擴展、高吞吐率,同時支持離線數據處理和實時數據處理。java
巨杉數據庫SequoiaDB支持海量分佈式數據存儲,而且支持垂直分區和水平分區,利用這些特性能夠將Kafka中的消息存儲到SequoiaDB中方便業務系統後續數據分析、數據應用。本文主要講解巨杉數據庫SequoiaDB如何消費Kafka中的消息以及將消息存儲到SequoiaDB中。數據庫
巨杉數據庫SequoiaDB是一款分佈式非關係型文檔數據庫,能夠被用來存取海量非關係型的數據,其底層主要基於分佈式,高可用,高性能與動態數據類型設計,它兼顧了關係型數據庫中衆多的優秀設計:如索引、動態查詢和更新等,同時以文檔記錄爲基礎更好地處理了動態靈活的數據類型。PostgreSQL支持標準SQL,巨杉SequoiaDB SSQL套件經過擴展 PostgreSQL功能可使用標準SQL 語句訪問 SequoiaDB 數據庫,完成對SequoiaDB 數據庫的各類操做。將Kafka中的消息存儲到SequoiaDB後,可利用巨杉SequoiaDB SSQL對這些消息數據進行在線實時的數據分析和數據應用。apache
操做系統:windows 7json
JDK:1.7.0_80 64位,下載地址爲:http://www.oracle.com/technetwork/java/javase/downloads/java-archive-downloads-javase7-521261.html#jdk-7u80-oth-JPRbootstrap
eclipse:4.5.2windows
SequoiaDB:1.12.5或以上版本session
Kakfa:0.10.0.0,下載地址爲:http://211.162.127.20/files/5115000001D9C0FE/www-us.apache.org/dist/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz多線程
本項目主要實現從Kafka中消費數據並寫入到SequoiaDB中來展現Kafka對接SequoiaDB的整個過程。oracle
建立項目工程以下圖:
圖3-1-1
在kafka啓動前啓動zookeeper,Kafka啓動,執行腳本以下:
./kafka-server-start.sh ../config/server.properties &
Kafka建立topic,執行腳本以下:
./kafka-topics.sh --zookeeper localhost:2181 --create --topic kafkaSdb --partitions 1 --replication-factor 1
執行結果以下圖:
圖3-2-1
驗證Kafka主題,執行腳本以下:
./kafka-topics.sh --zookeeper localhost:2181 –list
執行結果以下圖:
圖3-2-2
Kafka分佈式系統分爲生產者和消費者,生產者主要產生消息數據供消費者消費,消費者主要消費存儲在Kafka中的消息數據。本項目主要演示向SequoiaDB中寫入Kafka中的消息,故消息的生產只提供演示代碼。生產者和消費者各類參數分別放在各自的配置文件中。
Ø 生產端配置文件以下:
kafka-producer.properties bootstrap.servers=192.168.1.35:9092 retries=0 linger.ms=1 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer partitioner.class=com.sequoiadb.kafka.DefaultPartitioner
Ø 消費端配置文件以下:
kafka-consumer.properties bootstrap.servers=192.168.1.35:9092 enable.auto.commit=true auto.commit.interval.ms=60000 enable.auto.commit=false auto.offset.reset=earliest session.timeout.ms=30000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Ø Kafka主題、SequoiaDB集合、消息分區配置文件以下:
config.json [{ topicName:'kafkaSdb', sdbCLName:'kafkaSdb', partitionNum:1, topicGroupName:'kafkaSdb-consumer-group', pollTimeout:5000 }]
本項目將Kafka的配置放在配置文件中如Kafka的主題,主題的分區數,SequoiaDB集合並用java對象進行封裝,利用工具類進行獲取。
配置信息java實體類以下:
package com.sequoiadb.kafka.bean; public class KafkaConsumerConfig { private String topicName; private String sdbCLName; private int partitionNum = 1; private String topicGroupName; private long pollTimeout = Long.MAX_VALUE; public String getTopicName() { return topicName; } public void setTopicName(String topicName) { this.topicName = topicName; } public String getSdbCLName() { return sdbCLName; } public void setSdbCLName(String sdbCLName) { this.sdbCLName = sdbCLName; } public int getPartitionNum() { return partitionNum; } public void setPartitionNum(int partitionNum) { this.partitionNum = partitionNum; } public String getTopicGroupName() { return topicGroupName; } public void setTopicGroupName(String topicGroupName) { this.topicGroupName = topicGroupName; } public long getPollTimeout() { return pollTimeout; } public void setPollTimeout(long pollTimeout) { this.pollTimeout = pollTimeout; } public String toString(){ return "[topicName="+this.topicName+",sdbCLName="+this.sdbCLName+",partitionNum="+this.partitionNum",topicGroupName="+this.topicGroupName+",pollTimeout="+this.pollTimeout+"]"; } }
配置信息獲取工具類以下:
package com.sequoiadb.utils; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class PropertiesUtils { private static Properties prop = null; static{ InputStream in = PropertiesUtils.class.getClassLoader().getResourceAsStream("config.properties"); prop = new Properties(); try { prop.load(in); } catch (IOException e) { e.printStackTrace(); } } public static String getProperties(String key){ return (String)prop.get(key); } public static void main(String[] argc){ System.out.println(PropertiesUtils.getProperties("scm.url")); } }
生產者業務邏輯代碼展現:
package com.sequoiadb.kafka; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sequoiadb.utils.Configuration; public class PartitionTest { private static Logger log = LoggerFactory.getLogger(PartitionTest.class); private static String location = "kafka-producer.properties";// 配置文件位置 public static void main(String[] args) { Properties props = new Properties(); String json = null; try { props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location)); InputStream in = Configuration.class.getClassLoader().getResourceAsStream("oracle.json"); json = IOUtils.toString(in); } catch (IOException e) { e.printStackTrace(); } KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 1000; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>("oracle", json); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { log.error("the producer has a error:" + e.getMessage()); } } }); } try { Thread.sleep(1000); producer.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } }
消費者業務邏輯採用一線程一主題的方式進行消息的消費,主程序入口代碼以下:
package com.sequoiadb.kafka; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sequoiadb.kafka.bean.KafkaConsumerConfig; import com.sequoiadb.utils.Configuration; import com.sequoiadb.utils.Constants; public class KafkaSdb { private static Logger log = LoggerFactory.getLogger(KafkaSdb.class); private static ExecutorService executor; public static void main(String[] args) { // 獲取kafka主題配置 List<KafkaConsumerConfig> topicSdbList = Configuration.getConfiguration(); if (topicSdbList != null && topicSdbList.size() > 0) { executor = Executors.newFixedThreadPool(topicSdbList.size()); final List<ConsumerThread> consumerList = new ArrayList<ConsumerThread>(); for (int i = 0; i < topicSdbList.size(); i++) { KafkaConsumerConfig consumerConfig = topicSdbList.get(i); ConsumerThread consumer = new ConsumerThread(consumerConfig); consumerList.add(consumer); executor.submit(consumer); } Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (ConsumerThread consumer : consumerList) { consumer.shutdown(); } executor.shutdown(); try { executor.awaitTermination(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }); } else { log.error("主題爲空,請確認主題配置是否正確!"); } } }
線程類負責具體的消息的消費,而且將消息數據寫入到SequoiaDB中,具體代碼以下:
package com.sequoiadb.kafka; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.bson.BSONObject; import org.bson.BasicBSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sequoiadb.base.CollectionSpace; import com.sequoiadb.base.DBCollection; import com.sequoiadb.base.Sequoiadb; import com.sequoiadb.exception.BaseException; import com.sequoiadb.kafka.bean.KafkaConsumerConfig; import com.sequoiadb.utils.ConnectionPool; import com.sequoiadb.utils.Constants; import net.sf.json.JSONArray; import net.sf.json.JSONObject; public class ConsumerThread implements Runnable { private static Logger log = LoggerFactory.getLogger(ConsumerThread.class); private String location = "kafka-consumer.properties";// 配置文件位置 private Sequoiadb sdb = null; private CollectionSpace cs = null; private DBCollection cl = null; private KafkaConsumer<String, String> consumer = null; // private String topicName = null; // private String clName = null; // private String topicGroupName = null; // private long pollTimeout = 1000; private KafkaConsumerConfig consumerConfig; public ConsumerThread(KafkaConsumerConfig consumerConfig) { if (null == sdb) { sdb = ConnectionPool.getInstance().getConnection(); } if (sdb.isCollectionSpaceExist(Constants.CS_NAME)) { cs = sdb.getCollectionSpace(Constants.CS_NAME); } else { throw new BaseException("集合空間" + Constants.CS_NAME + "不存在!"); } if (null == cs) { throw new BaseException("集合空間不能爲null!"); } else { this.consumerConfig = consumerConfig; this.cl = cs.getCollection(this.consumerConfig.getSdbCLName()); } Properties props = new Properties(); try { props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location)); } catch (IOException e) { e.printStackTrace(); } props.put("group.id", this.consumerConfig.getTopicGroupName()); consumer = new KafkaConsumer<>(props); } @Override public void run() { log.info("主題爲" + this.consumerConfig.getTopicName() + "的消費者線程啓動!"); try { // 訂閱topic consumer.subscribe(Arrays.asList(this.consumerConfig.getTopicName())); while (true) { ConsumerRecords<String, String> records = consumer.poll(this.consumerConfig.getPollTimeout()); // consumer.seekToBeginning(Arrays.asList(new // TopicPartition(this.topicName, 0))); // consumer.seek(new TopicPartition(this.topicName, 0), 0); List<BSONObject> list = new ArrayList<BSONObject>(); for (ConsumerRecord<String, String> record : records) { String value = record.value(); JSONObject valueJson = JSONObject.fromObject(value); if (valueJson.containsKey("data")) { JSONArray dataJsonArray = valueJson.getJSONArray("data"); for (int i = 0; i < dataJsonArray.size(); i++) { BSONObject httpBson = new BasicBSONObject(); JSONObject dataJson = dataJsonArray.getJSONObject(i); Iterator iter = dataJson.keys(); while (iter.hasNext()) { String key = (String) iter.next(); String bsonValue = dataJson.getString(key); httpBson.put(key, bsonValue); } list.add(httpBson); // clHttp.insert(httpBson); } } else { log.error("消息中不存在data節點!"); } } if (list != null && list.size() > 0) { try { this.cl.bulkInsert(list, DBCollection.FLG_INSERT_CONTONDUP); log.info("主題爲"+this.consumerConfig.getTopicName()+"的消息插入SDB成功,插入記錄數爲:"+list.size()); } catch (BaseException e) { e.printStackTrace(); } } consumer.commitSync(); } } catch (WakeupException e) { } finally { consumer.close(); } } public void shutdown(){ consumer.wakeup(); } }
從上述對接過程當中,Kafka中的消息寫入SequoiaDB難點是Kafka中主題分區的配置以及多線程如何消費各主題分區中的消息,而且處理消息消費失敗的狀況。