直接採樣的思想是,經過對均勻分佈採樣,實現對任意分佈的採樣。由於均勻分佈採樣好猜,咱們想要的分佈採樣很差採,那就採起必定的策略經過簡單採起求複雜採樣。
假設y服從某項分佈p(y),其累積分佈函數CDF爲h(y),有樣本z~Uniform(0,1),咱們令 z = h(y),即 y = h(z)^(-1),結果y即爲對分佈p(y)的採樣。python
直接採樣的核心思想在與CDF以及逆變換的應用。在原分佈p(y)中,若是某個區域[a, b]的分佈較多,而後對應在CDF曲線中,[h(a), h(b)]的曲線斜率便較大。那麼,通過逆變換以後,對y軸(z)進行均勻分佈採樣時,分佈多的部分(佔據y軸部分多)對應抽樣獲得的機率便更大,web
實際中,全部採樣的分佈都較爲複雜,CDF的求解和反函數的求解都不太可行。算法
拒絕採樣是由一個易於採樣的分佈出發,爲一個難以直接採樣的分佈產生採樣樣本的通用算法。既然 p(x) 太複雜在程序中無法直接採樣,那麼便一個可抽樣的分佈 q(x) 好比高斯分佈,而後按照必定的方法拒絕某些樣本,達到接近 p(x) 分佈的目的。網絡
設定一個方便抽樣的函數 q(x),以及一個常量 k,使得 p(x) 總在 k*q(x) 的下方。(參考上圖)app
重要性採樣主要是用於求一個複雜分佈p(x)的均值,最後並無獲得樣本。函數
重要性採樣的思想是藉助一個易於採樣的簡單分佈q(x),對這個簡單分佈q(x)所獲得的樣本所有接受。可是以此獲得的樣本確定不知足分佈p(x),所以須要對每個樣本附加相應的重要性權重。在重要性採樣中,以p(x0)/q(x0)的值做爲每一個樣本的權重。這樣,當樣本和分佈p(x)相近時,對應的權重大;與分佈p(x)相差過多時,對應的權重小。這個方法採樣獲得的是帶有重要性權重的服從q(z)分佈的樣本,這個權重乘以樣本以後的結果其實就是服從p(z)分佈的。學習
經過上述公式,咱們能夠知道重要性採樣能夠用於近似複雜分佈的均值。spa
假設有一個例子:E:吃飯、學習、打球;時間T:上午、下午、晚上;天氣W:晴朗、颳風、下雨。樣本(E,T,W)知足必定的機率分佈。現要對其進行採樣,如:打球+下午+晴朗。.net
問題是咱們不知道p(E,T,W),或者說,不知道三件事的聯合分佈。固然,若是知道的話,就沒有必要用吉布斯採樣了。可是,咱們知道三件事的條件分佈。也就是說,p(E|T,W), p(T|E,W), p(W|E,T)。如今要作的就是經過這三個已知的條件分佈,再用Gibbs sampling的方法,獲得聯合分佈。
具體方法:首先隨便初始化一個組合,i.e. 學習+晚上+颳風,而後依條件機率改變其中的一個變量。具體說,假設咱們知道晚上+颳風,咱們給E生成一個變量,好比,學習→吃飯。咱們再依條件機率改下一個變量,根據學習+颳風,把晚上變成上午。相似地,把颳風變成颳風(固然能夠變成相同的變量)。這樣學習+晚上+颳風→吃飯+上午+颳風。一樣的方法,獲得一個序列,每一個單元包含三個變量,也就是一個馬爾可夫鏈。而後跳過初始的必定數量的單元(好比100個),而後隔必定的數量取一個單元(好比隔20個取1個)。這樣sample到的單元,是逼近聯合分佈的。
蓄水池抽樣(Reservoir Sampling ),即可以在o(n)時間內對n個數據進行等機率隨機抽取,例如:從1000個數據中等機率隨機抽取出100個。另外,若是數據集合的量特別大或者還在增加(至關於未知數據集合總量),該算法依然能夠等機率抽樣。
馬氏鏈定理:若是一個非週期馬氏鏈具備轉移機率矩陣P,且它的任何兩個狀態是連通的,那麼\(\lim_{p\to\infty}P_{ij}^n\)存在且與i無關,記\(\lim_{p\to\infty}P_{ij}^n = \pi(j)\),咱們有:
其中\(\pi = [\pi(1), \pi(2), ... , \pi(j), ...], \sum_{i=0}^{\infty}\pi_i = 1, \pi\)稱爲馬氏鏈的平穩分佈。
全部的 MCMC(Markov Chain Monte Carlo) 方法都是以這個定理做爲理論基礎的。
說明:
針對一個新的分佈,如何構造對應的轉移矩陣?
對於一個分佈\(\pi(x)\),根據細緻平穩條件,若是構造的轉移矩陣P知足\(\pi(i)P_{ij} = \pi(j)P_{ji}\),那麼\(\pi(x)\)即爲該馬氏鏈的平穩分佈,所以能夠根據這個條件去構造轉移矩陣。
一般狀況下,初始的轉移矩陣\(P\)通常不知足細緻平穩條件,所以咱們經過引入接受率構造出新的轉移矩陣\(P'\),使其和\(\pi(x)\)知足細緻平穩條件。由此,咱們即可以用任何轉移機率矩陣(均勻分佈、高斯分佈)做爲狀態間的轉移機率。
若是咱們假設狀態之間的轉移機率是相同的,那麼在算法實現時,接收率能夠簡單得用\(\pi(j)/\pi(i)\)表示。
對於給定的機率分佈p(x),咱們但願能有便捷的方式生成它對應的樣本。因爲馬氏鏈能收斂到平穩分佈, 因而一個很的漂亮想法是:若是咱們能構造一個轉移矩陣爲P的馬氏鏈,使得該馬氏鏈的平穩分佈剛好是p(x), 那麼咱們從任何一個初始狀態x0出發沿着馬氏鏈轉移, 獲得一個轉移序列 x0,x1,x2,⋯xn,xn+1⋯,, 若是馬氏鏈在第n步已經收斂了,因而咱們就獲得了 π(x) 的樣本xn,xn+1⋯。
在馬爾科夫狀態鏈中,每個狀態表明一個樣本\(x_n\),即全部變量的賦值狀況。
經過分析MCMC源碼,能夠知道:假設狀態間的轉移機率相同,那麼下一個樣本的採樣會依賴於上一個樣本。假設上一個樣本所對應的原始分佈機率\(\pi(x)\)很小,那麼下一個樣本的接受率很大機率爲1;反之若是上一個樣本的原始分佈機率\(\pi(x)\)很大,那麼下一個樣本會有挺大機率被拒絕。這樣的機制保證了生成的樣本服從分佈\(\pi(x)\)。
從上述分析能夠看出,假如初始狀態的樣本對應的分佈機率很小,那麼在算法剛開始運行時所產生的樣本(即便是分佈機率很小的樣本)很大可能都會被接收,從而使得算法剛開始運行時採樣的樣本不知足原始分佈\(\pi(x)\)。只要算法採樣到分佈機率大的樣本(此時即爲收斂!),那麼以後所採樣獲得的樣本就會基本服從原始分佈。固然,從初始狀態遍歷到分佈機率大的狀態時須要運行一段時間,這段過程即爲收斂的過程。MCMC算法在收斂以後,保證了在分佈機率\(\pi(x)\)大的地方產生更多的樣本,在分佈機率\(\pi(x)\)小的地方產生較少的樣本。
一個馬爾可夫鏈須要通過屢次的狀態轉移過程採用達到一個穩定狀態,這時候採樣才比較接近真實的分佈。此過程稱爲burn in。通常可經過丟棄前面的N個採樣結果來達到burn in。
MCMC的收斂是什麼意思?這個過程當中是什麼參數會更新致使收斂?如何肯定什麼時候收斂?
收斂過程沒有參數會更新,收斂的思想相似於大數定理。應用MCMC算法採樣時,初始的樣本量少,服從的分佈可能和複雜分佈\(\pi(x)\)相差挺遠,但隨着狀態轉移數的增長(轉移矩陣P的應用),根據上述定理的證實,最終的樣本分佈會逐漸服從複雜分佈\(\pi(x)\)。
\(\pi\)是每一個狀態所對應的機率分佈嗎?若是是的話,初始選定一個狀態後,這個\(\pi\)如何設定?或則在MCMC證實過程當中,初始\(\pi\)的機率分佈如何設置?
在MCMC的證實過程當中,\(\pi\)是每一個狀態所對應的機率分佈。證實中所給定的初始\(\pi\)應該只是爲了證實不管初始樣本符合什麼分佈,在通過必定數量的轉移以後,獲得的樣本會服從複雜分佈\(\pi (x)\),在實際代碼實現中,不用對這個\(\pi\)進行設定。
import numpy as np import random import matplotlib.pyplot as plt import pandas as pdf
def f(x): if 0 <= x and x <= 0.25: y = 8 * x elif 0.25 < x and x <= 1: y = (1 - x) * 8/3 else: y = 0 return y
def g(x): if 0 <= x and x <= 1: y = 1 else: y = 0 return y
def plot(fun): X = np.arange(0, 1.0, 0.01) Y = [] for x in X: Y.append(fun(x)) plt.plot(X, Y) plt.xlabel("x") plt.ylabel("y") plt.show()
plot(f) plot(g)
def rejection_sampling(N=10000): M = 3 cnt = 0 samples = {} while cnt < N: x = random.random() acc_rate = f(x) / (M * g(x)) u = random.random() if acc_rate >= u: if samples.get(x) == None: samples[x] = 1 else: samples[x] = samples[x] + 1 cnt = cnt + 1 return samples
s = rejection_sampling(100000)
X = [] Y = [] for k, v in s.items(): X.append(k) Y.append(v)
plt.hist(X, bins=100, edgecolor='None')
Metropolis-Hastings Algorithm
PI = 3.1415926 def get_p(x): # 模擬pi函數 return 1/(2*PI)*np.exp(- x[0]**2 - x[1]**2) def get_tilde_p(x): # 模擬不知道怎麼計算Z的PI,20這個值對於外部採樣算法來講是未知的,對外只暴露這個函數結果 return get_p(x)
def domain_random(): #計算定義域一個隨機值 return np.random.random()*3.8-1.9
def metropolis(x): new_x = (domain_random(),domain_random()) #新狀態 #計算接收機率 acc = min(1,get_tilde_p((new_x[0],new_x[1]))/get_tilde_p((x[0],x[1]))) #使用一個隨機數判斷是否接受 u = np.random.random() if u<acc: return new_x return x
def testMetropolis(counts = 100,drawPath = False): plt.figure() #主要邏輯 x = (domain_random(),domain_random()) #x0 xs = [x] #採樣狀態序列 for i in range(counts): xs.append(x) x = metropolis(x) #採樣並判斷是否接受 #在各個狀態之間繪製跳轉的線條幫助可視化 X1 = [x[0] for x in xs] X2 = [x[1] for x in xs] if drawPath: plt.plot(X1, X2, 'k-',linewidth=0.5) ##繪製採樣的點 plt.scatter(X1, X2, c = 'g',marker='.') plt.show()
testMetropolis(5000)
def metropolis(x): new_x = domain_random() #計算接收機率 acc = min(1,f(new_x)/f(x)) #使用一個隨機數判斷是否接受 u = np.random.random() if u<acc: return new_x return x
def testMetropolis(counts = 100,drawPath = False): plt.figure() #主要邏輯 x = domain_random() xs = [x] #採樣狀態序列 for i in range(counts): xs.append(x) x = metropolis(x) #採樣並判斷是否接受 #在各個狀態之間繪製跳轉的線條幫助可視化 plt.hist(xs, bins=100, edgecolor='None') # plt.plot(xs) plt.show()
testMetropolis(100000)
def partialSampler(x,dim): xes = [] for t in range(10): #隨機選擇10個點 xes.append(domain_random()) tilde_ps = [] for t in range(10): #計算這10個點的未歸一化的機率密度值 tmpx = x[:] tmpx[dim] = xes[t] tilde_ps.append(get_tilde_p(tmpx)) #在這10個點上進行歸一化操做,而後按照機率進行選擇。 norm_tilde_ps = np.asarray(tilde_ps)/sum(tilde_ps) u = np.random.random() sums = 0.0 for t in range(10): sums += norm_tilde_ps[t] if sums>=u: return xes[t]
def gibbs(x): rst = np.asarray(x)[:] path = [(x[0],x[1])] for dim in range(2): #維度輪詢,這裏加入隨機也是能夠的。 new_value = partialSampler(rst,dim) rst[dim] = new_value path.append([rst[0],rst[1]]) #這裏最終只畫出一輪輪詢以後的點,但會把路徑都畫出來 return rst,path
def testGibbs(counts = 100,drawPath = False): plt.figure() x = (domain_random(),domain_random()) xs = [x] paths = [x] for i in range(counts): xs.append([x[0],x[1]]) x,path = gibbs(x) paths.extend(path) #存儲路徑 p1 = [x[0] for x in paths] p2 = [x[1] for x in paths] xs1 = [x[0] for x in xs] xs2 = [x[1] for x in xs] if drawPath: plt.plot(p1, p2, 'k-',linewidth=0.5) ##繪製採樣的點 plt.scatter(xs1, xs2, c = 'g',marker='.') plt.show()
testGibbs(5000)