Spark上數據的獲取、處理與準備

1、獲取公開數據集

  • **UCI機器學習知識庫:**包括近300個不一樣大小和類型的數據集,可用於分類、迴歸、聚類和推薦系統任務。數據集列表位於:http://archive.ics.uci.edu/ml/
  • ** Amazon AWS公開數據集:**包含的一般是大型數據集,可經過Amazon S3訪問。這些數據集包括人類基因組項目、Common Craw網頁語料庫、維基百科數據和Google Books Ngrams。相關信息可參見:http://aws.amazon.com/publicdatasets/
  • **Kaggle:**這裏集合了Kaggle舉行的各類機器學習競賽所用的數據集。它們覆蓋分類、迴歸、排名、推薦系統以及圖像分析領域,可從Competitions區域下載:http://www.kaggle.com/competitions
  • **KDnuggets:**這裏包含一個詳細的公開數據集列表,其中一些上面提到過的。該列表位於:http://www.kdnuggets.com/datasets/index.html

MovieLens 100k 數據集

MovieLens 100k數據集包含表示多個用戶對多部電影的10萬次評級數據,也包含電影元數據和用戶屬性信息。html

下載連接:http://files.grouplens.org/datasets/movielens/ml-100k.zippython

下載後解壓:程序員

unzip ml-100k.zip

其中重要文件有:u.user(用戶屬性文件)、u.item(電影元數據)和u.data(用戶對電影的評級)。關於數據集更多信息可從README得到,包括每一個數據文件裏的變量定義。正則表達式

可用head命令查看各個文件中的內容:算法

head -5 u.user

2、探索與可視化數據

工具:IPython 是針對Python的一個高級交互式殼程序,包含內置一系列實用功能的pylab,其中有NumPy和SciPy用於數值計算,以及matplotlib用於交互式繪圖和可視化。shell

安裝方法:http://ipython.org/install.html數組

Anaconda安裝方法: https://blog.csdn.net/zhdgk19871218/article/details/46502637bash

Ipython notebook使用教程: https://blog.csdn.net/worfs123456/article/details/53506351網絡

基於pyspark和scala的Jupyter notebook安裝: https://blog.csdn.net/xmo_jiao/article/details/72674687?utm_source=itdadao&utm_medium=referralapp

toree安裝Scala的notebook: https://blog.csdn.net/cafebar123/article/details/78636826

PySpark支持運行Python時可指定參數。在啓動PySpark終端時,咱們可使用IPython而非標準的Python shell。啓動時也能夠向IPython傳入其餘參數,包括讓它在啓動時也啓用pylab功能。

可在Spark主目錄下運行以下命令啓動IPython(https://blog.csdn.net/JavaMoo/article/details/77275515):

PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="--pylab" ./bin/pyspark

2.1 探索用戶數據

啓動基於pyspark的Jupyter notebook:

cd ~/IPythonNotebook
bash /usr/local/spark/starths.sh  #啓動腳本
/usr/local/spark/bin/pyspark   #打開 jupyter notebook
%matplotlib inline
import numpy as np
from matplotlib.pylab import *
import matplotlib.pyplot as plt
from matplotlib.pyplot import hist

首先分析MovieLens用戶的特徵:

下述代碼生成RDD,每個記錄對應一個Python列表,各列表由 用戶ID(user ID) , 年齡(age),性別(gender),職業(occupation) 和 郵編(ZIP code) 五個屬性構成。

user_data = sc.textFile("/winnie/DataSets/ml-100k/u.user")  #hadoop文件
user_data.first()   #或take(1)

'1|24|M|technician|85711'

統計用戶,性別,職業和郵編的數目:

user_fields = user_data.map(lambda line: line.split("|"))   # 用 | 分隔各行數據
num_users = user_fields.map(lambda fields: fields[0]).count()  # 用戶數
num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()  # 性別數目,去重
num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count() #職業數目
num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()  #郵編數目
print("Users: %d, genders: %d, occupations: %d, ZIP codes: %d" % (num_users, num_genders, num_occupations, num_zipcodes))

Users: 943, genders: 2, occupations: 21, ZIP codes: 795

用 matplotlib 的 hist 函數來建立一個直方圖,以分析用戶年齡的分佈狀況:

ages = user_fields.map(lambda x: int(x[1])).collect()  #提出年齡全部數據項
hist(ages, bins=20, color='lightblue', density=True)   #區間數爲20
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)

