聊聊reactive streams的processors

本文主要研究一下reactive streams的processorsjava

processors分類

processors既是Publisher也是Subscriber。在project reactor中processor有諸多實現,他們的分類大體以下:react

  • direct(DirectProcessor以及UnicastProcessor)
  • synchronous(EmitterProcessor及ReplayProcessor)
  • asynchronous(TopicProcessor及WorkQueueProcessor)

direct

DirectProcessor

它不支持backpressure特性,若是publisher發佈了N個數據,若是其中一個subscriber請求數<N,則拋出IllegalStateException.緩存

@Test
    public void testDirectProcessor(){
        DirectProcessor<Integer> directProcessor = DirectProcessor.create();
        Flux<Integer> flux = directProcessor
                .filter(e -> e % 2 == 0)
                .map(e -> e +1);
        flux.subscribe(new Subscriber<Integer>() {
            private Subscription s;
            @Override
            public void onSubscribe(Subscription s) {
                this.s = s;
//                s.request(2);
            }

            @Override
            public void onNext(Integer integer) {
                LOGGER.info("subscribe:{}",integer);
            }

            @Override
            public void onError(Throwable t) {
                LOGGER.error(t.getMessage(),t);
            }

            @Override
            public void onComplete() {

            }
        });

        IntStream.range(1,20)
                .forEach(e -> {
                    directProcessor.onNext(e);
                });

        directProcessor.onComplete();
        directProcessor.blockLast();
    }

輸出以下數據結構

16:00:11.201 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
16:00:11.216 [main] ERROR com.example.demo.ProcessorTest - Can't deliver value due to lack of requests
reactor.core.Exceptions$OverflowException: Can't deliver value due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:304)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:106)
    at com.example.demo.ProcessorTest.lambda$testDirectProcessor$5(ProcessorTest.java:82)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)
    at com.example.demo.ProcessorTest.testDirectProcessor(ProcessorTest.java:81)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

UnicastProcessor

  • 支持backpressure特性,可是代價是至多隻能有一個subscriber。默認是無界的,若是發佈數據以後,subscriber還沒來得及request,則它會把數據緩存下來。
  • 若是設置了一個有界的queue,當buffer滿並且subscriber沒有發送足夠多的request的時候,processor會拒絕推送數據。在這種場景下,processor內置了一個callback,每當一個element被rejected的時候會觸發.
@Test
    public void testUnicastProcessor() throws InterruptedException {
        UnicastProcessor<Integer> unicastProcessor = UnicastProcessor.create(Queues.<Integer>get(8).get());
        Flux<Integer> flux = unicastProcessor
                .map(e -> e)
                .doOnError(e -> {
                    LOGGER.error(e.getMessage(),e);
                });

        IntStream.rangeClosed(1,12)
                .forEach(e -> {
                    LOGGER.info("emit:{}",e);
                    unicastProcessor.onNext(e);
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        LOGGER.info("begin to sleep 7 seconds");
        TimeUnit.SECONDS.sleep(7);
        //UnicastProcessor allows only a single Subscriber
        flux.subscribe(e -> {
            LOGGER.info("flux subscriber:{}",e);
        });

        unicastProcessor.onComplete();
        TimeUnit.SECONDS.sleep(10);
//        unicastProcessor.blockLast(); //blockLast也是一個subscriber
    }

輸出實例多線程

16:31:04.970 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
16:31:04.977 [main] INFO com.example.demo.ProcessorTest - emit:1
16:31:05.990 [main] INFO com.example.demo.ProcessorTest - emit:2
16:31:06.991 [main] INFO com.example.demo.ProcessorTest - emit:3
16:31:07.994 [main] INFO com.example.demo.ProcessorTest - emit:4
16:31:08.998 [main] INFO com.example.demo.ProcessorTest - emit:5
16:31:10.002 [main] INFO com.example.demo.ProcessorTest - emit:6
16:31:11.007 [main] INFO com.example.demo.ProcessorTest - emit:7
16:31:12.010 [main] INFO com.example.demo.ProcessorTest - emit:8
16:31:13.014 [main] INFO com.example.demo.ProcessorTest - emit:9
16:31:14.029 [main] INFO com.example.demo.ProcessorTest - emit:10
16:31:14.030 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 10
16:31:15.034 [main] INFO com.example.demo.ProcessorTest - emit:11
16:31:15.034 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 11
16:31:16.038 [main] INFO com.example.demo.ProcessorTest - emit:12
16:31:16.038 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 12
16:31:17.043 [main] INFO com.example.demo.ProcessorTest - begin to sleep 7 seconds
16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:1
16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:2
16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:3
16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:4
16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:5
16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:6
16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:7
16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:8
16:31:24.058 [main] ERROR com.example.demo.ProcessorTest - The receiver is overrun by more signals than expected (bounded queue...)
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:202)
    at reactor.core.publisher.UnicastProcessor.onNext(UnicastProcessor.java:330)
    at com.example.demo.ProcessorTest.lambda$testUnicastProcessor$8(ProcessorTest.java:108)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)

synchronous

