緩存,隊列(Redis,RabbitMQ)

1、Redis

一、簡介

Redis 與其餘 key - value 緩存產品有如下三個特色:java

  • Redis支持數據的持久化,能夠將內存中的數據保存在磁盤中,重啓的時候能夠再次加載進行使用。
  • Redis不只僅支持簡單的key-value類型的數據,同時還提供list,set,zset,hash等數據結構的存儲。
  • Redis支持數據的備份,即master-slave模式的數據備份。

二、優點

  • 性能極高 – Redis能讀的速度是110000次/s,寫的速度是81000次/s 。
  • 豐富的數據類型 – Redis支持二進制案例的 Strings, Lists, Hashes, Sets 及 Ordered Sets 數據類型操做。
  • 原子 – Redis的全部操做都是原子性的,意思就是要麼成功執行要麼失敗徹底不執行。單個操做是原子性的。多個操做也支持事務,即原子性,經過MULTI和EXEC指令包起來。
  • 豐富的特性 – Redis還支持 publish/subscribe, 通知, key 過時等等特性。

三、Redis與其餘key-value存儲有什麼不一樣?

  • Redis有着更爲複雜的數據結構而且提供對他們的原子性操做,這是一個不一樣於其餘數據庫的進化路徑。Redis的數據類型都是基於基本數據結構的同時對程序員透明,無需進行額外的抽象。python

  • Redis運行在內存中可是能夠持久化到磁盤,因此在對不一樣數據集進行高速讀寫時須要權衡內存,由於數據量不能大於硬件內存。在內存數據庫方面的另外一個優勢是,相比在磁盤上相同的複雜的數據結構,在內存中操做起來很是簡單,這樣Redis能夠作不少內部複雜性很強的事情。同時,在磁盤格式方面他們是緊湊的以追加的方式產生的,由於他們並不須要進行隨機訪問。mysql

四、Redis 安裝

下載地址:https://github.com/MSOpenTech/redis/releasesgit

Redis 支持 32 位和 64 位。這個須要根據你係統平臺的實際狀況選擇,這裏咱們下載 Redis-x64-xxx.zip壓縮包到 C 盤,解壓後,將文件夾從新命名爲 redis程序員

打開一個 cmd 窗口 使用cd命令切換目錄到 C:\redis 運行 redis-server.exe redis.windows.conf 。github

若是想方便的話,能夠把 redis 的路徑加到系統的環境變量裏,這樣就免得再輸路徑了,後面的那個 redis.windows.conf 能夠省略,若是省略,會啓用默認的。輸入以後,會顯示以下界面:web

這時候另啓一個cmd窗口,原來的不要關閉,否則就沒法訪問服務端了。redis

切換到redis目錄下運行 redis-cli.exe -h 127.0.0.1 -p 6379 。sql

設置鍵值對 set myKey abcshell

取出鍵值對 get myKey

五、安裝目錄的文件

redis-server : 服務器
redis-cli :命令行客戶端
redis-benchmark 性能工具測試
redis-check-aof ADF文件修復工具
redis-check-dump RDB文件檢測工具

redis.conf是redis的配置文件

將配置文件中的daemonize yes 以守護進程的方式來使用

cd到redis的安裝目錄下

啓動和中止

啓動 : redis-server 
中止:shutdown
命令返回值
1)狀態回覆 pong 
set test "this is a test"

2)錯誤回覆
(error) ERR unknown command 'testerror'

3)整數回覆:
(integer) 4

4)字符串回覆
get 'test' 

(nil) 表明空的結果

5)多行字符串回覆
KEYS * 獲得當前數據庫中的存在的鍵值名

 

六、Redis配置選項相關的內容

1>動態設置/獲取配置選項的值
		獲取
			CONFIG GET port
			1)"post"
			2)"6379"
		動態設置
			CONFIG GET warning
	2>Redis配置文件redis.conf選項相關
	
		---- 鏈接選項 ----
		
		port 6379 默認端口
		
		bind 127.0.01 默認綁定的主機地址
		
		timeout 0 客戶端閒置多久後自動關閉鏈接  0 表明沒有啓動這個選項
		
		loglevel notice 日誌的記錄級別
			debug  	調試的很詳細的信息:適合開發和測試
			verbose 包含不少不太有用的信息
			notice	生產環境
			warning 警告信息
			
		logfile stdout 指定日誌的記錄方式 默認是標準輸出
		
		database 16 設置默認數據庫的數量,默認是16個
			SETECT 1 選擇數據庫 默認編號是0
			
		----- 快照相關 ------
		
		多少秒有多少次改變將其同步到磁盤中的數據文件中
			save 900 1       900表明秒數  900秒內有一個更改就記錄到磁盤中
			save 300 10		 
			save 60 10000
		
 		rdbcompression yes   存儲本地數據庫是是否啓動壓縮, 默認yes
		
		dbfilename dump.db 指定本地數據庫的文件名

