kafka介紹

kakfa:

1.簡介

1.定義:
kafka是一個分佈式、支持分區(paritition)、多副本的,基於zookeeper協調的分佈式消息系統

2.能作什麼
能夠實時的處理大量數據,知足各類場景需求。hadoop批處理,spark/storm流式處理等

3.kafka的特性:
3.1 高吞吐、低延遲
    kafka每秒能夠處理幾十萬的消息,延遲最低只有幾毫秒
    每一個topic能夠分多個partition,consumer group對partition進行consume操做
3.2 可擴展性
    kafka集羣支持熱擴展
3.3 持久性、可靠性
    消息被持久化到本地磁盤,而且支持數據備份防止數據丟失
3.4 容錯性
    容許集羣中節點失敗(若是副本數n,容許n-1個節點失敗)
3.5 高併發
    支持數千個客戶端同時讀寫

4.使用場景
4.1 日誌收集
4.2 消息系統
4.3 流式處理    

5.流程
producers 往brokers裏面的指定topic寫消息
consumer  從brokers裏面拉取指定topic的消息進行業務處理
** kakfa.apache.org
** Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。

重要名詞:
Producer(生產者)
消息產生者,產生消息,將其發送到Broker;

Consumer(消費者)
使用數據的客戶端,從Broker獲得消息;

Broker(中間人,消息中轉站)
即kafka的Server;集羣(cluster)由一堆Broker組成

Zookeeper
管理鏈接信息,包括各個節點的IP,端口等;Producer和Consumer須要到Zookeeper請求Broker的信息,從而進行消息的收發;
一個新的Broker的啓動也須要到Zookeeper註冊; zookeeper也能夠配集羣。目的是防止某一臺掛了。
producer和consumer經過zookeeper去發現topic,而且經過zookeeper來協調生產和消費的過程。

Topic
Topic,是kafka對消息分類的依據;一條消息,必須有一個與之對應的Topic;
好比如今又兩個Topic,分別是Love和Hate,Producer向Love發送一個消息蘋果,而後向Hate發送一個消息土豆;那麼,
訂閱Love的Consumer就會收到消息蘋果,訂閱Hate的Consumer就會收到消息土豆;
每一個Consumer能夠同時訂閱多個Topic,也便是說,同時訂閱Love和Hate的Consumer能夠同時收到蘋果和土豆。

Message
Message就是咱們所說的消息,是KAfKA操做的對象,消息是按照Topic存儲的;
KAFKA中按照必定的期限保存着全部發布過的Message,無論這些Message是否被消費過。kafka默認存儲7天。

2.安裝

kafka_2.10-0.8.2.1
    ** 2.10是scala的版本,kafka是用Scala開發的,scale的安裝版本應該對應
    ** 0.8.2.1是kafka的版本

1、安裝jdk和zookeeper,並啓動
    $ sbin/zkServer.sh start    --啓動
    $ sbin/zkServer.sh status   --查看狀態,也可jps查看
    
2、安裝scala
    [tom@blue01 cdh]$ tar zxvf /opt/softwares/scala-2.10.4.tgz
    $ su - 
    # vi /etc/profile        
    #SCALA_HOME
    SCALA_HOME=/opt/modules/cdh/scala-2.10.4
    export PATH=$PATH:$SCALA_HOME/bin
    $ scala -version

3、安裝kafka
    $ tar zxvf /opt/softwares/kafka_2.10-0.8.2.1.tgz

4、修改配置文件
a)
server.properties:
# 消息中轉站,每一個broker id必需惟一
broker.id=0
port=9092
# 主機名,去掉註解
host.name=blue01.mydomain
# kafka存儲數據的目錄,並不是存放日誌的目錄,$ mkdir data
log.dirs=/opt/modules/cdh/kafka_2.10-0.8.2.1/data    
# 指定zookeeper服務器
zookeeper.connect=blue01.mydomain:2181

b)            
producer.properties:
# broker列表
metadata.broker.list=blue01.mydomain:9092

c)
consumer.properties:
zookeeper.connect=blue01.mydomain:2181

5、啓動kafka
bin/kafka-server-start.sh config/server.properties


kafka初次使用:
1、建立一個topic
# 副本因子一般是奇數,不大於集羣服務器臺數
# 分區數不能大於集羣服務器臺數
bin/kafka-topics.sh --create --zookeeper blue01.mydomain:2181 --replication-factor 1 --partitions 1 --topic testTopic

# 查看topic list
bin/kafka-topics.sh --list --zookeeper blue01.mydomain:2181

2、啓動生產者
bin/kafka-console-producer.sh --broker-list blue01.mydomain:9092 --topic testTopic

