【Android】Fresco圖片加載框架(二)————Producer

/**
* 本文能夠隨意轉載到任何網站或者App,
* BUT
* 轉載也要按「基本法」,
* 請註明原文出處和做者
*/

 

 

官方源碼地址html

 
fresco官方高大上介紹(1)(注意:前方有堵牆)
fresco官方高大上介紹 (2)(注意:前方有堵牆)
 
介紹:
上一篇大概介紹了fresco這個lib的總體結構和流程,這篇主要介紹 fresco中關鍵的一部分-- Producer
 
我的以爲, Producer基本是整個 ImagePipeline module的核心,串聯了整個圖片讀取的流程和各個細節(decode,resize等等)的處理,並且感受整個設計上頗有意思,讀完感受收益匪淺。
 
 
正文:
(分析主要是代碼,至關枯燥~~~sigh)
 
以一次網絡請求的例,進行分析,其餘類型的請求,例如從cache中讀取圖片等,都差很少。
 
SimpleDraweeView#setController後,圖片拉取的流程就開始了(詳細流程能夠參看上一篇的流程圖)。忽略掉大部分細節,流程會來到( PipelineDraweeControllerBuilder.java):
 
  @Override
  protected DataSource<CloseableReference<CloseableImage>> getDataSourceForRequest(
          ImageRequest imageRequest,
          Object callerContext,
          boolean bitmapCacheOnly) {
    if (bitmapCacheOnly) {
      return mImagePipeline.fetchImageFromBitmapCache(imageRequest, callerContext);
    } else {
      return mImagePipeline.fetchDecodedImage(imageRequest, callerContext);
    }
  }

 

 
這裏就是開始圖片拉取,轉入到 ImagePipeline核心流程。這裏就是調用的 ImagePipelinefetchDecodeImage方法,從名字看,意思就是「獲取一張decode(解碼)的圖片」,其代碼( ImagePipeline.java):
 
  /**
   * Submits a request for execution and returns a DataSource representing the pending decoded
   * image(s).
   * <p>The returned DataSource must be closed once the client has finished with it.
   * @param imageRequest the request to submit
   * @return a DataSource representing the pending decoded image(s)
   */
  public DataSource<CloseableReference<CloseableImage>> fetchDecodedImage(
          ImageRequest imageRequest,
          Object callerContext) {
    try {
      Producer<CloseableReference<CloseableImage>> producerSequence =
              mProducerSequenceFactory.getDecodedImageProducerSequence(imageRequest);
      return submitFetchRequest(
              producerSequence,
              imageRequest,
              ImageRequest.RequestLevel.FULL_FETCH,
              callerContext);
    } catch (Exception exception) {
      return DataSources.immediateFailedDataSource(exception);
    }
  }

 

 
這裏有兩個主要的函數: getDecodedImageProducerSequencesubmitFetchRequest
 
getDecodedImageProducerSequence的返回值就是一個 Producer<CloseableReference<CloseableImage>>。該值做爲參數,傳到第二個函數 submitFetchRequest中,先看 submitFetchRequest
 
  private <T> DataSource<CloseableReference<T>> submitFetchRequest(
          Producer<CloseableReference<T>> producerSequence,
          ImageRequest imageRequest,
          ImageRequest.RequestLevel lowestPermittedRequestLevelOnSubmit,
          Object callerContext) {
    try {
      ImageRequest.RequestLevel lowestPermittedRequestLevel =
              ImageRequest.RequestLevel.getMax(
                      imageRequest.getLowestPermittedRequestLevel(),
                      lowestPermittedRequestLevelOnSubmit);
      SettableProducerContext settableProducerContext = new SettableProducerContext(
              imageRequest,
              generateUniqueFutureId(),
              mRequestListener,
              callerContext,
              lowestPermittedRequestLevel,
/* isPrefetch */ false,
              imageRequest.getProgressiveRenderingEnabled() ||
                      !UriUtil.isNetworkUri(imageRequest.getSourceUri()),
              imageRequest.getPriority());
      return CloseableProducerToDataSourceAdapter.create(
              producerSequence,
              settableProducerContext,
              mRequestListener);
    } catch (Exception exception) {
      return DataSources.immediateFailedDataSource(exception);
    }
  }

 

 
而後上面的create函數,就會一直到( AbstractProducerToDataSourceAdapter.java):
 
 
  protected AbstractProducerToDataSourceAdapter(
          Producer<T> producer,
          SettableProducerContext settableProducerContext,
          RequestListener requestListener) {
    mSettableProducerContext = settableProducerContext;
    mRequestListener = requestListener;
    mRequestListener.onRequestStart(
            settableProducerContext.getImageRequest(),
            mSettableProducerContext.getCallerContext(),
            mSettableProducerContext.getId(),
            mSettableProducerContext.isPrefetch());
    producer.produceResults(createConsumer(), settableProducerContext);
  }

 

 
到最後, submitFetchRequest會調用到 Producer#produceResults方法,而這個producer就是前面那個 getDecodedImageProducerSequence方法產生的,因此回頭看這個 最最關鍵的地方
 
getDecodedImageProducerSequenceProducerSequenceFactory.java的方法:
 
  /**
   * Returns a sequence that can be used for a request for a decoded image.
   *
   * @param imageRequest the request that will be submitted
   * @return the sequence that should be used to process the request
   */
  public Producer<CloseableReference<CloseableImage>> getDecodedImageProducerSequence(
          ImageRequest imageRequest) {
    Producer<CloseableReference<CloseableImage>> pipelineSequence = getBasicDecodedImageSequence(imageRequest);
    if (imageRequest.getPostprocessor() != null) {
      return getPostprocessorSequence(pipelineSequence);
    } else {
      return pipelineSequence;
    }
  }

 

 
從註釋看,方法的意思就是返回一個用於請求decoded圖片的sequence,而事實上,應該是返回一個Producer纔對啊,
那爲何是強調是sequence Producer,而不是,僅僅就是一個Producer?
 
帶着疑問繼續看:
 
 
private Producer<CloseableReference<CloseableImage>> getBasicDecodedImageSequence(
          ImageRequest imageRequest) {
    Preconditions.checkNotNull(imageRequest);

    Uri uri = imageRequest.getSourceUri();
    Preconditions.checkNotNull(uri, "Uri is null.");
    if (UriUtil.isNetworkUri(uri)) {
      return getNetworkFetchSequence();
    } else if (UriUtil.isLocalFileUri(uri)) {
      if (MediaUtils.isVideo(MediaUtils.extractMime(uri.getPath()))) {
        return getLocalVideoFileFetchSequence();
      } else {
        return getLocalImageFileFetchSequence();
      }
    } else if (UriUtil.isLocalContentUri(uri)) {
      return getLocalContentUriFetchSequence();
    } else if (UriUtil.isLocalAssetUri(uri)) {
      return getLocalAssetFetchSequence();
    } else if (UriUtil.isLocalResourceUri(uri)) {
      return getLocalResourceFetchSequence();
    } else if (UriUtil.isDataUri(uri)) {
      return getDataFetchSequence();
    } else {
      String uriString = uri.toString();
      if (uriString.length() > 30) {
        uriString = uriString.substring(0, 30) + "...";
      }
      throw new RuntimeException("Unsupported uri scheme! Uri is: " + uriString);
    }
  }

 

看代碼可知道,就是根據 ImageRequestUri,選擇一個sequence Producer,咱們這裏假設是網絡請求圖片,因此選擇的是圖中紅色方法 getNetworkFetchSequence它也是返回一個Producer (可粗略看)
 
  /**
   * swallow result if prefetch -> bitmap cache get ->
   * background thread hand-off -> multiplex -> bitmap cache -> decode -> multiplex ->
   * encoded cache -> disk cache -> (webp transcode) -> network fetch.
   */
  private synchronized Producer<CloseableReference<CloseableImage>> getNetworkFetchSequence() {
    if (mNetworkFetchSequence == null) {
      mNetworkFetchSequence = newBitmapCacheGetToDecodeSequence(getCommonNetworkFetchToEncodedMemorySequence());
    }
    return mNetworkFetchSequence;
  }

 

 
