是什麼?java
1) Apache Kafka 是一個消息隊列(生產者消費者模式)node
2) Apache Kafka 目標:構建企業中統一的、高通量、低延時的消息平臺。mysql
3) 大多的是消息隊列(消息中間件)都是基於JMS標準實現的,Apache Kafka 相似於JMS的實現。git
有什麼用?github
1) 做爲緩衝,來異構、解耦系統。 redis
消息持久化(Kafka 基於文件系統來存儲和緩存消息)。算法
高吞吐量(Kafka 將數據寫到磁盤,可是在底層採用了零拷貝技術,因此速度比較快)。spring
高擴展性(Kafka 依賴ZooKeeper來對集羣進行協調管理,同時在機器擴展時無需將整個集羣停機)。sql
多客戶端支持(Kafka 核心模塊用Scala 語言開發,但提供多種開發語言接入,包括Java,Python等)。shell
安全機制(支持代理與ZooKeeper 鏈接身份驗證,客戶端讀、寫權限認證)。
數據備份(Kafka 能夠爲每一個主題指定副本數,對數據進行持久化備份)。
輕量級(Kafka 的實例是無狀態的,同時集羣自己幾乎不須要生產者和消費者的狀態信息)。
消息壓縮(Kafka 支持Gzip, Snappy 、LZ4 這3 種壓縮方式,把多條消息壓縮成MessageSet)。
解耦,異構
Kafka Cluster:由多個服務器組成。每一個服務器單獨的名字broker(掮客)。
Kafka Producer:生產者、負責生產數據。
Kafka consumer:消費者、負責消費數據。
Kafka Topic: 主題,一類消息的名稱。存儲數據時將一類數據存放在某個topci下,消費數據也是消費同樣。
ZooKeeper:Kafka的元數據都是存放在zookeeper中。
建立一個topic(主題):
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic order
啓動一個生產者,用來生產數據 :
bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic order
啓動給一個消費者,消費數據:
bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic order
JavaAPI操做Kafka所須要的依賴:
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
生產者相關操做:
//建立Properties配置參數對象,並設置參數
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
props.put("acks", "all");
//建立一個KafkaProducer,Kafka生產者對象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 1000; i++) {
// 發送數據 ,須要一個producerRecord對象,最少參數 String topic, V value
kafkaProducer.send(new ProducerRecord<String, String>("order", "訂單信息!"+i));
Thread.sleep(100);
}
}
消費者相關操做:
// 一、建立配置參數對象,並鏈接集羣
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//二、建立Kafka的消費者對象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//三、訂閱一個主題,訂閱主題需傳入List格式
kafkaConsumer.subscribe(Arrays.asList("order"));
//四、使用死循環不停拉取數據
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println("消費的數據爲:" + record.value());
}
}
topic相關操做:
因爲主題的元數據信息是註冊在 ZooKeeper 相應節點之中,因此對主題的操做實質是對ZooKeeper中記錄主題元數據信息相關路徑的操做。Kafka將對ZooKeeper的相關操做封裝成一 個ZkUtils 類,井封裝了一個AdrninUtils類調用ZkClient類的相關方法以實現對 Kafka 元數據 的操做,包括對主題、代理、消費者等相關元數據的操做。對主題操做的相關API調用較簡單,相應操做都是經過調用AdminUtils類的方法來完成的。
建立topic(通常經常使用方法一):
方法一:
//參數:zookeeper的地址,session超時時間,鏈接超時時間,是否啓用zookeeper安全機制
zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
方法二:
//參數:zkUtils,topic名稱,partition數量,副本數量,參數,機架感知模式
AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
刪除topic:
//參數:zkUtils,topic名稱,partition數量,副本數量,參數,機架感知模式
AdminUtils.deleteTopic(zkUtils, topicName);
判斷是否存在:
AdminUtils.topicExists(zkUtils, topicName);
分區:當數據量很是大的時候,一個服務器存放不了,就將數據分紅兩個或者多個部分,存放在多臺服務器上。每一個服務器上的數據,叫作一個分區。
副本:當數據只保存一份時,有丟失風險。爲了更好的容錯和容災,將數據拷貝幾份,保存到其餘機器上。
設置分區和副本的方法:
控制檯上:--replication-factor 1 --partitions 3
API代碼:AdminUtils.createTopic(zkUtils, topicName, 3, 1, new Properties(),
AdminUtils.createTopic$default$6());
生產者消息不丟失機制:
發送消息的同步和異步模式:
同步模式:生產者重試3次,若是尚未響應,就報錯。生產者等待10S,若是broker沒有給出ack響應,就認爲失敗。
異步模式:先將數據保存在生產者端的buffer中。Buffer大小是2萬條。發送一批數據的大小是500條。知足數據閾值或者數量閾值其中的一個條件就能夠發送數據。
消息確認的三個狀態:
0狀態:生產者只負責發送數據,無論Kafka的broker是否接收到數據;
1狀態:某個partition的leader收到數據給出響應;
-1狀態:某個partition的全部副本都收到數據後給出響應
Borker端消息不丟失機制:
消費者端消息不丟失:
若是有一個外部存儲可以記錄每一個consumer消費partition的offset值。就不會形成數據丟失,只會有重複消費的可能。而在Kafka0.8之後,offset值能夠存放到Kafka內置的topic中。
Kafka做爲消息中間件,只負責消息的臨時存儲,並非永久存儲,須要刪除過時的數據;
若是一個partition中有10T數據,是如何存放的?是存放在一個文件中,仍是存放在多個文件中?
Kafka時採用存儲到多個文件中的方式。由於若是將全部數據都存放在一個文件中,須要刪除過時數據的時候,就比較麻煩。由於文件有日期屬性,刪除過時數據,只須要根據文件的日期屬性刪除就好。
Kafka的數據是存儲在/export/data/kafka(能夠本身設置)目錄下,存儲時是將數據劃分爲一個個的segment段,在segment段中有兩個核心的文件,一個是log,一個是index。當log文件等於1G時,新的會寫入到下一個segment中。
在Kafka中進行消息查詢時,首先會查找segment中的index索引文件,index索引文件是以起始來命名的,根據查詢索引文件能很快的定位到具體文件。
當根據index索引文件定位到須要查詢的具體文件時,就會去查找log文件,在該文件中按順序查找到目標文件
kafka在數據生產的時候,有一個數據分發策略。默認的狀況使用DefaultPartitioner.class類。若是用戶制定了partition,生產就不會調用DefaultPartitioner.partition()方法。
當用戶指定key,就會使用hash算法來肯定發往那個patition。若是key一直不變,同一個key算出來的hash值是個固定值。若是是固定值,這種hash取模就沒有意義。
例:Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
還能夠指定將數據發往哪一個partition。當ProducerRecord 的構造參數中有partition的時候,就能夠發送到對應partition上。
例:public ProducerRecord(String topic, Integer partition, K key, V value)
若是既沒有指定partition,也沒有key的狀況下,那就使用輪詢的方式發送數據。
一個partition只能被一個組中的成員消費。因此若是消費組中有多於partition數量的消費者,那麼必定會有消費者沒法消費數據。若是消費組中的消費組小於partition,那麼消費的數據就不完整,會形成錯誤。
概述:Spring對kafka作了支持,以便簡化咱們的開發工做,官網:https://spring.io/projects/spring-kafka
依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
第一步:編寫application-kafka-producer.xml文件
第二步:編寫java代碼 TestSpringKafkaProducer
//類上方添加以下注釋 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(value = {"classpath:application-kafka-producer.xml"}) //注入一個KafkaTemplate對象 @Autowired private KafkaTemplate kafkaTemplate; //使用注入的對象發送數據到Kafka(發送的數據能夠是對象,會自動進行json轉換) kafkaTemplate.sendDefault(order);
第一步:編寫application-kafka-consumer.xml文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <bean id="consumerConfig" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="node01:9092,node02:9092"/> <entry key="group.id" value="my-group-spring-spring-3"/> <entry key="client.id" value="my-test-client-spring-3"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <!--反序列化器,這裏要注意設置的是字符串的反序列化--> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 定義消費者的工廠 --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg ref="consumerConfig"/> </bean> <!--定義消息監聽器,用於接收消息--> <bean id="myMessageListener" class="cn.itcast.kafka.MyMessageListener"/> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <!--設置消費的topic,這裏能夠指定多個topic--> <constructor-arg value="my-kafka-topic" type="java.lang.String[]"/> <property name="messageListener" ref="myMessageListener"/> </bean> <!--建立Listener容器--> <bean class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="start"> <constructor-arg index="0" ref="consumerFactory"/> <constructor-arg index="1" ref="containerProperties"/> </bean> </beans>
第二步:建立一個類,注入上述配置文件便可接收
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(value = {"classpath:application-kafka-consumer.xml"}) public class TestSpringKafkaConsumer { @Test public void testConsumer() { } }
Kafka Manager 由 yahoo 公司開發,該工具能夠方便查看集羣 主題分佈狀況,同時支持對 多個集羣的管理、分區平衡以及建立主題等操做。
Kafka和Kafka-manager的詳細安裝和部署,請詳看其安裝部署文件。
啓動Kafka-manager:
cd /export/servers/kafka-manager-1.3.3.17/bin ./kafka-manager -Dconfig.file=../conf/application.conf
官網:http://storm.apache.org/ 源碼:https://github.com/apache/storm
Storm是一個開源免費的分佈式實時計算系統,Storm能夠輕鬆的處理無界的數據流。
Storm只負責數據的計算,不負責數據的存儲。
2013年先後,阿里巴巴基於storm框架,使用java語言開發了相似的流式計算框架佳做,Jstorm。2016年年末阿里巴巴將源碼貢獻給了Apache storm,兩個項目開始合併,新的項目名字叫作storm2.x。阿里巴巴團隊專一flink開發。
架構說明:
在集羣架構中,用戶提交到任務到storm,交由nimbus處理。
nimbus經過zookeeper進行查找supervisor的狀況,而後選擇supervisor進行執行任務。
supervisor會啓動一個woker進程,在worker進程中啓動線程進行執行具體的業務邏輯。
Spout:Spout繼承BaseRichSpout,其中有三個方法:open(用來進行初始化),nextTuple(storm框架會不斷調用該方法進行執行,向下遊發送數據),declareOutputFields(定義向下遊發送的數據的名稱,定義的名稱和發送的數據的順序要一致)。
Bolt:Bolt繼承BaseRichBolt,其中有三個方法:prepare(初始化操做,只會執行一次),execute(storm框架會不斷調用該方法進行執行,處理業務邏輯,向下遊發送數據),declareOutputFields(定義向下遊發送的數據的名稱,定義的名稱和發送的數據的順序要一致)。
Tuple:一次消息傳遞的基本單元。原本應該是一個key-value的map,可是因爲各個組件間傳遞的tuple的字段名稱已經事先定義好,因此tuple中只要按序填入各個value就好了,因此就是一個value list.
Stream:源源不斷傳遞的tuple就組成了stream。
Topology:Storm中運行的一個實時應用程序,由於各個組件間的消息流動造成邏輯上的一個拓撲結構。
建立Topology的步驟:
第一步,建立一個TopologyBuilder;
第二步,向TopologyBuilder中設置Spout和Bolt,而且進行鏈接;
第三步,topologyBuilder.createTopology()獲得Topology對象;
第四步,提交到Storm進行運行(本地模式:localCluster.submitTopology(),集羣模式:
StormSubmitter.submitTopology());
由於頻繁切換本地和集羣模式太麻煩了,當進行Topology優化後,本地和集羣就能夠相同代碼,當啓動時,能夠根據main方法中args作出判斷,若是args沒有參數,說明是本地模式,若是有參數,說明是集羣模式。
優化以後,本地模式和集羣模式可使用相同的代碼,不要進行切換,並且在集羣模式下,能夠對Topology進行自定義名稱。
因爲worker和supervisor分離,當worker宕機,但supervisor能運行時,supervisor會嘗試從新啓動worker
若是supervisor也宕機了,那nimbus會從新分配其它的supervisor進行執行
若是隻有supervisor宕機,但其下面的worker沒有宕機時,那worker會正常工做,不會有影響
若是supervisor及其下面的worker宕機,首先會將分配給該機器的任務暫停,而且nimbus會從新分配機器來執行該節點上的任務
因此須要藉助外部的監控手段來保障supervisor的高可用
當nimbus宕機時,不會影響任務的執行,將影響的是,任務的提交,不能再向集羣提交任何任務
因此須要配置nimbus的高可用:能夠配置、啓動多個nimbus來保障;也能夠須要藉助外部的監控手段來保障nimbus的高可用
有些時候咱們須要將Storm計算完的數據持久化到數據庫,因此須要在Storm中整合JDBC進行持久化。
官方文檔:https://github.com/apache/storm/blob/master/docs/storm-jdbc.md
引入storm-jdbc和mysql驅動依賴
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.34</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-jdbc</artifactId> <version>1.1.1</version> </dependency>
使用JdbcInsertBolt進行整合的步驟
public static IRichBolt build() { //第一步:定義數據庫鏈接信息,包括數據庫驅動和數據庫地址,用戶名,密碼等 Map<String, Object> hikariConfigMap = new HashMap<String, Object>(); hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost:3306/storm"); hikariConfigMap.put("dataSource.user", "root"); hikariConfigMap.put("dataSource.password", "root"); ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); // 第二步:定義表名,以及定義字段的映射,這裏指定的是tupe中的字段名稱,用於獲取數據 String tableName = "tb_wordcount"; List<Column> columnSchema = Lists.newArrayList( new Column("word", Types.VARCHAR), new Column("count", Types.INTEGER)); // 第三步:定義jdbc的映射器 JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); //第四步: 定義插入數據的Bolt,而且指定了插入的sql語句 JdbcInsertBolt wordCountBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper) .withInsertQuery("INSERT INTO `tb_wordcount` VALUES (NULL, ?, ?, NOW())") .withQueryTimeoutSecs(30); //第五步:將結果進行返回 return wordCountBolt; }
//整合到TopologyBuilder中 topologyBuilder.setBolt("JdbcBolt",JdbcBoltBuilder.build()).shuffleGrouping("WordBolt");
Storm和Redis整合是很是經常使用的場景,Storm也支持了Redis的支持。
官方文檔:https://github.com/apache/storm/blob/master/docs/storm-redis.md
引入依賴
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>1.1.1</version> </dependency>
建立WordCountStoreMapper
public class WordCountStoreMapper implements RedisStoreMapper { private RedisDataTypeDescription redisDataTypeDescription; public WordCountStoreMapper() { // 定義Redis中的數據類型 this.redisDataTypeDescription = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING); } @Override public RedisDataTypeDescription getDataTypeDescription() { return this.redisDataTypeDescription; } @Override public String getKeyFromTuple(ITuple iTuple) { // 生成redis中的key String word = iTuple.getStringByField("word"); return "wordCount:" + word; } @Override public String getValueFromTuple(ITuple iTuple) { // 存儲到redis中的值 Integer count = iTuple.getIntegerByField("count"); return String.valueOf(count); } }
整合到Topology中
//建立Redis的鏈接參數 JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost("node01").setPort(6379).build(); //根據Redis的鏈接參數和Redis的Mapper類,建立Redis的RedisStoreBolt對象,並將該對象整合到Topology中 topologyBuilder.setBolt("RedistBolt", new RedisStoreBolt(poolConfig, new WordCountStoreMapper())).localOrShuffleGrouping("WordCountBolt");
Storm與Kafka的整合也是很是常見的,經常用於數據讀取,因此咱們更多的是要關注Spout。
官方文檔:https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md
引入依賴
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.1.1</version> </dependency>
整合KafkaSpout到Topology
//已知存在kafka的一個topic,該topic有3個分區,2個副本 //第一步,定義TopologyBuilder對象,用於構建拓撲 TopologyBuilder topologyBuilder = new TopologyBuilder(); //第二步,設置spout和bolt,建立kafka的Spout,並鏈接到kafka的模板topic上,並設置消費者id KafkaSpoutConfig.Builder<String, String> kafkaSpoutBuilder = KafkaSpoutConfig.builder("node01:9092", "kafka-storm-topic"); kafkaSpoutBuilder.setGroupId("kafka-storm-topic-consumer-groupid"); //設置消費者組id // 這裏設置Spout的並行度爲3,緣由是建立topic時,指定的partition爲3 topologyBuilder.setSpout("kafka_spout", new KafkaSpout<>(kafkaSpoutBuilder.build()),3); //設置一個bolt,並指定從kafka中獲取數據的spout爲這個bolt的上游 topologyBuilder.setBolt("SplitSentenceBolt", new SplitSentenceBolt()).localOrShuffleGrouping("kafka_spout");
當上遊爲Kafka的Spout時,下游Bolt獲取值的方法
經過KafkaSpout向下遊發送的Tupe的方式是這樣的:collect.emti();
因此,咱們須要經過value獲取值:String sentence = input.getStringByField("value");
在Storm中,當Spout、Bolt是多個時,那上游的Spout或Bolt不知道將數據傳入下游的哪一個Bolt時,就須要流分組
在Storm中,提供了8種流分組方式,其中3中常用
隨機分組(Shuffle grouping)
隨機分發tuple到Bolt的任務,保證每一個任務得到相等數量的tuple。 跨服務器通訊,浪費網絡資源,儘可能不用
本地或隨機分組(Local or shuffle grouping)(重點掌握,經常使用)
優先將數據發送到本地的Task,節約網絡通訊的資源
部分關鍵字分組(Partial Key grouping)
字段分組(Fields Grouping )(瞭解,僅用於WordCount案例)
在不一樣的worker間進行數據傳輸時,會產生通訊(相同主機的進程通訊,不一樣主機需經過網絡通訊)
Disruptor是一個Queue。Disruptor是實現了「隊列」的功能,並且是一個有界隊列。而隊列的應用場景天然就是「生產者-消費者」模型。
Disruptor的核心有3個:第一,維護生產者序號;第二,消費者序號;第三,數組的長度。
緣由一:Disruptor 沒有使用鎖機制;
緣由二:Disruptor 有一個相似中間數組的組件,生產者往這個組件中放入數據,消費者取出數據
Storm的消息不丟失機制核心是acker。當Spout或Bolt處理完數據後,會標記狀態到acker中,而後經過異或算法,計算出是否有失敗的狀況。結果爲0就表示執行成功,結果不爲0就表示失敗。
當結果爲0時,acker會通知Spout執行成功,從而調用Spout的ack()方法,此時能夠在ack()方法中執行操做。
當結果不爲0時,acker會通知Spout失敗,從而調用它的fail()方法,此時能夠在該方法中從新執行。
Spout實現:在Spout中需實現ack和fail方法,而且在Spout中發送消息時須要發送消息id(惟一,實現以下)
// 生成消息id,而且把數據存放到messages的map中,若是執行失敗能夠在fail方法中調用messages中的數據從新執行 String msgId = UUID.randomUUID().toString(); messages.put(msgId, sentence); //向下遊輸出 this.collector.emit(new Values(sentence),msgId);
Bolt中的實現:若是執行成功就調用ack方法,若是執行失敗就調用fail方法
public void execute(Tuple input) { // 經過Tuple的getValueByField獲取上游傳遞的數據,其中"sentence"是定義的字段名稱 String sentence = input.getStringByField("sentence"); // 將獲取的數據進行處理 // 向下遊輸出數據,需加入錨點 for (String word : words) { // 注意這裏,須要將原始數據input傳入,在這裏稱之爲錨點,意思是將新的數據和原有數據進行關聯 this.collector.emit(input,new Values(word)); } // 若是處理成功,執行ack方法,若是處理失敗,執行fail方法,並將傳入的input傳入該方法中 this.collector.ack(input); this.collector.fail(input); }
從上看出,若是執行失敗需手動調用方法,很不方便,因此咱們能夠繼承BaseBasicBolt類,這樣當執行失敗時就不須要手動調用方法了,只須要拋出FailedException便可。請注意:就是繼承BaseBasicBolt類,在Spout中仍是需重寫ack和fail方法,這是當執行成功或失敗後會調用的方法。
好處一:若是執行失敗,不須要手動調用fail和ack方法,只須要使用try...catch包裹執行代碼,而後拋出FailedException異常便可。
好處二:向下遊輸出數據時,不須要加入錨點,只須要將目標數據加入。
好處三:能夠在Topology驅動類中設置acker(消息不丟失機制)是否啓動,設置方法以下
Config.setNumAckers(conf, ackerParal);
首先在Spout中需將發送到下游的數據進行記錄(能夠記錄到一個Map中)。若是該數據執行成功,能夠將Map中的該數據刪除(this.msgData.remove(msgId);)。若是該數據執行失敗,能夠在fail方法中從新執行該數據,就是將Map中對應的數據取出,而後從新發送到下游執行。
若是執行失敗,最好設置從新執行的限制條件:需限制失敗的重試次數,若是重試次數超出,需將該數據記錄下來(能夠保存到數據庫等本地文件中),同時刪除內存中的數據(Map集合中的該數據);在進行重試時,須要有停頓,不能失敗後立刻執行(有多是網絡問題),須要有計數,不能無限執行。
爲了解決信息過載和用戶無明確需求的問題,找到用戶感興趣的物品,纔有了個性化推薦系統。
推薦系統普遍存在於各種網站中,做爲一個應用爲用戶提供個性化的推薦。它須要一些用戶的歷史數據,通常由三個部分組成:基礎數據、推薦算法系統、前臺展現。
基礎數據包括不少維度,包括用戶的訪問、瀏覽、下單、收藏,用戶的歷史訂單信息,評價信息等不少信息;
推薦算法系統主要是根據不一樣的推薦訴求由多個算法組成的推薦模型;
前臺展現主要是對客戶端系統進行響應,返回相關的推薦信息以供展現。
迄今爲止,在個性化推薦系統中,協同過濾技術是應用最成功的技術。目前國內外有許多大型網站應用這項技術爲用戶更加智能(個性化、千人千面)的推薦內容。
核心思想:協同過濾通常是在海量的用戶中發掘出一小部分和你品位比較相似的,在協同過濾中,這些用戶成爲鄰居,而後根據他們喜歡的其餘東西組織成一個排序的目彔做爲推薦給你。
問題:如何肯定一個用戶是丌是和你有類似的品位?如何將鄰居們的喜愛組織成一個排序的目彔?
在社交項目中,如微信、QQ,顯然選擇基於用戶推薦比較好,由於推薦每每都是和人相關的。
如:在QQ登陸後,會有提示,好友的好友多是你認識的,推薦給你添加好友。
在電商項目中,用戶的數量遠大於商品數量,因此基於商品的推薦的複雜度要低,並且也比較合理。
其實,在實際的推薦系統中,每每不單是使用一種推薦,而是會多種推薦混合使用。
因此,選擇基於用戶仍是基於商品的推薦,和應用場景有很大的關係。
不管是基於用戶仍是基於商品的推薦,都是須要找到類似的用戶或者商品,才能作推薦,因此,類似度算法就變得很是重要了。
常見的類似度算法有:
歐幾里德距離算法(Euclidean Distance)
皮爾遜類似度算法(Pearson Correlation Coefficient)
基於夾角餘弦類似度算法(Consine Similarity)
基於Tanimoto係數類似度(Tanimoto Coefficient)
經過類似度計算,能夠計算出鄰居,問題來了,咱們若是選取出幾個鄰居做爲參考,進行推薦呢?
一般有2種方式:
固定數量的鄰居:K-neighborhoods
基於類似度門檻的鄰居:Threshold-based neighborhoods
Mahout使用了Taste來提升協同過濾算法的實現,它是一個基於Java實現的可擴展的,高效的推薦引擎。Taste既實現了最基本的基於用戶的和基於內容的推薦算法,同時也提供了擴展接口,使用戶能夠方便的定義和實現本身的推薦算法。同時,Taste不只僅只適用於Java應用程序,它能夠做爲內部服務器的一個組件以HTTP和Web Service的形式向外界提供推薦的邏輯。Taste的設計使它能知足企業對推薦引擎在性能、靈活性和可擴展性等方面的要求。
2008年成爲Lucene的子項目,Lucene做爲搜索引擎項目,存在不少文本數據分析和挖掘的需求(例如文本重複判斷,文本自動分類等等),致使Lucene項目中部分開發者轉向機器學習領域研究算法,最終這些機器學習算法造成最初的Mahout
吸取開源協同過濾算法項目Taste
2010年成爲Apache頂級項目
DataModel 是用戶喜愛信息的抽象接口,它的具體實現支持從任意類型的數據源抽取用戶喜愛信息。Taste 默認提供 JDBCDataModel 和 FileDataModel,分別支持從數據庫和文件中讀取用戶的喜愛信息。
DataModel接口的部分實現: org.apache.mahout.cf.taste.impl.model.GenericDataModel org.apache.mahout.cf.taste.impl.model.GenericBooleanPrefDataModel org.apache.mahout.cf.taste.impl.model.PlusAnonymousUserDataModel org.apache.mahout.cf.taste.impl.model.file.FileDataModel org.apache.mahout.cf.taste.impl.model.hbase.HBaseDataModel
UserSimilarity 和 ItemSimilarity 。UserSimilarity 用於定義兩個用戶間的類似度,它是基於協同過濾的推薦引擎的核心部分,能夠用來計算用戶的「鄰居」,這裏咱們將與當前用戶口味類似的用戶稱爲他的鄰居。ItemSimilarity 相似的,計算Item之間的類似度。
UserSimilarity 和 ItemSimilarity 類似度實現有如下幾種: CityBlockSimilarity :基於Manhattan距離類似度 EuclideanDistanceSimilarity :基於歐幾里德距離計算類似度 LogLikelihoodSimilarity :基於對數似然比的類似度 PearsonCorrelationSimilarity :基於皮爾遜相關係數計算類似度 SpearmanCorrelationSimilarity :基於皮爾斯曼相關係數類似度 TanimotoCoefficientSimilarity :基於谷本系數計算類似度 UncenteredCosineSimilarity :計算 Cosine 類似度
UserNeighborhood 用於基於用戶類似度的推薦方法中,推薦的內容是基於找到與當前用戶喜愛類似的鄰居用戶的方式產生的。UserNeighborhood 定義了肯定鄰居用戶的方法,具體實現通常是基於 UserSimilarity 計算獲得的。
UserNeighborhood 主要實現有兩種: NearestNUserNeighborhood:對每一個用戶取固定數量N個最近鄰居 ThresholdUserNeighborhood:對每一個用戶基於必定的限制,取落在類似度限制之內的全部用戶爲鄰居
Recommender 是推薦引擎的抽象接口,Taste 中的核心組件。程序中,爲它提供一個 DataModel,它能夠計算出對不一樣用戶的推薦內容。實際應用中,主要使用它的實現類 GenericUserBasedRecommender 或者 GenericItemBasedRecommender,分別實現基於用戶類似度的推薦引擎或者基於內容的推薦引擎。
Recommender分爲如下幾種實現: GenericUserBasedRecommender:基於用戶的推薦引擎 GenericBooleanPrefUserBasedRecommender:基於用戶的無偏好值推薦引擎 GenericItemBasedRecommender:基於物品的推薦引擎 GenericBooleanPrefItemBasedRecommender:基於物品的無偏好值推薦引擎
RecommenderEvaluator有如下幾種實現: AverageAbsoluteDifferenceRecommenderEvaluator :計算平均差值 RMSRecommenderEvaluator :計算均方根差
@Test public void testBaseUser() throws Exception { String fileName = "user_item.data"; File file = FileUtils.toFile(TestMahout.class.getClassLoader().getResource(fileName)); // 第一步,定義數據模型 DataModel dataModel = new FileDataModel(file); // 第二步,定義相識度,這裏使用的歐幾里得 UserSimilarity userSimilarity = new EuclideanDistanceSimilarity(dataModel); // 第三步,定義最近鄰域,這裏使用的是固定數量的鄰居 UserNeighborhood userNeighborhood = new NearestNUserNeighborhood(10, userSimilarity, dataModel); long[] longs = userNeighborhood.getUserNeighborhood(1); for (long aLong : longs) { System.out.println(aLong); } // 第四步,定義推薦器,這裏使用的是基於用戶的推薦 Recommender recommender = new GenericUserBasedRecommender(dataModel, userNeighborhood, userSimilarity); LongPrimitiveIterator userIDs = dataModel.getUserIDs(); while (userIDs.hasNext()) { Long userId = userIDs.next(); List<RecommendedItem> recommendedItemList = recommender.recommend(userId, 4); StringBuffer sb = new StringBuffer(); for (RecommendedItem item : recommendedItemList) { sb.append(item.getItemID() + "|"+item.getValue()+","); } System.out.println(userId + "-->" + sb); } }
@Test public void testBaseItem() throws Exception { String fileName = "user_item.data"; File file = FileUtils.toFile(TestMahout.class.getClassLoader().getResource(fileName)); // 第一步,定義數據模型 DataModel dataModel = new FileDataModel(file); // 第二步,定義相識度,這裏使用的歐幾里得 ItemSimilarity itemSimilarity = new EuclideanDistanceSimilarity(dataModel); // 第三步,定義推薦器,這裏使用的是基於用戶的推薦 Recommender recommender = new GenericItemBasedRecommender(dataModel, itemSimilarity); LongPrimitiveIterator userIDs = dataModel.getUserIDs(); while (userIDs.hasNext()) { Long userId = userIDs.next(); List<RecommendedItem> recommendedItemList = recommender.recommend(userId, 2); StringBuffer sb = new StringBuffer(); for (RecommendedItem item : recommendedItemList) { sb.append(item.getItemID() + "|"+item.getValue()+","); } System.out.println(userId + "-->" + sb); } }
@Test public void testBaseItem2() throws Exception { String fileName = "user_item.data"; File file = FileUtils.toFile(TestMahout.class.getClassLoader().getResource(fileName)); // 第一步,定義數據模型 DataModel dataModel = new FileDataModel(file); // 第二步,定義相識度,這裏使用的歐幾里得 ItemSimilarity itemSimilarity = new EuclideanDistanceSimilarity(dataModel); // 第三步,定義推薦器,這裏使用的是基於用戶的推薦 GenericItemBasedRecommender recommender = new GenericItemBasedRecommender(dataModel, itemSimilarity); LongPrimitiveIterator userIDs = dataModel.getUserIDs(); //用戶1,正在瀏覽103商品,進行推薦 Long userId = 1L; Long itemId = 103L; List<RecommendedItem> recommendedItemList = recommender.recommendedBecause(userId,itemId,2); StringBuffer sb = new StringBuffer(); for (RecommendedItem item : recommendedItemList) { sb.append(item.getItemID() + "|"+item.getValue()+","); } System.out.println(userId + "-->" + sb); }
提交到hadoop運行,須要有2步操做:
第一步,將須要計算的數據上傳到hdfs
第二步,經過hadoop執行mahout-examples-0.13.0-job.jar中的RecommenderJob類
第三步,在輸出結果文件中查看結果
hadoop jar mahout-examples-0.13.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob --input /user_item.data --output /cc -s SIMILARITY_EUCLIDEAN_DISTANCE
參數說明:
--input(path)(-i): 存儲用戶偏好數據的目錄,該目錄下能夠包含一個或多個存儲用戶偏好數據的文本文件; --output(path)(-o): 結算結果的輸出目錄 --numRecommendations (integer): 爲每一個用戶推薦的item數量,默認爲10 --usersFile (path): 指定一個包含了一個或多個存儲userID的文件路徑,僅爲該路徑下全部文件包含的userID作推薦計算 (該選項可選) --itemsFile (path): 指定一個包含了一個或多個存儲itemID的文件路徑,僅爲該路徑下全部文件包含的itemID作推薦計算 (該選項可選) --filterFile (path): 指定一個路徑,該路徑下的文件包含了[userID,itemID]值對,userID和itemID用逗號分隔。計算結果將不會爲user推薦[userID,itemID]值對中包含的item (該選項可選) --booleanData (boolean): 若是輸入數據不包含偏好數值,則將該參數設置爲true,默認爲false --maxPrefsPerUser (integer): 在最後計算推薦結果的階段,針對每個user使用的偏好數據的最大數量,默認爲10 --minPrefsPerUser (integer): 在類似度計算中,忽略全部偏好數據量少於該值的用戶,默認爲1 --maxSimilaritiesPerItem (integer): 針對每一個item的類似度最大值,默認爲100 --maxPrefsPerUserInItemSimilarity (integer): 在item類似度計算階段,針對每一個用戶考慮的偏好數據最大數量,默認爲1000 --similarityClassname (classname)(-s): 向量類似度計算類 ([SIMILARITY_COOCCURRENCE, SIMILARITY_LOGLIKELIHOOD, SIMILARITY_TANIMOTO_COEFFICIEN T, SIMILARITY_CITY_BLOCK, SIMILARITY_COSINE, SIMILARITY_PEARSON_CORRELATION , SIMILARITY_EUCLIDEAN_DISTANCE] ) outputPathForSimilarityMatrix:SimilarityMatrix輸出目錄 --randomSeed:隨機種子 –sequencefileOutput:序列文件輸出路徑 --tempDir (path): 存儲臨時文件的目錄,默認爲當前用戶的home目錄下的temp目錄 --startPhase --endPhase --threshold (double): 忽略類似度低於該閥值的item對
補充網站:http://www.javashuo.com/article/p-gzuteolz-mz.html
HBase是一個使用Java語言實現的,構建於Hadoop分佈式文件系統(HDFS)上的分佈式數據庫
。
Hbase是參考谷歌的BigTable的論文開發實現的,Hadoop 生態系統引入了Bigtable的大部分功能。
海量存儲:Hbase單表能夠有百億行,百萬列,相對計較傳統關係型數據庫而言,存儲能力很是強悍。
列式存儲:建立表時,無需指定具體的列,根據數據的插入動態插入;能夠針對列進行權限控制和讀取。
多版本:能夠爲數據添加版本信息,如用戶信息的logo變動歷史。
稀疏性:爲空的列不佔用實際存儲空間。
高擴展、高可用性:底層基於HDFS,高可用和擴展性獲得的了保障。
表(table):用於存儲管理數據,具備稀疏的、面向列的特色。HBase中的每一張表,就是所謂的大表(Bigtable)。
行鍵(RowKey):相似於MySQL中的主鍵,HBase根據行鍵來快速檢索數據,一個行鍵對應一條記錄。與MySQL主鍵不一樣的是,HBase的行鍵是自然固有的,每一行數據都存在行鍵。
列族(簇)(ColumnFamily):是列的集合。列族在表定義時須要指定,而列在插入數據時動態指定。列中的數據都是以二進制形式存在,沒有數據類型。在物理存儲結構上,每一個表中的每一個列族單獨以一個文件存儲(參見圖1.2)。一個表能夠有多個列族。
時間戳(TimeStamp):是列的一個屬性,是一個64位整數。由行鍵和列肯定的單元格,能夠存儲多個數據,每一個數據含有時間戳屬性,數據具備版本特性。可根據版本(VERSIONS)或時間戳來指定查詢歷史版本數據,若是都不指定,則默認返回最新版本的數據。
全局架構:
有此能夠看出,Hbase須要依賴於ZooKeeper和HDFS。
Zookeeper
保證任什麼時候候,集羣中只有一個running master,避免單點問題;
存貯全部Region的尋址入口,包括-ROOT-表地址、HMaster地址;
實時監控Region Server的狀態,將Region server的上線和下線信息,實時通知給Master;
存儲Hbase的schema,包括有哪些table,每一個table有哪些column family。
Master
能夠啓動多個HMaster,經過Zookeeper的Master Election機制保證總有一個Master運行。
RegionServer
HBase中最核心的模塊,主要負責響應用戶I/O請求,向HDFS文件系統中讀寫數據。
維護Master分配給它的region,處理對這些region的IO請求;
負責切分在運行過程當中變得過大的region。
HDFS
負責存儲數據。
經過hbase shell命令進行命令行模式進行操做。
#指定表名,列族名 create 'user' , 'user_info', 'login_info' #建立表 -表名 -列族1名 -列族2名 list #查看全部的表 describe 'user' #查看該表的具體信息
put 'user', '1001', 'user_info:name','張三' put 'user', '1001', 'user_info:address', '上海' put 'user', '1001', 'login_info:user_name', 'zhangsan' put 'user', '1001', 'login_info:password', '123456' put 'user', '1002', 'user_info:name','李四' put 'user', '1002', 'user_info:address', '北京' put 'user', '1002', 'login_info:user_name', 'lisi' put 'user', '1002', 'login_info:password', '123456'
Hbase只支持2種查詢數據,單行查詢,全表查詢。
get 'user', '1001' #查詢所有數據 scan 'user' #查詢一條數據 scan 'user', {LIMIT => 1}
#刪除一行中的一列數據 delete 'user','1002', 'user_info:name' #刪除一行數據 deleteall 'user','1002' #清空表 truncate 'user'
#修改用1001的密碼爲888888,直接put覆蓋便可 put 'user', '1001', 'login_info:password', '888888' #刪除列族 alter 'user' , {NAME=>'user_info', METHOD => 'delete'} #增長列族 alter 'user', 'user_info', {NAME => 'user_info_2' , VERSIONS => 5}
#刪除表以前先要禁用表,再刪除 disable 'user' drop 'user'