<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR6</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
server: port: 8081 spring: application: name: output-demo cloud: instance-count: 1 instance-index: 0 stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2182 auto-add-partitions: true auto-create-topics: true min-partition-count: 1 bindings: output: destination: event-demo content-type: text/plain producer: partitionCount: 1
@EnableBinding(Source.class) public class SendService { @Autowired private Source source; public void sendMessage(String msg) { try { source.output().send(MessageBuilder.withPayload(msg).build()); } catch (Exception e) { e.printStackTrace(); } } } @RestController public class ProducerController { @Autowired private SendService service; @RequestMapping(value = "/send/{msg}", method = RequestMethod.GET) public void send(@PathVariable("msg") String msg){ service.sendMessage(msg); } }
spring: application: name: input-demo cloud: instance-count: 1 instance-index: 0 stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2182 auto-add-partitions: true auto-create-topics: true min-partition-count: 1 bindings: input: destination: event-demo group: s1 consumer: autoCommitOffset: false concurrency: 1 partitioned: false
@EnableBinding(Sink.class) public class MsgSink { @StreamListener(Sink.INPUT) public void process(Message<?> message) { System.out.println(message.getPayload()); Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); if (acknowledgment != null) { System.out.println("Acknowledgment provided"); acknowledgment.acknowledge(); } } }
先運行生產者,再運行消費者html
curl -i localhost:8081/send/hello1