WebFlux基礎之響應式編程

  上篇文章,咱們簡單的瞭解了WebFlux的一些基礎與背景,並經過示例來寫了一個demo。咱們知道WebFlux是響應式的web框架,其特色之一就是能夠經過函數式編程方式配置route。另外究竟什麼是響應式編程呢?這篇文章咱們就簡單探討一下html

1、Java8中的函數式編程

  百科中這樣定義函數式編程:java

  函數式編程是種編程方式,它將電腦運算視爲函數的計算。函數編程語言最重要的基礎是λ演算(lambda calculus),並且λ演算的函數能夠接受函數看成輸入(參數)和輸出(返回值)。那麼在Java8裏怎麼樣來實現它呢?web

示例一

在這裏我先本身寫一個例子編程

定義接口:app

package com.bdqn.lyrk.basic.java;

/**
 * 函數式接口
 *
 * @author chen.nie
 * @date 2018/7/18
 **/
@FunctionalInterface
public interface OperateNumberFunctions {

    void operate(Integer number);

    default void print() {
        
    }
}

     

  在定義的接口上添加@FunctionalInterface代表其是函數式接口,這個註解用於檢測函數式接口規範,定義函數式接口時該接口內必須有且只有一個抽象的方法。框架

定義類:less

package com.bdqn.lyrk.basic.java;

import java.util.Optional;
import java.util.function.Predicate;

/**
 * 定義函數式編程類
 */
public class NumberFunctions {

    private Integer number;

    private NumberFunctions() {
    }

    private static NumberFunctions numberFunctions = new NumberFunctions();

    public static NumberFunctions of(Integer number) {
        numberFunctions.number = number;
        return numberFunctions;
    }

    public NumberFunctions add(Integer number) {
        numberFunctions.number += number;
        return numberFunctions;
    }

    public NumberFunctions subtraction(Integer number) {
        numberFunctions.number -= number;
        return numberFunctions;
    }

    public Optional<NumberFunctions> filter(Predicate<Integer> predicate) {
        if (predicate.test(this.number)) return Optional.of(numberFunctions);
        return Optional.ofNullable(new NumberFunctions());

    }

    public void operate(OperateNumberFunctions functions) {
        functions.operate(this.number);
    }
}

 

  在這裏定義類進行簡單的運算與過濾條件。那麼在Main方法裏能夠這麼寫:異步

package com.bdqn.lyrk.basic.java;

public class Main {

    public static void main(String[] args) {
        NumberFunctions.of(10).add(30).subtraction(2).filter(number -> number>20).get().operate(System.out::println);
    }
}

  那麼輸出結果爲38編程語言

示例二

  在Java8裏有一個類叫Stream。Stream是數據流的意思,這個類略微有點像Reactor中Flux,它提供了相似於操做符的功能,咱們來看一個例子:ide

Main方法

package com.bdqn.lyrk.basic.java;

import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;

public class Main {

    public static void main(String[] args) {
        /*
            在這裏先將Stream裏的內容作乘2的操做
            而後在進行倒序排序
            緊接着過濾出是4的倍數的數字
            而後轉換成集合在打印
         */
        Stream.of(15, 26, 34, 455, 5, 6).map(number -> number * 2).sorted((num1, num2) -> num2 - num1).filter(integer -> integer % 4 == 0).collect(toList()).forEach(System.out::println);
    }
}

  運行獲得的結果:

68
52
12

關於::操做符

  該操做符是lambda表達式的更特殊寫法,使用此操做符能夠簡化函數式接口的實現,這個方法至少知足如下特定條件:

  1)方法返回值與函數式接口相同

  2)方法參數與函數式接口相同

  舉例說明

package java.util.function;

/**
 * Represents a supplier of results.
 *
 * <p>There is no requirement that a new or distinct result be returned each
 * time the supplier is invoked.
 *
 * <p>This is a <a href="package-summary.html">functional interface</a>
 * whose functional method is {@link #get()}.
 *
 * @param <T> the type of results supplied by this supplier
 *
 * @since 1.8
 */
