有幸使用Kafka Mirror Maker來同步兩個地方的數據,如下的是我的筆記記錄(未完)。java
使用JMX監控Kafka的敏感數據須要開啓JMX端口,能夠經過設置環境變量來實現。我本身的作法是在kafka-server-start.sh腳本文件中添加以下配置正則表達式
... if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G" # 增長JMX_PORT配置 export JMX_PORT="9999" ...
# 消費目標集羣 bootstrap.servers=<kafka1host_from>:<port>,<kafka2host_from>:<port>,<kafka3host_from>:<port> # 消費組的ID group.id=mm-test-consumer-group # 選取鏡像數據的起始?即鏡像MirrorMaker啓動後的數據,參數latest,仍是鏡像以前的數據,參數earliest auto.offset.reset=earliest # 消費者心跳數據,默認3000,因爲是遠程鏡像,此處設爲30秒 heartbeat.interval.ms=30000 # 消費鏈接超時值,默認10000,因爲遠程鏡像,此處設爲100秒 session.timeout.ms=100000 # 更改分區策略,默認是range,雖然有必定優點但會致使不公平現象,特別是鏡像大量的主題和分區的時候 partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor # 單個poll()執行的最大record數,默認是500 max.poll.records=20000 # 讀數據時tcp接收緩衝區大小,默認是65536(64KiB) receive.buffer.bytes=4194304 # 設置每一個分區總的大小,默認是1048576 max.partition.fetch.bytes=10485760
# 生產者目的集羣 bootstrap.servers=<kafka1host_to>:<port>,<kafka2host_to>:<port>,<kafka3host_to>:<port> # 開啓壓縮 compression.type=snappy
nohup bin/kafka-mirror-maker --consumer.config config/consumer.properties --num.streams 2 --producer.config config/producer.properties --whitelist '*.test|*.temp' >/var/log/mirrormaker/mirrormaker.log 2>&1 &
說明:apache
- --num.streams: 一個流就是一個消費者,全部消費者公用一個生產者。
- --whitelist: 代表須要同步的白名單,可使用'|'來鏈接多個topic,還可使用java風格的正則表達式,與此對應的就是黑名單。
- 對於日誌記錄文件建議放到/var/log/文件夾中,並使用logrotate進行週期管理,達到日誌可查,又避免日誌文件填滿整個磁盤。
- Mirror Maker通常部署與目的集羣,即一個可靠的生產者(向目的集羣輸入數據)更爲重要,一個沒法鏈接到集羣的消費者要比一個沒法鏈接到集羣的生產者要安全得多。
enable.auto.commit=false
max.in.flight.requests.per.connection=1 retries=Integer.MAX_VALUE max.block.ms=Long.MAX_VALUE
增長參數bootstrap
--abort.on.send.failure
- Mirror maker will only send one request to a broker at any given point.
- If any exception is caught in mirror maker thread, mirror maker will try to commit the acked offsets then exit immediately.
- For RetriableException in producer, producer will retry indefinitely. If retry did not work, eventually the entire mirror maker will block on producer buffer full.
- For None-retriable exception, if --abort.on.send.fail is specified, stops the mirror maker. Otherwise producer callback will record the message that was not successfully sent but let the mirror maker move on. In this case, that message will be lost in target cluster.
As the last point stated if there is any error occurred your mirror maker process will be killed. So users are recommend to use a watchdog process like supervisord to restart the killed mirrormaker process.安全
當安裝有多個java版本時,有時調試時須要在多個java版本間進行切換,可使用命令update-alternatives來實現,具體爲session
# update-alternatives --config java ... # update-alternatives --config javac
而後根據列出的java版本選擇你所須要的。app
進行kafka mirror maker操做後,目的集羣不能消費數據(除了kafka自身的kafka-console-consumertcp