分佈式消息緩存Kafka
一、消息中間件:生產者和消費者 生產者、消費者、數據流(消息)
Kafka架構:
procedure:生產者
consumer:消費者
broker:容錯存儲
topic:分類主題、標籤
consumer group:一個consumer最多消費一個分區的數據 consumer數量=partitions
磁盤順序讀寫,省掉尋道時間,提升性能
零字節拷貝:內核空間和用戶空間不直接拷貝、SendFile
/opt/bigdata/kafka_2.11-1.0.0/kafka-log2s/logkafka-0/00000000000000000000.index index的序號就是message在日誌文件中的相對offset(偏移量)
offsetIndex是稀疏索引,先根據offset找到對應log文件,計算 offset - (log文件第一個offset -1) 獲得相對索引,再到index文件找到消息。若是index找不到,則取最近的,再去log文件對應位置向下查找
ack: 0 :不等待broker返回確認消息,無阻塞
1 :partitions 中的leader 保存成功
-1: partitions 中的leader和follower都成功
啓動ZK:
啓動Kafka:kafkaStart.sh
nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server0.properties &
nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server1.properties &
nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server2.properties &
建立Topic:
kafka-topics.sh --create --zookeeper bigdata:2181,bigdata:2182,bigdata:2183 --replication-factor 1 --partitions 1 --topic logkafka
--partitions 能夠提升消費併發
查看Topic:
kafka-topics.sh --list --zookeeper bigdata:2181
kafka-topics.sh --describe --zookeeper bigdata:2181 --topic test (指定Topic,不然查看全部topic的詳細信息)
發送消息:
kafka-console-producer.sh --broker-list localhost:9092 --topic logkafka
接受消息:
kafka-console-consumer.sh --zookeeper bigdata:2181 --topic logkafka --from-beginning (--from-beginning . 是否從頭開始消費消息)
中止Kafka:kafkaStop.sh
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server0.properties &
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server1.properties &
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server2.properties &
兩種方式鏈接Kafka:簡單理解爲:Receiver方式是經過zookeeper來鏈接kafka隊列,Direct方式是直接鏈接到kafka的節點上獲取數據
Receiver:
一、Kafka中topic的partition與Spark中RDD的partition是沒有關係的,所以,在KafkaUtils.createStream()中,提升partition的數量,只會增長Receiver的數量,也就是讀取Kafka中topic partition的線程數量,不會增長Spark處理數據的並行度。
二、能夠建立多個Kafka輸入DStream,使用不一樣的consumer group和topic,來經過多個receiver並行接收數據。
三、若是基於容錯的文件系統,好比HDFS,啓用了預寫日誌機制,接收到的數據都會被複制一份到預寫日誌中。所以,在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。
Direct:
一、簡化並行讀取:若是要讀取多個partition,不須要建立多個輸入DStream,而後對它們進行union操做。Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行從Kafka中讀取數據。因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。
二、高性能:若是要保證零數據丟失,在基於receiver的方式中,須要開啓WAL機制。這種方式其實效率低下,由於數據實際上被複制了兩份,Kafka本身自己就有高可靠的機制會對數據複製一份,而這裏又會複製一份到WAL中。而基於direct的方式,不依賴Receiver,不須要開啓WAL機制,只要Kafka中做了數據的複製,那麼就能夠經過Kafka的副本進行恢復。
三、一次且僅一次的事務機制:基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制能夠保證數據零丟失的高可靠性,可是卻沒法保證數據被處理一次且僅一次,可能會處理兩次。由於Spark和ZooKeeper之間多是不一樣步的。基於direct的方式,使用kafka的簡單api,Spark Streaming本身就負責追蹤消費的offset,並保存在checkpoint中。Spark本身必定是同步的,所以能夠保證數據是消費一次且僅消費一次。因爲數據消費偏移量是保存在checkpoint中,所以,若是後續想使用kafka高級API消費數據,須要手動的更新zookeeper中的偏移量
二、API操做
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
Scala版 Producer :
package com.kafka
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
object producer {
def main(args: Array[String]): Unit = {
// 傳參
if (args.length < 4){
System.err.println("Usage: producer <metadataBrokerList> <topics> <messageSec> <wordsPerMessage>")
System.exit(1)
}
val Array(brokers, topics, messageSec, wordsPerMessage) = args
// ZK 配置
val zkProps = new HashMap[String, Object]()
zkProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
zkProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
zkProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Kafka Producer
val producer = new KafkaProducer[String, String](zkProps)
var i = 0
for ( i <- 1 to 10) {
(1 to messageSec.toInt).foreach { messageNum =>
val msg = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
val msgs = new ProducerRecord[String, String](topics, null, msg)
producer.send(msgs)
}
Thread.sleep(100)
}
}
}
三、整合Flume:
conf1:exec-mem-avro.conf
# Name the components on this agent
a1.sources = exec-source
a1.channels = memory-channel
a1.sinks = avro-sink
# configure for sources
a1.sources.exec-source.type = exec
a1.sources.exec-source.command = tail -F /opt/datas/log-collect-system/log_server.log
# configure for channels
a1.channels.memory-channel.type = memory
a1.channels.memory-channel.capacity = 1000
a1.channels.memory-channel.transactionCapacity = 100
# configure for sinks
a1.sinks.avro-sink.type = avro
a1.sinks.avro-sink.hostname = localhost
a1.sinks.avro-sink.port = 44444
# configure
a1.sinks.avro-sink.channel = memory-channel
a1.sources.exec-source.channels = memory-channel
Kafka conf:exec-memory-kafka.cnf
# Name the components on this agent
a1.sources = avro-source
a1.channels = memory-channel
a1.sinks = logger-sink
# configure for sources
a1.sources.avro-source.type = avro
a1.sources.avro-source.bind = localhost
a1.sources.avro-source.port = 44444
# configure for channels
a1.channels.memory-channel.type = memory
a1.channels.memory-channel.capacity = 1000
a1.channels.memory-channel.transactionCapacity = 100
# configure for sinks
a1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.kafka-sink.bootstrap.servers = bigdata:9092,bigdata:9093,bigdata:9094
a1.sinks.kafka-sink.brokerList = bigdata:9092,bigdata:9093,bigdata:9094
a1.sinks.kafka-sink.topic = logkafka
# configure
a1.sinks.kafka-sink.channel = memory-channel
a1.sources.avro-source.channels = memory-channel