/** * Perform the actual join recursively. * * @param tags * a list of input tags * @param values * a list of value lists, each corresponding to one input source * @param pos * indicating the next value list to be joined * @param partialList * a list of values, each from one value list considered so far. * @param key * @param output * @throws IOException */ private void joinAndCollect(Object[] tags, ResetableIterator[] values, int pos, Object[] partialList, Object key, OutputCollector output, Reporter reporter) throws IOException { if (values.length == pos) { // get a value from each source. Combine them TaggedMapOutput combined = combine(tags, partialList); collect(key, combined, output, reporter); return; } ResetableIterator nextValues = values[pos]; nextValues.reset(); while (nextValues.hasNext()) { Object v = nextValues.next(); partialList[pos] = v; joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter); } }
tags 爲join操做的數據源個數,例如ide
客戶數據:spa
customer ID Name PhomeNumbercode
1 趙一 025-5455-566orm
2 錢二 025-4587-565get
3 孫三 021-5845-5875input
客戶的訂單號:it
Customer ID order ID Price Dataio
2 1 93 2008-01-08table
3 2 43 2012-01-21form
1 3 43 2012-05-12
2 4 32 2012-5-14
tags 爲2,partialList[ ]存放的是join 匹配到的2個數據源的數據如
partialList[0] 爲 2 錢二 025-4587-565
partialList[1] 爲 2 1 93 2008-01-08
須要本身實現的方法
/** * * @param tags * a list of source tags * @param values * a value per source * @return combined value derived from values of the sources */ protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
就是 TaggedMapOutput combined = combine(tags, partialList);
對join的數據進行處理