Spark 數據ETL及部分代碼示例

問題導讀:




1.數據如何處理?
2.從數據中如何提取有用的特徵?
3.有哪些衍生特徵?









數據處理以及轉化


python

一、當咱們完成了一些對數據集的探索和分析,咱們知道了一些關於用戶數據以及電影數據的特徵,接下來咱們該作些什麼呢?


 

二、爲了讓原始數據可以在機器學習算法中變得有用,咱們首先須要清理以及在提取有用的特徵值以前使用各類方法儘量地轉化它。其中的轉化和特徵提取步驟是緊密鏈接的,並且在一些狀況下,特定的轉化就是一種特徵值提取的過程。


 

三、咱們已經看過了在電影數據集中須要清理數據的例子。一般,現實的數據集包含壞的數據、丟失的數據以及異常值。理想狀況下,咱們能夠糾正錯誤的數據;可是,這一般都是不可能的。由於不少數據集來源於那些不可以重複的集合操做。丟失的數據以及異常值也是很常見的,它們能夠用相似於壞數據的處理方法處理。總的來講,歸結爲如下普遍的處理方法:


 

過濾掉或者移除壞數據以及丟失的數據:


 

有時候這是不可避免的;然而這也意味着丟失掉大部分壞的或丟失的記錄。


 

填充壞掉或者丟失的數據:


 

咱們能夠盡力地依據剩下的數據來給壞掉的或者丟失的數據賦值。好比賦給0值、平均值、中位數、附近的值或者類似值等方法。選擇正確的方法一般是一件棘手的任務,這取決於數據、狀況和本身的經驗。


 

應用成熟的技術到異常值:


 

異常值的主要問題在於它們的值多是正確的,儘管它們是極端值。它們也有多是錯誤的。因此很難知道咱們處理的是哪一種狀況。異常值也能夠被移除或者填充。不過幸運的是,有是統計技術(如穩健迴歸)來處理異常值和極端值。


 

轉化潛在的異常值:


 

另外一個處理異常值或者極端值得方法是轉化。例如對數或者高斯內核轉化,計算出潛在的異常值,或者顯示大範圍的潛在數據。這些類型的轉換抑制了變量大尺度變化的影響並將非線性關係轉化爲一個線性的。



填充壞的或丟失的數據:

程序員

咱們以前已經見過過濾壞數據的例子了。咱們接着以前的代碼,下面的代碼段對壞數據應用了填充的方法,經過賦給數據點以相等於year中值的值。



 

[Python]   純文本查看   複製代碼
?
1
2
3
years_pre_processed = movie_fields. map ( lambda fields: fields[ 2 ]). 
map ( lambda x: convert_year(x)).collect() 
years_pre_processed_array = np.array(years_pre_processed) 





 

首先,咱們將在選擇全部的發佈年限後計算year的平均值和中位數,除了那些壞的數據。以後使用numpy函數,從years_pre_processed_array中查找壞數據的索引(參考以前咱們賦予1900給數據點)。最後,咱們使用這個索引來賦予中值給壞的數據:

[Python]   純文本查看   複製代碼
?
01
02
03
04
05
06
07
08
09
10
mean_year = np.mean(years_pre_processed_array[years_pre_processed_ 
array! = 1900 ]) 
median_year = np.median(years_pre_processed_array[years_pre_processed_ 
array! = 1900 ]) 
index_bad_data = np.where(years_pre_processed_array = = 1900 )[ 0 ][ 0
years_pre_processed_array[index_bad_data] = median_year 
print "Mean year of release: %d" % mean_year 
print "Median year of release: %d" % median_year 
print "Index of '1900' after assigning median: %s" % np.where(years_ 
pre_processed_array = = 1900 )[ 0 ]


 

打印結果應該相似於以下:

[Python]   純文本查看   複製代碼
?
1
2
3
Mean year of release: 1989 
Median year of release: 1995 
Index of '1900' after assigning median: []



 

在這裏咱們計算出year的平均值和中位數,從輸出結果中咱們能夠看出,year的中位數由於year的傾斜分佈要比平均值高許多。儘管直接決定使用一個精確的值去填充數據不是常見的作法,可是因爲數據的傾斜,使用中位數去賦值是一種可行的方法。




從數據中提取有用的特徵

算法

一、當咱們完成了對數據初始的處理和清洗,咱們就能夠準備從數據中提取一些實際有用的特徵,這些特徵數據能夠用於之後的機器學習模型中的訓練。


 

二、特徵數據是指咱們用於訓練模型的一些變量。每行數據都有可能包含能夠提取用於訓練的樣例。幾乎全部的機器學習模型都是工做在以數字爲技術的向量數據上。所以,咱們須要將粗糙的數據轉化爲數字。


特徵數據能夠分爲如下幾類:

express

數字特徵


 

這類特徵數據是指一些數值類型的數據。


 

分類特徵


 

這類特徵數據表明一些相同特性的,能夠歸爲一類的一些數據。例如用戶的性別、職位或者電影的類型。


 

文本特徵


 

這類特徵數據是從數據中的文本內容中派生出來的,例如電影名稱,描述,以及評論。


 

其餘特徵


 

這類特徵數據都會轉化爲以數字爲表明的特徵,例如圖片,視頻,音頻均可以表示爲數字數據的集合。地理位置能夠表明爲經度、緯度或者經緯度之差。



數字特徵

編程

一、舊數字和提取的新的特徵數值有什麼區別呢?其實,在現實生活中,任何的數值數據均可以做爲輸入變量,但在機器學習模型中,咱們學習的是每一個特徵的向量權重,例如監督學習模型。


 

二、所以,咱們須要使用那些有意義的特徵數據,那些模型能夠從特徵值與目標數據之間學習關係的特徵數據。例如,年齡就是一個合理的特徵數據,好比年齡的增加和產出有着直接的關係,一樣,身高也是能夠直接使用的數值特徵。



分類特徵

數組

一、分類特徵數據不能直接使用它們原有的粗糙的格式做爲輸入使用,由於它們不是數字。可是它們其中的一些衍生值能夠做爲輸入的變量。好比以前所說的職位就能夠有學生、程序員等。


 

二、這些分類變量只是名義上的變量,由於它們不存在變量值之間的順序的概念。相反,當變量之間存順序概念時,咱們會傾向於使用這些常見有序的變量。


 

三、爲了把這些分類變量轉化爲數字表示,咱們可使用經常使用的方法,例如1-of-k編碼。這種方法須要把那些名義上的變量轉化爲對機器學習任務有用的數據。常見那些粗糙格式的數據都會以名義上的變量形式編碼爲有意義的數據。


 

四、咱們假設這裏有k個值能夠供變量獲取,若是咱們能夠給每一個值都賦予1到k中的索引,而後咱們就可使用程度爲k的二進制向量表示一個值了。初始的實體中,向量表示的二進制值都是0,當咱們賦予變量一個狀態的時候,所對應的二進制向量中對應的索引值由0變成1。


 

例如,咱們先獲取上面所說的職位的全部類別變量:

[Python]   純文本查看   複製代碼
?
1
2
3
all_occupations = user_fields. map ( lambda fields: fields[ 3 ]). 
distinct().collect() 
all_occupations.sort()



 

接着咱們能夠賦給每一個可能的職位類別一個值(值得索引從零開始,由於在Python、Scala、Java數組中索引都是從0開始的)

[Python]   純文本查看   複製代碼
?
1
2
3
4
5
6
7
8
9
idx = 0 
all_occupations_dict = {} 
for o in all_occupations: 
     all_occupations_dict[o] = idx 
idx + = 1 
# try a few examples to see what "1-of-k" encoding is assigned 
print "Encoding of 'doctor': %d" % all_occupations_dict[ 'doctor'
print "Encoding of 'programmer': %d" % all_occupations_ 
dict [ 'programmer' ]


 

你將看到以下打印結果:

[Python]   純文本查看   複製代碼
?
1
2
Encoding of 'doctor' : 2 
Encoding of 'programmer' : 14


 

最後咱們能夠對上面打印的結果中programmer進行編碼,咱們能夠首先建立一個長度爲k(在這個案例中)的numpy數組而且值所有填0(咱們將使用numpy數組中的zeros函數建立這個數組)。


 

咱們將提取單詞programmer的索引並賦予1給數組的這個索引:

[Python]   純文本查看   複製代碼
?
1
2
3
4
5
6
K = len (all_occupations_dict) 
binary_x = np.zeros(K) 
k_programmer = all_occupations_dict[ 'programmer'
binary_x[k_programmer] = 1 
print "Binary feature vector: %s" % binary_x 
print "Length of binary vector: %d" % K



 

上面結果將呈現給咱們長度爲21的二進制特徵的向量:

[Python]   純文本查看   複製代碼
?
1
2
3
Binary feature vector: [ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 
                          0. 0. 1. 0. 0. 0. 0. 0. 0.
Length of binary vector: 21

衍生特徵

app

一、一般會從一或多個可得到的變量中計算出衍生特徵是頗有用的,咱們但願那些衍生特徵能夠相比於原來粗糙格式的變量添加更多的信息。


 

二、例如,咱們能夠計算全部電影評分數據中的用戶平均評分,用戶平均評分將提供針對用戶截差的模型。咱們已經獲取了粗糙的評分數據,而且建立了新的可讓咱們學習更好模型的特徵。


 

三、從粗糙數據中獲取衍生特徵數據的例子包括平均值、中位數、求和、最大值、最小值以及總數等。好比在電影數據中,咱們能夠經過如今的年限減去電影發佈年限得到電影的年齡。


 

四、一般,這些轉化用來產生數值數據以便於更好的讓模型去學習。


 

五、把數字特徵值轉化爲分類特徵值也很常見,好比



轉化timestamps值爲分類特徵值


dom

爲了演示怎樣從數字特徵值衍生爲分類特徵值,咱們將使用電影評分數據中的評分時間。這些時間都是Unix timestamps格式。咱們能夠用Python的datetime模塊去從timestamp中獲取date和time,而後提取day中的hour。這將爲每一個評分中day的hour成一個RDD。


 

咱們將須要一個函數去提取表明評分timestamp的datetime:

[Python]   純文本查看   複製代碼
?
1
2
3
def extract_datetime(ts): 
     import datetime 
     return datetime.datetime.fromtimestamp(ts)


 

咱們繼續使用以前例子之中計算出的rating_data RDD


 

首先,咱們使用map轉化提取timestamp列,把它轉化爲Python中int類型。對每一個timestamp應用extract_datetime方法,而後從結果datetime對象中提取hour:
[Python]   純文本查看   複製代碼
?
1
2
3
timestamps = rating_data. map ( lambda fields: int (fields[ 3 ])) 
hour_of_day = timestamps. map ( lambda ts: extract_datetime(ts).hour) 
hour_of_day.take( 5 )


 

若是咱們從結果RDD中獲取前五條記錄,咱們將看到如下輸出結果:

[Python]   純文本查看   複製代碼
?
1
[ 17 , 21 , 9 , 7 , 7 ]


 

至此咱們已經將粗糙的時間數據轉化爲了評分數據中表明day中hour的分類特徵數據


 

如今,咱們說的這種轉化可能優勢粗糙,也許咱們想更加貼切地定義轉化。咱們能夠將天天中的小時轉化爲表明天天時間中的塊。例如咱們能夠定義morning是從7 am到 11 am、lunch是從11 am到 1am等。使用這些塊,咱們能夠建立方法給天天中的時間賦值,下面將day中的hour做爲輸入:
[Python]   純文本查看   複製代碼
?
01
02
03
04
05
06
07
08
09
10
11
def assign_tod(hr): 
     times_of_day =
         'morning' : range ( 7 , 12 ), 
         'lunch' : range ( 12 , 14 ), 
         'afternoon' : range ( 14 , 18 ), 
         'evening' : range ( 18 , 23 ), 
         'night' : range ( 23 , 7
    
for k, v in times_of_day.iteritems(): 
     if hr in v: 
     return k


 

如今,咱們能夠將assign_tod函數應用到存在於hour_of_day RDD中的每一個評分記錄中的hour上。

[Python]   純文本查看   複製代碼
?
1
2
time_of_day = hour_of_day. map ( lambda hr: assign_tod(hr)) 
time_of_day.take( 5 )


 

若是咱們獲取這個RDD的前5條記錄,咱們將看到以下轉化後的值:

[Python]   純文本查看   複製代碼
?
1
[ 'afternoon' , 'evening' , 'morning' , 'morning' , 'morning' ]



 

到此,咱們已經將timestamp變量轉化爲24小時格式的hours變量,以及自定義的天天中的時間值。所以咱們已經有了分類特徵值,可使用以前介紹的1-of-k編碼方法去生成二進制特徵的向量。



文本特徵值

機器學習

一、在某些狀況下,文本特徵值是以分類以及衍生特徵存在的。咱們拿電影的描述信息做爲例子。這裏,粗糙的數據不能被直接使用,即便是做爲分類特徵,由於若是每一個文本都有值,那將會產生無限種可能組合的單詞。咱們的模型幾乎不會出現兩種相同特徵,就算有那麼學習效率也不會高。所以,咱們但願將原始文本變成一種更適合機器學習的形式。


 

二、有不少的方法能夠處理文本,並且天然語言領域處理致力於處理、呈現和模型化文本內容。咱們將介紹簡單和標準的方法來實現文本特徵提取,這個方法就是詞袋模型表示。


 

三、詞袋模型將文本塊視爲單詞的集合以及可能存在的數字,詞袋方法的處理以下:


 

標記:首先,一些形式的標記用於將文本分割爲標記的集合(通常是單詞,數字等)。例如常見的空格標記,將文本按照每一個空格分隔,還有其餘的一些標點和非字母數字的標記。


 

能夠移除的中止詞:通常咱們會移除文本中很是常見的詞,例如」the」、」and」、」but」(這些都稱爲中止詞)。


 

詞幹提取:接下來的操做包括詞幹提取,一種獲取輸入項,而後將其提取爲其最基礎的值。一個常見的例子就是複數編程單數,或者dogs變成dog。有不少方法能夠實現詞幹提取,有不少文本處理庫也包含各類詞幹提取算法。


 

向量化:最後一步是將處理項轉化爲向量表示形式。最簡單的形式也許就是二進制的向量表示形式,若是一個處理項包含在文本中,咱們就給它賦值爲1,若是沒有就賦值爲0。本質上是咱們以前提到的分類的1-of-k編碼。相似1-of-k編碼,這裏須要一個字典將這些項映射爲一個個索引。也許你會想到,這裏可能存在幾百萬單獨項。所以,使用稀疏向量表示形式是很是嚴格的,只在那些處理項已被保存的狀況下使用,這樣能夠節省內存、磁盤空間以及處理時間。



簡單文本特徵提取

函數

咱們使用電影評分數據中的電影名稱演示以二進制向量方法提取文本特徵值。


 

首先咱們建立函數去除每部電影的發佈年限,僅留下電影名稱。


 

電影數據示例:

[Python]   純文本查看   複製代碼
?
1
1 |Toy Story ( 1995 )| 01 - Jan - 1995 ||[url = http: / / us.imdb.com / M / title - exact?Toy % 20Story % 20 ]http: / / us.imdb.com / M / title - exact?Toy % 20Story % 20 [ / url]( 1995 )| 0 | 0 | 0 | 1 | 1 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0



 

咱們將使用Python的 regular expression模塊re,去搜索出存在於電影名稱列的電影發佈年限。當咱們匹配到這個regular expression,咱們將只提取出電影名稱,示例:

[Python]   純文本查看   複製代碼
?
01
02
03
04
05
06
07
08
09
10
def extract_title(raw): 
     import re 
     # this regular expression finds the non-word (numbers) between 
     parentheses 
     grps = re.search( "(\w+)" , raw) 
     if grps: 
     # we take only the title part, and strip the trailingwhite spacefrom the remaining text, below 
         return raw[:grps.start()].strip() 
     else
         return raw



 

接下來,咱們將從movie_fields RDD中提取出粗糙的電影名稱:

[Python]   純文本查看   複製代碼
?
1
2
/ / 包含電影發佈年限,格式:Toy Story ( 1995
raw_titles = movie_fields. map ( lambda fields: fields[ 1 ])


 

而後咱們經過下面的代碼提取5條記錄測試extract_title函數的功能:

[Python]   純文本查看   複製代碼
?
1
2
for raw_title in raw_titles.take( 5 ): 
     print extract_title(raw_title)



 

經過打印結果咱們能夠驗證函數執行狀況,打印結果示例:

[Python]   純文本查看   複製代碼
?
1
2
3
4
5
Toy Story 
GoldenEye 
Four Rooms 
Get Shorty 
Copycat


 

咱們將應用函數以及標記模式來提取電影名稱爲單個元素,下面咱們使用簡單地空格標記來分離電影名稱。

[Python]   純文本查看   複製代碼
?
1
2
3
4
5
movie_titles = raw_titles. map ( lambda m: extract_title(m)) 
# next we tokenize the titles into terms. We'll use simple whitespace 
tokenization 
title_terms = movie_titles. map ( lambda t: t.split( " " )) 
print title_terms.take( 5 )


 

打印結果:

[Python]   純文本查看   複製代碼
?
1
[[u 'Toy' , u 'Story' ], [u 'GoldenEye' ], [u 'Four' , u 'Rooms' ], [u 'Get' ,u 'Shorty' ], [u 'Copycat' ]]



 

如今咱們能夠看出電影名稱以及被按照空格分離爲單個的標記了。


 

爲了給每一項賦值一個向量的索引,咱們須要建立詞典,將每一項都映射到一個整數索引。


 

首先,咱們將使用Spark的flatMap函數來擴張title_terms RDD中每條記錄的list字符串,轉化爲每條記錄都是一項的名爲all_terms的RDD。


 

咱們獲取全部的惟一項,而後賦值索引,就像以前的對職位操做的1-of-k編碼。

[Python]   純文本查看   複製代碼
?
01
02
03
04
05
06
07
08
09
10
# next we would like to collect all the possible terms, in order to 
build out dictionary of term < - > index mappings 
all_terms = title_terms.flatMap( lambda x: x).distinct().collect() 
# create a new dictionary to hold the terms, and assign the "1-of-k" 
indexes 
idx = 0 
all_terms_dict = {} 
for term in all_terms: 
     all_terms_dict[term] = idx 
idx + = 1



 

咱們打印出惟一項的總數來測試咱們的map功能是否正常工做:

[Python]   純文本查看   複製代碼
?
1
2
3
print "Total number of terms: %d" % len (all_terms_dict) 
print "Index of term 'Dead': %d" % all_terms_dict[ 'Dead'
print "Index of term 'Rooms': %d" % all_terms_dict[ 'Rooms' ]


 

打印結果:

[Python]   純文本查看   複製代碼
?
1
2
3
Total number of terms: 2645 
Index of term 'Dead' : 147 
Index of term 'Rooms' : 1963



 

咱們也可使用Spark的zipWithIndex函數來更加有效地實現上面的結果,這個函數獲取values的RDD而後經過索引合併它們而且建立一個新的key-value對RDD,這個新的RDD的key就是惟一項,value是這個項的字典索引。咱們經過使用collectAsMap函數來將這個key-value RDD做爲Python字典方法傳入driver。

[Python]   純文本查看   複製代碼
?
1
2
3
4
all_terms_dict2 = title_terms.flatMap( lambda x: x).distinct(). 
zipWithIndex().collectAsMap() 
print "Index of term 'Dead': %d" % all_terms_dict2[ 'Dead'
print "Index of term 'Rooms': %d" % all_terms_dict2[ 'Rooms' ]



 

打印結果:

[Python]   純文本查看   複製代碼
?
1
2
Index of term 'Dead' : 147 
Index of term 'Rooms' : 1963


 

最後一步是建立一個函數將惟一項的集合轉化爲一個稀疏的向量表示形式。爲了達到效果,咱們將建立一個空的,有一行以及和字典中惟一項總數的列的稀疏矩陣。而後咱們將經過輸入列表中的每一項來檢查這一項是否存在於咱們的惟一項字典中。若是是,咱們將給這個字典中對應的這個惟一項的索引賦值爲1。


 

[Python]   純文本查看   複製代碼
?
01
02
03
04
05
06
07
08
09
10
11
12
# this function takes a list of terms and encodes it as a scipy sparse 
vector using an approach 
# similar to the 1-of-k encoding 
def create_vector(terms, term_dict): 
     from scipy import sparse as sp 
         num_terms = len (term_dict) 
         x = sp.csc_matrix(( 1 , num_terms)) 
         for t in terms: 
相關文章
相關標籤/搜索