用戶的年齡段分佈 瞭解用戶的職業分佈狀況。首先利用以前用到的MapReduce方法來計算數據集中各類職業的出現次數,而後matplotlib下的bar函數來繪製一個不一樣職業的數量的條形圖:

count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()  #職業出現次數
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])
                                      
x_axis = x_axis1[np.argsort(y_axis1)]  #升序
y_axis = y_axis1[np.argsort(y_axis1)]

#建立條形圖
pos = np.arange(len(x_axis))
width = 1.0

ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)
                                      
plt.bar(pos, y_axis, width, color='lightblue')
plt.xticks(rotation=30)  #旋轉30°
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)

用戶的職業分佈 Spark對RDD提供了一個名爲countByValue的便捷函數。它會計算RDD裏各不一樣值所分別出現的次數,並將其以Python dict函數的形式(或是Scala、Java下的Map函數)返回給驅動程序:

count_by_occupation2 = user_fields.map(lambda fields: fields[3]).countByValue()
print("Map-reduce approach:")
print(dict(count_by_occupation2))
print("")
print("countByValue approach:")
print(dict(count_by_occupation))

Map-reduce approach: {'technician': 27, 'other': 105, 'writer': 45, 'executive': 32, 'administrator': 79, 'student': 196, 'lawyer': 12, 'educator': 95, 'scientist': 31, 'entertainment': 18, 'programmer': 66, 'librarian': 51, 'homemaker': 7, 'artist': 28, 'engineer': 67, 'marketing': 26, 'none': 9, 'healthcare': 16, 'retired': 14, 'salesman': 12, 'doctor': 7} countByValue approach: {'other': 105, 'executive': 32, 'administrator': 79, 'student': 196, 'educator': 95, 'programmer': 66, 'homemaker': 7, 'artist': 28, 'engineer': 67, 'none': 9, 'retired': 14, 'doctor': 7, 'technician': 27, 'writer': 45, 'lawyer': 12, 'scientist': 31, 'entertainment': 18, 'librarian': 51, 'marketing': 26, 'healthcare': 16, 'salesman': 12}

2.2 探索電影數據

簡單看一下某行記錄,而後再統計電影總數:

movie_data = sc.textFile("/winnie/DataSets/ml-100k/u.item")
print(movie_data.first())
num_movies = movie_data.count()
print("Movies: %d" % num_movies)

1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0 Movies: 1682

電影數據中有些數據不規整,故須要一個函數來處理解析releasedate時可能的解析錯誤.這裏命名該函數爲convert_year.接着即可在調用電影數據進行map轉換時應用該函數,並取回其結果: 使用Spark的filter轉換操做過濾掉問題數據後,用當前年份減去發行年份,從而將電影發行年份列表轉換爲電影年齡.接着用countByValue來計算不一樣年齡電影的數目.最後繪製電影年齡直方圖(一樣會使用hist函數,且其values變量的值來自countByValue的結果,主鍵則爲bins變量):

def convert_year(x):
    try:
        return int(x[-4:])
    except:
        return 1900 #若數據缺失年份則將其年份設爲1900,在後續處理中會過濾掉這類數據。
    
movie_fields = movie_data.map(lambda lines: lines.split("|"))
years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x))  #得到年份

years_filtered = years.filter(lambda x: x!= 1900)   #過濾掉缺失年份的電影數據

movie_ages = years_filtered.map(lambda yr: 1998-yr).countByValue()  #電影年齡(相對於如今1998年)

# values = movie_ages.values()    #不一樣年齡電影的個數
# bins = movie_ages.keys()    #不一樣電影的年齡
# 用了比較複雜的方法:升序排序,dict和list來回轉換,不然出錯