先看紅色代碼, getCommonNetworkFetchToEncodedMemorySequence它也是返回一個Producer (可粗略看)
 
  /**
   * multiplex -> encoded cache -> disk cache -> (webp transcode) -> network fetch.
   */
  private synchronized Producer<EncodedImage> getCommonNetworkFetchToEncodedMemorySequence() {
    if (mCommonNetworkFetchToEncodedMemorySequence == null) {
      Producer<EncodedImage> inputProducer = newEncodedCacheMultiplexToTranscodeSequence(
                      mProducerFactory.newNetworkFetchProducer(mNetworkFetcher));
      mCommonNetworkFetchToEncodedMemorySequence =
              ProducerFactory.newAddImageTransformMetaDataProducer(inputProducer);

      if (mResizeAndRotateEnabledForNetwork && !mDownsampleEnabled) {
        mCommonNetworkFetchToEncodedMemorySequence =
                mProducerFactory.newResizeAndRotateProducer(
                        mCommonNetworkFetchToEncodedMemorySequence);
      }
    }
    return mCommonNetworkFetchToEncodedMemorySequence;
  }

 

 
再看, newEncodedCacheMultiplexToTranscodeSequence它也是返回一個Producer (可粗略看)
 
 
  /**
   * encoded cache multiplex -> encoded cache -> (disk cache) -> (webp transcode)
   * @param inputProducer producer providing the input to the transcode
   * @return encoded cache multiplex to webp transcode sequence
   */
  private Producer<EncodedImage> newEncodedCacheMultiplexToTranscodeSequence(
          Producer<EncodedImage> inputProducer) {
    if (Build.VERSION.SDK_INT < Build.VERSION_CODES.JELLY_BEAN_MR2) {
      inputProducer = mProducerFactory.newWebpTranscodeProducer(inputProducer);
    }
    inputProducer = mProducerFactory.newDiskCacheProducer(inputProducer);
    EncodedMemoryCacheProducer encodedMemoryCacheProducer =
            mProducerFactory.newEncodedMemoryCacheProducer(inputProducer);
    return mProducerFactory.newEncodedCacheKeyMultiplexProducer(encodedMemoryCacheProducer);
  }

 

 
選其中一個紅色函數看,newDiskCacheProducer, 它也是返回一個Producer(可粗略看)
 
  public DiskCacheProducer newDiskCacheProducer(
          Producer<EncodedImage> inputProducer) {
    return new DiskCacheProducer(
            mDefaultBufferedDiskCache,
            mSmallImageBufferedDiskCache,
            mCacheKeyFactory,
            inputProducer);
  }

 

 
好了,到此爲止(列出的函數足夠多了,暈~~~)
 
其實上面,一連串幾個函數,若是有細心的留意,它們有一個特色,就是, 每個函數)都以 (上一個函數)產生的Producer做爲參數進行傳遞
 
 
這樣的設計,是否是有似曾相識的感受,看下面的代碼應該就可以瞭解得更深:
 
FileInputStream fileInputStream = new FileInputStream("/test.txt");
 
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream);
 
BufferedReader bufferedReader = new BufferedReader(inputSteamReader);
 
很熟悉了吧,能夠把sequence Producer就當作是上面這樣的一個邏輯~~~
 
