機器學習

大數據和機器學習研究

[原]史上最直白的pca教程 之 二

pca的博文已經整理成一個完整的pdf文檔,在這裏下載:html

http://download.csdn.net/detail/u011539200/9305773html5

 

不須要積分,累計人品,^_^java

做者:u011539200 發表於2015/11/27 16:48:26  原文連接
閱讀:15 評論:0  查看評論
 
[原]史上最直白的pca教程 之 一

PCA理論推導

 

X=⎛⎝⎜⎜⎜⎜x1,1x2,1...xm,1x1,2x2,2...xm,2............x1,nx2,n...xm,n⎞⎠⎟⎟⎟⎟

 

XRm×n,式的每列是一個樣本,每一個樣本有m個屬性,一共有n個樣本。注意,這裏的每一個樣本都通過均值化處理。node

數據一般是含糊的,有噪聲的,不明確的。這種含糊和不明確,體如今它的協方差陣的多數元素都是非零值。好比,X的協方差陣就是: 
python

CX=1n1XXT

其中,1n1是一個實數係數,CXRm×m

 

須要從數據找到一個不含糊的,低噪聲的方向。這個需求,在本質上就是尋找一個矩陣,用它對X作變換,使得變換後的新矩陣的協方差陣大多數元素的值是零,最好的狀況是,只有主對角線非零,其餘都是零。令P表示這個矩陣,則: 
linux

Y=PX

其中,PRm×m,YRm×n

 

根據上兩式,令 git

A=XXT

則: 
 

 

A是一個對稱陣,對它進行對角化,能夠寫成A=EDET,其中,D是對角陣,{A,E,D}Rm×mweb

若是要將CY轉化成對角陣,觀察上式可知,若是令P=ET,根據矩陣對角化性質可知,P1=PTI表示單位陣,則上式就變成: 
算法

 

 

均值化

PCA理論推導裏的X通過均值化處理的。均值化過程以下: 
sql

Z=⎛⎝⎜⎜⎜⎜z1,1z2,1...zm,1z1,2z2,2...zm,2............z1,nz2,n...zm,n⎞⎠⎟⎟⎟⎟

 

其中,ZRm×n,是原始數據。

令: 

zi¯¯¯=1nj=1nzi,j

 

那麼,均值化就是:

 

Z¯¯¯=⎛⎝⎜⎜⎜⎜z1,1z1¯¯¯z2,1z2¯¯¯...zm,1zm¯¯¯¯z1,2z1¯¯¯z2,2z2¯¯¯...zm,2zm¯¯¯¯............z1,nz1¯¯¯z2,nz2¯¯¯...zm,nzm¯¯¯¯⎞⎠⎟⎟⎟⎟

 

實現

用Python2.7,matplotlib和numpy實現pca算法。

#!/usr/bin/env python #! -*- coding:utf-8 -*- import matplotlib.pyplot as plt from numpy import * #create two data set def create_dataset(n): data_r = random.randn(n, 2) #squeez y for i in range(data_r.shape[0]): data_r[i, 1] = 0.1*data_r[i, 1] #rotate theta = -0.25*3.14 tran = zeros((2, 2)) tran[1, 1] = tran[0, 0] = cos(theta) tran[0, 1] = -sin(theta) tran[1, 0] = sin(theta) data = dot(data_r, tran) #move data[:, 0] += 3 data[:, 1] += 1 return data.transpose() def do_mean(X): means = zeros((X.shape[0],1)) means[0,0] = mean(X[0,:]) means[1,0] = mean(X[1,:]) for i in range(X.shape[0]): X[i,:] -= means[i,0] return X def do_pca(X): A = dot(X, X.transpose()) _ , E = linalg.eig(A) P = E.transpose() return dot(P, X) def draw_x(X): plt.axis([-3, 6, -6, 6]) plt.plot([z for z in X[0, :]], [z for z in X[1, :]], 'r.') plt.show() def main(): X = create_dataset(1000) draw_x(X) do_mean(X) new_X = do_pca(X) draw_x(new_X) if __name__ == "__main__": main()

該實現建立出一組數據,它近似高斯分佈,且主方向在45度角方向的直線上。

這裏寫圖片描述

通過PCA算法處理後,數據X成爲以原點爲中心,水平方向是主方向的新數據。究其本質來講,new_X是數據X在以P爲基的二維空間的圖像。 
這裏寫圖片描述

做者:u011539200 發表於2015/11/27 16:43:48  原文連接
閱讀:68 評論:0  查看評論
 
[原]史上最直白的logistic regression教程 之 五

史上最直白的logistic regression教程整理稿,將4篇博文整理成一個完整的pdf文檔,且修改爲學術語境。

連接在這裏:

http://download.csdn.net/detail/u011539200/9290695

0積分下載,求rp,^_^

做者:u011539200 發表於2015/11/22 15:57:46  原文連接
閱讀:46 評論:0  查看評論
 
[原]史上最直白的logistic regression教程 之 四

接上篇,用python實現logisitic regression,代碼以下:

#!/usr/bin/env python #! -*- coding:utf-8 -*- import matplotlib.pyplot as plt from numpy import * #建立數據集 def load_dataset(): n = 100 X = [[1, 0.005*xi] for xi in range(1, 100)] Y = [2*xi[1] for xi in X] return X, Y def sigmoid(z): t = exp(z) return t/(1+t) #讓sigmodi函數向量化,能夠對矩陣求函數值,矩陣in,矩陣out sigmoid_vec = vectorize(sigmoid) #梯度降低法求解線性迴歸 def grad_descent(X, Y): X = mat(X) Y = mat(Y) row, col = shape(X) alpha = 0.05 maxIter = 5000 W = ones((1, col)) V = zeros((row, row), float32) for k in range(maxIter): L = sigmoid_vec(W*X.transpose()) for i in range(row): V[i, i] = L[0, i]*(L[0,i] - 1) W = W - alpha * (Y - L)*V*X return W def main(): X, Y = load_dataset() W = grad_descent(X, Y) print "W = ", W #繪圖 x = [xi[1] for xi in X] y = Y plt.plot(x, y, marker="*") xM = mat(X) y2 = sigmoid_vec(W*xM.transpose()) y22 = [y2[0,i] for i in range(y2.shape[1]) ] plt.plot(x, y22, marker="o") plt.show() if __name__ == "__main__": main()

跟前面相對,多了一點變化,sigmoid_vec是對sigmoid函數的向量化,以及計算對對V的計算。

咱們看看計算結果: 
迭代5次,擬合結果 
 
迭代50次,擬合結果 
 
迭代500次,擬合結果 
 
迭代5000次,擬合結果 
 
因爲sigmoid函數的緣由,擬合函數不是直線,這就是跟線性擬合的差異。

Logistic regression教程到此結束,就醬!

做者:u011539200 發表於2015/11/19 16:19:26  原文連接
閱讀:71 評論:0  查看評論
 
[原]史上最直白的logistic regression教程 之 三

在線性擬合的基礎上,咱們實現logistic regression。

如前所述,樣本集是 

{x1,y1},{x2,y2},...,{xn,yn}[1]

其中,xi=[1,xi,1,xi,2,xi,3,...,xi,k]T,且yi(0,1)。注意,這裏對yi有值上的要求,必須如此,若是值再也不這個區間,要以歸一化的方式調整到這個區間。對於分類問題,則yi的取值或者是0,或者是1,也就是yi{0,1}

 

固然,從嚴格的意義上說,logistic regression擬合後,yi的值只能無限地逼近0和1,而不能真正達到0和1,但在處理實際問題上,能夠設定成形如 ifyi>0.5thenyi=1ifyi<=0.5thenyi=0解決。

Logistic regression的擬合形式以下: 

yi=f(zi)[2]
zi=Wxi[3]

其中,f(z)=11+ez[4],也就是Logistic函數。

 

根據公式[2]和公式[3],則: 

yi=f(Wxi)[4]

 

那麼,若是用公式[4]擬合xiyi的關係,須要求解W,使得在公式[1]上偏差最小。對應的損失函數就是 

Loss=12i=1n(yif(Wxi))2[5]

 

跟前面的同樣,咱們用梯度降低法求解。 
因此,要對公式[5]wj的一階偏導,因而有 

Losswj=i=1n(yif(Wxi))×(1)×f(Wxi)wj=i=1n(yif(Wxi))×(1)×f(zi)zi×ziwj[6]

 

注意,問題來了,公式[6]的最後一步,其實是將Wxi視爲一個變量zi,分別求導。這一步是在高等數學有詳細描述了,不解釋。

公式[6]中的f(zi)zi等價於f(z),由於只有一個自變量z。根據公式[4],能夠求出