@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

  java中Runnable接口:

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

  java中的Predicate接口:

package java.util.function;

import java.util.Objects;

/**
 * Represents a predicate (boolean-valued function) of one argument.
 *
 * <p>This is a <a href="package-summary.html">functional interface</a>
 * whose functional method is {@link #test(Object)}.
 *
 * @param <T> the type of the input to the predicate
 *
 * @since 1.8
 */
@FunctionalInterface
public interface Predicate<T> {

    /**
     * Evaluates this predicate on the given argument.
     *
     * @param t the input argument
     * @return {@code true} if the input argument matches the predicate,
     * otherwise {@code false}
     */
    boolean test(T t);

    /**
     * Returns a composed predicate that represents a short-circuiting logical
     * AND of this predicate and another.  When evaluating the composed
     * predicate, if this predicate is {@code false}, then the {@code other}
     * predicate is not evaluated.
     *
     * <p>Any exceptions thrown during evaluation of either predicate are relayed
     * to the caller; if evaluation of this predicate throws an exception, the
     * {@code other} predicate will not be evaluated.
     *
     * @param other a predicate that will be logically-ANDed with this
     *              predicate
     * @return a composed predicate that represents the short-circuiting logical
     * AND of this predicate and the {@code other} predicate
     * @throws NullPointerException if other is null
     */
    default Predicate<T> and(Predicate<? super T> other) {
        Objects.requireNonNull(other);
        return (t) -> test(t) && other.test(t);
    }

    /**
     * Returns a predicate that represents the logical negation of this
     * predicate.
     *
     * @return a predicate that represents the logical negation of this
     * predicate
     */
    default Predicate<T> negate() {
        return (t) -> !test(t);
    }

    /**
     * Returns a composed predicate that represents a short-circuiting logical
     * OR of this predicate and another.  When evaluating the composed
     * predicate, if this predicate is {@code true}, then the {@code other}
     * predicate is not evaluated.
     *
     * <p>Any exceptions thrown during evaluation of either predicate are relayed
     * to the caller; if evaluation of this predicate throws an exception, the
     * {@code other} predicate will not be evaluated.
     *
     * @param other a predicate that will be logically-ORed with this
     *              predicate
     * @return a composed predicate that represents the short-circuiting logical
     * OR of this predicate and the {@code other} predicate
     * @throws NullPointerException if other is null
     */
    default Predicate<T> or(Predicate<? super T> other) {
        Objects.requireNonNull(other);
        return (t) -> test(t) || other.test(t);
    }

    /**
     * Returns a predicate that tests if two arguments are equal according
     * to {@link Objects#equals(Object, Object)}.
     *
     * @param <T> the type of arguments to the predicate
     * @param targetRef the object reference with which to compare for equality,
     *               which may be {@code null}
     * @return a predicate that tests if two arguments are equal according
     * to {@link Objects#equals(Object, Object)}
     */
    static <T> Predicate<T> isEqual(Object targetRef) {
        return (null == targetRef)
                ? Objects::isNull
                : object -> targetRef.equals(object);
    }
}

那麼上述的接口分別可使用以下寫法,注意實現該接口的方法特色

package com.bdqn.lyrk.basic.java;

import java.util.function.Predicate;
import java.util.function.Supplier;

public class Main {
    private static int i;

    public static void main(String[] args) {

        /*
            建立對象的方式
         */
        Supplier<Object> supplier = Object::new;

        /*
            調用方法的方式(無參數)
         */
        Runnable runnable = Main::add;

        /*
            調用方法的方式(有參數)
         */
        Predicate<String> predicate = Main::filter;
    }

    public static void add() {
        i++;
        System.out.println("test" + i);
    }

    public static boolean filter(String test) {
        return test != null;
    }
}

咱們能夠看到使用函數式編程藉助於lambda表達式,使得代碼更簡潔清爽 

 