那麼這樣作的做用是什麼?咱們選一個簡單Producer來分析,就以 DiskCacheProducer爲例 (留意代碼綠色註釋的描述)
 
 public DiskCacheProducer(
          BufferedDiskCache defaultBufferedDiskCache,
          BufferedDiskCache smallImageBufferedDiskCache,
          CacheKeyFactory cacheKeyFactory,
          Producer<EncodedImage> inputProducer) {
    mDefaultBufferedDiskCache = defaultBufferedDiskCache;
    mSmallImageBufferedDiskCache = smallImageBufferedDiskCache;
    mCacheKeyFactory = cacheKeyFactory;
    mInputProducer = inputProducer;
  }

  public void produceResults(
          final Consumer<EncodedImage> consumer,
          final ProducerContext producerContext) {
    ImageRequest imageRequest = producerContext.getImageRequest();
//若是diskcache disabled的話,那麼直接執行maybeStartInputProducer
    if (!imageRequest.isDiskCacheEnabled()) {
      maybeStartInputProducer(consumer, consumer, producerContext);
      return;
    }

    final ProducerListener listener = producerContext.getListener();
    final String requestId = producerContext.getId();
    listener.onProducerStart(requestId, PRODUCER_NAME);

    final CacheKey cacheKey = mCacheKeyFactory.getEncodedCacheKey(imageRequest);
    final BufferedDiskCache cache =
            imageRequest.getImageType() == ImageRequest.ImageType.SMALL
                    ? mSmallImageBufferedDiskCache
                    : mDefaultBufferedDiskCache;
    Continuation<EncodedImage, Void> continuation = new Continuation<EncodedImage, Void>() {
      //回調
      @Override
      public Void then(Task<EncodedImage> task)
              throws Exception {
        //根據task是canceled,fault等狀態決定如何執行
        if (task.isCancelled() ||
                (task.isFaulted() && task.getError() instanceof CancellationException)) {
          listener.onProducerFinishWithCancellation(requestId, PRODUCER_NAME, null);
          consumer.onCancellation();
        } else if (task.isFaulted()) {
          listener.onProducerFinishWithFailure(requestId, PRODUCER_NAME, task.getError(), null);
          //出錯了,就調用maybeStartInputProducer
 maybeStartInputProducer(
                  consumer,
                  new DiskCacheConsumer(consumer, cache, cacheKey),
                  producerContext);
        } else {
          EncodedImage cachedReference = task.getResult();
          if (cachedReference != null) {
            listener.onProducerFinishWithSuccess(
                    requestId,
                    PRODUCER_NAME,
                    getExtraMap(listener, requestId, true));
            consumer.onProgressUpdate(1);
            consumer.onNewResult(cachedReference, true);
            cachedReference.close();
          } else {
            //沒有結果,就調用maybeStartInputProducer
            listener.onProducerFinishWithSuccess(
                    requestId,
                    PRODUCER_NAME,
                    getExtraMap(listener, requestId, false));
            maybeStartInputProducer(
                    consumer,
                    new DiskCacheConsumer(consumer, cache, cacheKey),
                    producerContext);
          }
        }
        return null;
      }
    };

    AtomicBoolean isCancelled = new AtomicBoolean(false);
    final Task<EncodedImage> diskCacheLookupTask =
            cache.get(cacheKey, isCancelled);
//執行task,task其實就是從緩存中取結果,執行後,前面的continuation就會被回調
    diskCacheLookupTask.continueWith(continuation);
    subscribeTaskForRequestCancellation(isCancelled, producerContext);
  }

  //調用mInputProducer的produceResults 
  private void maybeStartInputProducer(
          Consumer<EncodedImage> consumerOfDiskCacheProducer,
          Consumer<EncodedImage> consumerOfInputProducer,
          ProducerContext producerContext) {
    if (producerContext.getLowestPermittedRequestLevel().getValue() >=
            ImageRequest.RequestLevel.DISK_CACHE.getValue()) {
      consumerOfDiskCacheProducer.onNewResult(null, true);
      return;
    }

    mInputProducer.produceResults(consumerOfInputProducer, producerContext);
  }

 

 
從前面知道,當開始拉取圖片的時候, Producer #produceResult開始執行,註釋標出了關鍵的步驟,從這些步驟能夠看出,其實 DiskCacheProducer拉取圖片時,作的任務大概就是: 先看Diskcache中是否有緩存的圖片,若是有,就直接返回緩存,若是沒有,就用inputProducer來處理
 
而後, inputProducer處理完結果會怎樣呢?它處理的結果會在 consumer中接收到,上面的例子代碼對應的就是 DiskCacheConsumer (留意綠色註釋)
 
  /**
   * Consumer that consumes results from next producer in the sequence.
   *
   * <p>The consumer puts the last result received into disk cache, and passes all results (success
   * or failure) down to the next consumer.
   */
  private class DiskCacheConsumer extends DelegatingConsumer<EncodedImage, EncodedImage> {

    private final BufferedDiskCache mCache;
    private final CacheKey mCacheKey;

    private DiskCacheConsumer(
            final Consumer<EncodedImage> consumer,
            final BufferedDiskCache cache,
            final CacheKey cacheKey) {
      super(consumer);
      mCache = cache;
      mCacheKey = cacheKey;
    }
    //inputProducer的結果會從這裏返回,即newResult
    @Override
    public void onNewResultImpl(EncodedImage newResult, boolean isLast) {
      //返回的結果加入cache中
      if (newResult != null && isLast) {
        mCache.put(mCacheKey, newResult);
      }
      //回調上一層procducer傳進來的consumer
      getConsumer().onNewResult(newResult, isLast);
    }
  }

 

 
