談到並行計算應用,會有人想到PageRank算法,咱們有成千上萬的網頁分析連接關係肯定排名前後,藉助並行計算完成是一個很好的場景。長期以來,google的創始發明PageRank算法吸引了不少人學習研究,聽說當年google創始者興奮的找到yahoo公司,說他們找到一種更好的搜索引擎算法,可是被yahoo公司技術人員潑了冷水,說他們關心的不是更好的技術,而是搜索的盈利。後來google包裝成了「更先進技術的新一代搜索引擎」的身份,逐漸取代了市場,並實現了盈利。 java
因爲PageRank算法有很是高的知名度和普及度,咱們接下來以PageRank算法爲例講述「並行計算+數據算法」的經典搭配,而且這種「海量數據並行處理、迭代多輪後收斂」的分析過程也跟其餘的數據挖掘或者機器學習算法應用相似,能起到很好的參考做用。 算法
下面是PageRank算法的公式:
咱們其實能夠直接闡述該公式自己,並介紹如何使用並行計算套用上面公式獲得各網頁的PageRank值,這樣雖然經過並行計算方式完成了PageRank計算,可是你們仍然不明白上面的PageRank公式是怎麼來的。 shell
咱們把這個PageRank算法公式先放在一邊,看看一個賭錢的遊戲:
有甲、乙、丙三我的賭錢,他們的輸贏關係以下:
甲的錢輸給乙和丙
乙的錢輸給丙
丙的錢輸給甲
例如,甲、乙、丙各有本錢100元,按照以上輸贏關係,玩一把下來:
甲輸給乙50元、輸給丙50元
乙輸給丙100元
丙輸給甲100元 數組
若是僅是玩一把的話很容易算出誰輸誰贏
但若是他們幾個維持這樣的輸贏關係,贏的錢又投進去繼續賭,這樣一輪一輪賭下去的話,最後會是什麼樣子呢? 機器學習
咱們能夠寫個單機程序看看,爲了方便計算,初始本錢都設爲1塊錢,用x1,x2,x3表明甲、乙、丙:
double x1=1.0,x2=1.0,x3=1.0;
用x1_income,x2_income,x3_income表明每賭一把後各人贏的錢,根據輸贏關係:
double x2_ income =x1/2.0;
double x3_ income =x1/2.0+x2;
double x1_ income =x3;
最後再把各人贏的錢覆蓋掉本錢,繼續往下算。完整程序以下: 分佈式
// Gamble單機程序 性能
public class Gamble { public static double x1=1.0,x2=1.0,x3=1.0; public static void playgame(){ double x2_income=x1/2.0; double x3_income=x1/2.0+x2; double x1_income=x3; x1=x1_income; x2=x2_income; x3=x3_income; System.out.println("x1:"+x1+", x2:"+x2+", x3:"+x3); } public static void main(String[] args){ for(int i=0;i<500;i++){ System.out.print("第"+i+"輪 "); playgame(); } } }
咱們發現,從107輪後,各人的輸贏結果就一直是
x1:1.2000000000000002, x2:0.6000000000000001, x3:1.2000000000000002
…...
可能你都沒想到會有這麼個規律,這樣一直賭下去,雖然各人每輪有輸有贏,可是多輪後的輸贏結果竟然保持平衡,維持不變了。用技術術語來講就是多輪迭代後產生了收斂,用俗話來說,就是玩下去甲和丙是不虧的,乙不服輸再繼續賭下去,也不會有扳本的機會的。 學習
咱們再把輸贏關係稍微改一下:丙的錢輸給甲和乙
double x2_income=x1/2.0+x3/2.0;
double x3_income=x1/2.0+x2;
double x1_income=x3/2.0; this
運行10000輪後,發現又收斂了:
x1:0.6666666666666667, x2:1.0, x3:1.3333333333333333
…
不過此次就變成了「甲是輸的,乙保本,丙是贏的」,咱們發現收斂的結果可用於排名,若是給他們作一個賭王排名的話,很顯然:「丙排第一,乙第2、甲第三」。 搜索引擎
那麼這樣的收斂是在全部狀況下都會發生嗎,什麼狀況不會收斂呢?
咱們回過頭觀察上面的輸贏關係,甲、乙、丙三人互相各有輸贏,致使錢沒有流走,因此他們三人才一直能夠賭下去,若是把輸贏關係改一下,讓甲只輸錢,不贏錢,以下:
double x2_income=x1/2.0+x3/2.0;
double x3_income=x1/2.0+x2;
double x1_income=0;
那麼運行下來會是什麼結果呢?
咱們發現不少輪後,所有爲0了。咱們分析一下過程,第一輪後,甲的錢就輸光了,沒有贏得一分錢。可是乙和丙各有輸贏,他們一直賭到2000多輪時,乙的錢所有輸光了,甲乙都沒錢投進來賭了,致使丙再也贏不到錢了,最後全部人結果都變爲0了。
咱們再分析一下輸贏關係,甲的錢所有輸給丙和乙後,丙跟乙賭,贏的多輸的少,因而全部的錢慢慢都被丙贏走了,致使最後沒法維持一個平衡的輸贏結果。所以,若是咱們要維持平衡和收斂,必須保證贏了錢的人不許走,必須又輸給別人才行,讓錢一直在三人圈裏轉不流失。換句話說,若是存在某人只輸不贏,那麼這個遊戲就玩不下去。
賭錢遊戲講完了,咱們再看看PageRank算法的公式:
上面的L(B)表明頁面B指向其餘頁面的鏈接數,咱們舉個例子:
假設有A、B、C三張網頁,他們的連接關係以下:
A包含B和C的連接
B包含C的連接
C包含A的連接
根據上面的公式,獲得各網頁PR值以下:
PR(B)=PR(A)/2;
PR(C)=PR(A)/2+PR(B);
PR(A)=PR(C);
能夠回過頭對照一下,把A、B、C改爲甲、乙、丙就是上面舉的賭錢遊戲例子。
那麼q是幹嘛的?公式裏的q叫作逃脫因子,名字很抽象,目的就是用於解決上面賭錢遊戲中「只輸不贏」不收斂的問題,1-q會保證其中一個PR值爲0時計算下來不會所有爲0,那麼加了這麼一個(…)*q+1-q的關係後,總體的PR值會變化嗎?
當每一個頁面的初始PR值爲1時,0<=q<=1(計算時一般取值0.8),咱們把全部頁面的PR值相加看看,假設有n張網頁:
PR(x1)+ PR(x2)+ …+PR(xn) =( (PR(x2)/ L(x2)+ … )*q+1-q) + … + ( (PR(x1)/ L(x1)+ … )*q+1-q) =(PR(x1)* L(x1)/L(x1) + PR(x2)* L(x2)/L(x2) + … + PR(xn)* L(xn)/L(xn))q + n(1-q) =( PR(x1) + PR(x2) + … + PR(xn))*q + n - n*q =n*q + n – n*q = n
因爲初始PR值爲1,因此最後全部頁面的PR值相加結果仍是爲n,保持不變,可是加上(…)*q+1-q的關係後,就避免了PR值爲0能夠尋求收斂進行排序。
固然實際應用中,這個公式還能夠設計的更復雜,並能夠經過高等代數矩陣旋轉求解,咱們這裏只是爲了理解原理,並非爲了作搜索算法,因此就再也不深刻下去了。
總結:世界的不少東西都是零和遊戲,就像炒股,股民賺的錢也就是機構虧的錢,機構賺的錢也就是股民虧的錢,也許股民們應該研究一下PageRank算法,看看股市起起落落的背後是否是收斂了,收斂了說明炒下去永遠別想解套,並且機構永遠不會虧。
如何使用並行計算方式求PR值:
咱們這裏經過fourinone提供的各類並行計算模式去設計,思路方法能夠有不少種。
第一次使用能夠參考分佈式計算上手demo指南,開發包下載地址:http://code.google.com/p/fourinone/
思路一:能夠採起工人互相合並的機制(工人互相合並及receive使用可參見sayhello demo),每一個工人分析當前網頁連接,對每一個連接進行一次PR值投票,經過receive直接投票到該連接對於網頁所在的工人機器上,這樣通過一輪工人的互相投票,而後再統計一下本機器各網頁所得的投票數獲得新的PR值。可是這種方式,對於每一個連接投票,都要調用一次receive到其餘工人機器,比較耗用帶寬,網頁數量龐大連接衆多時要調用不少次receive,致使性能不高。
思路二:因爲求PR值的特色是輸入數據大,輸出數據小,也就是網頁成千上萬佔空間多,可是算出來的PR值佔空間小,咱們姑且用內存能夠裝下。所以咱們優先考慮每一個工人統計各自機器上的網頁,計算各連接對應網頁的所得投票,而後返回工頭統一合併獲得各網頁的PR值。能夠採用最基本的「總—分—總」並行計算模式實現(請參考分佈式計算上手demo指南)。
並行計算的拆分和合並設計以下:
能夠看到:
工人負責統計各自機器上網頁的各個連接的PR得票。
工頭負責合併累加獲得各連接對應網頁的新PR值,並迭代計算。
程序實現:
PageRankWorker:是一個PageRank工人實現,爲了方便演示,它經過一個字符串數組表明包括的連接(實際上應該從本地網頁文件裏獲取)
links = new String[]{"B","C"};
而後對連接集合中的每一個連接進行PR投票
for(String p:links)
outhouse.setObj(p, pr/links.length);
PageRankCtor:是一個PageRank包工頭實現,它將A、B、C三個網頁的PageRank初始值設置爲1.00,而後經過doTaskBatch進行階段計算,doTaskBatch提供一個柵欄機制,等待每一個工人計算完成才返回,工頭將各工人返回的連接投票結果合併累加:
pagepr = pagepr+(Double)prwh.getObj(page);
獲得各網頁新的PR值(這裏取q值爲1進行計算),而後連續迭代500輪計算。
運行步驟:
一、 啓動ParkServerDemo(它的IP端口已經在配置文件指定)
java -cp fourinone.jar; ParkServerDemo
二、運行A、B、C三個PageRankWorker,傳入不一樣的IP和端口號
java -cp fourinone.jar; PageRankWorker localhost 2008 A
java -cp fourinone.jar; PageRankWorker localhost 2009 B
java -cp fourinone.jar; PageRankWorker localhost 2010 C
三、運行PageRankCtor
java -cp fourinone.jar; PageRankCtor
咱們能夠看到跟開始的單機程序的結果是同樣的,同時各工人窗口依次輸出了各自的PR值:
完整demo源碼以下:
// ParkServerDemo
import com.fourinone.BeanContext; public class ParkServerDemo { public static void main(String[] args) { BeanContext.startPark(); } }
// PageRankWorker
import com.fourinone.MigrantWorker; import com.fourinone.WareHouse; import com.fourinone.Workman; public class PageRankWorker extends MigrantWorker { public String page = null; public String[] links = null; public PageRankWorker(String page, String[] links){ this.page = page; this.links = links; } public WareHouse doTask(WareHouse inhouse) { Double pr = (Double)inhouse.getObj(page); System.out.println(pr); WareHouse outhouse = new WareHouse(); for(String p:links) outhouse.setObj(p, pr/links.length);//對包括的連接PR投票 return outhouse; } public static void main(String[] args) { String[] links = null; if(args[2].equals("A")) links = new String[]{"B","C"};//A頁面包括的連接 else if(args[2].equals("B")) links = new String[]{"C"}; else if(args[2].equals("C")) links = new String[]{"A"}; PageRankWorker mw = new PageRankWorker(args[2],links); mw.waitWorking(args[0],Integer.parseInt(args[1]),"pagerankworker"); } }
import com.fourinone.Contractor; import com.fourinone.WareHouse; import com.fourinone.WorkerLocal; import java.util.Iterator; public class PageRankCtor extends Contractor { public WareHouse giveTask(WareHouse inhouse) { WorkerLocal[] wks = getWaitingWorkers("pagerankworker"); System.out.println("wks.length:"+wks.length); for(int i=0;i<500;i++){//500輪 WareHouse[] hmarr = doTaskBatch(wks, inhouse); WareHouse prwh = new WareHouse(); for(WareHouse result:hmarr){ for(Iterator iter=result.keySet().iterator();iter.hasNext();){ String page = (String)iter.next(); Double pagepr = (Double)result.getObj(page); if(prwh.containsKey(page)) pagepr = pagepr+(Double)prwh.getObj(page); prwh.setObj(page,pagepr); } } inhouse = prwh;//迭代 System.out.println("No."+i+":"+inhouse); } return inhouse; } public static void main(String[] args) { PageRankCtor a = new PageRankCtor(); WareHouse inhouse = new WareHouse(); inhouse.setObj("A",1.00d);//A的pr初始值 inhouse.setObj("B",1.00d);//B的pr初始值 inhouse.setObj("C",1.00d);//C的pr初始值 a.giveTask(inhouse); a.exit(); } }