f(z)=ez(ez+1)2[7]

對公式[7]能夠作一次變形,以方便求解: 
根據公式[4],能夠知道
ez=f(z)1f(z)[8]

將公式[8]代入到公式[7],就能夠獲得
f(z)=f(z)×(1f(z))[9]

也就是說,咱們能夠根據f(z)獲得f(z),並且計算量很小。

 

把公式[9]代入公式[6],就獲得 

Losswj=i=1n(yif(Wxi))×(1)×f(zi)zi×ziwj=i=1n(yif(Wxi))×(1)×f(zi)×(1f(zi))×ziwj=i=1n(yif(Wxi))×(1)×f(Wxi)×(1f(Wxi))×(Wxi)wj=i=1n(yif(Wxi))×(1)×f(Wxi)×(1f(Wxi))×(Wxi)wj=i=1n(yif(Wxi))×(1)×f(Wxi)×(1f(Wxi))×xi,j=i=1n(yif(Wxi))×f(Wxi)×(f(Wxi)1)×xi,j[10]

因而公式[10]能夠寫成 
Losswj=i=1n(yif(Wxi))f(Wxi)(f(Wxi)1)xi,j[11]

那麼,wj在梯度降低法的迭代公式就是 
wj=wj+wj=wjLosswj[12]

如今,咱們開始作最麻煩的一步,將公式[11]進行矩陣化 
令 
Y=[y1,y2,...,yn][13]

W=[w0,w1,w2,...,wk][14]

X=⎛⎝⎜⎜⎜⎜11...1x1,1x2,1...xn,1x1,2x2,2...xn,2............x1,kx2,k...xn,k⎞⎠⎟⎟⎟⎟[15]

V=⎛⎝⎜⎜⎜⎜f(Wx1)(f(Wx1)1)0...00f(Wx2)(f(Wx2)1)...0............00...f(Wxn)(f(Wxn)1)⎞⎠⎟⎟⎟⎟[16]

L=[f(Wx1),f(Wx2),...,f(Wxn)][17]

公式[16]略有一點複雜,它是對角矩陣。 
根據上述設定,公式[11]的矩陣化形式就是 
Losswj=(YL)V⎛⎝⎜⎜⎜⎜x1,jx2,j...xn,j⎞⎠⎟⎟⎟⎟[18]

那麼,對W而言,更新公式就是 
W=W(YL)VX[19]

到這裏,logisitci regression的梯度降低法推導就結束了。下一篇咱們用python實現求解過程。

 

做者:u011539200 發表於2015/11/19 16:09:49  原文連接
閱讀:74 評論:0  查看評論
 
[原]史上最直白的logistic regression教程 之 一
本系列前四篇是隨手塗鴉,只爲講清問題,有口語化,且有少數符號誤寫,以及重複絮叨,且不打算修改:) 第5篇提供了一個嚴謹的學術語言的完整pdf文檔,敬請下載!

Logistic Regession是什麼

Logistic Regression是線性迴歸,但最終是用做分類器:它從樣本集中學習擬合參數,將目標值擬合到[0,1]之間,而後對目標值進行離散化,實現分類。

爲何叫Logistic呢?由於它使用了Logisitic函數,形如: 

f(z)=ezez+1=11+ez

這個函數有一些頗有趣的性質,後面會談到。

 

Logistic regression有必定的複雜度。對新人來講,最好有一個完整的推導指南,而後反覆推導N遍(N>=5),直至能獨立推導,再用python或者java實現這個推導,而後用這個實現解決一個實際應用,這樣差很少算是掌握Logistic regression了。上述過程缺一不可,並且是成本最小的學習方案。

Logistic regression很重要,聽說google的Ads廣告使用的預測算法就是一個大Logistic regression模型。

Logistic regression涉及機器學習的多個重要概念,樣本集,特徵,向量,損失函數,最優化方法,梯度降低。若是對logistic regression能作到庖丁屠牛的程度,對將來進行模式識別和機器學習有事半功倍的收益。

咱們現從一個最簡單的問題開始,而後逐步增長功能,最終實現logistic regression。

先從一個最簡單的問題開始

假若有一組樣本,形如

{x1,y1},{x2,y2},...,{xi,yi},...{xn,yn}[1]

xi的值決定yi的值,也就是說xi是自變量,yi是因變量,每一個xi對應一個yi。從腳標能夠看出,這組樣本一共有n個。

 

xi是一個向量,也就是說,xi裏有多個元素,也就是能夠表示爲

xi=[xi,1,xi,2,...,xi,j,...xi,k]T[2]
顯然,k表示xi的第k維。

 

實際上xi也能夠寫成 
xi=[xi,1,xi,2,...,xi,j,...xi,k],若是這樣的話,後面的W和公式[6]就要作一點改動。若是推導過程很熟悉,能夠將WxiXyiY等根據需求隨意改變,不做限定。

咱們擬合xiyi的關係,有了擬合,就能夠根據xi計算yi。最簡單的擬合方式是線性擬合,也就是形如:

yi=w0+w1×xi,1+w2×xi,2+...+wk×xi,k[3]

 

w0看起來不夠和諧,跟其餘元素不太同樣,對xi作一點修改能夠解決這個問題,將全部的xi從新表示成 

xi=[1,xi,1,xi,2,...,xi,j,...,xi,k]T[4]

那麼,咱們就獲得: 
yi=w0×1+w1×xi,1+w2×xi,2+...+wk×xi,k[5]

這個公式的好處是,咱們能夠用向量方式表示了: 
yi=Wxi[6]

也就是說,
W=[w0,w1,...,wk]

注意,此時的xi再也不是k維了,而是k+1維,一樣W也是k+1維,以數學形式表示爲xiR(k+1)×1WR1×(k+1),後面咱們一直使用這種表示方式。

 

若是咱們在公式[1]的樣本集上作擬合,就要是公式[6]在公式[1]上偏差最小。一般選擇的偏差形式是平方和偏差,由於它求導方便,作梯度優化的時候計算便捷。偏差形式以下 

Loss=12i=1n(yiWxi)2[7]

公式[7]是二次方程,有最小值,當它取最小值的時候的W就是最優擬合參數。

 

求解優化問題

公式[7]的求解不難,它有精確解,但當樣本量很大的時候,精確解的求解是有問題的,好比矩陣是奇異陣不能求逆。因此一般會使用梯度降低法求解。

梯度降低法的方式是,先隨機給W賦值,而後沿着公式[7]一階偏導的反方向計算降低量值,屢次重複,最終會讓公式[7]收斂到一個極小值。那麼,這個更新公式就是: 

W=W+W=WLossW[8]

公式[8]有些複雜,用更簡單的方式,能夠寫成以下方式: 
wi=wi+wi=wiLosswi[9]

如今,咱們用最原始的方式求解Losswj

 

 

Losswj=12[(y1(w0×1+w1× x1,1+...+wk×x1,k))2+(y2(w0×1+w1× x2,1+...+wk×x2,k))2+...+(yn(w0×1+w1× xn,1+...+wk×xn,k))2]/wi=[(y1(w0×1+w1× x1,1+...+wk×x1,k))×w1,j+(y2(w0×1+w1× x2,1+...+wk×x2,k))×w2,j+...+(yn(w0×1+w1× xn,1+...+wk×xn,k))×wn,j]=i=1n(yi(w0×1+w1×xi,1+...+wk×xi,k))×xi,j=i=1n(yi×xi,j)+i=1nWxixi,j[10]

 

注意咱們要把公式[10]改寫成矩陣形式,由於矩陣計算更有效率,方便實現。 
yi寫成矩陣形式,令

Y=[y1,y2,...,yn][11]

xi寫成矩陣形式,令
X=⎛⎝⎜⎜⎜⎜11...1x1,1x2,1...xn,1x1,2x2,2...xn,2............x1,jx2,j...xn,j............x1,kx2,kxn,k⎞⎠⎟⎟⎟⎟[12]

顯然,XRn×(k+1)。 
那麼,公式[10]最後一個等式中的ni=1(yi×xi,j)就能夠寫成 
i=1n(yi×xi,j)=Y⎛⎝⎜⎜⎜⎜x1,jx2,j...xn,j⎞⎠⎟⎟⎟⎟[13]

ni=1Wxixi,j能夠寫成 
i=1nWxixi,j=WXT⎛⎝⎜⎜⎜⎜x1,jx2,j...xn,j⎞⎠⎟⎟⎟⎟[14]

因此,公式[10]最終能夠表示成 
Losswj=(YWXT)⎛⎝⎜⎜⎜⎜x1,jx2,j...xn,j⎞⎠⎟⎟⎟⎟[15]