EmitterProcessor

  • 可以支持多個subscriber,同時還對每一個subscriber支持backpressure。它也能夠訂閱publisher,而後把數據同步重放。
  • 它有一個bufferSize參數,用來在發佈數據以後尚未訂閱者期間的數據,onNext會一直阻塞直到數據被消費;當第一個訂閱者訂閱以後,它會接收到buffer裏頭的數據,然後續的訂閱者就只能消費到自他們訂閱那個時候起發佈的數據。
  • 當全部的subscriber都取消訂閱以後,該processor會清空buffer,並中止接收新的訂閱。
@Test
    public void testEmitterProcessor() throws InterruptedException {
        int bufferSize = 3; //小於8的會被重置爲8
        FluxProcessor<Integer, Integer> processor = EmitterProcessor.create(bufferSize);
        Flux<Integer> flux1 = processor.map(e -> e);
        Flux<Integer> flux2 = processor.map(e -> e*10);

        IntStream.rangeClosed(1,8).forEach(e -> {
            LOGGER.info("emit:{}",e);
            processor.onNext(e); //若是發佈的未消費數據超過bufferSize,則會阻塞在這裏
        });

        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
        });

        IntStream.rangeClosed(9,10).forEach(e -> {
            LOGGER.info("emit:{}",e);
            processor.onNext(e);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        });

        //這個是後面添加的訂閱,只能消費以後發佈的數據
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });

        processor.onNext(11);
        processor.onNext(12);

        processor.onComplete();
        processor.blockLast();
    }

輸出實例併發

17:27:01.008 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
17:27:01.044 [main] INFO com.example.demo.ProcessorTest - emit:1
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:2
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:3
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:4
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:5
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:6
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:7
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:8
17:27:01.086 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:1
17:27:01.086 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:2
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:3
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:4
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:5
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:6
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:7
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:8
17:27:01.088 [main] INFO com.example.demo.ProcessorTest - emit:9
17:27:01.088 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:9
17:27:02.091 [main] INFO com.example.demo.ProcessorTest - emit:10
17:27:02.092 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:10
17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:11
17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:110
17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:12
17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:120

ReplayProcessor

能夠緩存經過sink產生的數據或者訂閱publisher的數據,而後重放給後來的訂閱者。有以下四種配置app

  • cacheLast
只緩存最後一個數據
  • create(int)
緩存最後N個數據
  • createTimeout(Duration)
對每一個數據打上時間戳標籤,只緩存age在指定ttl內的數據
  • createSizeOrTimeout(int,Duration)
對每一個數據打上時間戳標籤,只緩存age在指定ttl內的N個數據

實例異步

@Test
    public void testReplayProcessor() throws InterruptedException {
        ReplayProcessor<Integer> replayProcessor = ReplayProcessor.create(3);
        Flux<Integer> flux1 = replayProcessor
                .map(e -> e);
        Flux<Integer> flux2 = replayProcessor
                .map(e -> e);

        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
        });


        IntStream.rangeClosed(1,5)
                .forEach(e -> {
                    replayProcessor.onNext(e);
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });

        LOGGER.info("finish publish data");
        TimeUnit.SECONDS.sleep(3);

        LOGGER.info("begin to subscribe flux2");
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });

        replayProcessor.onComplete();
        replayProcessor.blockLast();
    }

輸出以下async

15:13:39.415 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:13:39.438 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:1
15:13:40.445 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:2
15:13:41.449 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:3
15:13:42.454 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:4
15:13:43.459 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:5
15:13:44.463 [main] INFO com.example.demo.ProcessorTest - finish publish data
15:13:47.466 [main] INFO com.example.demo.ProcessorTest - begin to subscribe flux2
15:13:47.467 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:3
15:13:47.467 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:4
15:13:47.468 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:5

asynchronous

TopicProcessor

  • TopicProcessor是一個異步的processor,當shared設置爲true的時候,支持對多個publisher的併發重放。若是訂閱的publisher是一個併發的stream或者是須要併發調用Topicrocessor的onNext,onCompleete,onError方法,則必須強制開啓share。關閉share則是遵循reactive streams規範的processor,不容許併發調用。
  • TopicProcessor也支持把消息廣播(fan-out)到多個subscriber,它給每一個subscriber綁定一個線程。可以支持的subscriber的最大個數由線程池executor限制。
  • TopicProcessor使用了RingBuffer數據結構來推送數據,每一個subscriber線程都在RingBuffer記錄其消費的位置
  • TopicProcessor也支持autoCancel選項,默認爲true,也就是當全部subscriber都取消訂閱的時候,publisher也會被自動cannel
@Test
    public void testTopicProcessor() throws InterruptedException {
        TopicProcessor<Integer> topicProcessor = TopicProcessor.<Integer>builder()
                .share(true)
//                .executor(Executors.newSingleThreadExecutor())
                .build();
        Flux<Integer> flux1 = topicProcessor
                .map(e -> e);
        Flux<Integer> flux2 = topicProcessor
                .map(e -> e);
        Flux<Integer> flux3 = topicProcessor
                .map(e -> e);

        AtomicInteger count = new AtomicInteger(0);
        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
            count.incrementAndGet();
        });
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });
        flux3.subscribe(e -> {
            LOGGER.info("flux3 subscriber:{}",e);
        });

        IntStream.rangeClosed(1,100)
                .parallel()
                .forEach(e -> {
//                    LOGGER.info("emit:{}",e);
                    topicProcessor.onNext(e);
                });

        topicProcessor.onComplete();
        topicProcessor.blockLast();

        TimeUnit.SECONDS.sleep(10);
        System.out.println(count.get());
    }