movieages=sorted(movie_ages.items(),key=lambda x:x[0])
values = list(dict(movieages).values())    #不一樣年齡電影的個數
bins = list(dict(movieages).keys())    #不一樣電影的年齡

hist(values, bins=bins, color='lightblue', density=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16,10)

電影的年齡分佈

2.3 探索評級數據

先來看一下評級數據:

rating_data_raw = sc.textFile("/winnie/DataSets/ml-100k/u.data")
print(rating_data_raw.first())
num_ratings = rating_data_raw.count() # 評級次數 10萬
print("Ratings: %d" % num_ratings)

196 242 3 881250949 Ratings: 100000

能夠看到評級次數共有10萬.另外和用戶數據與電影數據不一樣,評級記錄用'\t'分隔.下面作些基本統計,以及繪製評級值分佈的直方圖:

# 繪製評級值分佈的直方圖
rating_data = rating_data_raw.map(lambda line: line.split("\t"))
ratings = rating_data.map(lambda fields: int(fields[2]))  #獲取評級
max_rating = ratings.reduce(lambda x, y: max(x, y))   #獲取最大評級
min_rating = ratings.reduce(lambda x, y: min(x, y))   #獲取最小評級
mean_rating = ratings.reduce(lambda x, y: x + y) / num_ratings  #平均評級
median_rating = np.median(ratings.collect())  #評級中位數
ratings_per_user = num_ratings / num_users    #每一個用戶的平均評級次數
ratings_per_movie = num_ratings / num_movies  #每部電影的平均評級次數
print("Min rating: %d" % min_rating)
print("Max rating: %d" % max_rating)
print("Average rating: %2.2f" % mean_rating)
print("Median rating: %d" % median_rating)
print("Average # of ratings per user: %2.2f" % ratings_per_user)
print("Average # of ratings per movie: %2.2f" % ratings_per_movie)

Min rating: 1 Max rating: 5 Average rating: 3.53 Median rating: 4 Average # of ratings per user: 106.04 Average # of ratings per movie: 59.45

Spark對RDD也提供一個名爲states的函數.該函數包含一個數值變量用於作相似的統計(其中,stdev爲標準差):

ratings.stats()

(count: 100000, mean: 3.5298600000000024, stdev: 1.125667970762251, max: 5.0, min: 1.0)

能夠看出,用戶對電影的平均評級(mean)是3.5左右,而評級中位數(median)爲4.這就能期待說評級的分佈稍傾向高點的得分.要驗證這的,能夠建立一個評級值分佈的條行圖.

# 建立評級值分佈條形圖
count_by_rating1 = ratings.countByValue()  #評級計數
count_by_rating=sorted(count_by_rating1.items(),key=lambda x:x[0])   #排序
x_axis = dict(count_by_rating).keys()
y_axis = np.array([float(c) for c in dict(count_by_rating).values()])
# 這裏對y軸正則化,使它表示百分比
y_axis_normed = y_axis / y_axis.sum()
pos = np.arange(len(x_axis))
width = 1.0

ax = plt.axes()
# ax.set_xticks(pos + (width / 2))
ax.set_xticks(pos)
ax.set_xticklabels(x_axis)

plt.bar(pos, y_axis_normed, width, color='lightblue')
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)

電影評級的分佈

一樣也能夠求各個用戶評級次數的分佈狀況.先從rating_data RDD裏提取出以用戶ID爲主鍵,評級爲值的鍵值對.以後調用Spark的groupByKey函數,來對評級以用戶ID爲主鍵進行分組:

user_ratings_grouped = rating_data.map(lambda fields: (int(fields[0]), int(fields[2]))).groupByKey().sortByKey() #(用戶主鍵,評級)

# 求出每個主鍵(用戶ID)對應的評級集合的大小
user_ratings_byuser = user_ratings_grouped.map(lambda kv: (kv[0], len(kv[1])))
user_ratings_byuser.take(5)

[(1, 272), (2, 62), (3, 54), (4, 24), (5, 175)]