到這裏,應該就大概明白這個sequence Producer的做用了,所謂sequence Producer,其實就是 一層層的Producer不斷的嵌套鏈接起來,完成同一個任務,而每個Producer都相互獨立,完成各自任務;同時,Producer間產生的結果,也會相互傳遞,互爲表裏。能夠稱其爲「Producer鏈」,可是「Producer鏈」自己被抽象成一個Producer,那麼對於上層來看,這樣一個複雜的處理邏輯就被隱藏起來了,變得更加容易理解。
 
sequence Producer功能極其強大,不一樣的Producer的組合,產生了不少不一樣的效果,對於代碼的擴展性,可複用性和靈活性都有很大好處。
 
例如, MultiplexProducer
 
註釋:
/**
* Producer for combining multiple identical requests into a single request.
*
* <p>Requests using the same key will be combined into a single request. This request is only
* cancelled when all underlying requests are cancelled, and returns values to all underlying
* consumers. If the request has already return one or more results but has not finished, then
* any requests with the same key will have the most recent result returned to them immediately.
*
* @param <K> type of the key
* @param <T> type of the closeable reference result that is returned to this producer
*/
@ThreadSafe
public abstract class MultiplexProducer<K, T extends Closeable> implements Producer<T>
 

 

理解爲,多路複用Producer(什麼鬼東西?),其實就是將相同的任務合併爲一個,例如相同url的重複請求,如何作到的,關鍵代碼(主要看註釋):
 
  @Override
  public void produceResults(Consumer<T> consumer, ProducerContext context) {
    K key = getKey(context);
    Multiplexer multiplexer;
    boolean createdNewMultiplexer;
// We do want to limit scope of this lock to guard only accesses to mMultiplexers map.
// However what we would like to do here is to atomically lookup mMultiplexers, add new
// consumer to consumers set associated with the map's entry and call consumer's callback with
// last intermediate result. We should not do all of those things under this lock.
    do {
      createdNewMultiplexer = false;
      synchronized (this) {
        //根據key得到多路複用器,當緩存沒有的時候,才create一個,否則直接忽略
        multiplexer = getExistingMultiplexer(key);
        if (multiplexer == null) {
          multiplexer = createAndPutNewMultiplexer(key);
          createdNewMultiplexer = true;
        }
      }
// addNewConsumer may call consumer's onNewResult method immediately. For this reason
// we release "this" lock. If multiplexer is removed from mMultiplexers in the meantime,
// which is not very probable, then addNewConsumer will fail and we will be able to retry.
    } while (!multiplexer.addNewConsumer(consumer, context));
    //若是前面沒有建立,也就是存在緩存的多路複用器,那麼就不會調用startInputProducerIfHasAttachedConsumers,而後inputProducer就不起做用了,這樣,就起到合併請求的做用

    if (createdNewMultiplexer) {
      multiplexer.startInputProducerIfHasAttachedConsumers();
    }
  }

 

 
不一樣的功能Producer還有不少,例如對圖片進行resize和rotate的 ResizeAndRotateProducer, 異步執行任務的 ThreadHandoffProducer等等,如此靈活的實現,得益於sequence Factory這種設計。
 
總結
 
ImagePipeline的核心Producer,經過sequence的形式,很好的串聯了整個圖片網絡讀取,緩存,bitmap處理等流程,經過優秀的設計,保證的代碼 高擴展性高可複用性高靈活性
 
這種設計恰好對應就是「 pipeline」這個詞的含義,就如現代的pipelined CPU同樣,把對指令的處理,拆分紅多個stage,同時輸入輸出相互依賴和協做,共同完成一個任務。
 
~~~文卒~~~
相關文章
相關標籤/搜索