【大數據實踐】遊戲事件處理系統(3)——消息中間件-kafka

前言

上一篇文章【大數據實踐】遊戲事件處理系統(2)——事件處理-logstash中,對日誌的處理進行了講解,其事件最終要輸出到kafka集羣中。所以,在本文章中,將介紹簡單kafka集羣的建立過程。本篇文章完成後,系統應該可以跑通日誌收集、處理及輸出到kafka,並能使用kafka的工具驗證消息的正確性。html

啓動zookeeper

啓動命令:java

bin/zookeeper-server-start.sh config/zookeeper.properties

zookeeper.properties配置文件中, 主要配置參數爲:node

# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
  • dataDir:存放內存數據庫鏡像和更新數據庫的事務日誌(transaction log)的目錄。
  • clientPort:zookeeper服務的端口號。
  • maxClientCnxns:每一個ip鏈接zookeeper時鏈接數的限制,若是不設置或設爲0時,表示鏈接數沒有限制。注意:kafka的broker鏈接也計算在內,所以,若是maxClientCnxns = 1,那麼不能在同一臺機器上即啓動kafka server鏈接zookeeper,又啓動kafka producer來鏈接。

啓動Kafka Server

啓動命令:數據庫

bin/kafka-server-start.sh config/server.properties

執行成功後,即啓動了一個broker(代理),其中server.properties文件中對該broker作了配置,主要有:apache

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
# 代理ID,每一個代理的ID必須是惟一的
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
# listeners=PLAINTEXT://:9092

# 若是不設置,則默認的java.net.InetAddress.getCanonicalHostName()獲得的主機名,默認9092端口和PLAINTEXT協議。
# 協議還有PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL等。
listeners=PLAINTEXT://localhost:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# 通知給生成者和消費者的監聽地址,須要和listeners同樣。若是不配置該選項,則默認會將上面
# listeners配置的地址發送給生產者和消費者
advertised.listeners=PLAINTEXT://localhost:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
## 安全協議
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL


# The number of threads that the server uses for receiving requests from the network and sending responses to the network
# 用於接收網絡請求以及發送網絡請求的線程數。
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
# 用於處理請求(可能包含韓磁盤I/O處理)的線程數。
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
# socket發送緩衝區大小(字節數),默認100kb
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
# socket接收緩衝區大小(字節數),默認100kb
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
# 爲防止OutOfMemery異常而設置的每一個請求最大數據大小,默認100Mb。
socket.request.max.bytes=104857600

############################# Log Basics #############################
# 日誌的基本設置

# A comma separated list of directories under which to store log files
# kafka接收到日誌(消息)後,這些日誌存放的目錄(而不是kafka服務輸入的日誌)。
# 能夠指定多個目錄,中間用逗號分隔。
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 該borker的分區數量,分區數量多,則並行高,但同時也意味着brokers之間將有更多的文件。
num.partitions=3

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
# 當服務啓動時,爲每一個數據目錄分配用於恢復數據的線程數,或者是當服務關閉時,爲每一個數據目錄分配用於寫入數據的線程數。
# 默認爲1, 但對於磁盤陣列(RAID array),建議增長該值的大小。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# 內部的主題設置,卡夫卡主題管理相關的配置項。
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
## 日誌寫入到磁盤文件的策略
## 配置的時候,須要在性能、可靠性和數據吞吐量之間進行權衡:
##  1. 可靠性:若是不使用備份,不將數據flush到磁盤,可能致使數據丟失。
##  2. 延遲:若是消息記錄數設置的太大,可能致使一次要flush的數據太多而形成性能瓶頸。
##  3. 吞吐量:將數據flush到磁盤一般是最昂貴的操做,若是設置的時間間隔過小,可能帶來過多尋道。

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
# 每當消息記錄數達到10000時flush一次數據到磁盤
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
# 每間隔1000毫秒flush一次數據到磁盤
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
## 日誌文件保留策略
## 1. 每隔一段時間刪除
## 2. 當日志達到必定大小的時候被刪除
## 當達到以上任意一條,則日誌被刪除

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
# 默認日誌文件保留時間爲1周
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
# 保留文件大小,默認保留最近的1G。
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 日誌文件最大大小,超過該大小,將會新建另一個日誌文件。
# topic每一個分區的最大文件大小,一個topic的大小限制 = 分區數*log.retention.bytes。-1表示沒有大小限。
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
# 日誌文件的檢查週期,以判斷是否達處處理策略規定的條件
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
## Zookeeper相關設置

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.

## 鏈接到zookeeper集羣,使用逗號分隔各個zookeeper服務的ip:port對。
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
## ZooKeeper的鏈接超時時間
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################
## 組協調者相關設置


# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
## 空消費組延時時間,設爲0是爲了方便開發,實際發佈生成線中配置爲3秒更好。
group.initial.rebalance.delay.ms=0

從這個配置文件中,大概能夠窺探到kafka有的一些功能,裏面不少配置本身也不是很懂,後續再專門研究一下。json

若是隻是簡單地試驗嘗試,使用下面幾個配置就能夠了:bootstrap

  • broker.id=0
  • listeners=PLAINTEXT://127.0.0.1:9092
  • advertised.listeners=PLAINTEXT://127.0.0.1:9092
  • num.partitions=3(爲了研究多分區)
  • zookeeper.connect=localhost:2181(連到zookeeper)

啓動第二個broker

複製server.properties文件爲server-1.propertis,修改配置,如:segmentfault

  • broker.id=1
  • listeners=PLAINTEXT://127.0.0.1:9093
  • advertised.listeners=PLAINTEXT://127.0.0.1:9093
  • num.partitions=3(爲了研究多分區)
  • zookeeper.connect=localhost:2181(連到zookeeper)

執行啓動命令:安全

bin/kafka-server-start.sh config/server-1.properties

topic管理

建立topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic game-score
  • 新建立了一個game-score的topic。
  • replication-factor指的是topic須要在幾個不一樣的broker保存。
  • partition爲2,表示該主題有2個partition。

查看topic列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

能夠看到信息:網絡

game-score

查看topic信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic game-score

可看到以下信息:

Topic:game-score    PartitionCount:2    ReplicationFactor:2    Configs:
    Topic: game-score    Partition: 0    Leader: 1    Replicas: 1,0    Isr: 1,0
    Topic: game-score    Partition: 1    Leader: 0    Replicas: 0,1    Isr: 0,1
  • leader:表示當前指定的負責全部讀和寫的partition(分區),每一個分區都有可能被選爲leader。
  • replicas:表示保存副本的結點列表,無論他們是否爲leader結點,也無論他們是否存活。
  • Isr:in-sync replicas的簡寫,表示存活且副本都已同步的的broker集合,是replicas的子集。

刪除topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic game-score

並不會真正刪除,而是標記爲刪除:

Topic game-score is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

修改topic的分區數

bin/kafka-topics.sh --zookeeper master:2181 --alter --topic game-score --partitions 2
  • 試驗發現:沒法使用--alter命令修改--replication-factor

查看topic各個分區的消息的信息

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group testgroup --topic test0 --zookeeper 127.0.0.1:2181

啓動一個消費者

啓動一個消費者,用於查看消息是否到達kafka集羣:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic game-score --from-beginning

該命令會將消息dump出來,顯示在控制檯。

logstash output kafka配置

要想logstash將消息發送到kafka集羣中,須要在logstash的output模塊中使用kafka插件

配置以下:

output {
    kafka{
            # 主題ID
                topic_id => "game-score"
               # kafka服務的地址
            bootstrap_servers => "127.0.0.1:9092" 
            # 必定要註明輸出格式
            codec => "json"
    }
}

配置好以後,將filebeat,logstash,kafka都啓動好,往監控日誌文件中新增日誌,應該就能在kafka消費者控制檯看到消息了。

這裏貼一下成果,以示對本身的鼓勵:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic game-score --from-beginning

{"bet_count":"1","room_id":"002","score_type":"balance","game_time":"14:26:37","desk_id":"512","game_date":"2015-11-02","game_id":"2015-11-02_14:26:37_ÐÂÊÖÇø_1_002_512","game":"PDK","beat":{"name":"admindeMacBook-Pro-2.local","version":"6.2.4","hostname":"admindeMacBook-Pro-2.local"},"tax":0,"time":"2015-11-02 14:26:54,355","tags":["beats_input_codec_plain_applied"],"offset":21444,"users":[{"username":"ly6","win":15}],"bet_name":"ÐÂÊÖÇø","prospector":{"type":"log"},"source":"/Users/admin/Documents/workspace/elk/filebeat-6.2.4-darwin-x86_64/hjd_IScoreService.log"}

實現一個簡單的Kafka消費者

pom.xml文件中,加入下依賴:

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.1</version>
        </dependency>
    </dependencies>

GameScoreConsumer.java以下:

package consumers;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class GameScoreConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "game-score-consumers");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Collections.singletonList("game-score"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
            }
        }
    }

}

啓動,在日誌文件中加入新的日誌,該消費者便可接收到相應的信息。

小結

至此,從日誌收集、處理到保存到消息中間件kafka的整個流程都已經走通。【大數據實踐】遊戲事件處理系統系列文章主要更傾向於試驗,所以對深一層的理論研究和介紹不是不少,後面可能開另外的系列來說。

相關文章
相關標籤/搜索