七、Redis的數據類型 

1》String類型

  1 一個鍵最多存儲512MB
  2         1》SET 設置key 對應的值爲value
  3         
  4             語法:SET key value [EX seconds] [PX milliseconds] [NX|XX]
  5             
  6             EX seconds 設置key的過時時間 SET key value EX seconds == SETEX
  7             PX milliseconds 以毫秒的形式設置過時時間 SET key value PX milliseconds -- PSETEX
  8             NX :只有鍵不存在的時候纔會設置成功 --SETNX
  9             XX :只有key已經存在的時候才能夠設置
 10                     
 11             SET test16 'this is a test' EX 100
 12             SET test17 'this is a test17' PX 20000
 13             SET test18 'this is a test18' NX
 14             SET test18 'this is a test18888' XX
 15             
 16             SET test19 'this is a test19' EX 100 NX
 17             
 18             SET test20 'this is a test20' EX 100 PX 300000 NX
 19             
 20             
 21             注意: 若是key存在,同名會產生覆蓋
 22             
 23                 SET testStr1 "this is a test"
 24         
 25         2》GET key 根據key找到對應的值
 26         
 27             語法: GET key
 28             
 29             注意:若是key不存在,返回nil
 30                   若是key不是字符串,會報錯
 31               
 32         3》GETGANGE: 返回字符串中的一部分
 33             
 34             語法:GETRANGE key start end
 35             
 36             GETGANGET testStr2 0 4
 37             GETGANGET testStr2 0 -3
 38             GETGANGET testStr2 -4 -2
 39             GETGANGET testStr2 0 1000
 40         
 41         4》GETSET:設定指定能夠的值,返回原來的值
 42             計數器的清零效果
 43         
 44             語法:GETSET key value
 45             SET testStr3 'king'
 46             
 47             GET testStr3
 48             
 49             GETSET testStr3 'queen' 
 50             
 51             注意:當key不存在返回nil 若是key不是字符串會報錯
 52         
 53         5》MSET 一次設置多個
 54             
 55             語法:MSET key value [key value...]
 56                 MSET testStr5 'king' testStr6 'sk' testStr7 'queen'
 57             
 58         6》MGET 一次得到多個
 59         
 60             語法:MGET key key 
 61                 MGET testStr5 testStr6 testStr7
 62         
 63         7》STRLEN:獲取key的字符串的長度
 64             
 65             語法:STRLEN key
 66             
 67             注意:對於不存在的key獲取其長度返回的是0
 68             
 69         8》SETRANGE:至關於字符串替換的效果
 70             
 71             語法:8》SETRANGE key  offset value
 72             
 73             注意:若是設置的key原來的字符串長度要比偏移量小,就會以零字節(\x00)來填充
 74             
 75             SET testStr9 'hello king'
 76             
 77             SETRANGE testStr9 6 'queen'
 78             
 79             對於不存在的key使用SETRANGE
 80             
 81             EXISTS testStr10 檢測key是否存在
 82             
 83             SETRANGE testStr10 5 'king'
 84             
 85         9》SETNX 只有key不存在才能設置成功
 86         
 87             語法:SETNX key value 
 88             
 89         10》SETEX:key 設置key而且設置過時時間,以秒爲單位
 90         
 91             語法:SETEX key  seconds value 原子性操做
 92                 TTL 獲得鍵的生存週期
 93             
 94             注意:SETEX 是原子性操做,至關於執行了SET key value ,又對這個key設置了過時時間 EXPIRE key seconds
 95                 
 96             
 97             SET expireStr1 'test1'
 98             
 99             EXPIRE expireStr1 10
