在 Spark 中使用 IPython Notebook

本文是從 IPython Notebook 轉化而來,效果沒有原本那麼好。html

主要爲體驗 IPython Notebook。至於題目,改爲《在 IPython Notebook 中使用 Spark》也能夠,沒什麼差異。爲何是 Spark?由於這兩天在看《Spark 機器學習》這本書第 3 章,因此就順便作個筆記。python

簡單介紹下,IPython notebook 對數據科學家來講是個交互地呈現科學和理論工做的必備工具,它集成了文本和 Python 代碼。Spark 是個通用的集羣計算框架,經過將大量數據集計算任務分配到多臺計算機上,提供高效內存計算。shell

搭建環境

一臺阿里雲服務器,配置以下,108 元/月,而後在 Windows 7 上使用 Putty 和遠程桌面操做服務器。apache

CPU:1核
內存:2048 MB
操做系統:Ubuntu 14.04 64位
固定帶寬:1Mbps數組

IPython Notebook 的安裝很簡單,強烈推薦一個預編譯的科學 Python 套件 Anaconda,按照官方網站安裝,而後在 Terminal 裏執行 ipython notebook 便可。bash

在個人 Ubuntu 服務器上打開 ipython notebook 時報錯了:socket.error Errno 99 Cannot assign requested address。
這個問題耗了我一天時間。關於這個問題, ,但只有下面兩種方法管用。服務器

一種是,在命令後加參數 --ip:ipython notebook --ip=127.0.0.1app

另外一種是,先生成 notebook 配置文件:命令行執行 jupyter notebook --generate-config,而後打開生成的文件: vi ~/.jupyter/jupyter_notebook_config.py,修改 c.NotebookApp.ip = '127.0.0.1'。框架

若是想外網也能夠訪問,ip 就設爲外網 IP 地址。我選擇的是第二種,設的外網 IP 地址,這樣就能夠在 Windows 上編輯 ipython notebook 文件了,很是方便。機器學習

Spark 的安裝也很簡單,具體安裝和使用可參考我以前的筆記官方網站

如何在 Spark 中使用 IPython Notebook,或者如何在 IPython Notebook 中使用 spark,也耗費了我一天時間。

網上不少文章都是建議:一、執行 ipython profile create spark;二、建立 ~/.ipython/profile_spark/startup/00-pyspark-setup.py 文件並修改;三、啓動 IPython notebook:ipython notebook --profile spark。
但這種方法在我這行不通,百般折騰,就是各類不行。
後來終於發現一種簡單可行的方法,那就是修改 ~/.bashrc 文件,添加如下內容:

export PYSPARK_DRIVER_PYTHON=ipython2 # As pyspark only works with python2 and not python3
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

而後source .bashrc,就能夠經過啓動 pyspark 來啓動 IPython Notebook 了。也就能夠在 IPython Notebook 中使用 pyspark 了。

下面咱們經過實例演示 Spark 在 IPython Notebook 中的使用。

Spark 上數據的探索和處理

MovieLens 100k 數據集

這裏要使用的是著名的 MovieLens 100k 數據集,該數據集包含用戶對電影的 10 萬次評分數據,也包含電影元數據和用戶屬性數據。數據集不大,壓縮文件不到 5M,經常使用於推薦系統研究。

可到官方網站下載,解壓後會建立一個名爲 ml-100k 的文件夾,該目錄中重要的文件有 u.user(用戶屬性文件)、u.item(電影元數據) 和 u.data(用戶對電影的評分)。

數據集的更多信息能夠從 README 得到,包括每一個數據文件裏的變量定義。咱們可使用 head 命令來查看各個文件的內容。

先來看 u.user:

#head -5 u.user
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
4|24|M|technician|43537
5|33|F|other|15213

能夠看到 u.user 文件包含 user id、age、gender、occupation 和 ZIP code 這些屬性,各屬性之間用管道符(|)隔開。

再來看 u.item:

# head -5 u.item
1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0

u.item 文件包含 movie id、title、release date 以及若干與 IMDB Link 和電影分類相關的屬性,各屬性之間也用 | 符號分隔。電影分類的屬性有 unknown | Action | Adventure | Animation | Children's | Comedy | Crime | Documentary | Drama | Fantasy | Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi | Thriller | War | Western。

最後 u.data:

# head -5 u.data
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596

u.data 文件包含 user id、movie id、rating(從 1 到 5)和 timestamp 屬性。各屬性間用指標符分隔。timestamp 是從 1/1/1970 UTC 開始的秒數。

探索與可視化數據集

先來探索用戶數據

user_data = sc.textFile("ml-100k/u.user") 
user_data.first()  #此處如能輸出數據文件首行,則說明環境搭建沒問題
u'1|24|M|technician|85711'

sc 是 Spark shell 啓動時自動建立的一個 SparkContext 對象,shell 經過該對象來訪問 Spark。能夠經過下列方法輸出 sc 來查看它的類型。

sc
<pyspark.context.SparkContext at 0x7fb65c173450>