最後,用hist函數來繪製各用戶評級分佈的直方圖:

# 用hist函數繪製各用戶評級分佈的直方圖
user_ratings_byuser_local = user_ratings_byuser.map(lambda kv: kv[1]).collect() #評級次數
hist(user_ratings_byuser_local, bins=200, color='lightblue', density=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)

各用戶的電影評級的分佈

3、處理與轉換數據

爲讓原始數據可用於機器學習算法,須要先對其進行清理,並可能須要將其進行各類轉換,以後才能從轉換後的數據裏提取有用的特徵.數據的轉換和特徵提取聯繫緊密.某些狀況下,一些轉換自己即是特徵提取的過程.

通常來講,現實中的數據會存在信息不規則,數據點缺失和異常值問題.理想狀況下,咱們會修復非規整數據.但不少數據集都源於一些難以重現的收集過程(好比網絡活動數據和傳感器數據),故實際上會難以修復.值缺失和異常也很常見,且處理方式可與處理非規整信息相似.大體處理方法以下:

  • **過濾掉或刪除非規整或有值缺失的數據:**這一般是必須的,但的確會損失這些數據裏那些好的信息.
  • **填充非規整或缺失的數據:**能夠根據其餘的數據來填充非規整或缺失的數據.方法包括用零值、全局指望或中值來填充,或是根據相鄰或相似的數據點來作插值(一般針對時序數據)等.選擇正確的方式並不容易,它會因數據、應用場景和我的經驗而不一樣.
  • **對異常值作魯棒處理:**異常值的主要問題在於即便它們是極值也不必定就是錯的.究竟是對是錯一般很難分辨.異常值可被移除或是填充,但的確存在某些統計技術(如魯棒迴歸)可用於處理異常值或是極值.
  • **對可能的異常值進行轉換:**另外一種處理異常值的或極值的方法是進行轉換.對那些可能存在異常值或值域覆蓋過大的特徵,利用如對數或高斯覈對其轉換.這類轉換有助於下降變量存在的值跳躍的影響,並將非線性關係變爲線性的.

非規整數據和缺失數據的填充

下面的代碼對發行日期有問題的數據採起填充策略,即用發行日期的中位數來填充問題數據:

# 對發行日期有問題的數據採起填充策略,即用發行日期的中位數來填充問題數據:
years_pre_processed = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x)).collect() #發行年份,缺失爲1900
years_pre_processed_array = np.array(years_pre_processed)

# 首先計算髮行年份的平均數和中位數(選取的數據不包含非規整數據)。
# 而後用numpy的函數來找出year_pre_processed_array中的非規整數據點的序號(1900爲非規則數據)。
# 最後經過該序號來將中位數做爲非規則數據的發行年份。
mean_year = np.mean(years_pre_processed_array[years_pre_processed_array!=1900]) #對規整數據求均值
median_year = np.median(years_pre_processed_array[years_pre_processed_array!=1900])  #對規整數據求中位數
index_bad_data = np.where(years_pre_processed_array==1900)[0] #若是是[0][0],只會替換第一個非規整數據
years_pre_processed_array[index_bad_data] = median_year
print("Mean year of release: %d" % mean_year)
print("Median year of release: %d" % median_year)
print("Index of '1900' after assigning median: %s" % np.where(years_pre_processed_array == 1900)[0])

Mean year of release: 1989 Median year of release: 1995 Index of '1900' after assigning median: []

4、從數據中提取有用特徵

在完成對數據的初步探索、處理和清理後,即可從中提取可供機器學習模型訓練用的特徵. **特徵(feature)**指那些用於模型訓練的變量.每一行數據包含可供提取到訓練樣本中的各類信息.從根本上說,幾乎全部機器學習模型都是與用向量表示的數值特徵打交道;所以,須要將原始數據轉換爲數值.