3、啓動消費者,再開一個窗口
bin/kafka-console-consumer.sh --zookeeper blue01.mydomain:2181 --topic testTopic --from-beginning

** 生產者:flume、程序
** 消費者:spark、storm、impala

3.kafka與flume整合

生產者flume和kafka整合:    

** flume-collector
                channel1[c3]    -->  HDFS[k3]
    source[r3]
                channel2[c4]    -->     kafka[k4]
            
1、flume-apache.conf    --不須要修改
    ** 監控apache web應用的日誌文件
    
2、flume-hive.conf      --不須要修改
    ** 監控hive日誌文件
    $ sbin/start-dfs.sh ;sbin/start-yarn.sh ;mr-jobhistory-daemon.sh start historyserver

3、修改flume-collector.conf   ------------

# 一個源經過兩個管道同時向兩個目標下沉
agent3.sources = r3
agent3.channels = c3 c4
agent3.sinks = k3 k4
# 優化參數,複製
agent3.source.r3.selector.type = replicating

# define sources
agent3.sources.r3.type = avro
agent3.sources.r3.bind = 192.168.122.128
agent3.sources.r3.port = 4545

# define channels --> c3
agent3.channels.c3.type = memory
agent3.channels.c3.capacity = 1000
agent3.channels.c3.transactionCapacity = 100

# define channels --> c4
agent3.channels.c4.type = memory
agent3.channels.c4.capacity = 1000
agent3.channels.c4.transactionCapacity = 100

# define sinks : k4
agent3.sinks.k4.type = org.apache.flume.sink.kafka.KafkaSink
agent3.sinks.k4.brokerList = blue01.mydomain:9092
agent3.sinks.k4.topic = testTopic

# define sinks : k3
#啓用設置多級目錄,這裏按年///時 2級目錄,每一個小時生成一個文件夾
agent3.sinks.k3.type = hdfs
agent3.sinks.k3.hdfs.path=hdfs://192.168.122.128:8020/flume3/%Y%m%d/%H
agent3.sinks.k3.hdfs.filePrefix = accesslog
#啓用按時間生成文件夾
agent3.sinks.k3.hdfs.round=true
#設置round單位:小時  
agent3.sinks.k3.hdfs.roundValue=1
agent3.sinks.k3.hdfs.roundUnit=hour
#使用本地時間戳  
agent3.sinks.k3.hdfs.useLocalTimeStamp=true

agent3.sinks.k3.hdfs.batchSize=1000
agent3.sinks.k3.hdfs.fileType=DataStream
agent3.sinks.k3.hdfs.writeFormat=Text

#設置解決文件過多太小問題
#每600秒生成一個文件
agent3.sinks.k3.hdfs.rollInterval=600
#當達到128000000bytes時,建立新文件 127*1024*1024
#實際環境中若是按照128M回顧文件,那麼這裏設置通常設置成127M
agent3.sinks.k3.hdfs.rollSize=128000000
#設置文件的生成不和events數相關
agent3.sinks.k3.hdfs.rollCount=0
#設置成1,不然當有副本複製時就從新生成文件,上面三條則沒有效果
agent3.sinks.k3.hdfs.minBlockReplicas=1

# bind the sources and sinks to the channels
agent3.sources.r3.channels = c3 c4
agent3.sinks.k3.channel = c3
agent3.sinks.k4.channel = c4

------------------

測試:(接下來的操做要打開一堆窗口)
1、刷新網頁
    $ su -
    # service httpd start
    $ su - tom
    $ tail -F /var/log/httpd/access_log
    $ bin/flume-ng agent --conf conf/ --name agent1 --conf-file conf/flume-apache.conf
    
2、啓動CDH Hadoop,啓動hive
    $ tail -F /opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs/hive.log
    > show databases;
    $ bin/flume-ng agent --conf conf/ --name agent2 --conf-file conf/flume-hive.conf
    
3、啓動agent3:
    $ bin/flume-ng agent --conf conf/ --name agent3 --conf-file conf/flume-collector.conf
    進入CDH Hadoop,監控日誌變化,注意:路徑要修改(監控.temp文件效果會明顯點)
    $ bin/hdfs dfs -tail -f /flume3/20161226/11/accesslog.1482724356118.tmp
    
4、啓動zookeeper
    $ sbin/zkServer.sh start
   啓動kafka
    $ bin/kafka-server-start.sh config/server.properties
   啓動kafka--消費者:
    $ bin/kafka-console-consumer.sh --zookeeper blue01.mydomain:2181 --topic testTopic --from-beginning

PS:
** 查看以前執行過的命令
$ history | grep flume
相關文章
相關標籤/搜索