一旦有了 SparkContext,你就能夠用它來建立 RDD。RDD 是彈性分佈式數據集(Resilient Distributed Dataset),在 Spark 中,咱們經過對 RDD 的操做來表達咱們的計算意圖,這些計算會自動地在集羣上並行進行。

如上面代碼建立了一個名爲 user_data 的 RDD,而後使用 user_data.first() 輸出了 RDD 中的第一個元素。

下面用「|」字符來分隔各行數據。這將生成一個 RDD,其中每個記錄對應一個 Python 列表,各列表由用戶 ID、年齡、性別、職業和郵編五個屬性構成。以後再統計用戶、性別、職業和郵編的數目。可經過下列代碼實現。

user_fields = user_data.map(lambda line: line.split("|"))
num_users = user_fields.map(lambda fields: fields[0]).count()
num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()
num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()
num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()
print "Users: %d, genders: %d, occupations: %d, ZIP codes: %d" % (num_users, num_genders, num_occupations, num_zipcodes)
Users: 943, genders: 2, occupations: 21, ZIP codes: 795

接着用 Matplotlib 的 hist 函數來建立一個直方圖,以分析用戶年齡的分佈狀況。

%pylab inline
Populating the interactive namespace from numpy and matplotlib
ages = user_fields.map(lambda x: int(x[1])).collect()
hist(ages, bins=20, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)

這裏 hist 函數的輸入參數有 ages 數組、直方圖的 bins 數目(即區間數,這裏爲 20),同時,還使用了 normed=True 參數來正則化直方圖,即讓每一個方條表示年齡在該區間內的數量佔總數量的比。

從中能夠看出 MovieLens 的大量用戶處於 15 到 55 之間。

若想了解用戶的職業分佈狀況,能夠用以下代碼來實現。首先利用 MapReduce 方法來計算數據集中各類職業的出現次數,而後用 matplotlib 的 bar 函數來會繪製一個不一樣職業的數量的條形圖。

count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])
x_axis = x_axis1[np.argsort(y_axis1)]
y_axis = y_axis1[np.argsort(y_axis1)]

pos = np.arange(len(x_axis))
width = 1.0

ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)

plt.bar(pos, y_axis, width, color='lightblue')
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)

count_by_occupation
[(u'administrator', 79),
 (u'writer', 45),
 (u'retired', 14),
 (u'student', 196),
 (u'doctor', 7),
 (u'entertainment', 18),
 (u'marketing', 26),
 (u'executive', 32),
 (u'none', 9),
 (u'scientist', 31),
 (u'educator', 95),
 (u'lawyer', 12),
 (u'healthcare', 16),
 (u'technician', 27),
 (u'librarian', 51),
 (u'programmer', 66),
 (u'artist', 28),
 (u'salesman', 12),
 (u'other', 105),
 (u'homemaker', 7),
 (u'engineer', 67)]
x_axis1
array([u'administrator', u'writer', u'retired', u'student', u'doctor',
       u'entertainment', u'marketing', u'executive', u'none', u'scientist',
       u'educator', u'lawyer', u'healthcare', u'technician', u'librarian',
       u'programmer', u'artist', u'salesman', u'other', u'homemaker',
       u'engineer'], 
      dtype='<U13')
y_axis1
array([ 79,  45,  14, 196,   7,  18,  26,  32,   9,  31,  95,  12,  16,
        27,  51,  66,  28,  12, 105,   7,  67])

從中能夠看出,數量最多的職業是 student、other、educator、administrator、engineer 和 programmer。

Spark 對 RDD 提供了一個名爲 countByValue 的便捷函數,它會計算 RDD 裏各不一樣值所分別出現的次數,並將其以 Python dict 函數的形式返回給驅動程序。

count_by_occupation2 = user_fields.map(lambda fields: fields[3]).countByValue()
print "Map-reduce approach:"
print dict(count_by_occupation2)
print ""
print "countByValue approach:"
print dict(count_by_occupation)
Map-reduce approach:
{u'administrator': 79, u'retired': 14, u'lawyer': 12, u'healthcare': 16, u'marketing': 26, u'executive': 32, u'scientist': 31, u'student': 196, u'technician': 27, u'librarian': 51, u'programmer': 66, u'salesman': 12, u'homemaker': 7, u'engineer': 67, u'none': 9, u'doctor': 7, u'writer': 45, u'entertainment': 18, u'other': 105, u'educator': 95, u'artist': 28}

countByValue approach:
{u'administrator': 79, u'executive': 32, u'retired': 14, u'doctor': 7, u'entertainment': 18, u'marketing': 26, u'writer': 45, u'none': 9, u'healthcare': 16, u'scientist': 31, u'homemaker': 7, u'student': 196, u'educator': 95, u'technician': 27, u'librarian': 51, u'programmer': 66, u'artist': 28, u'salesman': 12, u'other': 105, u'lawyer': 12, u'engineer': 67}

能夠看到,上述兩種方式的結果相同。

接下來探索電影數據,跟以前同樣,先簡單看下第一行記錄,而後再統計電影總數。

movie_data = sc.textFile("ml-100k/u.item")
print movie_data.first()
num_movies = movie_data.count()
print "Movies: %d" % num_movies
1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
Movies: 1682