特徵能夠歸納地分爲以下幾種:

  • **數值特徵(numerical feature):**這些特徵一般爲實數或整數,好比年齡.
  • **類別特徵(categorical feature):**它們的取值只能是可能狀態集合中的某一種.咱們數據集中的用戶性別、職業或電影類別即是這類.
  • **文本特徵(text feature):**它們派生自數據中的文本內容,好比電影名、描述或是評論.
  • **其餘特徵:**大部分其餘特徵都最終表示爲數值.好比圖像、視頻和音頻可被表示爲數值數據的集合.地理位置則可由經緯度或地理散列(geohash)表示.

4.1 數值特徵

原始的數值和一個數值特徵之間的區別是什麼? 實際上,任何數值數據都能做爲輸入變量.可是,機器學習模型中所學習的是各個特徵所對應的向量的權值.這些權值在特徵值到輸出或是目標變量(指在監督學習模型中)的映射過程當中扮演重要角色.

由此咱們會使用那些合理的特徵,讓模型能從這些特徵學到特徵值和目標變量之間的關係.好比年齡就是一個合理的特徵.年齡的增長和某項支出直接之間可能就存在直接關係.相似地,高度也是一個可直接使用的數值特徵.

當數值特徵仍處於原始形式時,其可用性相對較低,但能夠轉化爲更有用的表示形式.位置信息即是如此.若使用原始位置信息(好比使用經緯度表示的),咱們的模型可能學習不到該信息和某個輸出之間的有用關係,這就使得該信息的可用性不高,除非數據點的確很密集.然而若對位置進行聚合或挑選後(好比聚焦爲一個城市或國家),便容易和特定輸出之間存在某種關聯了.

4.2 類別特徵

當類別特徵仍爲原始形式時,其取值來自全部可能值所構成的集合而不是一個數字,故不能做爲輸入.如例子中的用戶職業即是一個類別特徵變量,其可能取值有學生,程序員等.

這樣的類別特徵也稱做名義(nominal)變量,即其各個可能取值之間沒有順序關係.相反,那些存在順序關係的(好比評級)則被稱爲有序(ordinal)變量

將類別特徵表示爲數字i形式,常可藉助k之1(1-of-k)方法進行.將名義變量表示爲可用於機器學習任務的形式,會須要藉助如k之1編碼這樣的方法.有序變量的原始值可能就能直接使用,但也常會通過和名義變量同樣的編碼處理.

假設變量可取的值有k個.若是對這些值用1到k編序,則能夠用長度爲k的二元向量來表示一個變量的取值.在這個向量裏,該取值對應的序號所在的元素爲1,其餘元素都爲0.

咱們能夠取回occupation的全部可能取值:

# 職業
all_occupations = user_fields.map(lambda fields: fields[3]).distinct().collect()
all_occupations.sort()
# print(all_occupations)

# 依次對各可能的職業分配序號(從0開始編號):
idx = 0
all_occupations_dict = {}
for o in all_occupations:
    all_occupations_dict[o] = idx
    idx += 1

# 看一下"k之1"編碼會對新的例子分配什麼值
print("Encoding of 'doctor': %d" % all_occupations_dict['doctor'])
print("Encoding of 'programmer': %d" % all_occupations_dict['programmer'])

# 編碼programmer的取值.首先需建立一個長度和可能的職業數目相同的numpy數組,其各元素值爲0,這可經過numpy的zeros函數實現.
# 以後提取單詞programmer的序號,並將數組中對應該序號的那個元素值賦爲1:
K = len(all_occupations_dict)
binary_x = np.zeros(K)
k_programmer = all_occupations_dict['programmer']
binary_x[k_programmer] = 1
print("Binary feature vector: %s" % binary_x)
print("Length of binary vectory: %d" % K)

Encoding of 'doctor': 2 Encoding of 'programmer': 14 Binary feature vector: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.] Length of binary vectory: 21

4.3 派生特徵

從現有的一個或多個變量派生出新的特徵經常是有幫助的.理想狀況下,派生出的特徵能比原始屬性帶來更多信息.

從原始數據派生特徵的例子包括計算平均值,中位值,方差,和,差,最大值或最小值以及計數.