100             
101             SETEX test12 1000 'a'
102             
103             GET test12
104             
105         11》 MSETNX 一次設置多個key-value ,只有全部的key都不存在的時候纔會成功
106         
107             語法 MSETNX key value [key value]
108             
109                 MSETNX test13 'a' test14 'b' test15 'c'
110         
111         12》PSETEX:以毫秒爲單位設置key的生存週期
112             
113             語法:PSETEX key milliseconds value
114             
115                 PSETEX test16 2000 'hell0 king'
116         
117                 PTTL
118         13》INCR 對key中存儲的數字+1
119             
120             語法:INCR key
121             
122             SET counter 1
123                 
124             INCR counter
125             
126             注意:key若是不存在會先初始化爲0,在進行INCR操做,
127                   對於不是數值的值會報錯
128                   
129         14》INCR BY : 將key中存儲的數字加上指定增量
130             
131             語法:INCRBY key INCREMENT
132                 
133                 SET counter2 10
134                 INCRBY counter2 5
135                 INCRBY counter2 1.2 不能用浮點數
136         15》INCRBYFLOAT : 給key中存儲的數字加上指定的浮點數
137             
138             語法:INCRBYFLOAT key increment
139 
140                 SET counter3 1
141                 
142                 INCRBYFLOAT counter3 1.2
143         
144         16》DECR:給將key中存儲的數字減1
145             
146             語法:DECR key
147         
148         17》DECYBY:將key中存儲的數值減去指定的值
149             
150             語法:DECRBY key decrement
151             
152         18》APPEND:將值追加到字符串的末尾
153             
154             語法:APPEND key value
string 類型

2》Hash類型

 1 Hash類型(散列表)
 2         在配置文件中能夠經過配置
 3         hash-max-ziplist-entries 512 512個字節
 4         hash-max-ziplist-value 64 字段數目
 5         
 6         Hash相關命令
 7         1》HSET : 將哈希表key中域field設置成指定的value
 8         
 9             語法 : HSET key field value 
10             
11             HSET userInfo1 username 'king'
12             
13             HSET userInfo1 password '123456'
14             
15             HSET userInfo1 email '18368827317@163.com'
16             
17             HSET username 'queen'
18             
19             注意:若是哈希表中key中field不存在至關於新建field,設置成功返回1
20                   若是哈希表中key中field存在,至關於從新賦值,成功返回0
21         
22         2》HGET :返回哈希表中給定field的值
23         
24             語法:HGET key field
25             
26             HGET userInfo1 username
27             
28             注意:若是key中field不存在,返回的是nil
29             
30         3》HSETNX: 將哈希表中的field設定成指定的值,只有field不在的時候才能成功,若是filed存在,操做無效
31         
32             語法:HSETNX key filed value
33             
34             HSETNX testHash1 test 'a'
35             
36         4》    HMSET :經過將多個field-value設置到hash表key中
37             
38             語法:HMSET key value filed value ...
39             
40             HMSET userInfo2 username(域) 'king'(值) kickname 'freeyman'
41                 
42         5》HMGET : 一次得到hash表key中多個filed的值
43             
44             語法:HMGET key field field
45             
46             注意:若是hash表中field不存在,會返回nil
47             
48         6》HGETALL:返回哈希表key中全部的field和value
49             
50             語法:HGETALL key 
51         
52         7》HKEYS:返回hash中key的全部的filed 
53             
54             語法: HKEYS key 
55             
56         8》HVALS :返回全部hash中key中field全部的對應的值
57             
58             語法:HVALS key 
59         
60         9》HEXISTS :檢測hash表中key的field是否存在
61         
62             語法:HEXISTS key field 
63             
64             HEXISTS userInfo2 username
65             
66             HEXISTS userInfo2 notExists
67             
68         10》HLEN: 返回hash表中keyfield的數量
69             
70             語法:HLEN key
71             
72         11》HINCRBY:給hash中key的field作增量操做
73             
74             語法:HINCRBY key field increment
75             
76         12》HINCRBYFLOAT:    給hash中key的field作增量操做 家浮點數
77             
78             語法:HINCRBYFLOAT key field increment
79                 
80             HSET userInfo3 salary '12343.341'
81             
82             HINCRBYFLOAT userInfo3 salary '0.214'
83             
84         13》HDEL :刪除hash中key的指定域,能夠刪除一個也能夠刪除多個
85             
86             語法:HDEL key field field
87             
88             HGETALL userInfo2 
89             
90             HDEL userInfo2 username 
Hash類型(散列表)