注意兩個地方:ide

  • share
share背後設置的是EventLoopProcessor的multiproducers屬性
reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/EventLoopProcessor.java
EventLoopProcessor(
            int bufferSize,
            @Nullable ThreadFactory threadFactory,
            @Nullable ExecutorService executor,
            ExecutorService requestExecutor,
            boolean autoCancel,
            boolean multiproducers,
            Supplier<Slot<IN>> factory,
            WaitStrategy strategy) {

        if (!Queues.isPowerOfTwo(bufferSize)) {
            throw new IllegalArgumentException("bufferSize must be a power of 2 : " + bufferSize);
        }

        if (bufferSize < 1){
            throw new IllegalArgumentException("bufferSize must be strictly positive, " +
                    "was: "+bufferSize);
        }

        this.autoCancel = autoCancel;

        contextClassLoader = new EventLoopContext(multiproducers);

        this.name = defaultName(threadFactory, getClass());

        this.requestTaskExecutor = Objects.requireNonNull(requestExecutor, "requestTaskExecutor");

        if (executor == null) {
            this.executor = Executors.newCachedThreadPool(threadFactory);
        }
        else {
            this.executor = executor;
        }

        if (multiproducers) {
            this.ringBuffer = RingBuffer.createMultiProducer(factory,
                    bufferSize,
                    strategy,
                    this);
        }
        else {
            this.ringBuffer = RingBuffer.createSingleProducer(factory,
                    bufferSize,
                    strategy,
                    this);
        }
    }
若是share爲true,則建立的是createMultiProducer.
具體的表象就是若是有多線程調用processor的onNext方法,而沒有開啓share的話,會有併發問題,即數據會丟失.好比上面的代碼,若是註釋掉share(true),則最後count的大小就不必定是100,而開啓share爲true就能保證最後count的大小是100

若是設置executor(Executors.newSingleThreadExecutor()),則flux1,flux2,flux3的訂閱者則是順序執行,而不是併發的.

WorkQueueProcessor

  • WorkQueueProcessor也是一個異步的processor,當shared設置爲true的時候,支持對多個publisher的併發重放。
  • WorkQueueProcessor使用了RingBuffer數據結構來推送數據。
  • WorkQueueProcessor不是每來一個subscriber就給其建立一個線程,所以比TopicProcessor的伸縮性更好一點。可以支持的subscriber的最大個數由線程池executor限制。可是值得注意的是最好不要給WorkQueueProcessor添加過多的subscriber,這樣會增長processor的鎖競爭。最好使用ThreadPoolExecutor或者ForkJoinPool,processor能夠檢測他們的容量而後再訂閱者過多的時候拋出異常。
  • WorkQueueProcessor不遵循reactive streams的規範,所以比TopicProcessor所消耗的資源更少。做爲trade-off,全部subscriber的request會累加在一塊兒,而後WorkQueueProcessor每次只給一個subscriber重放數據,相比於TopicProcessorde fan-out廣播模式,它相似於round-robin模式,可是公平的round-robin模式是不被保證的。
@Test
    public void testWorkQueueProcessor(){
        WorkQueueProcessor<Integer> workQueueProcessor = WorkQueueProcessor.create();
        Flux<Integer> flux1 = workQueueProcessor
                .map(e -> e);
        Flux<Integer> flux2 = workQueueProcessor
                .map(e -> e);
        Flux<Integer> flux3 = workQueueProcessor
                .map(e -> e);

        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
        });
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });
        flux3.subscribe(e -> {
            LOGGER.info("flux3 subscriber:{}",e);
        });

        IntStream.range(1,20)
                .forEach(e -> {
                    workQueueProcessor.onNext(e);
                });

        workQueueProcessor.onComplete();
        workQueueProcessor.blockLast();
    }

輸出實例

21:56:58.203 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:56:58.214 [main] DEBUG reactor.core.publisher.UnsafeSupport - Starting UnsafeSupport init in Java 1.8
21:56:58.215 [main] DEBUG reactor.core.publisher.UnsafeSupport - Unsafe is available
21:56:58.228 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:1
21:56:58.228 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:3
21:56:58.228 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:2
21:56:58.229 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:4
21:56:58.229 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:5
21:56:58.229 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:6
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:7
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:8
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:9
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:10
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:11
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:12
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:13
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:14
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:15
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:17
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:16
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:19
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:18
能夠看到WorkQueueProcessor的subscriber就相似kafka的同屬於一個group的consumer,各自消費的消息總和就是publisher發佈的總消息,不像TopicProcessor那種廣播式的消息傳遞.

doc

相關文章
相關標籤/搜索