注意,如今有一個頗有意思的地方,(YWXT)是擬合偏差,在更新W的過程當中,每輪會進行一次計算。 
根據公式[15],對W而言,咱們有了一個更爲總體的計算方式: 
W=(YWXT)X[16]

因此,以梯度降低法計算最優W的更新公式是: 
W=W+(YWXT)X[17]

 

下一篇咱們用python實現公式[17]

做者:u011539200 發表於2015/11/17 15:11:38  原文連接
閱讀:187 評論:0  查看評論
 
[原]史上最直白的logistic regression教程 之 二

實現線性擬合

咱們用python2.7實現上一篇的推導結果。請先安裝python matplotlib包和numpy包。

具體代碼以下:

#!/usr/bin/env python #! -*- coding:utf-8 -*- import matplotlib.pyplot as plt from numpy import * #建立數據集 def load_dataset(): n = 100 X = [[1, 0.005*xi] for xi in range(1, 100)] Y = [2*xi[1] for xi in X] return X, Y #梯度降低法求解線性迴歸 def grad_descent(X, Y): X = mat(X) Y = mat(Y) row, col = shape(X) alpha = 0.001 maxIter = 5000 W = ones((1, col)) for k in range(maxIter): W = W + alpha * (Y - W*X.transpose())*X return W def main(): X, Y = load_dataset() W = grad_descent(X, Y) print "W = ", W #繪圖 x = [xi[1] for xi in X] y = Y plt.plot(x, y, marker="*") xM = mat(X) y2 = W*xM.transpose() y22 = [y2[0,i] for i in range(y2.shape[1]) ] plt.plot(x, y22, marker="o") plt.show() if __name__ == "__main__": main()

代碼超級簡單,load_dataset函數建立了一個y=2x的數據集,grad_descent函數求解優化問題。

在grad_descent裏多了兩個小東西,alpha是學習速率,通常取0.001~0.01,太大可能會致使震盪,求解不穩定。maxIter是最大迭代次數,它決定結果的精確度,一般是越大越好,但越大越耗時,因此一般須要試算如下,也能夠另外寫一個斷定標準,好比當YWXT小於多少的時候就再也不迭代。

咱們來看一下效果: 
當maxIter=5時,擬合結果是這樣的: 
 
若是maxIter=50,擬合結果是這樣的: 
 
若是maxIter=500,擬合結果是這樣的: 
 
若是maxIter=1000,擬合結果是這樣的: 
 
若是maxIter=5000,擬合結果是這樣的: 
 
5000次的結果幾乎完美,兩條曲線圖形重合。就醬。 
本篇到此結束,下一篇,咱們開始把logistic函數加進來,推導logistic regression。

做者:u011539200 發表於2015/11/17 15:02:48  原文連接
閱讀:79 評論:0  查看評論
 
[原]TensorFlow試用

Google發佈了開源深度學習工具TensorFlow。

 

根據官方教程  http://tensorflow.org/tutorials/mnist/beginners/index.md  試用。

 

操做系統是ubuntu 14.04,64位,python 2.7,已經安裝足夠的python包。

 

 

 

1. 安裝

    1.1 參考文檔 http://tensorflow.org/get_started/os_setup.md#binary_installation
    
    1.2 用pip安裝,須要用代理,不然連不上,這個是本地ssh到vps出去的。

    sudo pip install https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.5.0-cp27-none-linux_x86_64.whl --proxy http://127.0.0.1:3128

    1.3 注意,個人py2.7已經安裝了足夠的包,如python-dev,numpy,swig等等。若是遇到缺乏相應包的問題,先安裝必須的包。

2. 第一個demo,test.py
------------------------------
import tensorflow as tf

hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()
print sess.run(hello)

a = tf.constant(10)
b = tf.constant(32)
print sess.run(a+b)

------------------------------


3. mnist手寫識別
    3.1 下載數據庫 
    在http://yann.lecun.com/exdb/mnist/下載上面提到的4個gz文件,放到本地目錄如 /tmp/mnist

    3.2 下載input_data.py,放在/home/tim/test目錄下
    https://tensorflow.googlesource.com/tensorflow/+/master/tensorflow/g3doc/tutorials/mnist/input_data.py

    3.3 在/home/tim/test目錄下建立文件test_tensor_flow_mnist.py,內容以下
-----------------------
#!/usr/bin/env python 

import input_data
import tensorflow as tf

mnist = input_data.read_data_sets("/tmp/mnist", one_hot=True)

x = tf.placeholder("float", [None, 784])
W = tf.Variable(tf.zeros([784,10]))
b = tf.Variable(tf.zeros([10]))
y = tf.nn.softmax(tf.matmul(x,W) + b)
y_ = tf.placeholder("float", [None,10])
cross_entropy = -tf.reduce_sum(y_*tf.log(y))
train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)
init = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init)

for i in range(1000):
    batch_xs, batch_ys = mnist.train.next_batch(100)
    sess.run(train_step, feed_dict={x: batch_xs, y_: batch_ys})

correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
print sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels})
-----------------------

3.4 運行。大概之須要幾秒鐘時間,輸出結果是91%左右。

 

 

4. 關於版本

4.1  pip version


pip 1.5.4 from /usr/lib/python2.7/dist-packages (python 2.7)


4.2 已經安裝的python包

    有一些是用easy_install安裝的,大部分是pip安裝的。

pip freeze


Jinja2==2.7.2
MarkupSafe==0.18
MySQL-python==1.2.3
PAM==0.4.2
Pillow==2.3.0
Twisted-Core==13.2.0
Twisted-Web==13.2.0
adium-theme-ubuntu==0.3.4
apt-xapian-index==0.45
argparse==1.2.1
beautifulsoup4==4.2.1
chardet==2.0.1
colorama==0.2.5
command-not-found==0.3
cvxopt==1.1.4
debtagshw==0.1
decorator==3.4.0
defer==1.0.6
dirspec==13.10
duplicity==0.6.23
fp-growth==0.1.2
html5lib==0.999
httplib2==0.8
ipython==1.2.1
joblib==0.7.1
lockfile==0.8
lxml==3.3.3
matplotlib==1.4.3
nose==1.3.1
numexpr==2.2.2
numpy==1.9.2
oauthlib==0.6.1
oneconf==0.3.7
openpyxl==1.7.0
pandas==0.13.1
patsy==0.2.1
pexpect==3.1
piston-mini-client==0.7.5
pyOpenSSL==0.13
pycrypto==2.6.1
pycups==1.9.66
pycurl==7.19.3
pygobject==3.12.0
pygraphviz==1.2
pyparsing==2.0.3
pyserial==2.6
pysmbc==1.0.14.1
python-apt==0.9.3.5
python-dateutil==2.4.2
python-debian==0.1.21-nmu2ubuntu2
pytz==2012c
pyxdg==0.25
pyzmq==14.0.1
reportlab==3.0
requests==2.2.1
scipy==0.13.3
sessioninstaller==0.0.0
simplegeneric==0.8.1
simplejson==3.3.1
six==1.10.0
software-center-aptd-plugins==0.0.0
ssh-import-id==3.21
statsmodels==0.5.0
sympy==0.7.4.1
system-service==0.1.6
tables==3.1.1
tensorflow==0.5.0
tornado==3.1.1
unity-lens-photos==1.0
urllib3==1.7.1
vboxapi==1.0
wheel==0.24.0
wsgiref==0.1.2
xdiagnose==3.6.3build2
xlrd==0.9.2
xlwt==0.7.5
zope.interface==4.0.5

 

 

做者:u011539200 發表於2015/11/10 16:10:42  原文連接
閱讀:829 評論:3  查看評論
 
[原]weka實戰005:基於HashSet實現的apriori關聯規則算法

這個一個apriori算法的演示版本,全部的代碼都在一個類。僅供研究算法參考

 

 

package test;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Vector;

//用set寫的apriori算法
public class AprioriSetBasedDemo {

	class Transaction {
		/*
		 * 購物記錄,用set保存多個貨物名
		 */
		private HashSet<String> pnSet = new HashSet<String>();

		public Transaction() {
			pnSet.clear();
		}

		public Transaction(String[] names) {
			pnSet.clear();
			for (String s : names) {
				pnSet.add(s);
			}
		}

		public HashSet<String> getPnSet() {
			return pnSet;
		}

		public void addPname(String s) {
			pnSet.add(s);
		}

		public boolean containSubSet(HashSet<String> subSet) {
			return pnSet.containsAll(subSet);
		}

