【原創】大數據基礎之Kafka(1)簡介、安裝及使用

kafka2.0html

 

http://kafka.apache.orgnode

 

一 簡介

Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalablefault-tolerantwicked fast, and runs in production in thousands of companies.apache

Kafka經常使用來構建實時數據管道或者流式應用。它支持水平擴展,容錯,而且異常的快,已經在數千家公司的生產環境中使用。written in Scala by LinkedInbootstrap

A streaming platform has three key capabilities:api

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

關鍵能力:發佈訂閱、消息存儲、實時處理bash

First a few concepts:服務器

  • Kafka is run as a cluster on one or more servers that can span multiple datacenters.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

Kafka以集羣方式運行在一臺或臺個服務器上,Kafka中存儲消息記錄的分類叫作topics,每個消息記錄都包含key、value和timestamp。併發

 

1 傳統MQ侷限

Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. Each of these two models has a strength and a weakness. The strength of queuing is that it allows you to divide up the processing of data over multiple consumer instances, which lets you scale your processing. Unfortunately, queues aren't multi-subscriber—once one process reads the data it's gone. Publish-subscribe allows you broadcast data to multiple processes, but has no way of scaling processing since every message goes to every subscriber.app

傳統消息隊列有兩種模型:隊列 和 發佈訂閱;負載均衡

在隊列模型中,有不少消費者能夠從一臺服務器上讀消息,而且每條消息只會被一個消費者處理;

在發佈訂閱模型中,每條消息都會被廣播到全部的消費者;

二者各有利弊,隊列模型的優勢是容許你把消息處理並行分佈到多個消費者中,能夠提高消息處理速度;缺點是一旦有消費者讀到一條消息,這條消息就消失了;發佈訂閱模型優勢是容許你廣播一條消息給多個消費者;缺點是沒法並行處理消息;

後邊能夠看到,Kafka兼具這兩種模型的優勢;

A traditional queue retains records in-order on the server, and if multiple consumers consume from the queue then the server hands out records in the order they are stored. However, although the server hands out records in order, the records are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the records is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of "exclusive consumer" that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing.

傳統消息隊列在服務器端保持消息的順序,若是有多個消費者同時從隊列中消費消息,服務器會按照消息的順序派發消息;儘管如此,因爲消費者消費消息時是異步的,因此在消費的時候極有多是亂序的;這代表消息的順序在並行消息處理中丟失了;此時,一般的作法是隻容許一個消費者來消費消息來保證消息被處理時的順序性;

 

2 Role & API

 

Kafka has five core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
  • The AdminClient API allows managing and inspecting topics, brokers, and other Kafka objects.

Kafka中最經常使用的是Producer API(發送消息)和Consumer API(消費消息),另外還有Streams API、Connector API、AdminClient API;

In Kafka the communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. This protocol is versioned and maintains backwards compatibility with older version. We provide a Java client for Kafka, but clients are available in many languages.

 

3 Topic & Partition

topic is a category or feed name to which records are published. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

topic是指消息記錄的類別,發送的消息須要指定一個topic,同時一個topic能夠被多個消費者消費;

For each topic, the Kafka cluster maintains a partitioned log that looks like this:

每個topic都有一個或多個partition,每一個partition對應一個log文件

 

Each partition is an ordered, immutable sequence of records that is continually appended to—a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

每一個partition由順序的消息組成,這些消息會順序追加到一個log文件中;partition中的每條消息都有一個順序id稱爲offset,來惟必定義一個partition中的一條消息;

The Kafka cluster durably persists all published records—whether or not they have been consumed—using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so storing data for a long time is not a problem.

Kafka集羣會持久化全部的消息(不管它們是否被消費過),另外有一個保留時間配置,當保留時間設置爲2天,這時一個消息發佈後2天內均可以被消費到,超過2天它會被丟棄來釋放空間;因此Kafka的性能與數據大小無關,即便存儲很長時間的數據也沒問題;

 

In fact, the only metadata retained on a per-consumer basis is the offset or position of that consumer in the log. This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads records, but, in fact, since the position is controlled by the consumer it can consume records in any order it likes. For example a consumer can reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start consuming from "now".

消費者端惟一的元數據就是目前消費到的每一個partition的offset;offset是由消費者控制的,一般狀況下一個消費者在它不斷讀消息的同時不斷增長offset,一個消費者也能夠經過修改offset來從新消費部分消息或者跳過部分消息;

The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism—more on that in a bit.

一個topic能夠有多個partitions,這樣有兩個好處,一個是容許topic的數據量超過單機容量限制,一個是支持併發(包括髮送和消費);

 

The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.

partition分佈在Kafka集羣的全部服務器中的,每一臺服務器都會處理一個或多個partition的請求,每一個partition都會經過配置的數量進行服務器間備份來實現容錯;

Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.