2、Java中的響應式編程

  關於響應式編程,百度百科是這麼定義的:

  簡稱RP(Reactive Programming)

  響應式編程是一種面向數據流和變化傳播的編程範式。這意味着能夠在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值經過數據流進行傳播。
  在這裏有兩個關鍵詞: 數據流與變化傳播。下面咱們來經過代碼來演示下響應式編程是怎麼回事

 Java8及之前版本

  最典型的示例就是,JDK提供的觀察者模式類Observer與Observalbe:

package com.hzgj.lyrk.demo;

import java.util.Observable;

public class ObserverDemo extends Observable {

    public static void main(String[] args) {
        ObserverDemo observable = new ObserverDemo();
        observable.addObserver((o, arg) -> {
            System.out.println("發生變化");
        });
        observable.addObserver((o, arg) -> {
            System.out.println("收到被觀察者通知,準備改變");
        });
        observable.setChanged();
        observable.notifyObservers();
    }
}

 

  在上述代碼示例中觀察者並無及時執行,而是在接受到被觀察者發送信號的時候纔有了「響應」。其中setChanged()與notifyObservers方法就對應響應式編程中定義的關鍵詞--變化與傳播。還有一個典型的示例就是Swing中的事件機制,有興趣的朋友能夠下去查閱相關資料,在這裏就再也不進行闡述。

 Java9及其後版本

  從java9開始,Observer與Observable已經被標記爲過期的類了,取而代之的是Flow類。Flow纔是真正意義上的響應式編程類,由於觀察者Observer與Observable雖然可以響應,可是在數據流的體現並非特別突出。Flow這個類,咱們能夠先看一下:

  

public final class Flow {

    private Flow() {} // uninstantiable

    /**
     * A producer of items (and related control messages) received by
     * Subscribers.  Each current {@link Subscriber} receives the same
     * items (via method {@code onNext}) in the same order, unless
     * drops or errors are encountered. If a Publisher encounters an
     * error that does not allow items to be issued to a Subscriber,
     * that Subscriber receives {@code onError}, and then receives no
     * further messages.  Otherwise, when it is known that no further
     * messages will be issued to it, a subscriber receives {@code
     * onComplete}.  Publishers ensure that Subscriber method
     * invocations for each subscription are strictly ordered in <a
     * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
     * order.
     *
     * <p>Publishers may vary in policy about whether drops (failures
     * to issue an item because of resource limitations) are treated
     * as unrecoverable errors.  Publishers may also vary about
     * whether Subscribers receive items that were produced or
     * available before they subscribed.
     *
     * @param <T> the published item type
     */
    @FunctionalInterface
    public static interface Publisher<T> {
        /**
         * Adds the given Subscriber if possible.  If already
         * subscribed, or the attempt to subscribe fails due to policy
         * violations or errors, the Subscriber's {@code onError}
         * method is invoked with an {@link IllegalStateException}.
         * Otherwise, the Subscriber's {@code onSubscribe} method is
         * invoked with a new {@link Subscription}.  Subscribers may
         * enable receiving items by invoking the {@code request}
         * method of this Subscription, and may unsubscribe by
         * invoking its {@code cancel} method.
         *
         * @param subscriber the subscriber
         * @throws NullPointerException if subscriber is null
         */
        public void subscribe(Subscriber<? super T> subscriber);
    }

    /**
     * A receiver of messages.  The methods in this interface are
     * invoked in strict sequential order for each {@link
     * Subscription}.
     *
     * @param <T> the subscribed item type
     */
    public static interface Subscriber<T> {
        /**
         * Method invoked prior to invoking any other Subscriber
         * methods for the given Subscription. If this method throws
         * an exception, resulting behavior is not guaranteed, but may
         * cause the Subscription not to be established or to be cancelled.
         *
         * <p>Typically, implementations of this method invoke {@code
         * subscription.request} to enable receiving items.
         *
         * @param subscription a new subscription
         */
        public void onSubscribe(Subscription subscription);

