Redis是一個開源(BSD許可),內存存儲的數據結構服務器,可用做數據庫,高速緩存和消息隊列代理。Redis 是徹底開源免費的,遵照BSD協議,是一個高性能的key-value數據庫。它支持字符串、哈希表、列表、集合、有序集合,位圖,hyperloglogs等數據類型。內置複製、Lua腳本、LRU收回、事務以及不一樣級別磁盤持久化功能,同時經過Redis Sentinel提供高可用,經過Redis Cluster提供自動分區。html
REmote DIctionary Server(Redis) 是一個由Salvatore Sanfilippo寫的key-value存儲系統。前端
Redis是一個開源的使用ANSI C語言編寫、遵照BSD協議、支持網絡、可基於內存亦可持久化的日誌型、Key-Value數據庫,並提供多種語言的API。python
它一般被稱爲數據結構服務器,由於值(value)能夠是 字符串(String), 哈希(Map), 列表(list), 集合(sets) 和 有序集合(sorted sets)等類型。git
Redis教程:http://www.redis.net.cn/tutorial/3501.html程序員
Redis命令:github
http://www.redis.net.cn/order/redis
http://doc.redisfans.com/算法
Redis 與其餘 key - value 緩存產品有如下三個特色:mongodb
您能夠對這些類型進行原子操做,如附加到字符串;遞增哈希值;列表進棧;計算交集,差集和並集;在有序集合中獲取最大值。數據庫
爲了實現卓越的性能,Redis的數據存儲在內存中。你能夠不按期導出數據到磁盤,或把操做追加到日誌。
Redis主從複製,首次採用快速的非阻塞同步,網絡中斷等異常時會自動重連。
其餘特性有事務,Pub/Sub, Lua腳本,keys生存期,配置Redis爲cache。
Redis用ANSI C書寫,能夠無外部依賴地在和多數POSIX系統的,如Linux, * BSD ,Mac OS X運行。測試和開發多基於Linux和OSX,建議使用Linux。不支持windows。
Redis的外層由鍵、值映射的字典構成。與其餘非關係型數據庫主要不一樣在於:Redis中值的類型不只限於字符串,還支持以下抽象數據類型:
值的類型決定了值自己支持的操做。Redis支持高層的服務器原子操做,好比交集、並集、差集。
Redis的一般內存中的保存整個數據集。2.4以上版本可配置使用他們虛擬內存可是很快不推薦使用。目前經過兩種方式實現持久化:
默認狀況下,Redis的每2秒同步數據到磁盤,若是整個系統異常也知會丟失幾秒的數據。
Redis支持主從同步。數據能夠從主服務器向任意數量的從服務器上同步,從服務器可做爲其餘從服務器的主服務器。這使得Redis可執行單層樹複製。從盤能夠有意無心的數據不一致。因爲徹底實現了發佈/訂閱機制,使得從數據庫在任何地方同步樹根。複製用於讀(非寫)擴展和數據冗餘。
當數據不須要持久化時,Redis基於內存,寫與讀操做速度沒有明顯差異。Redis以單一的過程和單線程執行,單個的Redis實例不能執行存儲過程之類的併發操做。
Redis的項目有一個集羣規範,但集羣功能目前處於測試階段。據Redis的創造者聖菲利波,Redis的集羣的第一個測試版計劃在在2013年年末,將支持key空間自動分區和熱模式碎片重整,但只支持單key操做。未來Redis集羣會支持多達1000個節點,支持心跳的容錯和故障檢測,增量版本控制以防止衝突,slave選舉和推薦爲master,並全部羣集節點之間的發佈和訂閱。
下載地址:http://www.redis.net.cn/download/,下載最新文檔版本。
本教程使用的最新文檔版本爲 2.8.17,下載並安裝:
make完後 redis-2.8.17目錄下會出現編譯後的redis服務程序redis-server,還有用於測試的客戶端程序redis-cli
下面啓動redis服務.
注意這種方式啓動redis 使用的是默認配置。也能夠經過啓動參數告訴redis使用指定配置文件使用下面命令啓動。
redis.conf是一個默認的配置文件。咱們能夠根據須要使用本身的配置文件。
啓動redis服務進程後,就可使用測試客戶端程序redis-cli和redis服務交互了。 好比:
在 Ubuntu 系統安裝 Redi 可使用如下命令:
以上命令將打開如下終端:
127.0.0.1 是本機 IP ,6379 是 redis 服務端口。如今咱們輸入 PING 命令。
以上說明咱們已經成功安裝了redis。
Redis 的配置文件位於 Redis 安裝目錄下,文件名爲 redis.conf。
你能夠經過 CONFIG 命令查看或設置配置項。
Redis CONFIG 命令格式以下:
使用 * 號獲取全部配置項:
1、Redis安裝和基本使用
1
2
3
4
|
wget http:
/
/
download.redis.io
/
releases
/
redis
-
3.0
.
6.tar
.gz
tar xzf redis
-
3.0
.
6.tar
.gz
cd redis
-
3.0
.
6
make
|
啓動服務端
1
|
src
/
redis
-
server
|
啓動客戶端
1
2
3
4
5
|
src
/
redis
-
cli
redis>
set
foo bar
OK
redis> get foo
"bar"
|
2、Python操做Redis
1
2
3
4
5
6
7
|
sudo pip install redis
or
sudo easy_install redis
or
源碼安裝
詳見:https:
/
/
github.com
/
WoLpH
/
redis
-
py
|
API使用
redis-py 的API的使用能夠分類爲:
Redis支持五種數據類型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。
string是redis最基本的類型,你能夠理解成與Memcached如出一轍的類型,一個key對應一個value。
string類型是二進制安全的。意思是redis的string能夠包含任何數據。好比jpg圖片或者序列化的對象 。
#在Redis中設置值,默認不存在則建立,存在則修改 r.set('name', 'zhangsan') '''參數: set(name, value, ex=None, px=None, nx=False, xx=False) ex,過時時間(秒) px,過時時間(毫秒) nx,若是設置爲True,則只有name不存在時,當前set操做才執行,同setnx(name, value) xx,若是設置爲True,則只有name存在時,當前set操做才執行'''
setex(name, value, time)
#設置過時時間(秒) psetex(name, time_ms, value) #設置過時時間(豪秒)
string類型是Redis最基本的數據類型,一個鍵最大能存儲512MB。
在以上實例中咱們使用了 Redis 的 SET 和 GET 命令。鍵爲 name,對應的值爲redis.net.cn。
注意:一個鍵最大能存儲512MB。
mset()
#批量設置值 r.mset(name1='zhangsan', name2='lisi') #或 r.mget({"name1":'zhangsan', "name2":'lisi'})
get(name)
獲取值
mget(keys, *args)
#批量獲取 print(r.mget("name1","name2")) #或 li=["name1","name2"] print(r.mget(li))
getset(name, value)
#設置新值,打印原值 print(r.getset("name1","wangwu")) #輸出:zhangsan print(r.get("name1")) #輸出:wangwu
getrange(key, start, end)
#根據字節獲取子序列 r.set("name","zhangsan") print(r.getrange("name",0,3))#輸出:zhan
setrange(name, offset, value)
#修改字符串內容,從指定字符串索引開始向後替換,若是新值太長時,則向後添加 r.set("name","zhangsan") r.setrange("name",1,"z") print(r.get("name")) #輸出:zzangsan r.setrange("name",6,"zzzzzzz") print(r.get("name")) #輸出:zzangszzzzzzz
setbit(name, offset, value)
#對二進制表示位進行操做 ''' name:redis的name offset,位的索引(將值對應的ASCII碼變換成二進制後再進行索引) value,值只能是 1 或 0 ''' str="345" r.set("name",str) for i in str: print(i,ord(i),bin(ord(i)))#輸出 值、ASCII碼中對應的值、對應值轉換的二進制 ''' 輸出: 3 51 0b110011 4 52 0b110100 5 53 0b110101''' r.setbit("name",6,0)#把第7位改成0,也就是3對應的變成了0b110001 print(r.get("name"))#輸出:145
getbit(name, offset)
#獲取name對應值的二進制中某位的值(0或1) r.set("name","3") # 對應的二進制0b110011 print(r.getbit("name",5)) #輸出:0 print(r.getbit("name",6)) #輸出:1
bitcount(key, start=None, end=None)
#獲取對應二進制中1的個數 r.set("name","345")#0b110011 0b110100 0b110101 print(r.bitcount("name",start=0,end=1)) #輸出:7 ''' key:Redis的name start:字節起始位置 end:字節結束位置'''
strlen(name)
#返回name對應值的字節長度(一個漢字3個字節) r.set("name","zhangsan") print(r.strlen("name")) #輸出:8
incr(self, name, amount=1)
#自增mount對應的值,當mount不存在時,則建立mount=amount,不然,則自增,amount爲自增數(整數) print(r.incr("mount",amount=2))#輸出:2 print(r.incr("mount"))#輸出:3 print(r.incr("mount",amount=3))#輸出:6 print(r.incr("mount",amount=6))#輸出:12 print(r.get("mount")) #輸出:12
incrbyfloat(self, name, amount=1.0)
#相似 incr() 自增,amount爲自增數(浮點數)
decr(self, name, amount=1)
#自減name對應的值,當name不存在時,則建立name=amount,不然,則自減,amount爲自增數(整數)
append(name, value)
#在name對應的值後面追加內容 r.set("name","zhangsan") print(r.get("name")) #輸出:'zhangsan r.append("name","lisi") print(r.get("name")) #輸出:zhangsanlisi
Redis hash 是一個鍵值對集合。
Redis hash是一個string類型的field和value的映射表,hash特別適合用於存儲對象。
以上實例中 hash 數據類型存儲了包含用戶腳本信息的用戶對象。 實例中咱們使用了 Redis HMSET, HEGTALL 命令,user:1 爲鍵值。
每一個 hash 能夠存儲 232 - 1 鍵值對(40多億)。
redis中的Hash 在內存中相似於一個name對應一個dic來存儲
hset(name, key, value)
#name對應的hash中設置一個鍵值對(不存在,則建立,不然,修改) r.hset("dic_name","a1","aa")
hget(name,key)
r.hset("dic_name","a1","aa") #在name對應的hash中根據key獲取value print(r.hget("dic_name","a1"))#輸出:aa
hgetall(name)
#獲取name對應hash的全部鍵值 print(r.hgetall("dic_name"))
hmset(name, mapping)
#在name對應的hash中批量設置鍵值對,mapping:字典 dic={"a1":"aa","b1":"bb"} r.hmset("dic_name",dic) print(r.hget("dic_name","b1"))#輸出:bb
hmget(name, keys, *args)
# 在name對應的hash中獲取多個key的值 li=["a1","b1"] print(r.hmget("dic_name",li)) print(r.hmget("dic_name","a1","b1"))
hlen(name)、hkeys(name)、hvals(name)
dic={"a1":"aa","b1":"bb"} r.hmset("dic_name",dic) #hlen(name) 獲取hash中鍵值對的個數 print(r.hlen("dic_name")) #hkeys(name) 獲取hash中全部的key的值 print(r.hkeys("dic_name")) #hvals(name) 獲取hash中全部的value的值 print(r.hvals("dic_name"))
hexists(name, key)
#檢查name對應的hash是否存在當前傳入的key print(r.hexists("dic_name","a1"))#輸出:True
hdel(name,*keys)
#刪除指定name對應的key所在的鍵值對 r.hdel("dic_name","a1")
hincrby(name, key, amount=1)
#自增hash中key對應的值,不存在則建立key=amount(amount爲整數) print(r.hincrby("demo","a",amount=2))
hincrbyfloat(name, key, amount=1.0)
#自增hash中key對應的值,不存在則建立key=amount(amount爲浮點數)
hscan(name, cursor=0, match=None, count=None)
hscan_iter(name, match=None, count=None)
Redis 列表是簡單的字符串列表,按照插入順序排序。你能夠添加一個元素導列表的頭部(左邊)或者尾部(右邊)。
列表最多可存儲 232 - 1 元素 (4294967295, 每一個列表可存儲40多億)。
edis中的List在在內存中按照一個name對應一個List來存儲
lpush(name,values)
# 在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊 r.lpush("list_name",2) r.lpush("list_name",3,4,5)#保存在列表中的順序爲5,4,3,2
rpush(name,values)
#同lpush,但每一個新的元素都添加到列表的最右邊
lpushx(name,value)
#在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊
rpushx(name,value)
#在name對應的list中添加元素,只有name已經存在時,值添加到列表的最右邊
llen(name)
# name對應的list元素的個數 print(r.llen("list_name"))
linsert(name, where, refvalue, value))
# 在name對應的列表的某一個值前或後插入一個新值 r.linsert("list_name","BEFORE","2","SS")#在列表內找到第一個元素2,在它前面插入SS '''參數: name: redis的name where: BEFORE(前)或AFTER(後) refvalue: 列表內的值 value: 要插入的數據'''
r.lset(name, index, value)
#對list中的某一個索引位置從新賦值 r.lset("list_name",0,"bbb")
r.lrem(name, value, num)
#刪除name對應的list中的指定值 r.lrem("list_name","SS",num=0) ''' 參數: name: redis的name value: 要刪除的值 num: num=0 刪除列表中全部的指定值; num=2 從前到後,刪除2個; num=-2 從後向前,刪除2個'''
lpop(name)
#移除列表的左側第一個元素,返回值則是第一個元素 print(r.lpop("list_name"))
lindex(name, index)
#根據索引獲取列表內元素 print(r.lindex("list_name",1))
lrange(name, start, end)
#分片獲取元素 print(r.lrange("list_name",0,-1))
ltrim(name, start, end)
#移除列表內沒有在該索引以內的值 r.ltrim("list_name",0,2)
rpoplpush(src, dst)
# 從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊 #src 要取數據的列表 #dst 要添加數據的列表
brpoplpush(src, dst, timeout=0)
#同rpoplpush,多了個timeout, timeout:取數據的列表沒元素後的阻塞時間,0爲一直阻塞 r.brpoplpush("list_name","list_name1",timeout=0)
blpop(keys, timeout)
#將多個列表排列,按照從左到右去移除各個列表內的元素 r.lpush("list_name",3,4,5) r.lpush("list_name1",3,4,5) while True: print(r.blpop(["list_name","list_name1"],timeout=0)) print(r.lrange("list_name",0,-1),r.lrange("list_name1",0,-1)) '''keys: redis的name的集合 timeout: 超時時間,獲取完全部列表的元素以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞'''
r.brpop(keys, timeout)
#同blpop,將多個列表排列,按照從右像左去移除各個列表內的元素
Redis的Set是string類型的無序集合。
集合是經過哈希表實現的,因此添加,刪除,查找的複雜度都是O(1)。
添加一個string元素到,key對應的set集合中,成功返回1,若是元素以及在集合中返回0,key對應的set不存在返回錯誤。
注意:以上實例中 rabitmq 添加了兩次,但根據集合內元素的惟一性,第二次插入的元素將被忽略。
集合中最大的成員數爲 232 - 1 (4294967295, 每一個集合可存儲40多億個成員)。
Set集合就是不容許重複的列表
sadd(name,values)
#給name對應的集合中添加元素 r.sadd("set_name","aa") r.sadd("set_name","aa","bb")
smembers(name)
#獲取name對應的集合的全部成員
scard(name)
#獲取name對應的集合中的元素個數 r.scard("set_name")
sdiff(keys, *args)
#在第一個name對應的集合中且不在其餘name對應的集合的元素集合 r.sadd("set_name","aa","bb") r.sadd("set_name1","bb","cc") r.sadd("set_name2","bb","cc","dd") print(r.sdiff("set_name","set_name1","set_name2"))#輸出:{aa}
sdiffstore(dest, keys, *args)
#至關於把sdiff獲取的值加入到dest對應的集合中
sinter(keys, *args)
# 獲取多個name對應集合的並集 r.sadd("set_name","aa","bb") r.sadd("set_name1","bb","cc") r.sadd("set_name2","bb","cc","dd") print(r.sinter("set_name","set_name1","set_name2"))#輸出:{bb}
sinterstore(dest, keys, *args)
#獲取多個name對應集合的並集,再講其加入到dest對應的集合中
sismember(name, value)
#檢查value是不是name對應的集合內的元素
smove(src, dst, value)
#將某個元素從一個集合中移動到另一個集合
spop(name)
#從集合的右側移除一個元素,並將其返回
srandmember(name, numbers)
# 從name對應的集合中隨機獲取numbers個元素 print(r.srandmember("set_name2",2))
srem(name, values)
#刪除name對應的集合中的某些值 print(r.srem("set_name2","bb","dd"))
sunion(keys, *args)
#獲取多個name對應的集合的並集 r.sunion("set_name","set_name1","set_name2")
sunionstore(dest,keys, *args)
#獲取多個name對應的集合的並集,並將結果保存到dest對應的集合中
有序集合:
在集合的基礎上,爲每元素排序,元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。
zadd(name, *args, **kwargs)
# 在name對應的有序集合中添加元素 r.zadd("zset_name", "a1", 6, "a2", 2,"a3",5) #或 r.zadd('zset_name1', b1=10, b2=5)
zcard(name)
#獲取有序集合內元素的數量
zcount(name, min, max)
#獲取有序集合中分數在[min,max]之間的個數 print(r.zcount("zset_name",1,5))
zincrby(name, value, amount)
#自增有序集合內value對應的分數 r.zincrby("zset_name","a1",amount=2)#自增zset_name對應的有序集合裏a1對應的分數
zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
# 按照索引範圍獲取name對應的有序集合的元素 aa=r.zrange("zset_name",0,1,desc=False,withscores=True,score_cast_func=int) print(aa) '''參數: name redis的name start 有序集合索引發始位置 end 有序集合索引結束位置 desc 排序規則,默認按照分數從小到大排序 withscores 是否獲取元素的分數,默認只獲取元素的值 score_cast_func 對分數進行數據轉換的函數'''
zrevrange(name, start, end, withscores=False, score_cast_func=float)
#同zrange,集合是從大到小排序的
zrank(name, value)、zrevrank(name, value)
#獲取value值在name對應的有序集合中的排行位置(從0開始) print(r.zrank("zset_name", "a2")) print(r.zrevrank("zset_name", "a2"))#從大到小排序
zscore(name, value)
#獲取name對應有序集合中 value 對應的分數 print(r.zscore("zset_name","a1"))
zrem(name, values)
#刪除name對應的有序集合中值是values的成員 r.zrem("zset_name","a1","a2")
zremrangebyrank(name, min, max)
#根據排行範圍刪除
zremrangebyscore(name, min, max)
#根據分數範圍刪除
zinterstore(dest, keys, aggregate=None)
r.zadd("zset_name", "a1", 6, "a2", 2,"a3",5) r.zadd('zset_name1', a1=7,b1=10, b2=5) # 獲取兩個有序集合的交集並放入dest集合,若是遇到相同值不一樣分數,則按照aggregate進行操做 # aggregate的值爲: SUM MIN MAX r.zinterstore("zset_name2",("zset_name1","zset_name"),aggregate="MAX") print(r.zscan("zset_name2"))
zunionstore(dest, keys, aggregate=None)
#獲取兩個有序集合的並集並放入dest集合,其餘同zinterstore,
其餘經常使用操做
delete(*names)
#根據name刪除redis中的任意數據類型
exists(name)
#檢測redis的name是否存在
keys(pattern='*')
#根據* ?等通配符匹配獲取redis的name
expire(name ,time)
# 爲某個name設置超時時間
rename(src, dst)
# 重命名
move(name, db))
# 將redis的某個值移動到指定的db下
randomkey()
#隨機獲取一個redis的name(不刪除)
type(name)
# 獲取name對應值的類型
Redis zset 和 set 同樣也是string類型元素的集合,且不容許重複的成員。
不一樣的是每一個元素都會關聯一個double類型的分數。redis正是經過分數來爲集合中的成員進行從小到大的排序。
zset的成員是惟一的,但分數(score)卻能夠重複。
添加元素到集合,元素在集合中存在則更新對應score
應用場景 - 實時用戶統計 當咱們須要在頁面上顯示當前的在線用戶時,就可使用Redis來完成了。首先得到當前時間(以Unix timestamps方式)除以60,能夠基於這個值建立一個key。而後添加用戶到這個集合中。當超過你設定的最大的超時時間,則將這個集合設爲過時;而當須要查詢當前在線用戶的時候,則將最後N分鐘的集合交集在一塊兒便可。因爲redis鏈接對象是線程安全的,因此能夠直接使用一個全局變量來表示。
python操做redis
鏈接方式
redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類
#!/usr/bin/env python # -*- coding:utf-8 -*- import redis r = redis.Redis(host='192.168.0.110', port=6379,db=0) r.set('name', 'zhangsan') #添加 print (r.get('name')) #獲取
鏈接池
redis-py使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。
#!/usr/bin/env python # -*- coding:utf-8 -*- import redis pool = redis.ConnectionPool(host='192.168.0.110', port=6379) r = redis.Redis(connection_pool=pool) r.set('name', 'zhangsan') #添加 print (r.get('name')) #獲取
管道
redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。
#!/usr/bin/env python # -*- coding:utf-8 -*- import redis pool = redis.ConnectionPool(host='192.168.0.110', port=6379) r = redis.Redis(connection_pool=pool) pipe = r.pipeline(transaction=True) r.set('name', 'zhangsan') r.set('name', 'lisi') pipe.execute()
發佈和訂閱
首先定義一個RedisHelper類,鏈接Redis,定義頻道爲monitor,定義發佈(publish)及訂閱(subscribe)方法。
#!/usr/bin/env python #-*- coding:utf-8 -*- import redis class RedisHelper(object): def __init__(self): self.__conn = redis.Redis(host='192.168.0.110',port=6379)#鏈接Redis self.channel = 'monitor' #定義名稱 def publish(self,msg):#定義發佈方法 self.__conn.publish(self.channel,msg) return True def subscribe(self):#定義訂閱方法 pub = self.__conn.pubsub() pub.subscribe(self.channel) pub.parse_response() return pub
發佈者
#!/usr/bin/env python # -*- coding:utf-8 -*- #發佈 from RedisHelper import RedisHelper obj = RedisHelper() obj.publish('hello')#發佈
訂閱者
#!/usr/bin/env python # -*- coding:utf-8 -*- #訂閱 from RedisHelper import RedisHelper obj = RedisHelper() redis_sub = obj.subscribe()#調用訂閱方法 while True: msg= redis_sub.parse_response() print (msg)
打開瀏覽器,輸入地址,按下回車,打開了頁面。因而一個HTTP
請求(request
)就由客戶端發送到服務器,服務器處理請求,返回響應(response
)內容。
咱們天天都在瀏覽網頁,發送大大小小的請求給服務器。有時候,服務器接到了請求,會發現他也須要給另外的服務器發送請求,或者服務器也須要作另一些事情,因而最初們發送的請求就被阻塞了,也就是要等待服務器完成其餘的事情。
更多的時候,服務器作的額外事情,並不須要客戶端等待,這時候就能夠把這些額外的事情異步去作。從事異步任務的工具備不少。主要原理仍是處理通知消息,針對通知消息一般採起是隊列結構。生產和消費消息進行通訊和業務實現。
上述異步任務的實現,能夠抽象爲生產者消費模型。如同一個餐館,廚師在作飯,吃貨在吃飯。若是廚師作了不少,暫時賣不完,廚師就會休息;若是客戶不少,廚師快馬加鞭的忙碌,客戶則須要慢慢等待。實現生產者和消費者的方式用不少,下面使用Python
標準庫Queue
寫個小例子:
import random import time from Queue import Queue from threading import Thread queue = Queue(10) class Producer(Thread): def run(self): while True: elem = random.randrange(9) queue.put(elem) print "廚師 {} 作了 {} 飯 --- 還剩 {} 飯沒賣完".format(self.name, elem, queue.qsize()) time.sleep(random.random()) class Consumer(Thread): def run(self): while True: elem = queue.get() print "吃貨{} 吃了 {} 飯 --- 還有 {} 飯能夠吃".format(self.name, elem, queue.qsize()) time.sleep(random.random()) def main(): for i in range(3): p = Producer() p.start() for i in range(2): c = Consumer() c.start() if __name__ == '__main__': main()
大概輸出以下:
廚師 Thread-1 作了 1 飯 --- 還剩 1 飯沒賣完 廚師 Thread-2 作了 8 飯 --- 還剩 2 飯沒賣完 廚師 Thread-3 作了 3 飯 --- 還剩 3 飯沒賣完 吃貨Thread-4 吃了 1 飯 --- 還有 2 飯能夠吃 吃貨Thread-5 吃了 8 飯 --- 還有 1 飯能夠吃 吃貨Thread-4 吃了 3 飯 --- 還有 0 飯能夠吃 廚師 Thread-1 作了 0 飯 --- 還剩 1 飯沒賣完 廚師 Thread-2 作了 0 飯 --- 還剩 2 飯沒賣完 廚師 Thread-1 作了 1 飯 --- 還剩 3 飯沒賣完 廚師 Thread-1 作了 1 飯 --- 還剩 4 飯沒賣完 吃貨Thread-4 吃了 0 飯 --- 還有 3 飯能夠吃 廚師 Thread-3 作了 3 飯 --- 還剩 4 飯沒賣完 吃貨Thread-5 吃了 0 飯 --- 還有 3 飯能夠吃 吃貨Thread-5 吃了 1 飯 --- 還有 2 飯能夠吃 廚師 Thread-2 作了 8 飯 --- 還剩 3 飯沒賣完 廚師 Thread-2 作了 8 飯 --- 還剩 4 飯沒賣完
Python內置了一個好用的隊列結構。咱們也能夠是用redis實現相似的操做。並作一個簡單的異步任務。
Redis提供了兩種方式來做消息隊列。一個是使用生產者消費模式
模式,另一個方法就是發佈訂閱者模式
。前者會讓一個或者多個客戶端監聽消息隊列,一旦消息到達,消費者立刻消費,誰先搶到算誰的,若是隊列裏沒有消息,則消費者繼續監聽。後者也是一個或多個客戶端訂閱消息頻道,只要發佈者發佈消息,全部訂閱者都能收到消息,訂閱者都是ping的。
主要使用了redis提供的blpop獲取隊列數據,若是隊列沒有數據則阻塞等待,也就是監聽。
import redis class Task(object): def __init__(self): self.rcon = redis.StrictRedis(host='localhost', db=5) self.queue = 'task:prodcons:queue' def listen_task(self): while True: task = self.rcon.blpop(self.queue, 0)[1] print "Task get", task if __name__ == '__main__': print 'listen task queue' Task().listen_task()
使用redis的pubsub功能,訂閱者訂閱頻道,發佈者發佈消息到頻道了,頻道就是一個消息隊列。
import redis class Task(object): def __init__(self): self.rcon = redis.StrictRedis(host='localhost', db=5) self.ps = self.rcon.pubsub() self.ps.subscribe('task:pubsub:channel') def listen_task(self): for i in self.ps.listen(): if i['type'] == 'message': print "Task get", i['data'] if __name__ == '__main__': print 'listen task channel' Task().listen_task()
咱們分別實現了兩種異步任務的後端服務,直接啓動他們,就能監聽redis隊列或頻道的消息了。簡單的測試以下:
import redis import random import logging from flask import Flask, redirect app = Flask(__name__) rcon = redis.StrictRedis(host='localhost', db=5) prodcons_queue = 'task:prodcons:queue' pubsub_channel = 'task:pubsub:channel' @app.route('/') def index(): html = """ <br> <center><h3>Redis Message Queue</h3> <br> <a href="/prodcons">生產消費者模式</a> <br> <br> <a href="/pubsub">發佈訂閱者模式</a> </center> """ return html @app.route('/prodcons') def prodcons(): elem = random.randrange(10) rcon.lpush(prodcons_queue, elem) logging.info("lpush {} -- {}".format(prodcons_queue, elem)) return redirect('/') @app.route('/pubsub') def pubsub(): ps = rcon.pubsub() ps.subscribe(pubsub_channel) elem = random.randrange(10) rcon.publish(pubsub_channel, elem) return redirect('/') if __name__ == '__main__': app.run(debug=True)
啓動腳本,使用
siege -c10 -r 5 http://127.0.0.1:5000/prodcons siege -c10 -r 5 http://127.0.0.1:5000/pubsub
能夠分別在監聽的腳本輸入中看到異步消息。在異步的任務中,能夠執行一些耗時間的操做,固然目前這些作法並不知道異步的執行結果,若是須要知道異步的執行結果,能夠考慮設計協程任務或者使用一些工具如RQ
或者celery
等。
MemCache是什麼
MemCache是一個自由、源碼開放、高性能、分佈式的分佈式內存對象緩存系統,用於動態Web應用以減輕數據庫的負載。它經過在內存中緩存數據和對象來減小讀取數據庫的次數,從而提升了網站訪問的速度。MemCaChe是一個存儲鍵值對的HashMap,在內存中對任意的數據(好比字符串、對象等)所使用的key-value存儲,數據能夠來自數據庫調用、API調用,或者頁面渲染的結果。MemCache設計理念就是小而強大,它簡單的設計促進了快速部署、易於開發並解決面對大規模的數據緩存的許多難題,而所開放的API使得MemCache能用於Java、C/C++/C#、Perl、Python、PHP、Ruby等大部分流行的程序語言。
MemCache和MemCached的區別:
MemCache的官方網站爲http://memcached.org/
MemCache訪問模型
爲了加深理解,我模仿着原阿里技術專家李智慧老師《大型網站技術架構 核心原理與案例分析》一書MemCache部分,本身畫了一張圖:
特別澄清一個問題,MemCache雖然被稱爲」分佈式緩存」,可是MemCache自己徹底不具有分佈式的功能,MemCache集羣之間不會相互通訊(與之造成對比的,好比JBoss Cache,某臺服務器有緩存數據更新時,會通知集羣中其餘機器更新緩存或清除緩存數據),所謂的」分佈式」,徹底依賴於客戶端程序的實現,就像上面這張圖的流程同樣。
同時基於這張圖,理一下MemCache一次寫緩存的流程:
一、應用程序輸入須要寫緩存的數據
二、API將Key輸入路由算法模塊,路由算法根據Key和MemCache集羣服務器列表獲得一臺服務器編號
三、由服務器編號獲得MemCache及其的ip地址和端口號
四、API調用通訊模塊和指定編號的服務器通訊,將數據寫入該服務器,完成一次分佈式緩存的寫操做
讀緩存和寫緩存同樣,只要使用相同的路由算法和服務器列表,只要應用程序查詢的是相同的Key,MemCache客戶端老是訪問相同的客戶端去讀取數據,只要服務器中還緩存着該數據,就能保證緩存命中。
這種MemCache集羣的方式也是從分區容錯性的方面考慮的,假如Node2宕機了,那麼Node2上面存儲的數據都不可用了,此時因爲集羣中Node0和Node1還存在,下一次請求Node2中存儲的Key值的時候,確定是沒有命中的,這時先從數據庫中拿到要緩存的數據,而後路由算法模塊根據Key值在Node0和Node1中選取一個節點,把對應的數據放進去,這樣下一次就又能夠走緩存了,這種集羣的作法很好,可是缺點是成本比較大。
一致性Hash算法
從上面的圖中,能夠看出一個很重要的問題,就是對服務器集羣的管理,路由算法相當重要,就和負載均衡算法同樣,路由算法決定着究竟該訪問集羣中的哪臺服務器,先看一個簡單的路由算法。
一、餘數Hash
比方說,字符串str對應的HashCode是50、服務器的數目是3,取餘數獲得2,str對應節點Node2,因此路由算法把str路由到Node2服務器上。因爲HashCode隨機性比較強,因此使用餘數Hash路由算法就能夠保證緩存數據在整個MemCache服務器集羣中有比較均衡的分佈。
若是不考慮服務器集羣的伸縮性(什麼是伸縮性,請參見大型網站架構學習筆記),那麼餘數Hash算法幾乎能夠知足絕大多數的緩存路由需求,可是當分佈式緩存集羣須要擴容的時候,就難辦了。
就假設MemCache服務器集羣由3臺變爲4臺吧,更改服務器列表,仍然使用餘數Hash,50對4的餘數是2,對應Node2,可是str原來是存在Node1上的,這就致使了緩存沒有命中。若是這麼說不夠明白,那麼不妨舉個例子,原來有HashCode爲0~19的20個數據,那麼:
如今我擴容到4臺,加粗標紅的表示命中:
若是我擴容到20+的臺數,只有前三個HashCode對應的Key是命中的,也就是15%。固然這只是個簡單例子,現實狀況確定比這個複雜得多,不過足以說明,使用餘數Hash的路由算法,在擴容的時候會形成大量的數據沒法正確命中(其實不只僅是沒法命中,那些大量的沒法命中的數據還在原緩存中在被移除前佔據着內存)。這個結果顯然是沒法接受的,在網站業務中,大部分的業務數據度操做請求上事實上是經過緩存獲取的,只有少許讀操做會訪問數據庫,所以數據庫的負載能力是以有緩存爲前提而設計的。當大部分被緩存了的數據由於服務器擴容而不能正確讀取時,這些數據訪問的壓力就落在了數據庫的身上,這將大大超過數據庫的負載能力,嚴重的可能會致使數據庫宕機。
這個問題有解決方案,解決步驟爲:
(1)在網站訪問量低谷,一般是深夜,技術團隊加班,擴容、重啓服務器
(2)經過模擬請求的方式逐漸預熱緩存,使緩存服務器中的數據從新分佈
二、一致性Hash算法
一致性Hash算法經過一個叫作一致性Hash環的數據結構實現Key到緩存服務器的Hash映射,看一下我本身畫的一張圖:
具體算法過程爲:先構造一個長度爲232的整數環(這個環被稱爲一致性Hash環),根據節點名稱的Hash值(其分佈爲[0, 232-1])將緩存服務器節點放置在這個Hash環上,而後根據須要緩存的數據的Key值計算獲得其Hash值(其分佈也爲[0, 232-1]),而後在Hash環上順時針查找距離這個Key值的Hash值最近的服務器節點,完成Key到服務器的映射查找。
就如同圖上所示,三個Node點分別位於Hash環上的三個位置,而後Key值根據其HashCode,在Hash環上有一個固定位置,位置固定下以後,Key就會順時針去尋找離它最近的一個Node,把數據存儲在這個Node的MemCache服務器中。使用Hash環若是加了一個節點會怎麼樣,看一下:
看到我加了一個Node4節點,隻影響到了一個Key值的數據,原本這個Key值應該是在Node1服務器上的,如今要去Node4了。採用一致性Hash算法,的確也會影響到整個集羣,可是影響的只是加粗的那一段而已,相比餘數Hash算法影響了遠超一半的影響率,這種影響要小得多。更重要的是,集羣中緩存服務器節點越多,增長節點帶來的影響越小,很好理解。換句話說,隨着集羣規模的增大,繼續命中原有緩存數據的機率會愈來愈大,雖然仍然有小部分數據緩存在服務器中不能被讀到,可是這個比例足夠小,即便訪問數據庫,也不會對數據庫形成致命的負載壓力。
至於具體應用,這個長度爲232的一致性Hash環一般使用二叉查找樹實現,至於二叉查找樹,就是算法的問題了,能夠本身去查詢相關資料。
而後咱們來看一下MemCache的原理,MemCache最重要的莫不是內存分配的內容了,MemCache採用的內存分配方式是固定空間分配,仍是本身畫一張圖說明:
這張圖片裏面涉及了slab_class、slab、page、chunk四個概念,它們之間的關係是:
一、MemCache將內存空間分爲一組slab
二、每一個slab下又有若干個page,每一個page默認是1M,若是一個slab佔用100M內存的話,那麼這個slab下應該有100個page
三、每一個page裏面包含一組chunk,chunk是真正存放數據的地方,同一個slab裏面的chunk的大小是固定的
四、有相同大小chunk的slab被組織在一塊兒,稱爲slab_class
MemCache內存分配的方式稱爲allocator,slab的數量是有限的,幾個、十幾個或者幾十個,這個和啓動參數的配置相關。
MemCache中的value過來存放的地方是由value的大小決定的,value老是會被存放到與chunk大小最接近的一個slab中,好比slab[1]的chunk大小爲80字節、slab[2]的chunk大小爲100字節、slab[3]的chunk大小爲128字節(相鄰slab內的chunk基本以1.25爲比例進行增加,MemCache啓動時能夠用-f指定這個比例),那麼過來一個88字節的value,這個value將被放到2號slab中。放slab的時候,首先slab要申請內存,申請內存是以page爲單位的,因此在放入第一個數據的時候,不管大小爲多少,都會有1M大小的page被分配給該slab。申請到page後,slab會將這個page的內存按chunk的大小進行切分,這樣就變成了一個chunk數組,最後從這個chunk數組中選擇一個用於存儲數據。
若是這個slab中沒有chunk能夠分配了怎麼辦,若是MemCache啓動沒有追加-M(禁止LRU,這種狀況下內存不夠會報Out Of Memory錯誤),那麼MemCache會把這個slab中最近最少使用的chunk中的數據清理掉,而後放上最新的數據。針對MemCache的內存分配及回收算法,總結三點:
一、MemCache的內存分配chunk裏面會有內存浪費,88字節的value分配在128字節(緊接着大的用)的chunk中,就損失了30字節,可是這也避免了管理內存碎片的問題
二、MemCache的LRU算法不是針對全局的,是針對slab的
三、應該能夠理解爲何MemCache存放的value大小是限制的,由於一個新數據過來,slab會先以page爲單位申請一塊內存,申請的內存最多就只有1M,因此value大小天然不能大於1M了
簡單安裝:
1.分別把memcached和libevent下載回來,放到 /tmp 目錄下:
# cd /tmp
# wget http://www.danga.com/memcached/dist/memcached-1.2.0.tar.gz
# wget http://www.monkey.org/~provos/libevent-1.2.tar.gz
2.先安裝libevent:
# tar zxvf libevent-1.2.tar.gz
# cd libevent-1.2
# ./configure -prefix=/usr
# make (若是遇到提示gcc 沒有安裝則先安裝gcc)
# make install
3.測試libevent是否安裝成功:
# ls -al /usr/lib | grep libevent
lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent-1.2.so.1 -> libevent-1.2.so.1.0.3
-rwxr-xr-x 1 root root 263546 11?? 12 17:38 libevent-1.2.so.1.0.3
-rw-r-r- 1 root root 454156 11?? 12 17:38 libevent.a
-rwxr-xr-x 1 root root 811 11?? 12 17:38 libevent.la
lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent.so -> libevent-1.2.so.1.0.3
還不錯,都安裝上了。
4.安裝memcached,同時須要安裝中指定libevent的安裝位置:
# cd /tmp
# tar zxvf memcached-1.2.0.tar.gz
# cd memcached-1.2.0
# ./configure -with-libevent=/usr
# make
# make install
若是中間出現報錯,請仔細檢查錯誤信息,按照錯誤信息來配置或者增長相應的庫或者路徑。
安裝完成後會把memcached放到 /usr/local/bin/memcached ,
5.測試是否成功安裝memcached:
# ls -al /usr/local/bin/mem*
-rwxr-xr-x 1 root root 137986 11?? 12 17:39 /usr/local/bin/memcached
-rwxr-xr-x 1 root root 140179 11?? 12 17:39 /usr/local/bin/memcached-debug
啓動memcache服務
啓動Memcached服務:
1.啓動Memcache的服務器端:
# /usr/local/bin/memcached -d -m 8096 -u root -l 192.168.77.105 -p 12000 -c 256 -P /tmp/memcached.pid
-d選項是啓動一個守護進程,
-m是分配給Memcache使用的內存數量,單位是MB,我這裏是8096MB,
-u是運行Memcache的用戶,我這裏是root,
-l是監聽的服務器IP地址,若是有多個地址的話,我這裏指定了服務器的IP地址192.168.77.105,
-p是設置Memcache監聽的端口,我這裏設置了12000,最好是1024以上的端口,
-c選項是最大運行的併發鏈接數,默認是1024,我這裏設置了256,按照你服務器的負載量來設定,
-P是設置保存Memcache的pid文件,我這裏是保存在 /tmp/memcached.pid,
2.若是要結束Memcache進程,執行:
# cat /tmp/memcached.pid 或者 ps -aux | grep memcache (找到對應的進程id號)
# kill 進程id號
也能夠啓動多個守護進程,不過端口不能重複。
memcache 的鏈接
telnet ip port
注意鏈接以前須要再memcache服務端把memcache的防火牆規則加上
-A RH-Firewall-1-INPUT -m state --state NEW -m tcp -p tcp --dport 3306 -j ACCEPT
從新加載防火牆規則
service iptables restart
OK ,如今應該就能夠連上memcache了
在客戶端輸入stats 查看memcache的狀態信息
pid memcache服務器的進程ID
uptime 服務器已經運行的秒數
time 服務器當前的unix時間戳
version memcache版本
pointer_size 當前操做系統的指針大小(32位系統通常是32bit)
rusage_user 進程的累計用戶時間
rusage_system 進程的累計系統時間
curr_items 服務器當前存儲的items數量
total_items 從服務器啓動之後存儲的items總數量
bytes 當前服務器存儲items佔用的字節數
curr_connections 當前打開着的鏈接數
total_connections 從服務器啓動之後曾經打開過的鏈接數
connection_structures 服務器分配的鏈接構造數
cmd_get get命令 (獲取)總請求次數
cmd_set set命令 (保存)總請求次數
get_hits 總命中次數
get_misses 總未命中次數
evictions 爲獲取空閒內存而刪除的items數(分配給memcache的空間用滿後須要刪除舊的items來獲得空間分配給新的items)
bytes_read 讀取字節數(請求字節數)
bytes_written 總髮送字節數(結果字節數)
limit_maxbytes 分配給memcache的內存大小(字節)
threads 當前線程數