		@Override
		public String toString() {
			StringBuilder sb = new StringBuilder();
			Iterator<String> iter = pnSet.iterator();
			while (iter.hasNext()) {
				sb.append(iter.next() + ",");
			}
			return "Transaction = [" + sb.toString() + "]";
		}

	}

	class TransactionDB {
		// 記錄全部的Transaction
		private Vector<Transaction> vt = new Vector<Transaction>();

		public TransactionDB() {
			vt.clear();
		}

		public int getSize() {
			return vt.size();
		}

		public void addTransaction(Transaction t) {
			vt.addElement(t);
		}

		public Transaction getTransaction(int idx) {
			return vt.elementAt(idx);
		}

	}

	public class AssoRule implements Comparable<AssoRule> {
		private String ruleContent;
		private double confidence;

		public void setRuleContent(String ruleContent) {
			this.ruleContent = ruleContent;
		}

		public void setConfidence(double confidence) {
			this.confidence = confidence;
		}

		public AssoRule(String ruleContent, double confidence) {
			this.ruleContent = ruleContent;
			this.confidence = confidence;
		}

		@Override
		public int compareTo(AssoRule o) {
			if (o.confidence > this.confidence) {
				return 1;
			} else if (o.confidence == this.confidence) {
				return 0;
			} else {
				return -1;
			}
		}

		@Override
		public String toString() {
			return ruleContent + ", confidence=" + confidence * 100 + "%";
		}

	}

	public static String getStringFromSet(HashSet<String> set) {
		StringBuilder sb = new StringBuilder();
		Iterator<String> iter = set.iterator();
		while (iter.hasNext()) {
			sb.append(iter.next() + ", ");
		}
		if (sb.length() > 2) {
			sb.delete(sb.length() - 2, sb.length() - 1);
		}
		return sb.toString();
	}

	// 計算具備最小支持度的一項頻繁集 >= minSupport
	public static HashMap<String, Integer> buildMinSupportFrequenceSet(
			TransactionDB tdb, int minSupport) {
		HashMap<String, Integer> minSupportMap = new HashMap<String, Integer>();

		for (int i = 0; i < tdb.getSize(); i++) {
			Transaction t = tdb.getTransaction(i);
			Iterator<String> it = t.getPnSet().iterator();
			while (it.hasNext()) {
				String key = it.next();
				if (minSupportMap.containsKey(key)) {
					minSupportMap.put(key, minSupportMap.get(key) + 1);
				} else {
					minSupportMap.put(key, new Integer(1));
				}
			}
		}

		Iterator<String> iter = minSupportMap.keySet().iterator();
		Vector<String> toBeRemoved = new Vector<String>();
		while (iter.hasNext()) {
			String key = iter.next();
			if (minSupportMap.get(key) < minSupport) {
				toBeRemoved.add(key);
			}
		}

		for (int i = 0; i < toBeRemoved.size(); i++) {
			minSupportMap.remove(toBeRemoved.get(i));
		}

		return minSupportMap;
	}

	public void buildRules(TransactionDB tdb,
			HashMap<HashSet<String>, Integer> kItemFS, Vector<AssoRule> var,
			double ruleMinSupportPer) {

		// 若是kItemFS的成員數量不超過1不須要計算
		if (kItemFS.size() <= 1) {
			return;
		}

		// k+1項頻項集
		HashMap<HashSet<String>, Integer> kNextItemFS = new HashMap<HashSet<String>, Integer>();

		// 得到第k項頻項集
		@SuppressWarnings("unchecked")
		HashSet<String>[] kItemSets = new HashSet[kItemFS.size()];
		kItemFS.keySet().toArray(kItemSets);

		/*
		 * 根據k項頻項集,用兩重循環得到k+1項頻項集 而後計算有多少個tranction包含這個k+1項頻項集
		 * 而後支持比超過ruleMinSupportPer,就能夠生成規則,放入規則向量
		 * 而後,將k+1項頻項集及其支持度放入kNextItemFS,進入下一輪計算
		 */
		for (int i = 0; i < kItemSets.length - 1; i++) {
			HashSet<String> set_i = kItemSets[i];
			for (int j = i + 1; j < kItemSets.length; j++) {
				HashSet<String> set_j = kItemSets[j];
				// k+1 item set
				HashSet<String> kNextSet = new HashSet<String>();
				kNextSet.addAll(set_i);
				kNextSet.addAll(set_j);
				if (kNextSet.size() <= set_i.size()
						|| kNextSet.size() <= set_j.size()) {
					continue;
				}

				// 計算k+1 item set在全部transaction出現了幾回
				int count = 0;
				for (int k = 0; k < tdb.getSize(); k++) {
					if (tdb.getTransaction(k).containSubSet(kNextSet)) {
						count++;
					}
				}
				if (count <= 0) {
					continue;
				}

				Integer n_i = kItemFS.get(set_i);
				double per = 1.0 * count / n_i.intValue();
				if (per >= ruleMinSupportPer) {
					kNextItemFS.put(kNextSet, new Integer(count));
					HashSet<String> tmp = new HashSet<String>();
					tmp.addAll(kNextSet);
					tmp.removeAll(set_i);
					String s1 = "{" + getStringFromSet(set_i) + "}" + "(" + n_i
							+ ")" + "==>" + getStringFromSet(tmp).toString()
							+ "(" + count + ")";
					var.addElement(new AssoRule(s1, per));
				}
			}
		}

		// 進入下一輪計算
		buildRules(tdb, kNextItemFS, var, ruleMinSupportPer);
	}

	public void test() {
		// Transaction數據集
		TransactionDB tdb = new TransactionDB();

		// 添加Transaction交易記錄
		tdb.addTransaction(new Transaction(new String[] { "a", "b", "c", "d" }));
		tdb.addTransaction(new Transaction(new String[] { "a", "b" }));
		tdb.addTransaction(new Transaction(new String[] { "b", "c" }));
		tdb.addTransaction(new Transaction(new String[] { "b", "c", "d", "e" }));

		// 規則最小支持度
		double minRuleConfidence = 0.5;
		Vector<AssoRule> vr = computeAssociationRules(tdb, minRuleConfidence);
		// 輸出規則
		int i = 0;
		for (AssoRule ar : vr) {
			System.out.println("rule[" + (i++) + "]: " + ar);
		}
	}

	public Vector<AssoRule> computeAssociationRules(TransactionDB tdb,
			double ruleMinSupportPer) {
		// 輸出關聯規則
		Vector<AssoRule> var = new Vector<AssoRule>();

		// 計算最小支持度頻項
		HashMap<String, Integer> minSupportMap = buildMinSupportFrequenceSet(
				tdb, 2);

		// 計算一項頻項集
		HashMap<HashSet<String>, Integer> oneItemFS = new HashMap<HashSet<String>, Integer>();
		for (String key : minSupportMap.keySet()) {
			HashSet<String> oneItemSet = new HashSet<String>();
			oneItemSet.add(key);
			oneItemFS.put(oneItemSet, minSupportMap.get(key));
		}

		// 根據一項頻項集合,遞歸計算規則
		buildRules(tdb, oneItemFS, var, ruleMinSupportPer);
		// 將規則按照可信度排序
		Collections.sort(var);
		return var;
	}

	public static void main(String[] args) {
		AprioriSetBasedDemo asbd = new AprioriSetBasedDemo();
		asbd.test();
	}

}

運行結果以下:

 

rule[0]: {d }(2)==>b (2), confidence=100.0%
rule[1]: {d }(2)==>c (2), confidence=100.0%
rule[2]: {d, a }(1)==>c (1), confidence=100.0%
rule[3]: {d, a }(1)==>b (1), confidence=100.0%
rule[4]: {d, a }(1)==>b (1), confidence=100.0%
rule[5]: {d, c }(2)==>b (2), confidence=100.0%
rule[6]: {d, b, a }(1)==>c (1), confidence=100.0%
rule[7]: {d, b, a }(1)==>c (1), confidence=100.0%
rule[8]: {d, c, a }(1)==>b (1), confidence=100.0%
rule[9]: {b }(4)==>c (3), confidence=75.0%
rule[10]: {b, c }(3)==>d (2), confidence=66.66666666666666%
rule[11]: {b, c }(3)==>d (2), confidence=66.66666666666666%
rule[12]: {d }(2)==>a (1), confidence=50.0%
rule[13]: {b }(4)==>a (2), confidence=50.0%
rule[14]: {d, c }(2)==>b, a (1), confidence=50.0%
rule[15]: {d, b }(2)==>a (1), confidence=50.0%

 

做者:u011539200 發表於2015/5/27 6:33:28  原文連接
閱讀:113 評論:0  查看評論
 
