1、關於Kafka的一些概念和理解html
Kafka是一個分佈式的數據流平臺,它基於獨特日誌文件形式,提供了高性能消息系統功能。也能夠用於大數據流管道。 java
Kafka維護了按目錄劃分的消息訂閱源,稱之爲 Topic。 node
稱發佈消息到Topic的工程爲生產者。 apache
稱訂閱Topic和處理髮布的消息的訂閱源的工程爲消費者。 bootstrap
Kafka以一個或者多個服務器組成的集羣的形式運行,每一個服務器被稱爲broker。 api
Kafka客戶端和服務器端經過TCP協議鏈接,並提供了Java客戶端,許多其餘語言的客戶端也有。 服務器
對於每一個Topic,Kafka集羣維護了分區的日誌文件(分區一、分區二、分區3),每一個分區(partition)是順序的、不可改變的、一直不停地日後面追加的消息隊列,稱之爲提交日誌(commit log),每一個在其中的消息都有一個稱之爲offset的序列號,來惟一的標識在分區裏的每條消息。 app
Kafka集羣保存了全部發布的消息,無論他們是否被消費,保存時間期限是能夠配置的。Kafka對於性能表現對於數據的數量是恆定的,因此它處理大數據量沒有任何問題。 socket
消息系統一般有兩個模型:排隊模式和廣播模式,排隊模式是許多消費者同時去服務器爭奪數據,可是一條數據只分發給一個消費者,廣播模式是消息廣播給全部消費者,每一個消費者均可以拿到消息。Kafka經過consumer group統一律括了這兩種模式。 分佈式
消費者們都給本身定了一個group name(id) 的標籤,每條發佈到topic的消息都會發給每一個訂閱的consumer group裏面的一個且僅一個成員。consumers能夠分佈在不一樣的進程或者服務器上。
message、partition和consumer的關係
一、message按必定hash邏輯分發到topic的某個partition;
二、一個consumer能夠鏈接多個partition;
三、全部partition都會有consumer線程去鏈接,這個consumer的分配是自動的,沒法指定某個consumer鏈接哪個partition;
四、consumer鏈接的partitions是固定的,不會中途自動變動,好比consumer1鏈接的是partition1和partition3,consumer2鏈接的是partition2,這個分配中途不會本身變化。
五、consumer若是多於partition數,則多餘的那部分consumer會連不到partition而空閒。
Kafka服務器經常使用腳本命令
啓動kafka:
bin/kafka-server-start.sh config/server.properties &
中止kafka:
bin/kafka-server-stop.sh
一、Topic操做
建立topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic TEST2
刪除topic:
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicname
查看全部topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看某個topic詳情:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name
修改topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic TEST2 --partitions 2
二、消費消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
三、生產消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
按ctrl+c結束(^C)
consumer_group
一、查看有哪些consumer groups
./kafka-consumer-groups.sh --bootstrap-server 172.16.1.170:9092,172.16.1.171:9092,172.16.172:9092 --list --new-consumer
二、查看指定consumer groups的消費狀況(能夠看到topic的offset)
./kafka-consumer-groups.sh --bootstrap-server 172.16.1.170:9092,172.16.1.171:9092,172.16.172:9092 --describe --group PushConsumer_qAbA7b --new-consumer
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER ztest-group, ZTEST2, 6, 4987, 4987, 0, consumer-7_/172.19.15.113 ztest-group, ZTEST2, 0, 4876, 4936, 60, consumer-1_/172.19.15.113 ztest-group, ZTEST2, 3, 5008, 5062, 54, consumer-4_/172.19.15.113 ztest-group, ZTEST2, 4, 4963, 4992, 29, consumer-5_/172.19.15.113 ztest-group, ZTEST2, 1, 4900, 4949, 49, consumer-2_/172.19.15.113 ztest-group, ZTEST2, 2, 5046, 5046, 0, consumer-3_/172.19.15.113 ztest-group, ZTEST2, 7, 5051, 5051, 0, consumer-8_/172.19.15.113 ztest-group, ZTEST2, 5, 5010, 5010, 0, consumer-6_/172.19.15.113
參考官方文檔以下:
With the ConsumerGroupCommand tool, we can list, delete, or describe consumer groups. For example, to list all consumer groups across all topics:
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list test-consumer-group
To view offsets as in the previous example with the ConsumerOffsetChecker, we "describe" the consumer group like this:
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group test-consumer-group GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER test-consumer-group test-foo 0 1 3 2 test-consumer-group_postamac.local-1456198719410-29ccd54f-0
When you're using the new consumer API where the broker handles coordination of partition handling and rebalance, you can manage the groups with the "--new-consumer" flags:
> bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server broker1:9092 --list
Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named my-group consuming a topic named my-topic would look like this:
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
Note, however, after 0.9.0, the kafka.tools.ConsumerOffsetChecker tool is deprecated and you should use the kafka.admin.ConsumerGroupCommand (or the bin/kafka-consumer-groups.sh script) to manage consumer groups, including consumers created with the new consumer API.
查看topic的最大和最小offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell
官方文檔:
一、官方網站:http://kafka.apache.org/documentation
二、官方WIKI:https://cwiki.apache.org/confluence/display/KAFKA/Index
三、issues狀況(JIRA):https://issues.apache.org/jira/browse/KAFKA
Kafka集羣配置
kafka集羣配置很是簡單,在不一樣服務器上的kafka server只要鏈接同一個zookeeper就能夠組成集羣。
在server.properties配置 zookeeper.connect=172.16.1.6:2181,172.16.1.7:2181,172.16.1.8:2181
實例配置以下(kafka 0.9版本),供參考:
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# listeners=PLAINTEXT://:9092 # The port the socket server listens on port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces host.name=172.16.1.170 # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). advertised.host.name=172.16.1.170 # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. advertised.port=9092 # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. # add by zollty num.partitions=3 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 # use 2 factors add by zollty default.replication.factor=2 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=172.16.1.6:2181,172.16.1.7:2181,172.16.1.8:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ############################################# delete.topic.enable=true
Kafka 服務器生產配置
num.network.threads=3-8
queued.max.requests=500-16
fetch.purgatory.purge.interval.requests=1000-100
producer.purgatory.purge.interval.requests=1000-100
num.replica.fetchers=1-4
default.replication.factor=1-3
replication.factor=1-3
controlled.shutdown.enable=true
另外:
From a security perspective, we recommend you use the latest released version of JDK 1.8 as older freely available versions have disclosed security vulnerabilities. LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector. If you decide to use the G1 collector (the current default) and you are still on JDK 1.7, make sure you are on u51 or newer. LinkedIn tried out u21 in testing, but they had a number of problems with the GC implementation in that version. LinkedIn's tuning looks like this:
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80