(12)自定義數據流(實戰Docker事件推送的REST API)——響應式Spring的道法術器

本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor 3快速上手 | Spring WebFlux快速上手 | 響應式流規範
本文 測試源碼 | 實戰源碼前端

2.2 自定義數據流

這一小節介紹如何經過定義相應的事件(onNextonErroronComplete) 建立一個 Flux 或 Mono。Reactor提供了generatecreatepushhandle等方法,全部這些方法都使用 sink(池)來生成數據流。java

sink,顧名思義,就是池子,能夠想象一下廚房水池的樣子。以下圖所示:node

sink

下面介紹到的方法都有一個sink提供給方法使用者,一般至少會暴露三個方法給咱們,nexterrorcomplete。next和error至關於兩個下水口,咱們不斷將自定義的數據放到next口,Reactor就會幫咱們串成一個Publisher數據流,直到有一個錯誤數據放到error口,或按了一下complete按鈕,數據流就會終止了。react

2.2.1 generate

generate是一種同步地,逐個地發出數據的方法。由於它提供的sink是一個SynchronousSink, 並且其next()方法在每次回調的時候最多隻能被調用一次。git

generate方法有三種簽名:github

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

    public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) 

    public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)

1)使用SynchronousSink生成數據流web

@Test
    public void testGenerate1() {
        final AtomicInteger count = new AtomicInteger(1);   // 1
        Flux.generate(sink -> {
            sink.next(count.get() + " : " + new Date());   // 2
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (count.getAndIncrement() >= 5) {
                sink.complete();     // 3
            }
        }).subscribe(System.out::println);  // 4
    }
  1. 用於計數;
  2. 向「池子」放自定義的數據;
  3. 告訴generate方法,自定義數據已發完;
  4. 觸發數據流。

輸出結果爲每1秒鐘打印一下時間,共打印5次。docker

2)增長一個伴隨狀態數據庫

對於上邊的例子來講,count用於記錄狀態,當值達到5以後就中止計數。因爲在lambda內部使用,所以必須是final類型的,且不能是原生類型(如int)或不可變類型(如Integer)。api

若是使用第二個方法簽名,上邊的例子能夠這樣改:

@Test
    public void testGenerate2() {
        Flux.generate(
                () -> 1,    // 1
                (count, sink) -> {      // 2
                    sink.next(count + " : " + new Date());
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (count >= 5) {
                        sink.complete();
                    }
                    return count + 1;   // 3
                }).subscribe(System.out::println);
    }
  1. 初始化狀態值;
  2. 第二個參數是BiFunction,輸入爲狀態和sink;
  3. 每次循環都要返回新的狀態值給下次使用。

3)完成後處理

第三個方法簽名除了狀態、sink外,還有一個Consumer,這個Consumer在數據流發完後執行。

Flux.generate(
                () -> 1,
                (count, sink) -> {
                    sink.next(count + " : " + new Date());
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (count >= 5) {
                        sink.complete();
                    }
                    return count + 1;
                }, System.out::println)     // 1
                .subscribe(System.out::println);
    }
  1. 最後將count值打印出來。

若是 state 使用了數據庫鏈接或者其餘須要進行清理的資源,這個 Consumer lambda 能夠用來在最後完成資源清理任務。

2.2.2 create

create是一個更高級的建立Flux的方法,其生成數據流的方式既能夠是同步的,也能夠是異步的,而且還能夠每次發出多個元素。

create用到了FluxSink,後者一樣提供 next,error 和 complete 等方法。 與generate不一樣的是,create不須要狀態值,另外一方面,它能夠在回調中觸發多個事件(即便事件是發生在將來的某個時間)。

create 經常使用的場景就是將現有的 API 轉爲響應式,好比監聽器的異步方法。

先編寫一個事件源:

public class MyEventSource {

        private List<MyEventListener> listeners;

        public MyEventSource() {
            this.listeners = new ArrayList<>();
        }

        public void register(MyEventListener listener) {    // 1
            listeners.add(listener);
        }

        public void newEvent(MyEvent event) {
            for (MyEventListener listener :
                    listeners) {
                listener.onNewEvent(event);     // 2
            }
        }