每一個partition都有一個服務器稱爲leader,另外還有0個或多個服務器稱爲follower,這裏的數量由副本數決定;leader會處理該partition全部的讀寫請求,follower只是被動的同步leader的數據,若是leader掛了,其中一個follower會自動成爲新的leader;每一個服務器都會做爲一些partition的leader,同時也會做爲另一些partition的follower;

 

4 Producer & Consumer

Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). 

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

producer用來發送消息到topic,producer能夠決定將消息發送到哪一個partition,這裏經常使用的是隨機策略,另外也能夠根據key來分區,即相同key的消息會被髮送到一個partition;

consumer經過group來分組,topic裏的每條消息只會被一個group中的一個consumer消費到;

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

若是全部的consuemr都在一個group裏,它們之間會自動作負載均衡;

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

若是全部consumer都在不一樣的group裏,每條消息都會被廣播到全部的consumer;

 

A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.

The way consumption is implemented in Kafka is by dividing up the partitions in the log over the consumer instances so that each instance is the exclusive consumer of a "fair share" of partitions at any point in time. This process of maintaining membership in the group is handled by the Kafka protocol dynamically. If new instances join the group they will take over some partitions from other members of the group; if an instance dies, its partitions will be distributed to the remaining instances.

Kafka上的消息消費是經過在全部的consumer之間平均分配partition實現;

Kafka only provides a total order over records within a partition, not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over records this can be achieved with a topic that has only one partition, though this will mean only one consumer process per consumer group.

Kafka只支持partition內部的消息順序,這個特色加上能夠經過key來控制消息分區,能夠知足絕大多數應用對消息消費有序性的需求;若是你想嚴格要求topic內消息消費的有序性,只能經過topic內只有一個partition+消費者group內只有一個consumer來實現;

 

5 Guarantee

At a high-level Kafka gives the following guarantees:

  • Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a record M1 is sent by the same producer as a record M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
  • A consumer instance sees records in the order they are stored in the log.
  • For a topic with replication factor N, we will tolerate up to N-1 server failures without losing any records committed to the log.

一個producer發送到一個topic一個partition上的消息是按照它們發送的順序追加的,即發送順序決定存儲順序;一個consumer消費一個topic一個partition的消息是按照它們存儲的順序,即存儲順序決定消費順序;N個副本能夠允許N-1個server掛掉;

 

二 安裝使用

tar  -xzf kafka_2.11-2.0.0.tgz
cd  kafka_2.11-2.0.0

1 單機啓動

> bin /zookeeper-server-start .sh config /zookeeper .properties
> bin /kafka-server-start .sh config /server .properties

2 命令行客戶端

2.1 建立和查看topic

> bin /kafka-topics .sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic  test
> bin /kafka-topics .sh --list --zookeeper localhost:2181
> bin /kafka-topics .sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
     Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

2.2 生產和消費消息

> bin /kafka-console-producer .sh --broker-list localhost:9092 --topic  test
> bin /kafka-console-consumer .sh --bootstrap-server localhost:9092 --topic  test  --from-beginning

2.3 修改和查看broker配置

> bin /kafka-configs .sh --bootstrap-server localhost:9092 --entity- type  brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
> bin /kafka-configs .sh --bootstrap-server localhost:9092 --entity- type  brokers --entity-name 0 --describe
> bin /kafka-configs .sh --bootstrap-server localhost:9092 --entity- type  brokers --entity-name 0 --alter --delete-config log.cleaner.threads
From Kafka version 1.1 onwards, some of the broker configs can be updated without restarting the broker. See the  Dynamic Update Modecolumn in  Broker Configs for the update mode of each broker config.
  • read-only: Requires a broker restart for update
  • per-broker: May be updated dynamically for each broker
  • cluster-wide: May be updated dynamically as a cluster-wide default. May also be updated as a per-broker value for testing.

3 集羣啓動

3.1 配置

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=$zookeeper_server1:port1

The broker.id property is the unique and permanent name of each node in the cluster. 

其餘重要配置:

auto.create.topics.enable 發送消息時若是topic不存在是否自動建立

delete.topic.enable 是否容許刪除topic

num.partitions 自動建立topic時的默認分區數量

default.replication.factor 自動建立topic時的默認副本數量

broker.id.generation.enable broker的id是否自增

host.name

log.dirs 數據存放目錄,默認在/tmp,必須修改

log.cleanup.policy 數據清理策略

log.retention.bytes

log.retention.ms

log.roll.ms

log.segment.bytes

replica.lag.time.max.ms

log.message.timestamp.type

num.network.threads

num.io.threads

compression.type

相關文章
相關標籤/搜索