pom引入jar包
<!-- kafka jar start -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
<!-- kafka jar end -->
spring配置
<context:annotation-config/>
<context:property-placeholder
location="classpath:kafka.properties"
ignore-unresolvable="true" />
<!-- 定義producer的參數 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.producer.servers}" />
<!--entry key="group.id" value="0" /-->
<entry key="retries" value="${kafka.producer.retries}" />
<entry key="batch.size" value="${kafka.producer.batch.size}" />
<entry key="linger.ms" value="${kafka.producer.linger}" />
<entry key="buffer.memory" value="${kafka.producer.buffer.memory}" />
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
<!-- 建立kafkatemplate須要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
</bean>
kafka.properties 配置文件
kafka.producer.servers=127.0.0.1:9091,127.0.0.1:9092
kafka.producer.retries=1
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
使用方法
@Autowired
@Resource(name="kafkaTemplate")
private KafkaTemplate<String, String> kafkaTemplate;
kafkaTemplate.send("testtopic", "你好");