        public void eventStopped() {
            for (MyEventListener listener :
                    listeners) {
                listener.onEventStopped();      // 3
            }
        }

        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class MyEvent {   // 4
            private Date timeStemp;
            private String message;
        }
    }
  1. 註冊監聽器;
  2. 向監聽器發出新事件;
  3. 告訴監聽器事件源已中止;
  4. 事件類,使用了lombok註解。

準備一個監聽器接口,它能夠監聽上邊第2和3的兩種事件:(1)新的MyEvent到來;(2)事件源中止。以下:

public interface MyEventListener {
        void onNewEvent(MyEventSource.MyEvent event);
        void onEventStopped();
    }

下面的測試方法邏輯是:建立一個監聽器註冊到事件源,這個監聽器再收到事件回調的時候經過Flux.create的sink將一系列事件轉換成異步的事件流:

@Test
    public void testCreate() throws InterruptedException {
        MyEventSource eventSource = new MyEventSource();    // 1
        Flux.create(sink -> {
                    eventSource.register(new MyEventListener() {    // 2
                        @Override
                        public void onNewEvent(MyEventSource.MyEvent event) {
                            sink.next(event);       // 3
                        }

                        @Override
                        public void onEventStopped() {
                            sink.complete();        // 4
                        }
                    });
                }
        ).subscribe(System.out::println);       // 5

        for (int i = 0; i < 20; i++) {  // 6
            Random random = new Random();
            TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
            eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i));  
        }
        eventSource.eventStopped(); // 7
    }
  1. 事件源;
  2. 向事件源註冊用匿名內部類建立的監聽器;
  3. 監聽器在收到事件回調的時候經過sink將事件再發出;
  4. 監聽器再收到事件源中止的回調的時候經過sink發出完成信號;
  5. 觸發訂閱(這時候尚未任何事件產生);
  6. 循環產生20個事件,每一個間隔不超過1秒的隨機時間;
  7. 最後中止事件源。

運行一下這個測試方法,20個MyEvent陸續打印出來。

若是將上邊的create方法換成generate方法,則會報出異常:

java.lang.IllegalStateException: The generator didn't call any of the SynchronousSink method

證實generate並不支持異步的方式。

create方法還有一個變體方法push,適合生成事件流。與 create相似,push 也能夠是異步地, 而且可以使用以上各類回壓策略。因此上邊的例子能夠替換爲push方法。區別在於,push方法中,調用nextcompleteerror的必須是同一個線程。

除了nextcompleteerror方法外,FluxSink還有onRequest方法,這個方法能夠用來響應下游訂閱者的請求事件。從而不只能夠像上一個例子那樣,上游在數據就緒的時候將其推送到下游,同時下游也能夠從上游拉取已經就緒的數據。這是一種推送/拉取混合的模式。好比:

Flux<String> bridge = Flux.create(sink -> {
        myMessageProcessor.register(
          new MyMessageListener<String>() {

            public void onMessage(List<String> messages) {
              for(String s : messages) {
                sink.next(s);   // 1
              }
            }
        });
        sink.onRequest(n -> {   // 2
            List<String> messages = myMessageProcessor.request(n);  // 3
            for(String s : message) {
               sink.next(s); 
            }
        });
        ...
    }
  1. push方式,主動向下游發出數據;
  2. 在下游發出請求時被調用;
  3. 響應下游的請求,查詢是否有可用的message。

2.2.3 實戰Docker事件推送API

Docker提供了一個用來監聽事件的命令:docker events,運行這個命令後,會監聽docker daemon的事件並打印出來,執行是持續進行的,就像top或前邊介紹的mongostat命令同樣。Docker的java開發包的DockerClient也提供了相應的API,這個API是基於回調的,所以咱們就可使用Reactor的create方法,將這個基於回調的API轉換爲響應式流,流中的數據就是一個一個的docker事件。以下圖所示:

title

1)測試DockerClient

首先,咱們先啓動docker。

而後,咱們繼續用第一章的webflux-demomaven項目模塊,在pom.xml中添加Docker開發相關的依賴:

<!--docker client begin-->
        <dependency>
            <groupId>com.github.docker-java</groupId>
            <artifactId>docker-java</artifactId>
            <version>3.0.14</version>
        </dependency>
        <dependency>
            <groupId>javax.ws.rs</groupId>
            <artifactId>javax.ws.rs-api</artifactId>
            <version>2.1</version>
        </dependency>
        <dependency>
            <groupId>org.glassfish.jersey.inject</groupId>
            <artifactId>jersey-hk2</artifactId>
            <version>2.26</version>
        </dependency>
        <!--docker client end-->

最後編寫測試方法:

public class DockerEventTest {
    @Test
    public void dockerEventToFlux() throws InterruptedException {
        collectDockerEvents().subscribe(System.out::println);   // 5
        TimeUnit.MINUTES.sleep(1);  // 6
    }

    private Flux<Event> collectDockerEvents() {
        DockerClient docker = DockerClientBuilder.getInstance().build();    // 1
        return Flux.create((FluxSink<Event> sink) -> {
            EventsResultCallback callback = new EventsResultCallback() {    // 2
                @Override
                public void onNext(Event event) {   // 3
                    sink.next(event);
                }
            };
            docker.eventsCmd().exec(callback);  // 4
        });
    }
}
  1. 建立DockerClient,默認會鏈接tcp://localhost:2375,2375是docker默認的端口號,能夠經過指定的IP和端口鏈接docker daemon:DockerClientBuilder.getInstance("tcp://192.168.0.123:2375").build(),不過要注意docker daemon監聽接口和防火牆的配置。
  2. 自定義回調類。
  3. 當有docker事件產生時,會回調onNext,這時候經過FluxSinknext方法將Event對象發出。
  4. 開始對docker事件進行監聽。
  5. 經過訂閱的方式打印出來。
  6. 主線程會馬上返回,所以等待1分鐘。

OK,看一下效果。

爲了方便對比,咱們首先在終端運行docker events命令,而後在另外一個終端進行docker操做,好比本例:

docker run -it -m 200M --memort-swap=200M progrium/stress --vm 1 --vm-bytes 300M

progrium/stress是一個用於壓力測試的容器,經過-m 200M指定爲該容器的運行最多分配200M內存,而後在壓力測試的時候,經過--vm-bytes 300M使其運行時嘗試分配300M的內存,此時會出現內存不足(OOM)的錯誤並致使容器被殺死(single 9)。

title

如圖所示,上方是分別運行兩個命令的終端窗口,能夠看到docker events命令打印出了一系列事件,若是是第一個運行progrium/stress應該回先有一個pull鏡像的事件。下方是咱們的測試代碼的輸出,除了一些日誌以外,能夠看到這些事件也被輸出了。

2)REST API推送到前端

下面,咱們更進一步將Event事件經過REST API推送到瀏覽器端,看過第1.3.3節的話,對這一起應該是輕車熟路了。

(一)首先定義一個咱們本身的DockerEvent,這一步不是必須的哈,不過DockerClient返回的Event自己字段比較多,一般前端展現的話會轉換爲dvo,「戲要作足」嘛,哈哈。

DockerEvent.java

@Data
@Document(collection = "docker-event")
public class DockerEvent {
    @Indexed
    private String status;
    @Id
    private String id;
    private String from;
    private Node node;
    private EventType type;
    private String action;
    private String actorId;
    private Long time;
    private Long timeNano;
}

(二)而後就是DAO層了,建立一個DockerEventMongoRepository,增長三個@Tailable的查詢方法,分別用於查詢所有、按照狀態查詢和按類型+名稱查詢(好比查詢某某容器的事件):

DockerEventMongoRepository.java

public interface DockerEventMongoRepository extends ReactiveMongoRepository<DockerEvent, String> {
    @Tailable
    Flux<DockerEvent> findBy();

    @Tailable
    Flux<DockerEvent> findByStatus(String status);

    @Tailable
    Flux<DockerEvent> findByTypeAndFrom(String type, String from);
}

(三)定義一個CommandLineRunner,用於在應用啓動後即開始監聽docker事件:

DockerEventsCollector.java

@Slf4j
@Component
public class DockerEventsCollector implements CommandLineRunner {

    private DockerEventMongoRepository dockerEventMongoRepository;
    private MongoTemplate mongo;    // 1

    public DockerEventsCollector(DockerEventMongoRepository dockerEventMongoRepository, MongoTemplate mongo) {  // 1
        this.dockerEventMongoRepository = dockerEventMongoRepository;
        this.mongo= mongo;
    }