在此要繪製電影年齡分佈圖,電影年齡即其發行年份相對於如今過了多少年(在本數據中如今是 1998 年)。電影數據有些不完整,須要一個函數來處理解析 release date 時可能出現的解析錯誤。這裏命名該函數爲 convert_year。

def convert_year(x):
    try:
        return int(x[-4:])
    except:
        return 1900 # there is a 'bad' data point with a blank year, which we set to 1900 and will filter out later

有了以上函數來解析發行年份後,即可在調用電影數據進行 map 轉換時應用該函數,並取回其結果。

movie_fields = movie_data.map(lambda lines: lines.split("|"))
years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x))
# we filter out any 'bad' data points here
years_filtered = years.filter(lambda x: x != 1900)
# plot the movie ages histogram
movie_ages = years_filtered.map(lambda yr: 1998-yr).countByValue()
values = movie_ages.values()
bins = movie_ages.keys()
hist(values, bins=bins, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16,10)

從圖中能夠看到,大部分電影發行於 1998 年的前幾年。

現實的數據常常會有不規整的狀況,對其解析時就須要進一步處理。上面便是一個很好的例子。事實上,這也代表了數據探索的重要性所在,即它有助於發現數據在完整性和質量上的問題。

如今來探索評級數據

rating_data_raw = sc.textFile("ml-100k/u.data")
print rating_data_raw.first()
num_ratings = rating_data_raw.count()
print "Ratings: %d" % num_ratings
196 242 3   881250949
Ratings: 100000

能夠看到評級次數共有 10 萬條。另外和用戶與電影數據不一樣,評分記錄用「\t」分隔。

rating_data = rating_data_raw.map(lambda line: line.split("\t"))
ratings = rating_data.map(lambda fields: int(fields[2]))
max_rating = ratings.reduce(lambda x, y: max(x, y))
min_rating = ratings.reduce(lambda x, y: min(x, y))
mean_rating = ratings.reduce(lambda x, y: x + y) / float(num_ratings)
median_rating = np.median(ratings.collect())
ratings_per_user = num_ratings / num_users
ratings_per_movie = num_ratings / num_movies
print "Min rating: %d" % min_rating
print "Max rating: %d" % max_rating
print "Average rating: %2.2f" % mean_rating
print "Median rating: %d" % median_rating
print "Average # of ratings per user: %2.2f" % ratings_per_user
print "Average # of ratings per movie: %2.2f" % ratings_per_movie
Min rating: 1
Max rating: 5
Average rating: 3.53
Median rating: 4
Average # of ratings per user: 106.00
Average # of ratings per movie: 59.00

從中能夠看到,最低的評級爲 1,而最大的評級爲 5.這並不意外,由於評級的範圍即是從 1 到 5。

Spark 對 RDD 也提供一個名爲 states 的函數。該函數包含一個數值變量用於作相似的統計:

ratings.stats()
(count: 100000, mean: 3.52986, stdev: 1.12566797076, max: 5.0, min: 1.0)

能夠看出,用戶對電影的平均評級是 3.5 左右,而評分中位數爲 4,。說明評級的分佈稍傾向高分。要驗證這點,可建立一個評級值分佈的條形圖。

# create plot of counts by rating value
count_by_rating = ratings.countByValue()
x_axis = np.array(count_by_rating.keys())
y_axis = np.array([float(c) for c in count_by_rating.values()])
# we normalize the y-axis here to percentages
y_axis_normed = y_axis / y_axis.sum()

pos = np.arange(len(x_axis))
width = 1.0

ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)

plt.bar(pos, y_axis_normed, width, color='lightblue')
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)

其特徵和以前的期待相同,評分分佈確實偏向中等以上。

一樣,也能夠求各個用戶評級次數的分佈狀況。計算各用戶評級次數的分佈時,先從 rating_data RDD 裏提取出以用戶 ID 爲主鍵、評級爲值的鍵值對。以後調用 Spark 的 groupByKey 函數,來對評級以用戶 ID 爲主鍵進行分組。

# to compute the distribution of ratings per user, we first group the ratings by user id
user_ratings_grouped = rating_data.map(lambda fields: (int(fields[0]), int(fields[2]))).groupByKey()

接着求出每個主鍵(用戶 ID)對應的評級集合的大小,這會給出各用戶評級的次數:

# then, for each key (user id), we find the size of the set of ratings, which gives us the # ratings for that user 
user_ratings_byuser = user_ratings_grouped.map(lambda (k, v): (k, len(v)))
user_ratings_byuser.take(5)
[(1, 272), (2, 62), (3, 54), (4, 24), (5, 175)]

最後,用 hist 來繪製用戶評級分佈的直方圖。

# and finally plot the histogram
user_ratings_byuser_local = user_ratings_byuser.map(lambda (k, v): v).collect()
hist(user_ratings_byuser_local, bins=200, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16,10)

能夠看出,大部分用戶的評級次數少於 100,但也代表仍然有較多用戶作出過上百次的評級。

參考資料

  1. Spark入門(Python版)
  2. Ipython-Spark setup for pyspark application
  3. Spark 機器學習》 第 3 章