[原]weka實戰004:fp-growth關聯規則算法

apriori算法的計算量太大,若是數據集略大一些,會比較慢,很是容易內存溢出。

 

咱們能夠算一下複雜度:假設樣本數有N個,樣本屬性爲M個,每一個樣本屬性平均有K個nominal值。

1. 計算一項頻繁集的時間複雜度是O(N*M*K)。

2. 假設具備最小支持度的頻繁項是q個,根據它們則依次生成一項頻繁集,二項頻繁集,....,r項頻繁集合,它們的元素數量分別是:c(q, 1), c(q,2), ...,c(q, r)。那麼頻繁集的數量是極大的,單機確定不能支持,好比說,假設q=10000--其實很小,電商/零售商的數據比這大太多了--此時生成的二項頻繁集合的元素數量是5千萬,三項頻繁集超過1000億... 打住吧,不要再往下算了...

3. 若是transaction有100萬個,這也不算多,但計算二項頻繁集的關聯規則就要掃描數據庫100萬*5千萬。

 

因此快速算法是必須,不然搞不下去。

 

fp-growth就是一種快速算法,設計很是巧妙,它的流程是這樣的:

 

1. 計算最小支持度頻繁項,並按照支持度從大到小排列,形如{'f':100, 'c':84, 'd':75, 'a':43, 'q':19, ...}

 

2. 把transaction的全部記錄,都按照最小支持度頻繁項進行排列,若是沒有某個頻繁項,就空下來,因而,transaction就是以下的形式:

{'f', 'd', 'q', ....}  //前面是頻繁項,後面是非頻繁項

{'c', 'd', 'a', ...}

...

 

3. 而後,創建一個fp-tree,樹結構:

    3.1 樹的根節點是null

    3.2 把transaction的記錄向樹結構作插入:

        3.2.1 第一次插入{'f', 'd', 'q', ....},此時null的子節點沒有'f',那就創建一個名爲'f'的節點,將它的次數計爲1,而後將這個transaction的id存儲在節點。

        3.2.2 第二次插入{'c', 'd', 'a', ...},此時null的子節點沒有'c',那就創建一個名爲'f'的節點,將它的次數計爲1,而後將這個transaction的id存儲在節點。

        3.2.3 以此類推,繼續插入其餘全部記錄,若是遇到節點已經存在,把節點次數+1,再把transaction加入到節點。

        3.2.4 當全部的transaction被加入到fp-tree以後,fp-tree的第一層子節點有若干個,就把全部transaction的第一個元素進行了分類。

        3.2.5 再按照這個方式,再對全部transaction的第二個元素進行分類,也就是在fp-tree的根節點的子節點進行上述3.2.1~3.2.3的操做。

        3.2.6 知道將全部transaction分到不在有符合最小支持度的元素爲止。這樣fp-tree就建成了。

    3.3 計算關聯規則,這就是很簡單啦,凡是須要計算的頻繁項集合,都在fp-tree上按照支持度列出來了,從根節點挨個往下薅就好了,並且,不再須要遍歷全部的transaction了,計算量大大減小。

    3.4 fp-tree的結構,很容易拆分到並行或者分佈式計算。

 

4. 實際上,在原做paper,構造fp-tree的方式和本文的方式略有差異,它是深度優先的,好比說,對第一個transaction是一次性創建'f'-->'d'-->'q'三個節點,而後計數,其餘以此類推。本文的方式爲了方便理解。

 

 

 

 

 

 

 

做者:u011539200 發表於2015/5/24 21:45:38  原文連接
閱讀:307 評論:0  查看評論
 
[原]weka實戰003:apriori關聯規則算法的實現

weka實現的apriori算法是在weka.associations包的Apriror類。

 

在這個類,挖掘關聯規則的入口函數是public void buildAssociations(Instances instances),而instances就是數據集,檢查數據,設置參數,初始化變量,而後,用一個do-while循環計算關聯規則。若是你看過上一篇,就知道其實就是從一項頻繁集開始,逐級二項頻繁集... N項頻繁集,直到達到中止準則,終止循環。

 

在這個循環裏,根據條件有若干種計算關聯規則的方式,其中一種方式,是執行兩個函數:findLargeItemSets()和findRulesBruteForce()。

 

findLargeItemSets()函數的做用,就是計算出全部的超過最小支持度的頻繁集。全部的操做都是用集合來的,這裏最重要的類就是AprioriItemSet,它作各類集合操做,也計算關聯規則。

 

findRulesBruteForce()函數,就是暴力直接粗魯地一個個取出前面算好的AprioriItemSet,計算關聯規則。

 

整個過程就這麼簡單。

 

 

 

 

 

做者:u011539200 發表於2015/5/23 21:36:12  原文連接
閱讀:98 評論:0  查看評論
 
[原]weka實戰002:apriori關聯規則算法

關聯規則算法最出名的例子就是啤酒和尿布放一塊兒賣。

 

假如咱們去超市買東西,付款後,會拿到一張購物清單。這個清單就是一個Transaction。對關聯規則算法來講,每一個產品的購買數量是無心義的,不參與計算。

 

許許多多的人買東西,生成了N個購物清單,也就是N個Transaction。

 

那麼,這些Transaction上的貨物之間有什麼有用的關係呢?這些關係能夠用什麼方式表達出來呢?這就是關聯規則算法要解決的問題。

 

下面,咱們用一個具體的例子解釋這個問題:

 

1. 假設有三個Transaction分別是:

t1 = {'a', 'b', 'c', 'd'}

t2 = {'a', 'c', 'e'};

t3 = {'b', 'c', 'f'}

其中,abcdef都是貨物的ID,簡寫是爲了方便理解。

 

2. 咱們看一下,就知道只要買了'a',就可能會買'c',或者說,只要買了'c'就極可能買了'a',並且,在2個Transaction上都出現了。這個規律能夠表達成:

  'c' ==> 'a'(66.67%)

後面的66.67%叫支持度,也就是'a'和'c'在一塊兒出現的次數,處以c的次數,也就2/3=66.67%。

 

3. 這就是關聯規則,各類關聯規則算法要解決的是在樣本數據很大或者樣本數量不少的狀況下計算關聯規則,以及減小內存,提升計算速度。

 

4. 那麼,apriori算法是如何作的呢?算法流程是這樣的:

    4.1 先從全部的transaction遍歷出全部貨物id,也就{'a', 'b', 'c', 'd', 'e', 'f'}

    4.2 再計算每一個貨物id在全部transaction上出現次數總和,也就是{'a':2, 'b':2, 'c':3, 'd':1, 'e':1, 'f':1}

    4.3 有經驗的同窗能夠知道上述兩個步驟用HashMap能一次性搞定

    4.4 對4.2的結果,將出現次數少於一個最小支持數閾值的貨物id刪除,若是閾值是1,則剩下的結果就是{'a':2, 'b':2, 'c':3}

    4.5 對4.4的結果,生成一項頻繁集,也就是{{'a'}, {'b'}, {'c'}}

    4.6 到這裏爲止,就獲得了apriori算法的核心,頻繁集,之後的全部計算都是在頻繁集上進行:

        4.6.1 根據一項頻繁集,生成二項頻繁集,也就是{{'a','b'}, {'a','c'}, {'b','c'}},也就是任意兩個一項頻繁集的組合。

        4.6.2 計算二項頻繁集的貨物id同時在全部transaction上的出現次數:{{'a','b'}:1, {'a','c'}:2, {'b','c'}:2}

        4.6.3 根據最小支持數閾值=1,刪除4.6.2的低值二項頻繁集,其結果就是{{'a','c'}:2, {'b','c'}:2}

        4.6.4 根據二項頻繁集合計算關聯規則:

                  'c' ==> 'a'(66.67%)

                  'c' ==> 'b'(66.67%)

         4.6.5 根據二項頻繁集,計算三項頻繁集以及在三項頻繁集上的關聯規則,其步驟相似4.6.1~4.6.4。

         4.6.6 上述計算步驟,能夠寫成一個while循環,計算到高次頻繁集爲空,也就是不在有新規則產生爲止。而後輸出全部的規則。算法結束。

 

5. 幾個問題

    5.1 關聯規則不能處理連續值屬性,全部要將連續值屬性轉化成nominal屬性進行計算。

    5.2. 若是樣本的屬性值不少,或者transaction總數不少,apriori算法會很慢,由於每一輪計算都須要查詢整個數據庫。爲此,學者們提出不少優化算法,剪切,並行,fp-growth等等。

 

 

 

 

 

 

做者:u011539200 發表於2015/5/22 7:30:15  原文連接
閱讀:107 評論:0  查看評論
 