3》List類型

  1 雙向鏈表實現的兩邊的獲取速度快)
  2         
  3         1》LPUSH:向列表左端添加元素
  4             
  5             語法:LPUSH key value... 從左到右依次插入
  6             
  7             LPUSH myList1 a b c 
  8             
  9         2》RPUSH :向列表右端添加元素
 10             
 11             語法:RPUSH key value...
 12             
 13             RPUSH myList1 test1 test2 test3
 14             
 15         3》LPUSHX:向列表頭部添加元素,只有key存在在來添加 只能添加一個值,
 16         
 17             語法:LPUSHX key value
 18             
 19             LPUSH myList2 test4 
 20             
 21         4》RPUSHX:向列表尾部添加元素,只有key存在在來添加
 22             
 23             語法:RPUSHX key value
 24             
 25             RPUSH myList2 test4 
 26             
 27         5》LPOP :將列表頭部的元素彈出
 28             
 29             語法:LPOP myList1
 30             
 31         6》RPOP : 將列表尾部的元素彈出
 32         
 33             語法:RPOP myList1
 34         
 35         7》LLEN : 獲得列表的長度
 36         
 37             語法:LLEN key 
 38         
 39         8》LRANGE : 獲取列表片斷
 40             
 41             語法:LRANGE key start stop
 42         
 43             LRANGE myList1 0 -1
 44             
 45             注意:若是start下標比列表的最大的下標end大,返回空列表
 46                 若是stop比列表長度大,返回列表的末尾
 47             
 48         9》LREM:刪除列表中指定的值
 49             
 50             語法:LRME key count value
 51             
 52             count>0 從列表的頭開始,向尾部搜索,移除與value相等的元素,移動count個
 53             
 54             count<0 從列表的尾部向頭搜索,移除與value相等的元素,移除count個
 55             
 56             count=0 移除列表全部與count相等的值
 57             
 58             LPUSH myList3 a b c d a b c d b e f b g e b  
 59             
 60             LREM myList3 2 b 
 61             
 62             LREM myList3 -1 a 
 63             
 64             LREM myList3 0 e 
 65             
 66         10》LINDEX:獲取指定索引元素的值
 67         
 68             語法:LINDEX key index
 69             
 70             LINDEX myList3 3
 71             
 72             LINDEX myList3 -2
 73             
 74         11》LSET :設置指定索引元素的值
 75         
 76             LSET key index value
 77         
 78         12》LTRIM :只保留列表的片斷
 79         
 80             語法:LTRIN key start stop
 81             
 82             LPUSH myList4 log1 log2 log3 log4 log5
 83             
 84             LRTIM myList4 0 1
 85             
 86             LPUSH myList4 a b c d e f g 
 87             
 88             LTRIM myList4 1 -1
 89             
 90             LTRIM myList4 1
 91         
 92         13》LINSERT 向列表插入元素    
 93             
 94             語法:LINSERT key BEFORE|AFTER pivot value 
 95             
 96             LPUSH myList6 a b c d
 97             
 98             LINSERT myList6 BEFORE "b" "king"
 99             
100             若是沒有成功返回-1 成功返回當前列表的長度 對於空列表返回0 不成功
101             
102         14》RPOPLPUSH:將一個元素從一個列表轉移到另外一個列表(原子性操做)
103             
104             語法:RPOPLPUSH source destination 
105             
106                 RPOPLPUSH myList1 myList6 
107         
108         15》BLPOP:BLPOP是LPOP 的一個阻塞版本
109             
110             語法: BLPOP key [key...] timeout 
111             
112             LPUSH myList9 a b c 
113             
114             LPUSH myList10 d e f 
115             
116             BLPOP myList8 myList9 myList10 0
117             
118             BLPOP myList8 8 0
List類型

4》Set類型

 1 sns 和 博客系統 能夠經過集合類型實現
 2         
 3         1》SADD :向集合中添加元素
 4             
 5             語法:SADD key member [,...]
 6             
 7             SADD web sunkai.clog.com
 8         
 9         2》SMEMBERS :返回指定集合中的元素
