Environment env = Environment(); Reactor reactor = Reactors.reactor() .env(env) .dispatcher(RING_BUFFER) .get(); reactor.on($(「topic」), (Event<String> ev) → { System.out.println(「Hello 「 + ev.getData()); }); reactor.notify(「topic」, Event.wrap(「John Doe」)); |
Stream<String> str; str.map(String::toUpperCase) .filter( Predicate<String>() { test(String s) { … } }) .consume(s → log.info(「consumed string {}」, s)); |
Promise<String> p; String s = p .onSuccess(s → log.info(「consumed string {}」, s)) .onFailure(t → log.error(t.getMessage(), t)) .onComplete(t → log.info(「complete」)) .await(5, SECONDS); p.map(String::toUpperCase).consume(s → log.info(「UC: {}」, s)); |
Processor<Buffer> proc; Operation<Buffer> op = proc.prepare(); op.get().append(data).flip(); op.commit(); proc.batch(512, buff → buff.append(data).flip()); |
@Configuration @EnableReactor ReactorConfiguration { @Bean Reactor input(Environment env) { Reactors.reactor().env(env) .dispatcher(RING_BUFFER).get(); } @Bean Reactor output(Environment env) { Reactors.reactor().env(env) .dispatcher(RING_BUFFER).get(); } |
@Component SimpleHandler { @Autowired Reactor reactor; @Selector(「test.topic」) onTestTopic(String s) { |
@CompileStatic def welcome(){ reactor.on('greetings') { String s -> reply 「hello $s」 reply 「how are you?」 } reactor.notify 'greetings', 'Jon' reactor.send('greetings', 'Stephane'){ println it cancel() } } |