    @Override
    public void run(String... args) {

        mongo.dropCollection(DockerEvent.class);    // 2
        mongo.createCollection(DockerEvent.class, CollectionOptions.empty().maxDocuments(200).size(100000).capped()); // 2

        dockerEventMongoRepository.saveAll(collect()).subscribe();  // 6
    }

    private Flux<DockerEvent> collect() {   // 3
        DockerClient docker = DockerClientBuilder.getInstance().build();

        return Flux.create((FluxSink<Event> sink) -> {
            EventsResultCallback callback = new EventsResultCallback() {
                @Override
                public void onNext(Event event) {
                    sink.next(event);
                }
            };
            docker.eventsCmd().exec(callback);
        })
                .map(this::trans)   // 4
                .doOnNext(e -> log.info(e.toString())); // 5
    }

    private DockerEvent trans(Event event) {    // 4
        DockerEvent dockerEvent = new DockerEvent();
        dockerEvent.setAction(event.getAction());
        dockerEvent.setActorId(Objects.requireNonNull(event.getActor()).getId());
        dockerEvent.setFrom(event.getFrom() == null ? null : event.getFrom().replace("//", "_"));
        dockerEvent.setId(UUID.randomUUID().toString());
        dockerEvent.setNode(event.getNode());
        dockerEvent.setStatus(event.getStatus());
        dockerEvent.setTime(event.getTime());
        dockerEvent.setTimeNano(event.getTimeNano());
        dockerEvent.setType(event.getType());
        return dockerEvent;
    }
}
  1. 這裏使用的是MongoTemplate,Spring 4.3 以後,若是有構造方法,Spring會自動注入,不須要@Autowired註解了。
  2. 每次啓動應用針對DockerEvent建立「capped」的collection,方便測試,若是提早手動建立好的話能夠不加這兩句。若是在//1處使用的是響應式的ReactiveMongoTemplate,由於是異步的,因此要用then()thenMany()將後續的全部操做鏈接起來,如mongo.dropCollection(...).then(mongo.createCollection(...)).thenMany(dockerEventMongoRepository.saveAll(collect())),保證能前後依次執行。
  3. 監聽docker事件的方法。
  4. 將返回的Event轉換爲咱們定義的DockerEvent,其中DockerEvent.from字段是事件主體名稱,好比容器名,可能有/,所以進行一個字符替換,不然在URL中會有問題。
  5. 打印個日誌(可選)。
  6. 將收集的DockerEvent保存到MongoDB,用subscribe()觸發執行。

(四)Service層沒有啥邏輯,咱們直接寫Controller:

DockerEventController.java

@Slf4j
@RestController
@RequestMapping(value = "/docker/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)    // 1
public class DockerEventController {
    private DockerEventMongoRepository dockerEventMongoRepository;

    public DockerEventController(DockerEventMongoRepository dockerEventMongoRepository) {
        this.dockerEventMongoRepository = dockerEventMongoRepository;
    }

    @GetMapping
    public Flux<DockerEvent> dockerEventStream() {  // 2
        return dockerEventMongoRepository.findBy();
    }

    @GetMapping("/{type}/{from}")
    public Flux<DockerEvent> dockerEventStream(@PathVariable("type") String type, @PathVariable("from") String from) {    // 3
        return dockerEventMongoRepository.findByTypeAndFrom(type, from);
    }

    @GetMapping("/{status}")
    public Flux<DockerEvent> dockerEventStream(@PathVariable String status) {   // 4
        return dockerEventMongoRepository.findByStatus(status);
    }
}

OK了,啓動試一下:

title

能夠看到,右側的瀏覽器的小圖標一直在旋轉,表示持續接收推送中,當在終端中進行docker操做的時候,所產生的事件就馬上出如今瀏覽器中了。若是請求/docker/events/oom將只推送OOM事件,若是請求/docker/events/container/progrium_stress將只推送來自容器progrium/stress的事件。

再次提醒,當capped 的 Collection中一條數據都沒有的時候,@Tailable的API也會馬上返回,因此須要等到數據庫中有至少一條數據以後(好比先執行如下pull),再在瀏覽器中請求docker/eventsAPI。

相關文章
相關標籤/搜索