10             
11             語法:SMEMBERS key 
12             
13         3》SISMEMBER : 檢測member是不是集合中的成員
14         
15             語法:SISMEMBER key member 
16             
17         4》SREM :刪除
18         
19             語法:SREM key member
20             
21         5》SPOP :隨機刪除並返回集合中的刪除的元素
22             
23             語法:SPOP key
24         
25         6》SRANDMEMBER :隨機返回集合中的元素
26             
27             語法:SRANDMEMBER key counter
28             
29             注意:count 爲正數,並且小於集合中的元素,返回的一個包含隨機元素的集合數組;count若是大於集合中的元素,這個時候會返回整個集合
30                   count 爲負數,返回一個數組,數組中的成員可能出現重複,數組的長度是count取絕對值
31         
32         7》SDIFF 返回集合間的差集
33             
34             語法:SDIFF key key 
35             
36             SADD  couser2 java PHP js jq Python
37             
38             SADD  couser1 iOS anzhuo Python    
39                 
40             SDIFF couser2 couser1
41             
42         8》SINTER 返回集合間的交集
43             
44             語法:SINTER key key ...
45             
46             SINTER couser2 couser1
47             
48         9》SUNION :返回集合間的並集
49             
50             語法:SUNION key key
51                 
52         10》SCARD :返回集合的長度
53             
54             語法:SCARD
55         
56         11》SDIFFSTORE :講差集結果保存到指定集合中
57             
58             語法:SDIFFSTORE destination key key ...
59              
60             SDIFFSTORE diffSET couser2 couser1
61             
62         12》SINTERSTORE
63         
64         13》SUNIONSTORE
65         
66         14》SMOVE 將集合中的元素移動到另外一個集合中(原子性操做)
67             
68             語法:SMOVE source destination member
Set集合類型(元素不可重複)

5》Zset(sorted set)有序集合類型

 1 1》ZADD :將元素及其分數添加到集合中
 2         
 3             語法: ZADD key score member [score member]
 4             
 5             ZADD PYTHONcourse 100 king
 6             
 7             ZADD PYTHONcourse 98 queen 98 test 78 test1
 8             
 9             ZADD PYTHONcourse +inf maxInt -inf minInx 正無窮大,負無窮大
