kafka 消費者和生產者測試類

pom.xml:java

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>apache

<groupId>bj.zm</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>api

<name>kafka</name>
<url>http://maven.apache.org</url>maven

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>ide

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
</dependencies>
</project>ui

 

KafkaConsumer.javathis

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;url

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;code

 

public class KafkaConsumer extends Thread{xml

private String topic;

public kafkaConsumer(String topic){
super();
this.topic = topic;
}


@Override
public void run() {
ConsumerConnector consumer = createConsumer();
System.out.println("消費者對象:"+consumer);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("消費者數據 "+topic +":"+ message);
}
}

private ConsumerConnector createConsumer() {
Properties properties = new Properties();
// properties.put("zookeeper.connect", "10.202.27.5:2181,10.202.27.6:2181,10.202.27.7:2181/kafka/st");
properties.put("zookeeper.connect", "10.202.36.28:2182,10.202.36.30:2182,10.202.36.29:2182/kafka/st");
properties.put("group.id", "sfst");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}


public static void main(String[] args) {
//new kafkaConsumer("EXP_IMG_TO_WQS").start();
//new kafkaConsumer("EXP_IMG_TO_SSS").start();
//new kafkaConsumer("EXP_IMG_TYPE1").start();
new kafkaConsumer("EXP_IMAGE_TOPIC").start();

}
}

KafkaProducer.java

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

 


public class kafkaProducer extends Thread{

private String topic;

public kafkaProducer(String topic){
super();
this.topic = topic;
}


@Override
public void run() {
Producer producer = createProducer();
int i=0;
while(true){
// producer.send(new KeyedMessage<Integer, String>(topic, "{\"logId\":\"1\",\"waybillId\":\"10376683317\",\"waybillNo\":\"785000213695\",\"sourceZoneCode\":\"755A\",\"destZoneCode\":\"755\",\"oneselfPickupFlg\":\"1\",\"consignorCompName\":\"科技公司\",\"consignorAddr\":\"軟件產業基地\",\"consignorPhone\":\"10086\",\"consignorContName\":\"王珂\",\"consignorMobile\":\"8888888\",\"addresseeCompName\":\"寄件公司\",\"addresseeAddr\":\"收件地址\",\"addresseePhone\":\"11111111\",\"addresseeContName\":\"收件聯繫人\",\"addresseeMobile\":\"55555\",\"meterageWeightQty\":\"10\",\"realWeightQty\":\"10\",\"quantity\":\"1\",\"freeParcelFlg\":\"0\",\"innerParcelFlg\":\"0\",\"versionNo\":\"1\",\"inputTypeCode\":\"2\",\"modifiedEmpCode\":\""+i+"\"}"));
producer.send(new KeyedMessage<Integer, String>(topic, "{\"isImageDto\":\"1\",\"waybillId\":\"10376683317\",\"billCode\":\"785000213695\",\"sourceZoneCode\":\"755A\",\"destZoneCode\":\"755\",\"oneselfPickupFlg\":\"1\",\"consignorCompName\":\"科技公司\",\"consignorAddr\":\"軟件產業基地\",\"consignorPhone\":\"10086\",\"consignorContName\":\"王珂\",\"consignorMobile\":\"8888888\",\"addresseeCompName\":\"寄件公司\",\"addresseeAddr\":\"收件地址\",\"addresseePhone\":\"11111111\",\"addresseeContName\":\"收件聯繫人\",\"addresseeMobile\":\"55555\",\"meterageWeightQty\":\"10\",\"realWeightQty\":\"10\",\"quantity\":\"1\",\"freeParcelFlg\":\"0\",\"innerParcelFlg\":\"0\",\"waybillType\":\"1\",\"inputTypeCode\":\"2\",\"modifiedEmpCode\":\""+i+"\"}"));
System.out.println("==============: " + i);
try {
TimeUnit.SECONDS.sleep(1);
i++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private Producer createProducer() { Properties properties = new Properties();// properties.put("zookeeper.connect", "10.202.27.5:2181,10.202.27.6:2181,10.202.27.7:2181");// properties.put("metadata.broker.list", "10.202.27.5:9093,10.202.27.6:9093,10.202.27.7:9093"); properties.put("zookeeper.connect", "10.202.36.28:2182,10.202.36.30:2182,10.202.36.29:2182"); properties.put("metadata.broker.list", "10.202.36.29:9093,10.202.36.28:9093,10.202.36.30:9093"); properties.put("serializer.class", StringEncoder.class.getName()); return new Producer<Integer, String>(new ProducerConfig(properties)); } public static void main(String[] args) {// new kafkaProducer("EXP_WB_TOPIC").start(); new kafkaProducer("EXP_IMAGE_TOPIC").start();// new kafkaProducer("EXP_IMG_TO_SSS").start(); } }

相關文章
相關標籤/搜索