TopK算法及實現

1. 問題描述
  在大規模數據處理中,常遇到的一類問題是,在海量數據中找出出現頻率最高的前K個數,或者從海量數據中找出最大的前K個數,這類問題一般稱爲「top K」問題,如:在搜索引擎中,統計搜索最熱門的10個查詢詞;在歌曲庫中統計下載率最高的前10首歌等等。

2. 當前解決方案 c++

  針對top k類問題,一般比較好的方案是【分治+trie樹/hash+小頂堆】,即先將數據集按照hash方法分解成多個小數據集,而後使用trie樹或者hash統計每一個小數據集中的query詞頻,以後用小頂堆統計出每一個數據集中出頻率最高的前K個數,最後在全部top K中求出最終的top K。
  實際上,最優的解決方案應該是最符合實際設計需求的方案,在實際應用中,可能有足夠大的內存,那麼直接將數據扔到內存中一次性處理便可,也可能機器有多個核,這樣能夠採用多線程處理整個數據集。

本文針對不一樣的應用場景,介紹了適合相應應用場景的解決方案。
3. 解決方案
3.1 單機+單核+足夠大內存
  設每一個查詢詞平均佔8Byte,則10億個查詢詞所需的內存大約是10^9*8=8G內存。若是你有這麼大的內存,直接在內存中對查詢詞進行排序,順序遍歷找出10個出現頻率最大的10個便可。這種方法簡單快速,更加實用。固然,也能夠先用HashMap求出每一個詞出現的頻率,而後求出出現頻率最大的10個詞。

3.2 單機+多核+足夠大內存
  這時能夠直接在內存中實用hash方法將數據劃分紅n個partition,每一個partition交給一個線程處理,線程的處理邏輯是同3.1節相似,最後一個線程將結果歸併。
該方法存在一個瓶頸會明顯影響效率,即數據傾斜,每一個線程的處理速度可能不一樣,快的線程須要等待慢的線程,最終的處理速度取決於慢的線程。解決方法是,將數據劃分紅c*n個partition(c>1),每一個線程處理完當前partition後主動取下一個partition繼續處理,直到全部數據處理完畢,最後由一個線程進行歸併。

3.3 單機+單核+受限內存
  這種狀況下,須要將原數據文件切割成一個一個小文件,如,採用hash(x)%M,將原文件中的數據切割成M小文件,若是小文件仍大於內存大小,繼續採用hash的方法對數據文件進行切割,直到每一個小文件小於內存大小,這樣,每一個文件可放到內存中處理。採用3.1節的方法依次處理每一個小文件。

3.4 多機+受限內存
  這種狀況下,爲了合理利用多臺機器的資源,可將數據分發到多臺機器上,每臺機器採用3.3節中的策略解決本地的數據。可採用hash+socket方法進行數據分發。
從實際應用的角度考慮,3.1~3.4節的方案並不可行,由於在大規模數據處理環境下,做業效率並非首要考慮的問題,算法的擴展性和容錯性纔是首要考慮的。算法應該具備良好的擴展性,以便數據量進一步加大(隨着業務的發展,數據量加大是必然的)時,在不修改算法框架的前提下,可達到近似的線性比;算法應該具備容錯性,即當前某個文件處理失敗後,能自動將其交給另一個線程繼續處理,而不是從頭開始處理。
  Top k問題很適合採用MapReduce框架解決,用戶只需編寫一個map函數和兩個reduce 函數,而後提交到Hadoop(採用mapchain和reducechain)上便可解決該問題。對於map函數,採用hash算法,將hash值相同的數據交給同一個reduce task;對於第一個reduce函數,採用HashMap統計出每一個詞出現的頻率,對於第二個reduce 函數,統計全部reduce task輸出數據中的top k便可。

  附上小根堆實現的c/c++代碼: 算法

/*
 *交換函數
*/
template<typename T>
inline void Swap( T &a, T &b)
{
	T c;
	c = a;
	a = b;
	b = c;
}
/*

 */
template<typename T>
void HeapAdjustSmall(T *array, int i, int nLength)
{ 
	int nChild; 
	T   tTemp; 
	for (tTemp = array[i]; 2 * i + 1 < nLength; i = nChild) { 
		nChild = 2 * i + 1;  
		if (nChild < nLength - 1 && array[nChild + 1] < array[nChild]) 
			++nChild; 
		if (tTemp > array[nChild]) { 
				array[i]= array[nChild]; 
		}
		else { 
			break; 
		} 
			array[nChild]= tTemp; 
	} 
}
/*
 *@ µ÷ÕûÐòÁеÄÇ°°ë²¿·ÖÔªËØ,µ÷ÕûÍêÖ®ºóµÚÒ»¸öÔªËØÊÇÐòÁеÄ×îСµÄÔªËØ
 */
template<typename T>
void HeapCreate(T *array, int length) {
	 
	for (int i = length / 2 - 1; i >= 0; --i) 
	{ 
		HeapAdjustSmall(array, i, length); 
	} 
} 
/*
*/
template<typename T>
void HeapSort(T *array, int length) { 
	for (int i = length - 1; i > 0; --i) { 
		Swap(array[0], array[i]); 
		HeapAdjustSmall(array, 0, i); 
	} 
}

template<typename T>
void HeapUpdate(T *array, int nLength, T *value) { 

	if ( *value > array[0] ) {
		array[0] = *value;
		HeapAdjustSmall(array, 0, nLength);
	}else
		return;
}

template <typename T>
inline bool GetTopK(T *pData, const se_uint32_t num, se_uint32_t k){
	
	num_t size = 0;
	HeapCreate(pData, k);
	for ( int i = k; i < num; ++i ) {
		HeapUpdate( pData, k, (pData + i) );
	}
	HeapSort(pData, k);
	return true;
}
#endif
相關文章
相關標籤/搜索