10     
11         2》ZSCORE :得到集合中的指定元素的分數
12             
13             語法:ZSCORE key member 
14             
15         3》ZRANGE :按照元素分數從小到大的順序返回指定索引start到stop之間的全部元素(包含兩端)
16             
17             語法:ZRANGE key start stop WITHSCORES 帶分數
18             
19             注意:當元素的兩個元素的分數相同的時候,redis在排序按照字典的順序排列
20             
21         4》ZREVRANGE: 和ZRANGE 相反,按照從大到小的順序返回
22             
23             語法:ZREVRANGE key start stop [WITHSCORES]
24             
25         5》ZRANGEBYSCORE :獲取指定分數範圍內的元素,按照從小到大的順序,返回的是分數在指定的min到max之間
26             
27             語法:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
28             
29             得到分數80~90之間的全部元素
30             
31             ZRANGEBYSCORE PYTHONcourse 80 90 
32             
33             ZRANGEBYSCORE PYTHONcourse 80 (90 不包含90 
34             
35             注意:經過左括號表明不包含端點
36             
37         6:ZREVRANGEBYSCORE 和上面的相反    
38             
39             語法:ZREVRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
40             
41         7》ZINCRBY : 操做某個元素的分數,返回操做以後的分數
42             
43             語法:ZINCRBY key increment member
44         
45         8》ZCARD :得到集合中元素的數量
46             
47             語法:ZCARD key 
48         
49         9》ZCOUNT 得到指定分數內元素的數量
50             
51             語法:ZCOUNT PYTHONcourse 80 90 
52             
53         10》ZREM :刪除一個或多個元素,返回刪除元素的個數
54             
55             語法:ZREM key member ...
56             
57         11》ZREMRANGEBYRANK: 按照排名範圍刪除元素,按照分數從小到大的順序刪除全部指定的排名範圍內的全部元素
58             
59             語法:ZREMRANGEBYRANK key start stop 
60         
61         12》ZREMRANGEBYSCORE:按照分數範圍內刪除元素
62             
63             語法:ZREMRANGEBYSCORE key min max 
64             
65         13》ZRANK :得到指定元素的排名,根據分數從小到大的順序
66             
67             語法:ZRANK    key member 
68             
69         14》ZREVRANK:得到指定元素的排名,根據分數從大到小的順序
70             
71             語法:ZREVRANK key member
72             
73         15》ZINTERSTORE: 計算有序集合的交集,並將結果保存起來
74             
75             語法:ZINTERSTORE destination numkeys key key ... WEIGHTS weight weight AGGREGATE [SUM|MIN|MAX]
76             
77                 ZADD testSortedSet1 1 a 2 b 3 c
78                 
79                 ZADD testSortedSet2 10 a 20 b 30 c
80                 
81                 ZINTERSTORE testSorted1 2 testSortedSet1 testSortedSet2
82                 
83                 ZRANGE testSorted1 0 -1 WITHSCORES 
84                 
85                 ZINTERSTORE testSorted2 2 testSortedSet1 testSortedSet2 AGGREGATE SUM
86                 
87                 ZINTERSTORE testSorted3 2 testSortedSet1 testSortedSet2 AGGREGATE MIN
88                 
89         16》ZUNIONSTORE    :計算有序集合並集,並將結果保存起來
90                 
91             語法:ZUNIONSTORE destination numkeys key key ... WEIGHTS weight weight AGGREGATE [SUM|MIN|MAX]
Zset(sorted set)有序集合類型  

未完待續。。。。。。  

2、RabbitMQ

一、簡介

RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。RabbitMQ能夠,多個程序同時使用RabbitMQ ,可是必須隊列名稱不同。採用erlang語言,屬於愛立信公司開發的。

消息中間件 --->就是消息隊列

異步方式:不須要立馬獲得結果,須要排隊

同步方式:須要實時得到數據,堅定不能排隊

subprocess 的Q也提供不一樣進程之間的溝通

應用場景:

  電商秒殺活動

  搶購小米手機

  堡壘機批量發送文件

二、Centos6.x系統編譯安裝RabbitMQ

1、安裝erlang
依賴包:
yum -y install gcc ncurses ncurses-base ncurses-devel ncurses-libs ncurses-static ncurses-term ocaml-curses ocaml-curses-devel openssl-devel zlib-devel openssl-devel perl xz xmlto kernel-devel m4  這是一行


一、下載otp_src_19.3.tar.gz
二、tar xvf otp_src_19.3.tar.gz
三、cd opt_src_19.3.tar.gz
四、./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac
五、make && make install

五、配置erlang環境:

vi /etc/profile

export PATH=$PATH:/usr/local/erlang/bin

source /etc/profile # 環境變量重啓生效


2、安裝rabbitmq
一、下載rabbitmq-server-generic-unix-3.6.5.tar.xz
二、tar xvf rabbitmq-server-generic-unix-3.6.5.tar.xz
三、mv rabbitmq_server-3.6.5/ /usr/local/rabbitmq
四、啓動:
	#啓動rabbitmq服務
	/usr/local/rabbitmq/sbin/rabbitmq-server
	#後臺啓動
	/usr/local/rabbitmq/sbin/rabbitmq-server -detached
	#關閉rabbitmq服務
	/usr/local/rabbitmq/sbin/rabbitmqctl stop
	或
	ps -ef | grep rabbit 和 kill -9 xxx  殺死服務

	#開啓插件管理頁面
	/usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management

	#建立用戶
	/usr/local/rabbitmq/sbin/rabbitmqctl add_user rabbitadmin 123456
	/usr/local/rabbitmq/sbin/rabbitmqctl set_user_tags rabbitadmin administrator
       ./rabbitmqctl set_permissions -p / rabbitadmin ".*" ".*" ".*" 爲這個用戶受權
        
    
五、登陸
	#WEB登陸
	http://10.10.3.63:15672  本身的IP地址
	用戶名:rabbitadmin
	密碼:123456

3、幾種隊列通訊

一、實現最簡單的隊列通訊

sender

import pika
# 認證
credentials = pika.PlainCredentials('rabbitadmin', '123456')  # 必定要認證
# 鏈接這臺機器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.14.38',credentials=credentials))    #主機IP 和驗證
channel = connection.channel() # 創建了rabbitmq的協議通道

# 聲明queue隊列
channel.queue_declare(queue='hello')

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
# 發送消息
channel.basic_publish(exchange='', exchange表示交換器,能精確指定消息應該發送到哪一個隊列,
                      routing_key='hello', # 
                      body='Hello World!')#routing_key設置爲隊列的名稱,body就是發送的內容
print(" [x] Sent 'Hello World!'")
connection.close()
import pika
import time

credentials = pika.PlainCredentials('rabbitadmin', '123456')
# 鏈接這臺機器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.14.38',credentials=credentials))
channel = connection.channel() # 創建了rabbitmq的協議通道


channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):

    print("received msg...start processing....",body)
    time.sleep(20)
    print(" [x] msg process done....",body)


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

二、Work Queues (一個發消息,兩個收消息,收消息是公平的依次分發)

在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少。

消息提供者代碼

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
 