[原]Hadoop 1.x的Task,ReduceTask,MapTask隨想

Hadoop的技術體系,最使人稱讚的是細節。它的基本原理是很是容易理解的,細節是魔鬼。

 

hadoop的hdfs是文件系統存儲,它有三類節點namenode, scondraynamenode, datanode,前兩種在集羣分別只有一個節點,而datanode在集羣有不少個。hdfs的解耦作的很是好,以致於它能夠單獨運行,作一個海量數據的文件存儲系統。它能夠跟mapreduce分別運行。

 

對mapreduce任務來講,它有兩類節點, jobtracker,tasktracker。前者每一個集羣以後一個,後者有許多個。顧名思義,tasktracker就是運行任務task。task有兩種,maptask和reducertask。

 

一個mapreduce任務job,要作拆分,拆分紅若干個inputsplit。每一個inputsplit對應一個maptask。maptask執行完,將結果傳給reducetask。而後reduecetask處理後將最終結果輸出到hdfs存儲。

 

MapTask和ReducerTask的基類是抽象類Task,它們在抽象的層次上近似,只是處理數據的流程不一樣。每一個tasktracker節點能夠同時運行這兩種task。

 

這裏有複雜的細節。tasktracker和jobtracker經過遠程rpc的方式進行心跳服務。心跳服務調用會帶上各類信息,有些是tasktracker報告本身的狀態和任務執行狀況,有些是jobtracker在應答裏讓tasktracker執行任務,不一而足。

 

每一個job有jobid,拆分紅若干個maptask和reducertask以後,又有taskid。每一個maptask執行結束,將結果寫入hdfs,又經過http的方式傳遞給reducertask。

 

因而:

1. 一切通信由遠程rpc調用實現。

2. hdfs是存儲,可單獨運行。

3. mapreduce是分佈式計算,它使用hdfs。

4. task是節點計算的核心。

5. 大量的細節實現以保證可靠性和穩定性。

做者:u011539200 發表於2015/5/14 21:05:39  原文連接
閱讀:349 評論:0  查看評論
 
[原]Hadoop 1.x的Shuffle源碼分析之3

shuffle有兩種,一種是在內存存儲數據,另外一種是在本地文件存儲數據,二者幾乎一致。

 

以本地文件進行shuffle的過程爲例:

 

mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
              compressedLength)

shuffleToDisk函數以下:

 

 

private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
                                      InputStream input,
                                      Path filename,
                                      long mapOutputLength) 
      throws IOException {
        // Find out a suitable location for the output on local-filesystem
        //在本地文件系統作輸出,輸出文件的path
        Path localFilename = 
          lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), 
                                         mapOutputLength, conf);

        //建立Map輸出
        MapOutput mapOutput = 
          new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), 
                        conf, localFileSys.makeQualified(localFilename), 
                        mapOutputLength);


        // Copy data to local-disk
        //從input讀取數據,寫入到本地文件,這個input是http鏈接建立的流式輸入
        OutputStream output = null;
        long bytesRead = 0;
        try {
          output = rfs.create(localFilename);
          
          byte[] buf = new byte[64 * 1024];
          int n = -1;
          try {
            n = input.read(buf, 0, buf.length);
          } catch (IOException ioe) {
            readError = true;
            throw ioe;
          }
          while (n > 0) {
            bytesRead += n;
            shuffleClientMetrics.inputBytes(n);
            output.write(buf, 0, n);

            // indicate we're making progress
            reporter.progress();
            try {
              n = input.read(buf, 0, buf.length);
            } catch (IOException ioe) {
              readError = true;
              throw ioe;
            }
          }

          LOG.info("Read " + bytesRead + " bytes from map-output for " +
              mapOutputLoc.getTaskAttemptId());

          //正常取完數據,關閉。
          output.close();
          input.close();
        } catch (IOException ioe) {
          LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), 
                   ioe);

          // Discard the map-output
          try {
            mapOutput.discard();
          } catch (IOException ignored) {
            LOG.info("Failed to discard map-output from " + 
                mapOutputLoc.getTaskAttemptId(), ignored);
          }
          mapOutput = null;

          // Close the streams
          IOUtils.cleanup(LOG, input, output);

          // Re-throw
          throw ioe;
        }

        // Sanity check
        //檢查讀取是否正常
        if (bytesRead != mapOutputLength) {
          try {
            mapOutput.discard();
          } catch (Exception ioe) {
            // IGNORED because we are cleaning up
            LOG.info("Failed to discard map-output from " + 
                mapOutputLoc.getTaskAttemptId(), ioe);
          } catch (Throwable t) {
            String msg = getTaskID() + " : Failed in shuffle to disk :" 
                         + StringUtils.stringifyException(t);
            reportFatalError(getTaskID(), t, msg);
          }
          mapOutput = null;

          throw new IOException("Incomplete map output received for " +
                                mapOutputLoc.getTaskAttemptId() + " from " +
                                mapOutputLoc.getOutputLocation() + " (" + 
                                bytesRead + " instead of " + 
                                mapOutputLength + ")"
          );
        }

        return mapOutput;

      }

因此說,這一段shuffle的本質就是,從http的輸入流讀取數據,而後存放在本地文件系統的磁盤文件,寫完以後,把taskId, jobid,本地文件名等等諸多參數放在MapOutput對象記錄下來,而後返回一個MapOutput對象。

 

 

java的代碼很直接,沒有花花繞的東東,除了略有一點冗長,實在沒什麼缺點  :)

做者:u011539200 發表於2015/5/13 21:59:11  原文連接
閱讀:296 評論:0  查看評論
 
[原]Hadoop 1.x的Shuffle源碼分析之2

ReduceTask類的內嵌類ReduceCopier的內嵌類MapOutputCopier的函數copyOutput是Shuffle裏最重要的一環,它以http的方式,從遠程主機取數據:建立臨時文件名,而後用http讀數據,再保存到內存文件系統或者本地文件系統。它讀取遠程文件的函數是getMapOutput。

 

getMapOutput函數以下:

 

private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
                                     Path filename, int reduce)
      throws IOException, InterruptedException {
        //創建http連接
        URL url = mapOutputLoc.getOutputLocation();
        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
        //建立輸入流
        InputStream input = setupSecureConnection(mapOutputLoc, connection);

        //檢查鏈接姿式是否正確
        int rc = connection.getResponseCode();
        if (rc != HttpURLConnection.HTTP_OK) {
          throw new IOException(
              "Got invalid response code " + rc + " from " + url +
              ": " + connection.getResponseMessage());
        }
 
        //從http連接獲取mapId
        TaskAttemptID mapId = null;
        try {
          mapId =
            TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
        } catch (IllegalArgumentException ia) {
          LOG.warn("Invalid map id ", ia);
          return null;
        }
</pre><pre code_snippet_id="665348" snippet_file_name="blog_20150513_3_7696491" name="code" class="java">        //檢查mapId是否一致
        TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
        if (!mapId.equals(expectedMapId)) {
          LOG.warn("data from wrong map:" + mapId +
              " arrived to reduce task " + reduce +
              ", where as expected map output should be from " + expectedMapId);
          return null;
        }
        //若是數據有壓縮,要獲取壓縮長度
        long decompressedLength = 
          Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
        long compressedLength = 
          Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));

        if (compressedLength < 0 || decompressedLength < 0) {
          LOG.warn(getName() + " invalid lengths in map output header: id: " +
              mapId + " compressed len: " + compressedLength +
              ", decompressed len: " + decompressedLength);
          return null;
        }
        int forReduce =
          (int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));
        
        if (forReduce != reduce) {
          LOG.warn("data for the wrong reduce: " + forReduce +
              " with compressed len: " + compressedLength +
              ", decompressed len: " + decompressedLength +
              " arrived to reduce task " + reduce);
          return null;
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("header: " + mapId + ", compressed len: " + compressedLength +
              ", decompressed len: " + decompressedLength);
        }

        //We will put a file in memory if it meets certain criteria:
        //1. The size of the (decompressed) file should be less than 25% of 
        //    the total inmem fs
        //2. There is space available in the inmem fs
        
        // Check if this map-output can be saved in-memory
        boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 

        // Shuffle
        MapOutput mapOutput = null;
        if (shuffleInMemory) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                compressedLength + " raw bytes) " + 
                "into RAM from " + mapOutputLoc.getTaskAttemptId());
          }
          //在內存作shuffle處理
          mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
                                      (int)decompressedLength,
                                      (int)compressedLength);
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                compressedLength + " raw bytes) " + 
                "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
          }
          //在本地作shuffle處理
          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
              compressedLength);
        }
        mapOutput.decompressedSize = decompressedLength;    
        return mapOutput;
      }

 

