隨着Last.fm服務的發展,用戶數目從數千增加到數百萬,這時,存儲、處理和管理這些用戶數據漸漸變成一項挑戰。幸運的是,當你們認識到Hadoop技術能解決衆多問題以後,Hadoop的性能迅速穩定下來,並被你們積極地運用。2006年初,Last.fm開始使用Hadoop,幾個月以後便投入實際應用。Last.fm使用Hadoop的理由概括以下。 html
(1)分佈式文件系統爲它所存儲的數據(例如,網誌,用戶收聽音樂的數據)提供冗餘備份服務而不增長額外的費用。 java
(2)能夠方便地經過增添便宜、普通的硬件來知足可擴展性需求。 服務器
(3)當時Last.fm財力有限,Hadoop是免費的。 網絡
(4)開源代碼和活躍的社區團體意味着Last.fm可以自由地修改Hadoop,從而增添一些自定義特性和補丁。 app
(5)Hadoop提供了一個彈性的容易掌握的框架來進行分佈式計算。 框架
如今,Hadoop已經成爲Last.fm基礎平臺的關鍵組件,目前包括2個Hadoop集羣,涉及50臺計算機、300個內核和100 TB的 硬盤 空間。在這些集羣上,運行着數百種執行各類操做的平常做業,例如日誌文件分析、A/B測試評測、即時處理和圖表生成。本節的例子將側重於介紹產生圖表的處理過程,由於這是Last.fm對Hadoop的第一個應用,它展現出Hadoop在處理大數據集時比其餘方法具備更強的功能性和靈活性。
一般狀況下,Last.fm有兩種收聽信息。 分佈式
用戶播放本身的音樂(例如,在PC機或其餘設備上聽MP3文件),這種信息經過Last.fm的官方客戶端應用或一種第三方應用 (有上百種)發送到Last.fm。 ide
用戶收聽Last.fm某個網絡電臺的節目,並在本地計算機上經過流技術緩衝一首歌。Last.fm播放器或站點能被用來訪問這些流數據,而後它能給用戶提供一些額外的功能,好比容許用戶對她收聽的音頻進行喜好、跳過或禁止等操做。 函數
在處理接收到的數據時,咱們對它們進行分類:一類是用戶提交的收聽的音樂數據從如今開始,第一類數據稱爲「scrobble」(收藏數據);另外一類是用戶收聽的Last.fm的電臺數據(從如今開始,第二類數據稱爲「radio listen」(電臺收聽數據)。爲了不Last.fm的推薦系統出現信息反饋循環的問題,對數據源的區分是很是重要的,而Last.fm的推薦系統只使用scrobble數據。
數據命名及意義: oop
scrobble -- 收藏數據
radio listen--電臺收聽數據
Track Statistics程序
音樂收聽信息被髮送到Last.fm時,會經歷驗證和轉換階段,最終結果是一系列由空格分隔的文本文件,包含的信息有用戶ID(userId)、音樂(磁道)ID(trackId)、這首音樂被收藏的次數(Scrobble)、這首音樂在電臺中收聽的次數(Radio)以及被選擇跳過的次數(Skip)。表16-1包含一些採樣的收聽數據,後面介紹的例子將用到這些數據,它是Track Statistics程序的輸入(真實數據達GB數量級,而且具備更多的屬性字段,爲了方便介紹,這裏省略了其餘的字段)。
命名 | 含義 | 命名 | 含義 |
userId | 用戶Id | trackId | 音樂Id |
scrobbles | 收藏次數 | skip | 跳過次數 |
radio | 收聽次數 | |
|
計算不一樣的聽衆數--Unique Listeners做業模塊用於計算每一個音頻的不一樣收聽用戶數
UniqueListenerMaper:程序處理用空格分隔的原始收聽數據,而後對每一個track ID(音頻ID)產生相應的user ID(用戶ID):
public void map(LongWritable position, Text rawLine, OutputCollector IntWritable> output, Reporter reporter) throws IOException { String[] parts = (rawLine.toString()).split(" "); int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]); int radioListens = Integer.parseInt(parts[TrackStatisticsProgram.COL_RADIO]); // if track somehow is marked with zero plays - ignore if (scrobbles <= 0 && radioListens <= 0) { return; } // if we get to here then user has listened to track, // so output user id against track id IntWritable trackId = new IntWritable(Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID])); IntWritable userId = new IntWritable(Integer.parseInt(parts[TrackStatisticsProgram.COL_USERID])); output.collect(trackId, userId); }代碼釋義:按空格「 」讀取內容, 排除無效數據(既沒有被收藏,又沒有被收聽的數據),按照 音樂Id 和 用戶Id 合併成key-value
UniqueListenersReducer:接收到每一個track ID對應的user ID數據列表以後,把這個列表放入Set類型對象以消除重複的用戶ID數據。而後輸出每一個track ID對應的這個集合的大小(不一樣用戶數)。可是若是某個鍵對應的值太多,在set對象中存儲全部的reduce值可能會有內存溢出的危險。實際上尚未出現過這個問題,可是爲了不這一問題,咱們能夠引入一個額外的MapReduce處理步驟來刪除重複數據或使用輔助排序的方法(詳細內容請參考第241頁的「輔助排序」小節)。
public void reduce(IntWritable trackId, Iterator values,OutputCollector output, Reporter reporter) throws IOException { Set userIds = new HashSet(); // add all userIds to the set, duplicates automatically removed (set contract) while (values.hasNext()) { IntWritable userId = values.next(); userIds.add(Integer.valueOf(userId.get())); } // output trackId -> number of unique listeners per track output.collect(trackId, new IntWritable(userIds.size())); }
代碼釋義:經過map處理後的key-value,按照相同的key,對value進行合併操做。
表16-2是這一做業模塊的樣本輸入數據。map輸出結果如表16-3所示,reduce輸出結果如表16-4所示。
表16-2. 做業的輸入
表16-3. map輸出
表16-4. reduce輸出
統計音頻使用總數
Sum做業相對簡單,它只爲每一個音軌累計咱們感興趣的數據。
SumMapper 輸入數據仍然是原始文本文件,可是這一階段對輸入數據的處理徹底不一樣。指望的輸出結果是針對每一個音軌的一系列累計值(不一樣用戶、播放次數、收藏次數、電臺收聽次數和跳過次數)。爲了方便處理,咱們使用一個由Hadoop Record I/O類產生的TrackStats中間對象,它實現了WritableComparable方法(所以可被用做輸出)來保存這些數據。mapper建立一個TrackStats對象,根據文件中的每一行數據對它進行值的設定,可是「不一樣的用戶數」(unique listener count)這一項沒有填寫(這項數據由merge做業模塊填寫)。
Map過程
public void map(LongWritable position, Text rawLine, OutputCollector output, Reporter reporter) throws IOException { String[] parts = (rawLine.toString()).split(" "); int trackId = Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID]); int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]); int radio = Integer.parseInt(parts[TrackStatisticsProgram.COL_RADIO]); int skip = Integer.parseInt(parts[TrackStatisticsProgram.COL_SKIP]); // set number of listeners to 0 (this is calculated later) // and other values as provided in text file TrackStats trackstat = new TrackStats(0, scrobbles + radio, scrobbles, radio, skip); output.collect(new IntWritable(trackId), trackstat); }釋義:對於指望輸出結果多維度(好比此例子中須要輸出: 不一樣用戶、播放次數、收藏次數、電臺收聽次數和跳過次數 ),採用虛擬對象來實現(VO作value時,須要實現 WritableComparable或Writable )
new TrackStats(0, scrobbles + radio, scrobbles, radio, skip)根據業務建立實例:0,喜好(經過scrobbles+radio),收藏,試聽過,跳過。
建立完實例後輸出:(trackId,trackstat)
Reduce過程
SumReducer 在這一過程,reducer執行和mapper類似的函數——對每一個音頻使用總數狀況進行統計,而後返回一個總的統計數據:
public void reduce(IntWritable trackId, Iterator values,OutputCollector output, Reporter reporter) throws IOException { TrackStats sum = new TrackStats(); // holds the totals for this track while (values.hasNext()) { TrackStats trackStats = (TrackStats) values.next(); sum.setListeners(sum.getListeners() + trackStats.getListeners()); sum.setPlays(sum.getPlays() + trackStats.getPlays()); sum.setSkips(sum.getSkips() + trackStats.getSkips()); sum.setScrobbles(sum.getScrobbles() + trackStats.getScrobbles()); sum.setRadioPlays(sum.getRadioPlays() + trackStats.getRadioPlays()); } output.collect(trackId, sum); }代碼釋義:建立一個能夠接收全部統計數據的VO,合併輸出。
表16-5是這個部分做業的輸入數據(和Unique Listener做業模塊的輸入同樣)。map的輸出結果如表16-6所示,reduce的輸出結果如表16-7所示。
表16-5. 做業輸入
表16-6. map輸出
表16-7. reduce 輸出
合併結果
最後一個做業模塊須要合併前面兩個做業模塊產生的輸出數據:每一個音頻對應的不一樣用戶數和每一個音頻的使用統計信息。爲了可以合併這兩種不一樣的輸入數據,咱們採用了兩個不一樣的mapper(對每一種輸入定義一個)。兩個中間做業模塊被配置以後能夠把他們的輸出結果寫入路徑不一樣的文件,MultipleInputs類用於指定mapper和文件的對應關係。下面的代碼展現了做業的JobConf對象是如何設置來完成這一過程的:
MultipleInputs.addInputPath(conf, sumInputDir, SequenceFileInputFormat.class, IdentityMapper.class); MultipleInputs.addInputPath(conf, listenersInputDir, SequenceFileInputFormat.class, MergeListenersMapper.class);
雖然單用一個mapper也能處理不一樣的輸入,可是示範解決方案更方便,更巧妙。
MergeListenersMapper 這個mapper用來處理UniqueListenerJob輸出的每一個音軌的不一樣用戶數據。它採用和SumMapper類似的方法建立TrackStats對象,但此次它只填寫每一個音軌的不一樣用戶數信息,無論其餘字段:
public void map(IntWritable trackId, IntWritable uniqueListenerCount,OutputCollector output, Reporter reporter) throws IOException { TrackStats trackStats = new TrackStats(); trackStats.setListeners(uniqueListenerCount.get()); output.collect(trackId, trackStats); }
代碼釋義:剛纔知道爲何在建立虛擬對象TrackStats第一個參數置零的緣由了吧,這個參數是爲了第一步統計用戶喜歡總數而預留的。
表16-8是mapper的一些輸入數據;表16-9是對應的輸出結果。
表16-8. MergeListenersMapper的輸入
表16-9. MergeListenersMapper的輸出
IdentityMapper IdentityMapper被配置用來處理SumJob輸出的TrackStats對象,由於不要求對數據進行其餘處理,因此它直接輸出輸入數據(見表16-10)。
表16-10. IdentityMapper的輸入和輸出
▲
SumReducer 前面兩個mapper產生同一類型的數據:每一個音軌對應一個TrackStats對象,只是數據賦值不一樣。最後的reduce階段可以重用前面描述的SumReducer來爲每一個音軌建立一個新的TrackStats對象,它綜合前面兩個TrackStats對象的值,而後輸出結果(見表16-11)。
表16-11. SumReducer的最終輸出
▲
最終輸出文件被收集後複製到服務器端,在這裏一個Web服務程序使Last.fm網站能獲得並展現這些數據。如圖16-3所示,這個網頁展現了一個音頻的使用統計信息:接聽者總數和播放總次數。
▲圖16-3. TrackStats結果
總結
Hadoop已經成爲Last.fm基礎框架的一個重要部件,它用於產生和處理各類各樣的數據集,如網頁日誌信息和用戶收聽數據。爲了讓你們可以掌握主要的概念,這裏講述的例子已經被大大地簡化;在實際應用中輸入數據具備更復雜的結構而且數據處理的代碼也更加繁瑣。雖然Hadoop自己已經足夠成熟能夠支持實際應用,但它仍在被你們積極地開發,而且每週Hadoop社區都會爲它增長新的特性並提高它的性能。Last.fm很高興是這個社區的一分子,咱們是代碼和新想法的貢獻者,同時也是對大量開源技術進行利用的終端用戶。
(做者:Adrian Woodhead和Marc de Palol)