DataJoin 關聯 邏輯代碼

/**
   * Perform the actual join recursively.
   * 
   * @param tags
   *          a list of input tags
   * @param values
   *          a list of value lists, each corresponding to one input source
   * @param pos
   *          indicating the next value list to be joined
   * @param partialList
   *          a list of values, each from one value list considered so far.
   * @param key
   * @param output
   * @throws IOException
   */
  private void joinAndCollect(Object[] tags, ResetableIterator[] values,
                              int pos, Object[] partialList, Object key,
                              OutputCollector output, Reporter reporter) throws IOException {

    if (values.length == pos) {
      // get a value from each source. Combine them
      TaggedMapOutput combined = combine(tags, partialList);
      collect(key, combined, output, reporter);
      return;
    }
    ResetableIterator nextValues = values[pos];
    nextValues.reset();
    while (nextValues.hasNext()) {
      Object v = nextValues.next();
      partialList[pos] = v;
      joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter);
    }
  }

tags 爲join操做的數據源個數,例如ide

客戶數據:spa

customer ID       Name      PhomeNumbercode

1                        趙一        025-5455-566orm

2                        錢二        025-4587-565get

3                        孫三        021-5845-5875input

客戶的訂單號:it

Customer ID     order ID     Price    Dataio

2                          1               93       2008-01-08table

3                          2               43       2012-01-21form

1                          3               43       2012-05-12

2                          4               32       2012-5-14

tags  爲2,partialList[ ]存放的是join 匹配到的2個數據源的數據如

partialList[0] 爲      2    錢二        025-4587-565

partialList[1] 爲       2      1         93       2008-01-08

須要本身實現的方法

/**
   * 
   * @param tags
   *          a list of source tags
   * @param values
   *          a value per source
   * @return combined value derived from values of the sources
   */
  protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);

就是 TaggedMapOutput combined = combine(tags, partialList);  

對join的數據進行處理

相關文章
相關標籤/搜索