高併發通常指的是,同一時刻有許多請求,PHP會開啓多個進程來響應對應的請求。html
大型的網站每每遇到的問題就是高併發事件,好比說秒殺、搶購等電商一類,那麼網站如何處理高併發問題呢?應該從哪幾個方面着手考慮?今天就與你們探討下,網站如何處理高併發。nginx
我我的認爲的一些解決方案:redis
1. 負載均衡,主要是以 nginx 代理服務器進行請求轉發。原理:啓動 nginx 有一個master主進程啓動,而後一個請求(request)來了會開啓一個子進程(worker),在正常的負載配置,worker進行請求平均轉發到A、B、C業務服務器去處理業務,而後由業務服務器進行結果返回給nginx服務器,這裏須要注意的是 worker 進行請求轉發後,會中止worker而釋放,存在listen程序等待業務服務器異步回調而後響應給客戶端。數據庫
2.HTML界面靜態化,請求網站消耗最小、相應最快的就是靜態化的文件,例如社區類能夠將帖子、文章一類靜態化,這樣能夠減小數據庫的請求,加載速度也會加快。緩存
3.能夠設置圖片服務器分離,無論是什麼服務器圖片都是最消耗資源的,能夠設置獨立的圖片服務器,這樣也能夠避免由於圖片問題而致使系統崩潰問題。服務器
4.在設計數據庫的時候,能夠再架構、擴張性上考慮,根據業務須要將數據庫進行分離,例如不一樣的模塊能夠對應不一樣的數據庫,這樣數據庫就具備很好的擴展性。網絡
5.網站開發可以使用緩存,各類語言都有本身的緩存模塊,好比說PHP有Cache模塊,Java的更多,另外還能夠考慮擴展redis、memcache等,對數據庫進行緩存和共享,加快響應速度。多線程
6.使用CDN加速技術,意思就是讓用戶能夠就近的獲取到所需的內容,提升用戶訪問網站的響應速度,根據用戶的請求地點和網絡等狀況,能夠將用戶的請求指定到離用戶最近的服務器上,這也是個比較經常使用的方式。架構
public function lock($key,$expire=5,$type='ex') { if($type != 'ex' && $type != 'px') { throw new \Exception("redis lock type must be nx or ex",1); } $this->random = rand(1,4294967295); return self::$_redis->set($key, $this->random, ['nx',$type=>$expire]); // 獲取鎖 }
刪除鎖也須要注意,先對比隨機數是否一致,若是一致,再刪除。加上watch命令,若是在刪除過程當中,發現key被修改,則刪除失敗。避免在A進程獲取key值以後,刪除key以前鎖過時,而這個時候B進程拿到鎖(修改key值),則A刪除失敗。併發
public function unlock($key) { self::$_redis->watch($key); // 監聽key if(self::$_redis->get($key) == $this->random) { // 若是是該對象的記錄,則刪除 self::$_redis->multi()->del($key)->exec(); return true; } else { self::$_redis->unwatch(); return true; } }
這是一個經典問題,請看代碼:
//redis中的某個鍵自增 $val = $this->redis->get($key); $val ++; $this->redis->set($val);
這段代碼邏輯沒有問題,就是先讀取數據,再修改數據,在寫回修改,這裏是但願每次訪問都遞增變量$val的值,但在併發狀況下,存在狀況是兩個進程都讀取到了同樣的初始值,而後都加1,最後寫回Redis,這種狀況就會統計數據比實際的少。這個問題應該有許多人遇到過,思考過怎麼解決這類問題。這裏給出一個統一的解決方案,就是儘可能保證操做的原子性,好比能夠用redis的incr命令來實現自增(能夠認爲redis的命令是原子的)。
由上面的問題再進一步,來探討一個你們經常使用的,爲一個操做進行加鎖。
問題場景以下:有一個商品,每一個用戶均可以去修改商品信息。假設用戶id分別爲6和8的用戶對id爲123的商品進行操做。
$key = '123'; $val = $this->redis->get($key); if(!$val){ $this->redis->set($key,'123'); $this->redis->expire($key,'4'); /**此處修改商品信息操做 ****** **/ $this->redis->del($key); }else{ echo '錯誤提示'; }
上面這個錯誤示例,
錯誤點1:set和expire是分開寫的,若是說程序執行中再執行了set()後出現崩潰,則這個就變成了永久鎖(雖然這是個小几率事件)。
錯誤點2:這個商品中設置的key是商品id,val也是商品id,不少人認爲只有一個key就能夠了,val是什麼無所謂。這就缺乏了鎖的標識,沒法判斷這個鎖的擁有者是誰,從而會帶來一系列影響以下。
針對錯誤1和錯誤2的第1點,咱們只須要去除read & write模式就能夠解決,解決方案爲
//同時設置val和過時時間,並使用setnx $status = $this->redis->setnx($key,$val,$expireTime); if($status){ /**此處修改商品信息操做 ****** **/ $this->redis->del($key); }else{ echo '錯誤提示'; }
setnx,能夠在設置時檢查是否存在鎖不存在則設置並返回1,若是存在不覆蓋並返回0。
針對錯誤2第2點,咱們須要爲每一個進程設置一個獨立的本身能夠識別的val,若是一個用戶只能開一個進程,這個val能夠爲用戶id,若是一個用戶能夠設置多個進程,那麼必須按照實際車狀況採用其餘方式來區分,這裏咱們以用戶id爲例,而且在刪除的時候只能刪除本身的鎖。那麼這裏問題又出現了,若是咱們寫成這樣:
//同時設置val和過時時間,並使用setnx $userId = 2; $status = $this->redis->setnx($key,$userId,$expireTime); if($status){ /**此處修改商品信息操做 ****** **/ if($this->redis->get($key) == $userId){ $this->redis->del($key); } }else{ echo '錯誤提示'; }
這種狀況看似沒有什麼問題,其實否則,你們注意我再設置所得時候,設置了一個過時時間,假如這個時間設置的是4秒,那麼若是進程A執行到刪除前一刻一不當心超過了4秒,那麼這個鎖就自動消失了。而另外一個進程B查到沒有鎖,就加了一把本身的鎖,此時進程A執行刪除,就把B的鎖給刪除了(極小機率事件)。
這裏解決方案有兩種
//同時設置val和過時時間,並使用setnx $userId = 2; $status = $this->redis->setnx($key,$userId,$expireTime); if($status){ /**此處修改商品信息操做 ****** **/ //由於寫這個博客的機器沒有裝redis,因此沒有驗證這個語法對不對。請你們見諒 $script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; $result = $this->redis->eval(script,array($key,$val),1); if ($result) { return true; } }else{ echo '錯誤提示'; }
這裏就把兩個操做變成了一個原子操做。解決的加鎖和解鎖可能出現的問題。
咱們來講一些題外話拓展:在進程有可能出現衝突的地方,通常咱們叫作臨界區(操做系統中也有這個概念,是經過另外一種叫作PV信號量的方式來解決的,其實能夠理解爲組織等待進程隊列,P操做不能獲取到資源使用權的則進入等待隊列,等待V操做釋放資源後,檢查是否有等待隊列,進行進程釋放。固然PV操做也是原子性的。因此說解決類似問題的辦法也有必定的類似性)。
本文版權歸博客園和做者本人吳雙共同全部 。轉載爬蟲請註明地址,博客園蝸牛 http://www.cnblogs.com/tdws/p/5712835.html
蝸牛Redis系列文章目錄http://www.cnblogs.com/tdws/tag/NoSql/
Redis Cluster http://www.cnblogs.com/tdws/p/7710545.html
其實說多線程修改數據也不合適,畢竟redis服務端是單線程的,全部命令串行執行,只是在客戶端併發發送命令的時候,致使串行的命令一些排列問題和網絡時間差等形成數據不一致。本文雖然是數字的加減,可是爲了說明鎖的狀況,故意不是用原子命令incr。也並不是分佈式鎖的正確實現,沒有考慮一些重入性等,稍後會整理一篇分佈式鎖的實踐。
Redis分佈式鎖 http://www.cnblogs.com/tdws/p/5808528.html
ZK+curator 分佈式鎖 http://www.cnblogs.com/tdws/p/5874686.html
先配上一個簡易的RedisHelper,一個set值,一個get值,一個設置併發鎖,以便在我後面的操做中,你能清楚我究竟作了什麼。
public class RedisHelper { public RedisClient client = new RedisClient("127.0.0.1", 6379); public void Set<T>(string key, T val) { client.Set(key, val); } public T Get<T>(string key) { var result = client.Get<T>(key); return result; } public IDisposable Acquire(string key) { return client.AcquireLock(key); } }
下面看一下併發代碼,我只new了兩個Thread。兩個線程同時想訪問同一個key,分別訪問五萬次,在併發條件下,咱們很難保證數據的準確性,請比較輸出結果。
static void Main(string[] args) { RedisHelper rds = new RedisHelper(); rds.Set<int>("mykey1", 0); Thread myThread1 = new Thread(AddVal); Thread myThread2 = new Thread(AddVal); myThread1.Start(); myThread2.Start(); Console.WriteLine("等待兩個線程結束"); Console.ReadKey(); } public static void AddVal() { RedisHelper rds = new RedisHelper(); for (int i = 0; i < 50000; i++) { int result = rds.Get<int>("mykey1"); rds.Set<int>("mykey1", result + 1); } Console.WriteLine("線程結束,輸出" + rds.Get<int>("mykey1")); }
是的,和咱們單線程,跑兩個50000,會輸出100000。如今是兩個併發線程同時跑在因爲併發形成的數據結果每每不是咱們想要的。那麼如何解決這個問題呢,Redis已經爲咱們準備好了!
你能夠看到我RedisHelper中有個方法是 public IDisposable Acquire(string key)。 也能夠看到他返回的是IDisposable,證實咱們須要手動釋放資源。方法內部的 AcquireLock正是關鍵之處,它像redis中索取一把鎖頭,被鎖住的資源,只能被單個線程訪問,不會被兩個線程同時get或者set,這兩個線程必定是交替着進行的,固然這裏的交替並非指你一次我一次,也多是你屢次,我一次,下面看代碼。
static void Main(string[] args) { RedisHelper rds = new RedisHelper(); rds.Set<int>("mykey1", 0); Thread myThread1 = new Thread(AddVal); Thread myThread2 = new Thread(AddVal); myThread1.Start(); myThread2.Start(); Console.WriteLine("等待兩個線程結束"); Console.ReadKey(); } public static void AddVal() { RedisHelper rds = new RedisHelper(); for (int i = 0; i < 50000; i++) { using (rds.Acquire("lock")) { int result = rds.Get<int>("mykey1"); rds.Set<int>("mykey1", result + 1); } } Console.WriteLine("線程結束,輸出" + rds.Get<int>("mykey1")); }
能夠看到我使用了using,調用個人Acquire方法獲取鎖。
輸出結果最後是100000,正是咱們要的正確結果。前面的8W+是由於兩個線程之一先執行結束了。
還有,在正式使用的過程當中,建議給咱們的鎖,使用後刪除掉,並加上一個過時時間,使用expire。
以避免程序執行期間意外退出,致使鎖一直存在,從此可能沒法更新或者獲取此被鎖住的數據。
你也能夠嘗試一下不設置expire,在程序剛開始執行時,關閉console,從新運行程序,而且在redis-cli的操做控制檯,get你鎖住的值,將會永遠獲取不到。
全部鏈接此redis實例的機器,同一時刻,只能有一個獲取指定name的鎖.
下面是StackExchange.Redis的寫法
var info = "name-"+Environment.MachineName; //若是5秒不釋放鎖 自動釋放。避免死鎖 if (db.LockTake("name", info, TimeSpan.FromSeconds(5))) { try { } catch (Exception ex) { } finally { db.LockRelease("name", token); } }