# 聲明queue
channel.queue_declare(queue='task_queue')
 
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )
print(" [x] Sent %r" % message)
connection.close()
import pika, time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
 
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(20)
    print(" [x] Done")
    print("method.delivery_tag",method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
 
channel.basic_consume(callback,
                      queue='task_queue',
                      no_ack=True
                      )
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

此時,先啓動消息生產者,而後再分別啓動3個消費者,經過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上  

三、消息持久化

雖然有了消息反饋機制,可是若是rabbitmq自身掛掉的話,那麼任務仍是會丟失。因此須要將任務持久化存儲起來。聲明持久化存儲:

將隊列(Queue)與消息(Message)都設置爲可持久化的(durable),這樣能夠保證絕大部分狀況下咱們的RabbitMQ消息不會丟失。但依然解決不了小几率丟失事件的發生(好比RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),若是須要對這種小几率事件也要管理起來,那麼要用到事務。因爲這裏僅爲RabbitMQ的簡單介紹,因此不講解RabbitMQ相關的事務。

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  # 創建了rabbit 協議的通道

# durable=True 聲明持久化存儲
channel.queue_declare(queue='task_queue', durable=True)


channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body='Hello World!',
                      # 在發送任務的時候,用delivery_mode=2來標記消息爲持久化存儲
                      properties=pika.BasicProperties(
                          delivery_mode=2,  
                      ))

print(" [x] Sent 'Hello World!'")
connection.close()

sender.py
import pika
import time


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)


def callback(ch, method, properties, body):
    print("received msg...start processing....",body)
    time.sleep(20)
    print(" [x] msg process done....", body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    callback,
    queue='task_queue',
    no_ack=False  # 默認爲False
)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

四、消息公平分發

若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

channel.basic_qos(prefetch_count=1)

帶消息持久化+公平分發的完整代碼

生產者端

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

消費者端  

import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
 
channel.start_consuming()

五、Publish\Subscribe(消息發佈\訂閱) 

以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,

交流是一個很是簡單的事情。一方面它收到消息從生產者和另外一邊推他們隊列。交換必須知道如何處理接收到的消息。應該是附加到一個特定的隊列嗎?應該是附加到多隊列?或者應該丟棄。交換的規則定義的類型。  

Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息


fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

   表達式符號說明:#表明一個或多個字符,*表明任何字符
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout 

headers: 經過headers 來決定把消息發給哪些queue

消息publisher

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消息subscriber

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue
 
channel.queue_bind(exchange='logs',
                   queue=queue_name)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

六、有選擇的接收消息(exchange type=direct)

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

七、更細緻的消息過濾

Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).

That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.

 

topi: 意思是話題

To receive all the logs run:

python receive_logs_topic.py "#"  #綁定#號,就是收全部消息,至關於廣播

To receive all logs from the facility "kern":

python receive_logs_topic.py "kern.*"   #以kern開頭

Or if you want to hear only about "critical" logs:

python receive_logs_topic.py "*.critical"  #以critical結尾

You can create multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical" #收kern開頭而且以critical結尾(至關於收兩個) 

And to emit a log with a routing key "kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error" #發消息到kern.critical裏,內容是:
A critical kernel error

示例:

rabbit_topic_send.py (生產者是發送端)

1 import pika
 2 import sys
 3 
 4 credentials = pika.PlainCredentials('nulige', '123456')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     host='192.168.1.118',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 channel.exchange_declare(exchange='topic_logs',type='topic') #指定類型
11 
12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
13 
14 message = ' '.join(sys.argv[2:]) or 'Hello World!'  #消息
15 
16 channel.basic_publish(exchange='topic_logs',
17                       routing_key=routing_key,
18                       body=message)
19 print(" [x] Sent %r:%r" % (routing_key, message))
20 connection.close()

rabbit_topic_recv.py (消費者是接收端)單向的

 1 import pika
 2 import sys
 3 
 4 credentials = pika.PlainCredentials('nulige', '123456')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     host='192.168.1.118',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 channel.exchange_declare(exchange='topic_logs',type='topic')
10 
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13 
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18 
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange='topic_logs',
21                        queue=queue_name,
22                        routing_key=binding_key)
23 
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 channel.basic_consume(callback,queue=queue_name)
30 
31 channel.start_consuming()

