啓動kafka服務java
./bin/kafka-server-start.sh config/server.properties
建立話題my-replicated-topiccentos
bin/kafka-topics.sh --create --zookeeper yun1:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic查看話題列表api
bin/kafka-topics.sh --list --zookeeper yun1:2181
生產者寫消息:centos7
bin/kafka-console-producer.sh --broker-list yun1:9092 --topic myboys消費者讀消息:code
bin/kafka-console-consumer.sh --zookeeper yun1:2181 --from-beginning --topic myboys查看某個話題的狀態信息:server
bin/kafka-topics.sh --describe --zookeeper yun1:2181 --topic myboys
生產者的Javademo實現:get
import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; public class ProducerDemo { public static void main(String[] args) throws Exception{ Properties props=new Properties(); props.put("zk.connect","192.168.0.190:2181,192.168.0.191:2181,192.168.0.192:2181"); props.put("metadata.broker.list","192.168.0.190:9092,192.168.0.191:9092,192.168.0.192:9092"); props.put("serializer.class", StringEncoder.class.getName()); ProducerConfig config=new ProducerConfig(props); Producer producer=new Producer<String,String>(config); for(int i=1;i<1000;i++){ try { Thread.sleep(500); }catch (Exception e){ e.printStackTrace(); } System.out.println(i); producer.send(new KeyedMessage<String,String>("myboys","hello "+i+" times")); } } }
問題記錄:kafka