做者:u011539200 發表於2015/5/13 7:59:01  原文連接
閱讀:406 評論:0  查看評論
 
[原]Hadoop 1.x的Shuffle源碼分析之1

先參考董西成的博文  http://dongxicheng.org/mapreduce/hadoop-shuffle-phase/   

Hadoop中shuffle階段流程分析

 

 

Hadoop的一個任務執行過程,分爲Map和Reduce兩個階段。而shuffle發生在Reducer階段。Hadoop 1.2.1裏,Reduce類的源碼在org.apache.hadoop.mapreduce包的Reducer.java文件,這裏有一份詳細的reduce過程的解釋。shuffle是reduce的第一個階段,以http的方式從map任務的輸出獲取數據。reduce的第二個階段是sort,根據key對來reducer的輸入進行排序,由於不一樣的map任務可能會產生相同key的輸出。reducer的第三個階段就是作最終處理,根據Reducer的reduce函數處理數據。

 

Reducer類只是提供了一個可Override的reduce函數,Shuffle實際上在ReduceTask類執行。ReduceTask類在org.apache.hadoop.mapred包裏。

 

ReduceTask類比較複雜,有7個內嵌類,有些內嵌類裏也有本身的內嵌類,畢其功於一役的作法。它的主要代碼在run函數執行:

這是reduce的三個階段:

 

if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }

shuffle階段在這裏

 

 

reduceCopier = new ReduceCopier(umbilical, job, reporter);
      if (!reduceCopier.fetchOutputs()) {
        if(reduceCopier.mergeThrowable instanceof FSError) {
          throw (FSError)reduceCopier.mergeThrowable;
        }
        throw new IOException("Task: " + getTaskID() + 
            " - The reduce copier failed", reduceCopier.mergeThrowable);
      }


 

fetchOutputs函數接近400行,比較長,略坑,在這個函數,

 

copiers = new ArrayList<MapOutputCopier>(numCopiers);

 

 

建立了一組MapOutputCopier,它是個線程類,負責取數據。

//start the on-disk-merge thread
      localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
      //start the in memory merger thread
      inMemFSMergeThread = new InMemFSMergeThread();
      localFSMergerThread.start();
      inMemFSMergeThread.start();
      
      // start the map events thread
      getMapEventsThread = new GetMapEventsThread();
      getMapEventsThread.start();

再建立幾個線程,有的負責取map任務的信息,有的負責對結果作歸併。

 

繼續往下,是一個while循環,這個循環處理取數據,有一百多行代碼,當數據取完或者達到失敗上限就終止循環。

 

循環結束後,依次終止 獲取map事件線程, 取數據線程,shuffl內存管理線程,排序線程,至此shuffle就結束了。

做者:u011539200 發表於2015/5/6 22:10:42  原文連接
閱讀:145 評論:0  查看評論
 
[原]weka實戰001:一篇博文簡單瞭解weka

weka是java寫的開源模式識別和數據挖掘軟件,已經有十多年的歷史了。weka的官網在http://www.cs.waikato.ac.nz/ml/weka/。

 

模式識別和數據挖掘有四個問題,

第一:問題是什麼

第二:數據是什麼

第三:如何學習

第四:學習結果可靠嗎?

 

第一個問題來自需求。分析需求是很難的:嚴密的邏輯,深刻了解行業宏觀和細節,熟悉技術領域和學術領域的進展,有多個成功項目的實踐經驗,這四個因素缺一不可,因此一般由一個團隊不一樣領域的精英合做完成。

 

weka不解決需求問題。

 

第二個問題是數據。每一個樣本對應一個weka的Instance,由多個樣本組成的數據集對應weka的Instances,這是存儲。對數據集,須要選擇各類樣本進行訓練和測試,這裏存在諸多的選擇方法。好比,只選擇部分樣本進行訓練和測試,處理屬性缺失的樣本,只選擇部分屬性進行訓練和測試,如何對樣本次序重排以改變訓練和測試效果。如何以有監督或者無監督的方式選擇樣本及其屬性。

 

第三個問題是學習。若是能精肯定義第一個問題,那麼第三個問題的答案也必然是清晰的。weka提供大量的算法,分類,迴歸,聚類,關聯規則等等。對初學者而言,選擇算法是個大問題,每種算法都各有好處,但又沒有一種算法在大多數指標上好過其餘算法。這裏的訣竅就是大量的作實驗並分析結果,作的多了天然就知道什麼是好的。

 

第四個問題是驗證學習器是否可靠。經常使用的方式就是交叉驗證,五倍交叉或者十倍交叉。再配合網格調參。常規問題就能夠解決了。

 

對大數據big data,weka的建議是,用命令行操做數據和訓練,若是有可能,本身用groovy或者jython實現算法,或者使用能夠增量學習的算法。它這麼說的意思其實代表,weka目前尚未對big data作好準備,因此最好用它解決單機能搞定的問題。

 

 

做者:u011539200 發表於2015/5/3 11:39:35  原文連接
閱讀:400 評論:0  查看評論
 
[原]HBase 二次開發 java api和demo
1. 試用thrift python/java以及hbase client api,結論以下:
    1.1 thrift的安裝和發佈繁瑣,可能會遇到未知的錯誤,且hbase.thrift的版本在變化中。優勢代碼簡單,須要打包的內容少。
    1.2 hbase client api,須要的jar不少,發佈版的容量也很大,打包後近百兆。優勢是,明確,無歧義。
 
2. 推薦用hbase client api的方式搞定。
 
3. 如下均爲技術細節。
 
4. 有一臺機器/一個集羣,在運行hadoop,也運行了基於這個hadoop集羣的hbase集羣,同時,也運行了一個zookeeper集羣,咱們統稱它是A。
 
5. 有一臺集羣負責開發,咱們在上面寫代碼,編譯代碼,運行代碼,咱們稱它是B。
 
6. 在B上,要修改/etc/hosts,把A的任意一臺zookeeper服務器的hostname和對應的ip地址放進去,由於hbase client須要鏈接到zookeeper,以便得到hbase的hmast信息---hbase集羣有多個hmast,一個是主hmast,其餘是備用hmaster,若是主hmaster掛了,備用的會頂上,避免單點故障問題。
 
7. 在B上開發,在elipse創建一個java項目,添加一個lib目錄,把A上的hadoop, hbase, zookeeper的全部jar包,注意,是全部jar包,各級子目錄的也算在內,都複製到lib目錄,大概有130個左右,90M。而後,再把它們添加到buildpath。這麼作的好處是,不用一點點找究竟哪一個類在哪一個包,生命短暫,不要把時間浪費在這裏,浪費點磁盤空間不要緊。
    若是hadoop,hbase, zookeeper都安裝在一個目錄下,能夠用一個shell語句搞定:
    for i in `find . -name "*.jar"`;      do cp $i ~/alljars;    done;
    而後再把alljars下的jar包都複製到B的lib目錄。
 
8. 寫一個最簡單的hbase demo,在hbase裏檢查一個表是否存在,若是不存在,就建立它。
-----------------------------------------
package hbasedemo;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;

public class Main {

public static void main(String[] args) throws IOException{
Configuration hbase_conf = new Configuration();
hbase_conf.set("hbase.zookeeper.quorum", "brianxxxooo"); //brianxxxooo是A裏的zookeeper機器的hostname
hbase_conf.set("hbase.zookeeper.property.clientPort","2181");
Configuration conf = HBaseConfiguration.create(hbase_conf);

String tablename="scores";
String[] familys = {"grade", "course"};

HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tablename)){
System.out.println("table exist, return!");
return;
}

HTableDescriptor td = new HTableDescriptor(TableName.valueOf(tablename));
for(int i = 0; i < familys.length; i++){
td.addFamily(new HColumnDescriptor(familys[i]));
}
admin.createTable(td);
System.out.println("create table "+tablename+" ok.");

}
-----------------------------------------
 
9. 注意事項,hbase client的版本變化甚多,具體api調用要根據版原本,有時候須要參考多個版原本。好比,0.96.x的HTableDescripter更接近 http://hbase.apache.org/apidocs/index.html  , 而不是0.94的api。但HBaseAdmin在0.94的api是有的,在2.0.0裏沒有。很是混亂。估計這個局面還要持續一段時間。
 