執行結果:  

 1 #接收端
 2 D:\python\day42>python3 rabbit_topic_recv.py error
 3  [*] Waiting for logs. To exit press CTRL+C
 4  [x] 'error':b'mysql has error'
 5 
 6 
 7 D:\python\day42>python3 rabbit_topic_recv.py *.warning mysql.*
 8  [*] Waiting for logs. To exit press CTRL+C
 9  [x] 'mysql.error':b'mysql has error'
10 
11 
12 D:\python\day42>python3 rabbit_topic_send.py mysql.info "mysql has error"
13  [x] Sent 'mysql.info':'mysql has error'
14 
15 
16 D:\python\day42>python3 rabbit_topic_recv.py *.error.*
17  [*] Waiting for logs. To exit press CTRL+C
18  [x] 'mysql.error.':b'mysql has error'
19 
20 
21 #發送端                                指定類型:error      消息內容
22 D:\python\day42>python3 rabbit_topic_send.py error "mysql has error"
23  [x] Sent 'error':'mysql has error'
24 
25 
26 D:\python\day42>python3 rabbit_topic_send.py mysql.error "mysql has error"
27  [x] Sent 'mysql.error':'mysql has error'
28  [x] 'mysql.info':b'mysql has error'
29 
30 
31 D:\python\day42>python3 rabbit_topic_send.py mysql.error. "mysql has error"
32  [x] Sent 'mysql.error.':'mysql has error'

八、Remote procedure call (RPC) 雙向的

To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)  
 

應用場景:

示例:實現RPC服務功能

rabbit_rpc_send.py(生產者是發送端)

 1 import pika
 2 import uuid
 3 
 4 class SSHRpcClient(object):
 5     def __init__(self):
 6         credentials = pika.PlainCredentials('nulige', '123456')
 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
 8                             host='192.168.1.118',credentials=credentials))
 9 
10         self.channel = self.connection.channel()
11 
12         result = self.channel.queue_declare(exclusive=True) #客戶端的結果必需要返回到這個queue
13         self.callback_queue = result.method.queue
14 
15         self.channel.basic_consume(self.on_response,queue=self.callback_queue) #聲明從這個queue裏收結果
16 
17     def on_response(self, ch, method, props, body):
18         if self.corr_id == props.correlation_id: #任務標識符
19             self.response = body
20             print(body)
21 
22     # 返回的結果,放在callback_queue中
23     def call(self, n):
24         self.response = None
25         self.corr_id = str(uuid.uuid4()) #惟一標識符
26         self.channel.basic_publish(exchange='',
27                                    routing_key='rpc_queue3',  #聲明一個Q
28                                    properties=pika.BasicProperties(
29                                        reply_to=self.callback_queue,
30                                        correlation_id=self.corr_id,
31                                    ),
32                                    body=str(n))
33 
34         print("start waiting for cmd result ")
35         count = 0
36         while self.response is None: #若是命令沒返回結果
37             print("loop ",count)
38             count +=1
39             self.connection.process_data_events() #以不阻塞的形式去檢測有沒有新事件
40             #若是沒事件,那就什麼也不作, 若是有事件,就觸發on_response事件
41         return self.response
42 
43 ssh_rpc = SSHRpcClient()
44 
45 print(" [x] sending cmd")
46 response = ssh_rpc.call("ipconfig")
47 
48 print(" [.] Got result ")
49 print(response.decode("gbk"))
rabbit_rpc_recv.py(消費端是接收端)

 1 import pika
 2 import time
 3 import subprocess
 4 
 5 credentials = pika.PlainCredentials('nulige', '123456')
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7     host='192.168.1.118', credentials=credentials))
 8 
 9 channel = connection.channel()
10 channel.queue_declare(queue='rpc_queue3')
11 
12 def SSHRPCServer(cmd):
13 
14     print("recv cmd:",cmd)
15     cmd_obj = subprocess.Popen(cmd.decode(),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
16 
17     result = cmd_obj.stdout.read() or cmd_obj.stderr.read()
18     return result
19 
20 def on_request(ch, method, props, body):
21 
22     print(" [.] fib(%s)" % body)
23     response = SSHRPCServer(body)
24 
25     ch.basic_publish(exchange='',
26                      routing_key=props.reply_to,
27                      properties=pika.BasicProperties(correlation_id= \
28                                                          props.correlation_id),
29                      body=response)
30 
31 channel.basic_consume(on_request, queue='rpc_queue3')
32 print(" [x] Awaiting RPC requests")
33 channel.start_consuming()
相關文章
相關標籤/搜索