數值特徵到類別特徵的轉換也很常見,好比劃分爲區間特徵.進行這類轉換的變量常見的有年齡,地理位置和時間.

將時間戳轉爲類別特徵 下面以對評級時間的轉換爲例,說明如何將數值數據轉換爲類別特徵.該時間的格式爲Unix的時間戳.咱們能夠用Python的datetime模塊從中提取出日期,時間以及點鐘(hour)信息.其結果將是由各評級對應的點鐘數所構成的RDD.

# 定義一個函數將評級時間戳提取爲datatime的格式:
def extract_datetime(ts):
    import datetime
    return datetime.datetime.fromtimestamp(ts)

# 首先使用map將時間戳屬性轉換爲Python int 類型.
# 而後經過extract_datetime函數將各時間戳轉爲datetime類型的對象,進而提取其點鐘數
timestamps = rating_data.map(lambda fields: int(fields[3]))  #時間戳,好比 881250949
hour_of_day = timestamps.map(lambda ts: extract_datetime(ts).hour) # 時鐘
hour_of_day.take(5)

[23, 3, 15, 13, 13]

假設想要更爲精確的表示,能夠將點鐘數劃分到一天中的不一樣階段.建立一個以點鐘數爲輸入的函數來返回相應的時間段:

# 將點鐘數劃分到一天中的不一樣時段
def assign_tod(hr):
    times_of_day = {
        'morning' : range(7, 12),
        'lunch' : range(12, 14),
        'afternoon' : range(14, 18),
        'evening' : range(18, 24),
        'night' : range(0, 7)
    }
    for k, v in times_of_day.items():
        if hr in v:
            return k
        
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))
time_of_day.take(5)

['evening', 'night', 'afternoon', 'lunch', 'lunch']

咱們已將時間戳變量轉爲點鐘數,再接着轉爲了時間段,從而獲得了一個類別特徵.咱們能夠藉助以前提到的k之1編碼方法來生成其相應的二元特徵向量.

4.4 文本特徵

從某種意義上說,文本特徵也是一種類別特徵或派生特徵.

文本的處理方式有不少種.天然語言處理即是專一於文本內容的處理,表示和建模的一個領域.

在這裏僅介紹一種簡單且標準化的文本特徵提取方法.該方法被視爲**詞袋(bag-of word)**表示法.

詞袋法將一段文本視爲由其中的文本或數字組成的集合,其處理過程以下:

  • 分詞(tokenization):首先會應用某些分詞方法來將文本分隔爲一個由詞(通常如單詞,數字等)組成的集合.可用的方法如空白分隔法.這種方法在空白處對文本分隔並可能還刪除其餘如標點符號和其餘非字母或數字字符.
  • **刪除停用詞(stop words removal):**以後,它一般會刪除常見的單詞,好比the, and和but(這些詞被稱做停用詞).
  • **提取詞幹(stemming):**下一步則是詞幹的提取.這是指將各個詞簡化爲其基本的形式或者幹詞.常見的例子如複數變爲單數.提取的方法有不少種,文本處理算法庫中經常會包括多種詞幹提取方法.
  • 向量化(vectorization):最後一步就是用向量來表示處理好的詞.二元向量多是最爲簡單的表示方法.它用1和0來分別表11示是否存在某個詞.從根本上說,這與以前提到的k之1編碼相同.它須要一個詞的字典來實現詞到索引序號的映射.隨着遇到的詞增多,各類詞可能達樹百萬.由此,使用稀疏矩陣來表示就很掛機.這種表示只記錄某個詞是否出現過,從而節省內存和磁盤空間,以及計算時間.

提取簡單的文本特徵 以數據集中的電影標題爲例,來示範如何提取文本特徵爲二元矩陣.