        /**
         * Method invoked with a Subscription's next item.  If this
         * method throws an exception, resulting behavior is not
         * guaranteed, but may cause the Subscription to be cancelled.
         *
         * @param item the item
         */
        public void onNext(T item);

        /**
         * Method invoked upon an unrecoverable error encountered by a
         * Publisher or Subscription, after which no other Subscriber
         * methods are invoked by the Subscription.  If this method
         * itself throws an exception, resulting behavior is
         * undefined.
         *
         * @param throwable the exception
         */
        public void onError(Throwable throwable);

        /**
         * Method invoked when it is known that no additional
         * Subscriber method invocations will occur for a Subscription
         * that is not already terminated by error, after which no
         * other Subscriber methods are invoked by the Subscription.
         * If this method throws an exception, resulting behavior is
         * undefined.
         */
        public void onComplete();
    }

    /**
     * Message control linking a {@link Publisher} and {@link
     * Subscriber}.  Subscribers receive items only when requested,
     * and may cancel at any time. The methods in this interface are
     * intended to be invoked only by their Subscribers; usages in
     * other contexts have undefined effects.
     */
    public static interface Subscription {
        /**
         * Adds the given number {@code n} of items to the current
         * unfulfilled demand for this subscription.  If {@code n} is
         * less than or equal to zero, the Subscriber will receive an
         * {@code onError} signal with an {@link
         * IllegalArgumentException} argument.  Otherwise, the
         * Subscriber will receive up to {@code n} additional {@code
         * onNext} invocations (or fewer if terminated).
         *
         * @param n the increment of demand; a value of {@code
         * Long.MAX_VALUE} may be considered as effectively unbounded
         */
        public void request(long n);

        /**
         * Causes the Subscriber to (eventually) stop receiving
         * messages.  Implementation is best-effort -- additional
         * messages may be received after invoking this method.
         * A cancelled subscription need not ever receive an
         * {@code onComplete} or {@code onError} signal.
         */
        public void cancel();
    }

    /**
     * A component that acts as both a Subscriber and Publisher.
     *
     * @param <T> the subscribed item type
     * @param <R> the published item type
     */
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }

    static final int DEFAULT_BUFFER_SIZE = 256;

    /**
     * Returns a default value for Publisher or Subscriber buffering,
     * that may be used in the absence of other constraints.
     *
     * @implNote
     * The current value returned is 256.
     *
     * @return the buffer size value
     */
    public static int defaultBufferSize() {
        return DEFAULT_BUFFER_SIZE;
    }

}

  Flow這個類裏定義最基本的Publisher與Subscribe,該模式就是發佈訂閱模式。咱們來看一下代碼示例:

package com.hzgj.lyrk.demo;

import java.util.concurrent.Flow;

public class Main {

    public static void main(String[] args) {
        Flow.Publisher<String> publisher = subscriber -> {
            subscriber.onNext("1"); // 1
            subscriber.onNext("2");
            subscriber.onError(new RuntimeException("出錯")); // 2
            //  subscriber.onComplete();
        };
        publisher.subscribe(new Flow.Subscriber<>() {
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscription.cancel();
            }

            @Override
            public void onNext(String item) {
                System.out.println(item);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("出錯了");
            }

            @Override
            public void onComplete() {
                System.out.println("publish complete");
            }
        });
    }
}

   代碼1 是一種數據流的體現,在Publisher中每次調用onNext的時候,在中都會在Subscribe的onNext方法進行消費

   代碼2 一樣是發送錯誤信號,等待訂閱者進行消費

   運行結果:

1
2
出錯了

  在上述代碼中咱們能夠發現:Publisher在沒有被訂閱的時候,是不會觸發任何行爲的。每次調用Publisher的onNext方法的時候都像是在發信號,訂閱者收到信號時執行相關內容,這就是典型的響應式編程的案例。不過java9提供的這個功能對異步的支持不太好,也不夠強大。所以纔會出現Reactor與RxJava等響應式框架

相關文章
相關標籤/搜索