寫生成者和消費者java
producer 生產者數據庫
public class ProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("zk.connect", "weekend01:2181,weekend02:2181,weekend03:2181"); props.put("metadata.broker.list","weekend01:9092,weekend02:9092,weekend03:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); // 發送業務消息 // 讀取文件 讀取內存數據庫 讀 socket 端口 for (int i = 1; i <= 100; i++) { Thread.sleep(500); producer.send(new KeyedMessage<String, String>("wordcount", "i said i love you baby for" + i + "times,will you have a nice day with me tomorrow")); } } }
consumersocket
public class ConsumerDemo { private static final String topic = "mysons"; private static final Integer threads = 1; public static void main(String[] args) { Properties props = new Properties(); props.put("zookeeper.connect", "weekend01:2181,weekend02:2181,weekend03:2181"); props.put("group.id", "1111"); props.put("auto.offset.reset", "smallest"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); topicCountMap.put("mygirls", 1); topicCountMap.put("myboys", 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("mygirls"); for(final KafkaStream<byte[], byte[]> kafkaStream : streams){ new Thread(new Runnable() { @Override public void run() { for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){ String msg = new String(mm.message()); System.out.println(msg); } } }).start(); } } }