# 首先需建立一個函數來過濾掉電影標題中可能存在的發行年月.
# 使用Python的正則表達式模塊re來尋找標題裏位於括號之間的年份.若是找到與表達式匹配的字段,咱們將提取標題中匹配起始位置(即左括號所在的位置)以前的部分.
def extract_title(raw):
    import re
    # 該表達式找尋括號之間的非單詞(數字)
    grps = re.search("\((\w+)\)", raw)  # \w:用於匹配字母,數字或下劃線字符;
    # 「+」元字符規定其前導字符必須在目標對象中連續出現一次或屢次
    if grps:
        # 只選取標題部分,並刪除末尾的空白字符
        return raw[:grps.start()].strip()   
    else:
        return raw

# 從movie_fields RDD裏提取出原始的電影標題:
raw_titles = movie_fields.map(lambda fields: fields[1])

# 用前5個原始標題來測試一下extract_title函數:
for raw_title in raw_titles.take(5):
    print(extract_title(raw_title))

Toy Story GoldenEye Four Rooms Get Shorty Copycat

# 簡單空白分詞法:
movie_titles = raw_titles.map(lambda m: extract_title(m))
title_terms = movie_titles.map(lambda t: t.split(" "))
print(title_terms.take(5))

[['Toy', 'Story'], ['GoldenEye'], ['Four', 'Rooms'], ['Get', 'Shorty'], ['Copycat']]

咱們須要建立一個字詞典,實現詞到一個整數序號的映射,以便能爲每個詞分配一個對應到向量元素的序號.

# 首先使用Spark的flatMap函數來擴展title_terms RDD中每一個記錄的字符串列表,以獲得一個新的字符串RDD.該RDD的每一個記錄是一個名爲all_terms的詞.
# 以後取回全部不一樣的詞,並給他們分配序號.其作法和以前對職業進行k之1編碼徹底相同:

#取回全部可能的詞,以便構建一個詞到序號的映射字典
all_terms = title_terms.flatMap(lambda x: x).distinct().collect()

# 建立一個新的字典來保存詞,並分配k之1序號
idx = 0
all_terms_dict = {}
for term in all_terms:
    all_terms_dict[term] = idx
    idx += 1
    
print("Total number of terms: %d" % len(all_terms_dict))
print("Index of term 'Dead': %d" % all_terms_dict['Dead'])
print("Index of term 'Rooms': %d" % all_terms_dict['Rooms'])

Total number of terms: 2645 Index of term 'Dead': 9 Index of term 'Rooms': 2

也能夠經過Spark的zipWithIndex函數來更高效獲得相同結果:以各值的RDD爲輸入,對值進行合併,生成新的鍵值對RDD,其主鍵爲詞,值爲詞在詞字典中的序號.用collectAsMap將該RDD以Python的dict函數形式返回到驅動程序.

all_terms_dict2 = title_terms.flatMap(lambda x: x).distinct().zipWithIndex().collectAsMap()
print("Index of term 'Dead': %d" % all_terms_dict2['Dead'])
print("Index of term 'Rooms': %d" % all_terms_dict2['Rooms'])

Index of term 'Dead': 9 Index of term 'Rooms': 2

最後一步是建立一個函數.該函數將一個詞集合轉換爲一個稀疏向量的表示. 具體實現時,會建立一個空白稀疏矩陣,該矩陣只有一行,列數爲字典的總詞數. 以後逐一檢查輸入集合中的每個詞,看它是否在詞字典中.若是在,就給矩陣裏相應序數位置的向量賦值1:

# 該函數輸入一個詞列表,並用k之1編碼相似的方式將其編碼爲一個scipy稀疏向量
def create_vector(terms, term_dict):
    from scipy import sparse as sp
    num_terms = len(term_dict)
    x = sp.csc_matrix((1, num_terms))
    for t in terms:
        if t in term_dict:
            idx = term_dict[t]
            x[0, idx] = 1
    return x

# 對提取出的各個詞的RDD的各記錄都應用該函數
all_terms_bcast = sc.broadcast(all_terms_dict) #該字典可能會極大
# print(title_terms.take(5))
# print(all_terms_bcast.value)
term_vectors = title_terms.map(lambda terms: create_vector(terms, all_terms_bcast.value))
term_vectors.take(5)
 
 

