本章的模式有一個共同點:不會改變原來的記錄。這種模式是找到一個數據的子集,或者更小,例如取前十條,或者很大,例如結果去重。這種過濾器模式跟前面章節的不一樣是,從更小的粒度認識數據,例如特殊用戶生成的記錄,或文本中用得最多的前10個動詞。簡單的說,過濾器容許你更清楚的看清數據,像在顯微鏡下同樣。也能夠認爲是搜索的一種形式。若是你對找出全部有着特殊信息的記錄感興趣,你就能夠過濾出不匹配搜索條件的記錄。java
抽樣,一種通用的過濾程序,是指取出數據的一個樣本,好比,取某字段值最高的幾個,或隨機取幾個記錄。取樣能用來獲得更小的,仍具備表明性的子數據集,在其上面進行分析,就沒必要處理龐大的數據集。不少機器學習算法在大數據集上運行低效,因此須要建模使其可運行在更小的子數據集。web
子樣本也能夠用於開發目的。簡單的抽取前一千條記錄不是好的抽樣方法,由於取的數據類似並不能表明整個數據集。一個均勻的抽樣可以提供更好的數據的視圖,並容許你的程序或分析能在現實數據上完成,甚至取樣很是小。正則表達式
本章列出了四種這樣的模式:filtering, Bloom filtering, top ten, and distinct。找出數據一部分的方法有不少,每種模式都有細微的差異,甚至都在作相同的事情。算法
咱們將會看到MapReduce中幾個簡明的使用。Filtering, Bloom filtering,和 simple random能夠只使用map端完成,不用reducer。sql
最基本的模式,filtering,爲其它模式充當了一種抽象形式。Filtering基於某種條件評估每一條記錄,決定它的去留。數據庫
過濾出咱們不感興趣的數據並保存起來。apache
考慮用一個評估方法f,處理記錄,若是返回true,保留,若是返回false,丟掉。api
你的數據集很大,你想更專一於數據的一部分並可能在上面作隨後的分析。這個本身可能對數據集有表明意義或者就像在大海中撈針。無論用怎樣的方式,你須要使用MapReduce並行處理數據,並找到須要的數據,過程可能有點費勁。緩存
例如,你可能只對含有Hadoop的記錄感興趣,指的是那些文本中有hadoop詞,或有hadoop標籤的記錄。Filtering能找到跟hadoop相關的記錄並保存,丟掉其它記錄。服務器
像hadoop同樣的大數據處理系統,一般是關於把全部數據拿到一個本地機器。Filtering就是這種把數據子集拉出來併發送到分析程序的方法。Filtering也用於放大匹配條件的你好奇的數據。對子數據集的探索可能會致使代價更昂貴,而且基於更小數據集上行爲的分析更復雜。
Filtering有着普遍的應用。僅須要作的是用指定的規則解析出特定的「records」並決定是否保存。
過濾器模式多是這本書裏最簡單的。圖3-1展現告終構圖。
map(key, record):
if we want to keep record then
emit key,value
Figure 3-1. The structure of the filter pattern
Filtering是MapReduce中惟一不須要reduce的。由於它不處理聚合運算。每一條記錄單獨處理評估去留。
Mapper對每條輸入記錄應用評估方法。輸出鍵值對跟輸入同樣,由於記錄不會改變。若是評估方法返回true就輸出。
Job的輸出是經過了選擇條件的子記錄集。若是格式不變,在大數據集上跑的job也能在過濾後的數據集上跑。
Closer view of data
準備個特殊的記錄有共性或有趣的子數據集,作進一步檢查。例如,馬里蘭的電話局可能只關心國際通話中馬里蘭的去電。
Tracking a thread of events
抽取一系列連續事件做爲大數據集的案例研究。例如,你能夠經過分析apache web服務器的日誌瞭解特殊用戶怎樣與你的網站交互。一個特殊用戶的行爲可能遍及於其它行爲中,因此很難找出發生了什麼。靠根據用戶的ip過濾,就可以獲得特殊用戶行爲更好的視圖。
Distributed grep
Grep,一個使用正則表達式找出感興趣的文本行的強大的工具,很容易並行應用正則表達式匹配每一行並輸出。
Data cleansing
數據有時是髒的,或者很難理解,未完成,和錯誤的格式。數據可能丟失了字段,date類型的字段不能格式化成date,或二進制數據的隨即字節可能存在。Filtering能檢驗每條記錄是否結構良好,並去除任何垃圾數據。
Simple random samping
若是你想獲得數據集的一個簡單隨機採樣,可使用filtering,用一個評估方法隨即返回true或false。在簡單隨即採樣中,數據集中的每條記錄都有相同的機率被選中。能夠根據源數據的數量算出百分比,獲得要返回的記錄的數量。例如,數據集有一萬億,你想獲得一百萬數據,那麼評估方法應該每一百萬次返回一次true,由於一百萬個一百萬是一萬億。
Removing low scoring data
若是你能根據排序給數據評分,你能夠根據不知足某一臨界條件來過濾數據。若是以前已經知道有些數據對分析也沒意義,能夠給這些記錄評比較低的分。這跟隨後講到的top ten模式有相同的目的,除了數據量。
Sql:過濾器模式跟select語句中使用where條件是等同的。記錄保持不變,一些被簡單的過濾,例如:
SELECT * FROM table WHERE value < 3;
Pig:filter是關鍵詞
b = FILTER a BY value < 3;
這種模式基本上同MapReduce等效率,由於job只有map。下面是map-only job高效的緣由:
·因爲沒有reducer,就少了數據在map和reduce之間傳輸數據的階段。全部的map任務處理本地數據,而後放回本地磁盤。
·因爲沒有reducer,排序階段和reduce階段時沒有的。一般不會佔用太多時間,但聚沙成塔。
須要注意的一件事情是:輸出文件的數量和大小。由於job只有mapper,獲得的輸出文件都是以part-m-爲前綴的。你會發現若是過濾掉不少數據,輸出文件的數據量會不多,這會在一段時間後NameNode的可擴展性上帶來問題。
若是你顧慮這些小文件,但容忍job運行騷微長一點,可使用identity reducer收集結果集。這樣,mapper會把所有輸出數據給reducer,但reducer不會作任何處理,僅僅每一個reducer輸出一個文件。合適的reducer數量取決於將被寫到文件系統的數據量的大小和你想處理多少小文件。
Grep 做爲流行的文本過濾工具能夠追溯到unix和類unix系統中的使用。對一個文件進行行掃描,匹配指定的樣式就輸出。咱們要在大文本數據上並行作正則表達式搜索。在這個例子中,咱們將展現在MapReduce中怎樣應用正則表達式。
Mapper code。Mapper很簡單,由於使用java內建api處理正則表達式。若是文本行匹配樣式,就輸出這一行。不然忽略這一行。咱們使用setup方法獲取job配置的正則。
public static class GrepMapper extends Mapper<Object, Text, NullWritable, Text> { private String mapRegex = null; public void setup(Context context) throws IOException, InterruptedException { mapRegex = context.getConfiguration().get("mapregex"); } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { if (value.toString().matches(mapRegex)) { context.write(NullWritable.get(), value); } } }
只有map,沒有combiner,沒有reducer。全部輸出記錄被寫到本地文件系統。
在srs中,咱們想抽取大數據的每條記錄都同等機率被選擇的子數據集,這樣有效減少數據集,並在更易於管理的部分數據上作有表明性的分析工做。
實現srs做爲過濾操做不是filtering模式的一種程序,但結構是相同的。取代以記錄內容爲過濾條件的方法,這裏生成一個隨機數,用來對應一個值,保留對應的記錄。
Mapper code。在mapper代碼裏,從setup方法裏獲取過濾器的百分率配置值,在map方法裏會用到。
Map中,檢查隨機數的生成。隨機數在0到1之間,因此根據與臨界值的比較,能夠決定是否保留記錄。
public static class SRSMapper extends Mapper<Object, Text, NullWritable, Text> { private Random rands = new Random(); private Double percentage; protected void setup(Context context) throws IOException, InterruptedException { // Retrieve the percentage that is passed in via the configuration // like this: conf.set("filter_percentage", .5); // for .5% String strPercentage = context.getConfiguration().get("filter_percentage"); percentage = Double.parseDouble(strPercentage) / 100.0; } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { if (rands.nextDouble() < percentage) { context.write(NullWritable.get(), value); } } }
在這個只有map的job裏,沒有combiner或reducer。全部輸出記錄被寫到本地文件系統。當使用一個小的百分比,你會發現文件數據量小且文件數量多。若是是這樣,就把reducer數量設爲1,但不指定reducer類。這樣會告訴MapReduce框架執行默認的identity reducer把map的輸出收集爲一個文件。隨後能夠用hadoop fs –cat查看文件。
Bloom filtering跟前面的模式作一樣的事情,但對每條記錄的評估方法不同。
這種過濾器是指咱們保存預先定義的值得集合。若是輸出由一點錯誤是不要緊的,由於咱們還打算進一步檢查。這些預先定義的值叫熱點值。
對每條記錄,抽取一種特色。若是這種特色是預約義的值集合裏面的成員,保留,不然丟掉,或反之。
Bloom filtering在查看每一條記錄和決定是否保留問題上跟通用的filtering類似。然而,有兩個主要不一樣點區別於通用filtering。首先,咱們想使用熱點值,基於某種集合的隸屬關係過濾數據。例如:若是用戶字段是預先定義的用戶列表裏的,咱們保留或去除這條記錄。其次,集合的隸屬關係用bloom filter來評估,在附錄A有描述。在某種意義上,bloom filtering是一種不關心join右邊數據值得join操做。(左鏈接)
這種模式跟第五章的replicated join模式有點相像。拿一個列表跟另外一個比較,作一些join邏輯的排序,僅使用map任務。取代replicated join中用分佈式緩存複製熱點值列表到各處,咱們發送一個bloom filter對象到分佈式緩存。這樣就容許使用的bloom filter對象取代列表自己,這容許執行更大的數據集的操做,由於bloom filter更簡潔。並且不存在列表大小受內存限制的狀況,只受bloom filter定義的feature limitations限制。
使用bloom filter,在這種狀況下計算集合隸屬關係有一種後果:有時會獲得一種錯的判斷。就是說,一個值被判斷爲集合的元素而實際上不是。若是bloom filter判斷出一個值不是集合的成員,咱們必須保證它的正確性。關於爲何這種狀況發生的更多信息,參考附錄 A。然而,在一些狀況下,這不是大問題。這章最後的一個例子代碼中,咱們將會收集至關大量的有趣的單詞,若是一條記錄中包含了有趣單詞中的一個,保留該條記錄。咱們作這個的目的是想靠去掉不感興趣的內容而使咱們的數據更有意義。若是使用bloom filter表明單詞列表,有時一個單詞可能成爲列表的成員,雖然列表不該該有它。這種狀況下,若是咱們意外保存了一些記錄,咱們任然要達到咱們過濾掉大多數垃圾數據的目的。
下面是使用bloom filtering的條件:
·數據能被分割爲記錄,就像filtering裏的。
·能從每條記錄抽取的特性都在熱點值裏。
·要有一個預先設定的熱點值得條目的集合。
·能容忍一些錯誤。(例如不該該經過檢查的經過了)
Figure 3-2. The structure of the Bloom filtering pattern
圖3-2展現了bloom filtering的結構,和它是怎樣分紅兩個主要組件的。首先,要訓練出值得列表。結果被存在hdfs上。下面是filtering的MapReduce job,跟本章前一個模式相同,除了用到分佈式緩存。由於記錄被一條條分析而且沒有聚合要作,因此沒有reducer。
第一階段訓練值得列表。即從存儲的地方load數據並把每一個條目加到Bloom filter。訓練好的bloom filter對象存儲到已知的hdfs目錄。
第二步,作具體的過濾。Map任務啓動後,從分佈式緩存加載bloom filter。而後,在map方法裏,迭代記錄檢查時候知足隸屬熱點值列表。每條記錄或者經過或者沒經過隸屬關係的檢查。
當數據改變的時候,bloom filter須要從新訓練。所以使用懶加載模式設置bloom filter是合適的。
Job的輸出是那些經過了bloom filter資格測試的子數據集。你應該預料到輸出數據中的一些記錄可能並不在熱點值中,由於bloom filter有必定概率出錯。
Removing most of the nonwatched values
最簡單的例子是去除不經常使用的值。例如,你只對有含有10000個單詞的列表裏的單詞感興趣,用hadoop處理。你拿到這個數據列表,訓練出bloom filter,而後檢查文本數據,看看每條記錄是否命中其中的一個單詞,命中保存繼續執行,沒命中不保存。雖然可能獲得一些錯誤的記錄,但也沒多大問題,由於你已經去掉了大多數數據。
Prefiltering a data set for an expensive set membership check
有時,檢查某個值是不是集合的成員的代價是昂貴的。例如,你可能作涉及到webservice或外部數據庫去檢驗值是否在集合中。這種情形可能很是稀少,但可能忽然出現。替代週期性的把列表放到集羣,你能夠在數據源所在系統產生一個bloom filter並放進去。一旦在適當的位置部署了bloom filter並過濾掉大部分數據,你能夠對數據的來源作第二次檢查。若是bloom filter能去掉95%以上的數據,你將看到在外部只須要命中剩下的5%。使用這種途徑,能夠達到100%的準確率,並不會對外部系統帶來大量查詢的性能問題。
隨後的第五章,咱們會看到一種模式叫「Reduce Side Join with Bloom Filtering」,就是用bloom filter減小發送到reducers的數據量。提早決定記錄是不是咱們想要的,能顯著減小網絡帶寬。
Bloom filter在數據分析領域相對較新,極可能由於在大數據性能方面特別收益於這種以前方法論裏沒有的東西。在hive sql和pig裏,bloom filter能夠實現爲用戶自定義的方法,但做爲本書寫做時,並無當即可用的原生功能。
這種模式的性能很是相似於簡單filtering。從分佈式緩存加載bloom filter花不了多少時間,由於數據相對較小。根據bloom filter檢查值也是相對輕鬆的操做,由於每次都執行常數級別的時間。
bloom filter一個最基本的應用正如它所設計之目的:描述數據集。做爲本例,bloom filter用一些熱點關鍵詞列表訓練。咱們使用bloom filter測試評論裏的每一個單詞是否在熱點列表裏。若是返回true,整個記錄輸出。不然忽略。這裏咱們不關心bloom filter產生的不可避免的false誤報爲true的狀況。下一個例子詳細說明一種使用HBase驗證positive bloom filter的方法。
問題:給出用戶評論數據,過濾出絕大多數不包含指定關鍵詞的評論。
Bloom filter training。爲了演示怎樣使用hadoopbloom filters,下面的代碼段生成預先決定的單詞的集合。這是一個通用的程序,輸入參數爲一個gzip文件或含有gzip文件的目錄,文件裏元素的數量,但願的誤報率,最終輸出文件名。
public classBloomFilterDriver {
public staticvoid main(String[]args) throws Exception{
// Parse command line arguments
Path inputFile= newPath(args[0]);
intnumMembers =Integer.parseInt(args[1]);
floatfalsePosRate =Float.parseFloat(args[2]);
Path bfFile= newPath(args[3]);
// Calculate our vector size and optimal K value based on approximations
intvectorSize =getOptimalBloomFilterSize(numMembers,falsePosRate);
intnbHash =getOptimalK(numMembers,vectorSize);
// Create new Bloom filter
BloomFilter filter= newBloomFilter(vectorSize,nbHash,
Hash.MURMUR_HASH);
System.out.println("Training Bloom filter of size " + vectorSize
+" with " + nbHash + " hash functions, "+ numMembers
+" approximate number of records, and " + falsePosRate
+" false positive rate");
// Open file for read
String line= null;
intnumElements =0;
FileSystem fs= FileSystem.get(newConfiguration());
for(FileStatus status: fs.listStatus(inputFile)) {
BufferedReader rdr= newBufferedReader(newInputStreamReader(
newGZIPInputStream(fs.open(status.getPath()))));
System.out.println("Reading " + status.getPath());
while((line= rdr.readLine()) !=null) {
filter.add(newKey(line.getBytes()));
++numElements;
}
rdr.close();
}
System.out.println("Trained Bloom filter with " + numElements
+" entries.");
System.out.println("Serializing Bloom filter to HDFS at " + bfFile);
FSDataOutputStream strm= fs.create(bfFile);
filter.write(strm);
strm.flush();
strm.close();
System.exit(0);
}
}
代碼中,一個新的bloomfilter對象由最優矢量大小和基於輸入參數的最優數量的hash方法(k)。liststatus返回的每一個文件按行讀,每一行都用來訓練bloom filter。全部輸入文件處理完後,bloom filter以輸入參數爲名字序列化到文件中。由於bloomfilter也是writable對象,因此序列化不值一提。簡單的使用FileSystem對象建立一個流 FSDataOutputStream,把這個流傳給過濾器的寫方法,而後刷新並關閉流。
隨後bloom filter能很容易得從hdfs反序列化回來。僅僅使用FileSystem對象打開文件並傳給bloomfilter的readfilelds方法。Bloom filter的反序列化會在下面map代碼的setup方法裏演示。
Mapper code。Hadoop框架爲每一個mapper在執行大量的map調用以前,調用一次setup方法。這裏,在map方法使用以前,bloom filter從分佈式緩存反序列化回來。分佈式緩存是hadoop通用功能,可以保證hdfs上的文件也會在須要這個文件的每一個task的本地文件系統也存在。Bloom filter就被填充了一個熱點單詞列表。
在map方法裏,評論從每一個輸入記錄中提取。評論被標記爲單詞,每一個單詞清洗掉無關字符。清洗後的單詞就可以使用bloom filter測試。
Notice:bloom filter是在單詞的字節層次上訓練。單詞相同但大小寫不一樣,會被認爲不一樣。除非你的邏輯須要大小寫敏感,最好訓練和測試以前轉換成小寫。
public static classBloomFilteringMapper extends
Mapper<Object,Text, Text, NullWritable> {
privateBloomFilter filter =new BloomFilter();
protectedvoid setup(Context context)throws IOException,
InterruptedException{
// Get file from the DistributedCache
URI[]files = DistributedCache.getCacheFiles(context
.getConfiguration());
System.out.println("Reading Bloom filter from: "
+files[0].getPath());
// Open local file for read.
DataInputStream strm= newDataInputStream(newFileInputStream(
files[0].getPath()));
// Read into our Bloom filter.
filter.readFields(strm);
strm.close();
}
publicvoid map(Object key,Text value,Context context)
throwsIOException,InterruptedException {
Map<String,String> parsed = transformXmlToMap(value.toString());
// Get the value for the comment
String comment= parsed.get("Text");
StringTokenizer tokenizer= newStringTokenizer(comment);
// For each word in the comment
while(tokenizer.hasMoreTokens()) {
// If the word is in the filter, output the record and break
String word= tokenizer.nextToken();
if(filter.membershipTest(newKey(word.getBytes()))) {
context.write(value,NullWritable.get());
break;
}
}
}
}
Bloom filters能幫助費勁的任務減小沒必要要的代價。下面的例子,bloom filter使用至少有1500聲譽值的用戶id訓練。在查詢hbase獲取更多用戶信息以前,咱們使用bloom filter 作初始環境的測試。依靠消除沒必要要的查詢,咱們加速執行時間。
問題:給出用戶評論列表,過濾掉聲譽不超過1500的用戶的評論。
Mapper code。跟前面的例子同樣,用到了反序列化。這個bloom filter使用有聲譽的至少1500用戶的id訓練。僅僅是全部用戶的1.5%,因此將會過濾出大量不須要的查詢。除了bloom filter,hbase table的鏈接也會在setup裏獲取。
在map方法裏,抽取用戶的id,用bloom filter作檢查,若是檢查經過,hbase會用這個id去hbase table查詢出用戶的全部數據。這裏,靠驗證用戶真實的聲譽至少1500,來做廢可能出現的誤報錯誤。
public static classBloomFilteringMapper extends
Mapper<Object,Text, Text, NullWritable> {
privateBloomFilter filter =new BloomFilter();
privateHTable table =null;
protectedvoid setup(Context context)throws IOException,
InterruptedException{
// Get file from the Distributed Cache
URI[]files = DistributedCache.getCacheFiles(context
.getConfiguration());
System.out.println("Reading Bloom filter from: "
+files[0].getPath());
// Open local file for read.
DataInputStream strm= newDataInputStream(newFileInputStream(
files[0].getPath()));
// Read into our Bloom filter.
filter.readFields(strm);
strm.close();
// Get HBase table of user info
Configuration hconf= HBaseConfiguration.create();
table= newHTable(hconf,"user_table");
}
publicvoid map(Object key,Text value,Context context)
throwsIOException,InterruptedException {
Map<String,String> parsed = transformXmlToMap(value.toString());
// Get the value for the comment
String userid= parsed.get("UserId");
// If this user ID is in the set
if(filter.membershipTest(newKey(userid.getBytes()))) {
// Get the reputation from the HBase table
Result r= table.get(newGet(userid.getBytes()));
intreputation =Integer.parseInt(newString(r.getValue(
"attr".getBytes(),"Reputation".getBytes())));
// If the reputation is at least 1500,
// write the record to the file system
if(reputation>= 1500) {
context.write(value,NullWritable.get());
}
}
}
}
Query Buffer Optimization
前面查詢hbase的例子方法相對幼稚。這裏只是爲了演示怎麼應用這種模式,能夠進一步優化。Hbase提供了分批查詢,因此理想的狀況下,能夠預緩存必定大小的查詢結果。這個常數取決於內存能充裕的存多少查詢。而後把查詢刷進hbase執行進一步處理並返回結果。若是昂貴的操做能緩存,這是推薦的作法。只須要記得在mapper或reducer的cleanup方法裏刷新緩存。Context對象能用來寫輸出,就像map或reduce方法。
附: