https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)java
1、正則表達式
Kafka's mirroring feature makes it possible to maintain a replica of an existing kafka cluster. The following diagram shows how to use the MirrorMaker tool to mirror a source kafka cluster into a target(mirror) kafka cluster.The tool uses a kafka comsumer to consume messages from the source cluster,and re-publishes those messages to the local(target)cluster using an embedded kafka producer.shell
2、apache
1.whitelist and blacklist 黑白名單異步
mirror-maker接受精確指定同步的topic的黑白名單。使用java的標準的正則表達式,爲了方便,逗號(,)被編譯成java正則的|async
2.producer timeoutcode
爲了支持高吞吐量,你最好會用異步的內置producer,並將內置的producer設置爲阻塞模式(queue.enqueueTimeout.ms=-1) 這樣能夠保證數據不會丟失。不然,異步producer的默認queue.enqueueTimeout.ms=0,若是producer內部的隊列滿了,數據會被丟棄,並拋出QueueFullException異常。而對於阻塞模式的producer,若是內部隊列滿了就會一直等到,從而有效的節制了內置的comsumer的消費速凍。你能夠打開producer的trace logging,隨時看到內部隊列剩餘量。server
3.Producer的重試次數retries隊列
3、ip
Source Kafka Cluster A:==> 配置zk [root@hftest0001 conf]# pwd /opt/zookeeper-3.4.6/conf [root@hftest0001 conf]# ll total 16 -rw------- 1 root root 535 Jan 28 13:27 configuration.xsl -rw------- 1 root root 2161 Jan 28 13:27 log4j.properties -rw------- 1 root root 1043 Jan 28 13:31 zoo.cfg -rw------- 1 root root 922 Jan 28 13:27 zoo_sample.cfg [root@hftest0001 conf]# cat zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/zookeeper-3.4.6/data/ clientPort=2181 [root@hftest0001 config]# pwd /opt/kafka_2.11-0.8.2.2/config B:==> 配置kafka [root@hftest0001 config]# cat server.properties broker.id=201 port=9092 ... ... zookeeper.connect=${source_zk_ip}:2181 ... C:==> 啓動source kafka [root@hftest0001 kafka_2.11-0.8.2.2]# ./bin/kafka-server-start.sh config/server.properties & ... ... D:==> 建立topic [root@hftest0001 kafka_2.11-0.8.2.2]# ./bin/kafka-topics.sh --zookeeper 10.224.246.201:2181 --replication-factor 1 --partitions 1 --topic r1-p1-1 --create E:==> 查看topic [root@hftest0001 kafka_2.11-0.8.2.2]# ./bin/kafka-topics.sh --zookeeper ${source_zk_ip}:2181 --list r1-p1-1
Target Kafka Cluster 一樣的操做,step A,B,C,D,E F:==>配置MirrorMaker [root@hftest0004 config]# pwd /opt/kafka_2.11-0.8.2.2/config [root@hftest0004 config]# cat consumer.properties zookeeper.connect=${source_zk_ip}:2181 zookeeper.connection.timeout.ms=6000 group.id=hftest-mirror-maker [root@hftest0004 config]# cat producer.properties metadata.broker.list=${target_broker_ip}:9092 producer.type=async compression.codec=none serializer.class=kafka.serializer.DefaultEncoder ############################# Async Producer ############################# queue.enqueue.timeout.ms=-1 [root@hftest0004 kafka_2.11-0.8.2.2]# ./bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties ----num.producers 2 --whitelist r1-p1-1
校驗: 啓動一個producer向source kafka cluster push數據 啓動一個consumer從source kafka cluster pull數據 啓動一個consumer從target kafka cluster pull數據 ==> 看是否能pull到數據