[<1x2645 sparse matrix of type '<class 'numpy.float64'>' with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<class 'numpy.float64'>' with 1 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<class 'numpy.float64'>' with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<class 'numpy.float64'>' with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<class 'numpy.float64'>' with 1 stored elements in Compressed Sparse Column format>]

 
 

如今每個電影標題都被轉換爲一個稀疏向量.能夠看到那些提取出了2個詞的標題所對應的向量裏也是2個非零元素,而只提取了1個詞的則只對應到了1個非零元素.

4.5 正則化特徵

在將特徵提取爲向量形式後,一種常見的預處理方式是將數值數據正則化(normalization).其背後的思想是將各個數值特徵進行轉換,以將它們的值域規範到一個標準區間內.正則化的方法有以下幾種:

  • **正則化特徵:**這其實是對數據集中的單個特徵進行轉換.好比減去平均值(特徵對齊)或是進行標準的正則轉換(以使得該特徵的平均值和標準差分別爲0和1).
  • **正則化特徵向量:**這一般是對數據中的某一行的全部特徵進行轉換,以讓轉換後的特徵向量的長度標準化.也就是縮放向量中的各個特徵以使得向量的範數爲1(常指一階或二階範數).

下面將用第二種狀況舉例說明:

# 向量正則化可經過numpy的norm函數來實現.
# 先計算一個隨機向量的二階範數,而後讓向量中的每個元素都除該範數,從而獲得正則化後的向量:
np.random.seed(42) # 若是不設置隨機種子值,則生成隨機數因時間差別而不一樣.
x = np.random.randn(10)
norm_x_2 = np.linalg.norm(x)
normalized_x = x / norm_x_2
print("x:\n%s" % x)
print("2-Norm of x: %2.4f" % norm_x_2)
print("Normalized x:\n%s" % normalized_x)
print("2-Norm of normalized_x: %2.4f" % np.linalg.norm(normalized_x))

x: [ 0.49671415 -0.1382643 0.64768854 1.52302986 -0.23415337 -0.23413696 1.57921282 0.76743473 -0.46947439 0.54256004] 2-Norm of x: 2.5908 Normalized x: [ 0.19172213 -0.05336737 0.24999534 0.58786029 -0.09037871 -0.09037237 0.60954584 0.29621508 -0.1812081 0.20941776] 2-Norm of normalized_x: 1.0000

用MLlib正則化特徵 Spark在其MLlib機器學習庫中內置了一些函數用於特徵的縮放和標準化.它們包括供標準正態變換的StandardScaler,以及提供與上述相同的特徵向量正則化的Normalizer.

from pyspark.mllib.feature import Normalizer
normalizer = Normalizer() # 導入所需類後,初始化
vector = sc.parallelize([x])

normalized_x_mllib = normalizer.transform(vector).first().toArray()

print("x:\n%s" % x)
print("2-Norm of x: %2.4f" % norm_x_2)
print("Normalized x MLlib:\n%s" % normalized_x_mllib)
print("2-Norm of normalized_x_mllib: %2.4f" % np.linalg.norm(normalized_x_mllib))

x: [ 0.49671415 -0.1382643 0.64768854 1.52302986 -0.23415337 -0.23413696 1.57921282 0.76743473 -0.46947439 0.54256004] 2-Norm of x: 2.5908 Normalized x MLlib: [ 0.19172213 -0.05336737 0.24999534 0.58786029 -0.09037871 -0.09037237 0.60954584 0.29621508 -0.1812081 0.20941776] 2-Norm of normalized_x_mllib: 1.0000

###4.6 用軟件包提取特徵 Spark支持Scala,Java和Python的綁定.咱們能夠經過這些語言所開發的軟件包,藉助其中完善的工具箱來實現特徵的處理和提取,以及向量表示.

特徵提取可藉助的軟件包有scikit-learn, gensim, scikit-image, matplotlib, Python的NLTK, Java編寫的OpenNLP以及用Scala編寫的Breeze和Chalk.實際上,Breeze自Spark 1.0開始就成爲Spark的一部分了.

相關文章
相關標籤/搜索