本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor 3快速上手 | Spring WebFlux快速上手 | 響應式流規範
本文 測試源碼 | 實戰源碼前端
這一小節介紹如何經過定義相應的事件(onNext
、onError
和onComplete
) 建立一個 Flux 或 Mono。Reactor提供了generate
、create
、push
和handle
等方法,全部這些方法都使用 sink(池)來生成數據流。java
sink,顧名思義,就是池子,能夠想象一下廚房水池的樣子。以下圖所示:node
下面介紹到的方法都有一個sink提供給方法使用者,一般至少會暴露三個方法給咱們,next
、error
和complete
。next和error至關於兩個下水口,咱們不斷將自定義的數據放到next口,Reactor就會幫咱們串成一個Publisher數據流,直到有一個錯誤數據放到error口,或按了一下complete
按鈕,數據流就會終止了。react
generate
是一種同步地,逐個地發出數據的方法。由於它提供的sink是一個SynchronousSink
, 並且其next()
方法在每次回調的時候最多隻能被調用一次。git
generate
方法有三種簽名:github
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
1)使用SynchronousSink生成數據流web
@Test public void testGenerate1() { final AtomicInteger count = new AtomicInteger(1); // 1 Flux.generate(sink -> { sink.next(count.get() + " : " + new Date()); // 2 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (count.getAndIncrement() >= 5) { sink.complete(); // 3 } }).subscribe(System.out::println); // 4 }
generate
方法,自定義數據已發完;輸出結果爲每1秒鐘打印一下時間,共打印5次。docker
2)增長一個伴隨狀態數據庫
對於上邊的例子來講,count
用於記錄狀態,當值達到5以後就中止計數。因爲在lambda內部使用,所以必須是final類型的,且不能是原生類型(如int
)或不可變類型(如Integer
)。api
若是使用第二個方法簽名,上邊的例子能夠這樣改:
@Test public void testGenerate2() { Flux.generate( () -> 1, // 1 (count, sink) -> { // 2 sink.next(count + " : " + new Date()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (count >= 5) { sink.complete(); } return count + 1; // 3 }).subscribe(System.out::println); }
BiFunction
,輸入爲狀態和sink;3)完成後處理
第三個方法簽名除了狀態、sink外,還有一個Consumer
,這個Consumer
在數據流發完後執行。
Flux.generate( () -> 1, (count, sink) -> { sink.next(count + " : " + new Date()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (count >= 5) { sink.complete(); } return count + 1; }, System.out::println) // 1 .subscribe(System.out::println); }
若是 state 使用了數據庫鏈接或者其餘須要進行清理的資源,這個 Consumer lambda 能夠用來在最後完成資源清理任務。
create
是一個更高級的建立Flux的方法,其生成數據流的方式既能夠是同步的,也能夠是異步的,而且還能夠每次發出多個元素。
create
用到了FluxSink
,後者一樣提供 next,error 和 complete 等方法。 與generate不一樣的是,create不須要狀態值,另外一方面,它能夠在回調中觸發多個事件(即便事件是發生在將來的某個時間)。
create 經常使用的場景就是將現有的 API 轉爲響應式,好比監聽器的異步方法。
先編寫一個事件源:
public class MyEventSource { private List<MyEventListener> listeners; public MyEventSource() { this.listeners = new ArrayList<>(); } public void register(MyEventListener listener) { // 1 listeners.add(listener); } public void newEvent(MyEvent event) { for (MyEventListener listener : listeners) { listener.onNewEvent(event); // 2 } } public void eventStopped() { for (MyEventListener listener : listeners) { listener.onEventStopped(); // 3 } } @Data @NoArgsConstructor @AllArgsConstructor public static class MyEvent { // 4 private Date timeStemp; private String message; } }
準備一個監聽器接口,它能夠監聽上邊第2和3的兩種事件:(1)新的MyEvent
到來;(2)事件源中止。以下:
public interface MyEventListener { void onNewEvent(MyEventSource.MyEvent event); void onEventStopped(); }
下面的測試方法邏輯是:建立一個監聽器註冊到事件源,這個監聽器再收到事件回調的時候經過Flux.create
的sink將一系列事件轉換成異步的事件流:
@Test public void testCreate() throws InterruptedException { MyEventSource eventSource = new MyEventSource(); // 1 Flux.create(sink -> { eventSource.register(new MyEventListener() { // 2 @Override public void onNewEvent(MyEventSource.MyEvent event) { sink.next(event); // 3 } @Override public void onEventStopped() { sink.complete(); // 4 } }); } ).subscribe(System.out::println); // 5 for (int i = 0; i < 20; i++) { // 6 Random random = new Random(); TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i)); } eventSource.eventStopped(); // 7 }
運行一下這個測試方法,20個MyEvent
陸續打印出來。
若是將上邊的create
方法換成generate
方法,則會報出異常:
java.lang.IllegalStateException: The generator didn't call any of the SynchronousSink method
證實generate
並不支持異步的方式。
create
方法還有一個變體方法push
,適合生成事件流。與 create相似,
push 也能夠是異步地, 而且可以使用以上各類回壓策略。因此上邊的例子能夠替換爲push
方法。區別在於,push
方法中,調用next
、complete
或error
的必須是同一個線程。
除了next
、complete
或error
方法外,FluxSink
還有onRequest
方法,這個方法能夠用來響應下游訂閱者的請求事件。從而不只能夠像上一個例子那樣,上游在數據就緒的時候將其推送到下游,同時下游也能夠從上游拉取已經就緒的數據。這是一種推送/拉取混合的模式。好比:
Flux<String> bridge = Flux.create(sink -> { myMessageProcessor.register( new MyMessageListener<String>() { public void onMessage(List<String> messages) { for(String s : messages) { sink.next(s); // 1 } } }); sink.onRequest(n -> { // 2 List<String> messages = myMessageProcessor.request(n); // 3 for(String s : message) { sink.next(s); } }); ... }
Docker提供了一個用來監聽事件的命令:docker events
,運行這個命令後,會監聽docker daemon的事件並打印出來,執行是持續進行的,就像top
或前邊介紹的mongostat
命令同樣。Docker的java開發包的DockerClient
也提供了相應的API,這個API是基於回調的,所以咱們就可使用Reactor的create
方法,將這個基於回調的API轉換爲響應式流,流中的數據就是一個一個的docker事件。以下圖所示:
1)測試DockerClient
首先,咱們先啓動docker。
而後,咱們繼續用第一章的webflux-demo
maven項目模塊,在pom.xml
中添加Docker開發相關的依賴:
<!--docker client begin--> <dependency> <groupId>com.github.docker-java</groupId> <artifactId>docker-java</artifactId> <version>3.0.14</version> </dependency> <dependency> <groupId>javax.ws.rs</groupId> <artifactId>javax.ws.rs-api</artifactId> <version>2.1</version> </dependency> <dependency> <groupId>org.glassfish.jersey.inject</groupId> <artifactId>jersey-hk2</artifactId> <version>2.26</version> </dependency> <!--docker client end-->
最後編寫測試方法:
public class DockerEventTest { @Test public void dockerEventToFlux() throws InterruptedException { collectDockerEvents().subscribe(System.out::println); // 5 TimeUnit.MINUTES.sleep(1); // 6 } private Flux<Event> collectDockerEvents() { DockerClient docker = DockerClientBuilder.getInstance().build(); // 1 return Flux.create((FluxSink<Event> sink) -> { EventsResultCallback callback = new EventsResultCallback() { // 2 @Override public void onNext(Event event) { // 3 sink.next(event); } }; docker.eventsCmd().exec(callback); // 4 }); } }
tcp://localhost:2375
,2375是docker默認的端口號,能夠經過指定的IP和端口鏈接docker daemon:DockerClientBuilder.getInstance("tcp://192.168.0.123:2375").build()
,不過要注意docker daemon監聽接口和防火牆的配置。onNext
,這時候經過FluxSink
的next
方法將Event
對象發出。OK,看一下效果。
爲了方便對比,咱們首先在終端運行docker events
命令,而後在另外一個終端進行docker操做,好比本例:
docker run -it -m 200M --memort-swap=200M progrium/stress --vm 1 --vm-bytes 300M
progrium/stress
是一個用於壓力測試的容器,經過-m 200M
指定爲該容器的運行最多分配200M內存,而後在壓力測試的時候,經過--vm-bytes 300M
使其運行時嘗試分配300M的內存,此時會出現內存不足(OOM)的錯誤並致使容器被殺死(single 9)。
如圖所示,上方是分別運行兩個命令的終端窗口,能夠看到docker events
命令打印出了一系列事件,若是是第一個運行progrium/stress
應該回先有一個pull鏡像的事件。下方是咱們的測試代碼的輸出,除了一些日誌以外,能夠看到這些事件也被輸出了。
2)REST API推送到前端
下面,咱們更進一步將Event事件經過REST API推送到瀏覽器端,看過第1.3.3節的話,對這一起應該是輕車熟路了。
(一)首先定義一個咱們本身的DockerEvent
,這一步不是必須的哈,不過DockerClient
返回的Event
自己字段比較多,一般前端展現的話會轉換爲dvo,「戲要作足」嘛,哈哈。
DockerEvent.java
@Data @Document(collection = "docker-event") public class DockerEvent { @Indexed private String status; @Id private String id; private String from; private Node node; private EventType type; private String action; private String actorId; private Long time; private Long timeNano; }
(二)而後就是DAO層了,建立一個DockerEventMongoRepository
,增長三個@Tailable
的查詢方法,分別用於查詢所有、按照狀態查詢和按類型+名稱查詢(好比查詢某某容器的事件):
DockerEventMongoRepository.java
public interface DockerEventMongoRepository extends ReactiveMongoRepository<DockerEvent, String> { @Tailable Flux<DockerEvent> findBy(); @Tailable Flux<DockerEvent> findByStatus(String status); @Tailable Flux<DockerEvent> findByTypeAndFrom(String type, String from); }
(三)定義一個CommandLineRunner
,用於在應用啓動後即開始監聽docker事件:
DockerEventsCollector.java
@Slf4j @Component public class DockerEventsCollector implements CommandLineRunner { private DockerEventMongoRepository dockerEventMongoRepository; private MongoTemplate mongo; // 1 public DockerEventsCollector(DockerEventMongoRepository dockerEventMongoRepository, MongoTemplate mongo) { // 1 this.dockerEventMongoRepository = dockerEventMongoRepository; this.mongo= mongo; } @Override public void run(String... args) { mongo.dropCollection(DockerEvent.class); // 2 mongo.createCollection(DockerEvent.class, CollectionOptions.empty().maxDocuments(200).size(100000).capped()); // 2 dockerEventMongoRepository.saveAll(collect()).subscribe(); // 6 } private Flux<DockerEvent> collect() { // 3 DockerClient docker = DockerClientBuilder.getInstance().build(); return Flux.create((FluxSink<Event> sink) -> { EventsResultCallback callback = new EventsResultCallback() { @Override public void onNext(Event event) { sink.next(event); } }; docker.eventsCmd().exec(callback); }) .map(this::trans) // 4 .doOnNext(e -> log.info(e.toString())); // 5 } private DockerEvent trans(Event event) { // 4 DockerEvent dockerEvent = new DockerEvent(); dockerEvent.setAction(event.getAction()); dockerEvent.setActorId(Objects.requireNonNull(event.getActor()).getId()); dockerEvent.setFrom(event.getFrom() == null ? null : event.getFrom().replace("//", "_")); dockerEvent.setId(UUID.randomUUID().toString()); dockerEvent.setNode(event.getNode()); dockerEvent.setStatus(event.getStatus()); dockerEvent.setTime(event.getTime()); dockerEvent.setTimeNano(event.getTimeNano()); dockerEvent.setType(event.getType()); return dockerEvent; } }
MongoTemplate
,Spring 4.3 以後,若是有構造方法,Spring會自動注入,不須要@Autowired
註解了。DockerEvent
建立「capped」的collection,方便測試,若是提早手動建立好的話能夠不加這兩句。若是在//1處使用的是響應式的ReactiveMongoTemplate
,由於是異步的,因此要用then()
或thenMany()
將後續的全部操做鏈接起來,如mongo.dropCollection(...).then(mongo.createCollection(...)).thenMany(dockerEventMongoRepository.saveAll(collect()))
,保證能前後依次執行。Event
轉換爲咱們定義的DockerEvent
,其中DockerEvent.from
字段是事件主體名稱,好比容器名,可能有/
,所以進行一個字符替換,不然在URL中會有問題。DockerEvent
保存到MongoDB,用subscribe()
觸發執行。(四)Service層沒有啥邏輯,咱們直接寫Controller:
DockerEventController.java
@Slf4j @RestController @RequestMapping(value = "/docker/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) // 1 public class DockerEventController { private DockerEventMongoRepository dockerEventMongoRepository; public DockerEventController(DockerEventMongoRepository dockerEventMongoRepository) { this.dockerEventMongoRepository = dockerEventMongoRepository; } @GetMapping public Flux<DockerEvent> dockerEventStream() { // 2 return dockerEventMongoRepository.findBy(); } @GetMapping("/{type}/{from}") public Flux<DockerEvent> dockerEventStream(@PathVariable("type") String type, @PathVariable("from") String from) { // 3 return dockerEventMongoRepository.findByTypeAndFrom(type, from); } @GetMapping("/{status}") public Flux<DockerEvent> dockerEventStream(@PathVariable String status) { // 4 return dockerEventMongoRepository.findByStatus(status); } }
OK了,啓動試一下:
能夠看到,右側的瀏覽器的小圖標一直在旋轉,表示持續接收推送中,當在終端中進行docker操做的時候,所產生的事件就馬上出如今瀏覽器中了。若是請求/docker/events/oom
將只推送OOM事件,若是請求/docker/events/container/progrium_stress
將只推送來自容器progrium/stress的事件。
再次提醒,當capped 的 Collection中一條數據都沒有的時候,
@Tailable
的API也會馬上返回,因此須要等到數據庫中有至少一條數據以後(好比先執行如下pull),再在瀏覽器中請求docker/events
API。