10. 更詳細的例子
------------------------------------------
package hbasedemo;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class Main {

public static void main(String[] args) throws IOException{
Configuration hbase_conf = new Configuration();
hbase_conf.set("hbase.zookeeper.quorum", "brianvxxxxooooo");
hbase_conf.set("hbase.zookeeper.property.clientPort","2181");
Configuration conf = HBaseConfiguration.create(hbase_conf);

String tablename="scores";
String[] familys = {"grade", "course"};

HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tablename)){
System.out.println("table exist!");
}else{
HTableDescriptor td = new HTableDescriptor(TableName.valueOf(tablename));
for(int i = 0; i < familys.length; i++){
td.addFamily(new HColumnDescriptor(familys[i]));
}
admin.createTable(td);
System.out.println("create table "+tablename+" ok.");
}

HTable table = new HTable(conf, "scores");
Put put = new Put(Bytes.toBytes("row1"));

//create
put.add(Bytes.toBytes("grade"), Bytes.toBytes("g1"), Bytes.toBytes(781));
put.add(Bytes.toBytes("grade"), Bytes.toBytes("g2"), Bytes.toBytes("this is test"));
table.put(put);

//read
Get get = new Get(Bytes.toBytes("row1"));
get.addColumn(Bytes.toBytes("grade"), Bytes.toBytes("g1"));
Result result = table.get(get);
byte[] val = result.getValue(Bytes.toBytes("grade"), Bytes.toBytes("g1"));
System.out.println(Bytes.toInt(val));


}
------------------------------------------
 
其餘各類操做於此類似,再也不一一列出。
做者:u011539200 發表於2014/11/12 11:23:49  原文連接
閱讀:858 評論:0  查看評論
 
[原]apache oozie安裝試用
oozie是hadoop的工做流Scheduler,最新的版本到4.0.1了。試用了下,小坑還蠻多的。

1. 編譯
個人主機上跑的是Hadoop 1.1.2,選的是oozie 3.3.0版本,下載源碼,解壓縮。
首先,要把源碼裏的javaversion從1.6改爲1.7,主機是用jdk1.7。
編譯oozie,命令是'./bin/mkdistro.sh -DskipTests -Dhadoop.version=1.0.1',跳過測試,另外,無論Hadoop的版本是多少,只要它是1.x的,就只能是-Dhaoop.version=1.0.1,填其餘版本號編譯不經過。若是是Hadoop 2.x,只能填'-Dhadoop.version=2.0.0-alpha',填其餘版本號編譯不經過。

2. 安裝
編譯結果在oozie-3.3.0/distro/target/oozie-3.3.0-distro/oozie-3.3.0,這個目錄有
--------------------
bin lib oozie-core oozie-sharelib-3.3.0.tar.gz
conf libtools oozie-examples.tar.gz oozie.war
docs.zip oozie-client-3.3.0.tar.gz oozie-server release-log.txt
--------------------
把這些文件複製到安裝目錄/usr/local/lib/oozie-3.3.0,或者其餘地方好比個人是~/usr/oozie-3.3.0。之後的操做都是安裝目錄進行。

3. 配置hadoop
在hadoop的core-site.xml裏添加oozie的配置
--------------------
  <!-- OOZIE -->
  <property>
    <name>hadoop.proxyuser.[youname].hosts</name>
    <value>*</value>
  </property>
  <property>
    <name>hadoop.proxyuser.brian.groups</name>
    <value>*</value>
  </property>
--------------------
把[youname]替換成你的當前用戶名。
而後啓動Hadoop。

4. 官檔提到要把oozie-3.3.0目錄下hadooplibs tar.gz解壓縮,但若是你的系統已經安裝hadoop了,編譯的時候不會生成這個文件,這一步能夠忽略。

5. 建立oozie-3.3.0/libext目錄。下載http://extjs.com/deploy/ext-2.2.zip放到這個目錄,不須要解壓,再把主機上hadoop-1.1.2的幾個主jar文件複製到這個目錄,也就是:
---------------------------
hadoop-client-1.1.2.jar hadoop-examples-1.1.2.jar hadoop-test-1.1.2.jar
hadoop-ant-1.1.2.jar hadoop-core-1.1.2.jar hadoop-minicluster-1.1.2.jar hadoop-tools-1.1.2.jar
---------------------------

6. 在oozie-3.3.0目錄下,運行'./bin/oozie-setup.sh',執行安裝步驟。

7. 在oozie-3.3.0目錄下,運行'./bin/ooziedb.sh create -sqlfile oozie.sql -run',建立數據庫。

8. 在ooize-3.3.0目錄下,運行'./bin/oozie-start.sh',啓動oozie。

9. 在ooize-3.3.0目錄下,運行'./bin/oozie admin -oozie http://localhost:11000/oozie -status',檢查oozie是否啓動成功,正常狀況下,輸出值是normal。

10.在ooize-3.3.0目錄下,解壓縮oozie-sharelib-3.3.0.tar.gz,獲得一個目錄share,用'hadoop fs -put share share'將它放到hdfs上。

11.在ooize-3.3.0目錄下,解壓縮oozie-examples.tar.gz,獲得目錄examples。修改examples/apps/mao-reduce/jobproperties文件的前兩句,替換成:
------------
nameNode=hdfs://localhost:9000
jobTracker=localhost:9001
------------
這是下一步要運行的demo,修改namenode和jobtracker的配置,默認值不對。
而後將examples目錄也put到hdfs上。

12. 在oozie-3.3.0目錄下,運行'./bin/oozie job -oozie http://localhost:11000/oozie -config examples/apps/map-reduce/job.properties -run',注意端口號是11000,官檔是8080,參數不對。執行後,輸出hadoop jod id,形如‘0000000-140826104216537-oozie-cke-W’

13. 根據job id檢查運行結果'./bin/oozie job -oozie http://localhost:11000/oozie -info 0000000-140826104216537-oozie-cke-W',看到success即代表成功。

14. 顯示運行結果:'hadoop fs -cat examples/output-data/map-reduce/part-00000'

流程是完整的,步驟是最簡的,解釋是忽略的 :),每一步的具體解釋請參考官檔。
做者:u011539200 發表於2014/8/26 16:07:05  原文連接
閱讀:1087 評論:0  查看評論
 
[原]一個網站的誕生10--自動化部署

所謂自動部署就是說,若是用10臺機器跑tornado程序提供Web服務,它們上面的代碼都是同樣的,這也叫生產環境。在公司寫新代碼,寫好了,測試經過,這叫開發環境。而後執行自動部署程序,它把新代碼提交到版本管理服務器,而後連上生產環境的10臺服務器,讓它們更新代碼,再重啓tornado程序,新代碼就上線了,用戶看到的就是新發布的網站。

自動部署的關鍵是兩個東西,一個是版本服務器,一個是遠程操做。

推薦用git版本服務器,推薦寥雪峯的git教程,寫得很是清晰,是目前爲止我見到的最容易入手的,連接在這裏http://www.liaoxuefeng.com/wiki/0013739516305929606dd18361248578c67b8067c8c017b000

推薦用python fabric遠程操做,http://www.fabfile.org/

作自動化部署,第一步是建立一個公網IP的git版本服務器,這樣10臺web server能訪問git服務器。而後在開發環境把代碼提交到git版本服務器。這個過程請參考寥同窗的教程,這裏就不重複了。

第二步,ssh登陸到任意一臺web server,手動執行命令,從git服務器更新代碼,而後把tornado程序開起來,記下這個過程的全部步驟。

第三部,用fabric把第二步的步驟寫一遍,填上10臺web server的ip地址,未來就能夠一次性更新10臺機器了。

zuijiacanting.com的自動化部署腳本相似以下,remote_deploy.py:

#!/usr/bin/env python
#! -*- coding:utf-8 -*-

from __future__ import with_statement
import os
from fabric.api import *

#ip of web server
env.hosts=['106.111.111.111']

def commit_to_remote():
    d = os.path.abspath("xx/yy")
    local("cd %s;git push origin master" % d)

def deploy_zjct():
    code_dir="/home/xxx/yyy"
    with cd(code_dir):
        run("sudo git pull origin master")
        run("sudo reboot")
在開發環境下,若是新代碼已經準備好了,執行命令"fab -f remote_deploy.py commit_to_remote",把新代碼提交到git版本服務器,而後再執行"fab -f remote_deploy.py deploy_zjct",fabric以ssh的方式登陸到web server,更新代碼,而後重啓。重啓運行新代碼。不重啓也能夠,kill掉tornado進程,自動監控會重啓tornado進程,重啓後也是運行新代碼。若是要更新多臺機器,在env.hosts把ip地址加進去就能夠了。

fabric能作的事情很是多,不只僅是上面這麼簡單,能夠實現各類酷炫的特效。

做者:u011539200 發表於2014/8/20 13:02:28  原文連接
閱讀:850 評論:0  查看評論
相關文章
相關標籤/搜索