Redis 與其餘 key - value 緩存產品有如下三個特色:java
Redis有着更爲複雜的數據結構而且提供對他們的原子性操做,這是一個不一樣於其餘數據庫的進化路徑。Redis的數據類型都是基於基本數據結構的同時對程序員透明,無需進行額外的抽象。python
Redis運行在內存中可是能夠持久化到磁盤,因此在對不一樣數據集進行高速讀寫時須要權衡內存,由於數據量不能大於硬件內存。在內存數據庫方面的另外一個優勢是,相比在磁盤上相同的複雜的數據結構,在內存中操做起來很是簡單,這樣Redis能夠作不少內部複雜性很強的事情。同時,在磁盤格式方面他們是緊湊的以追加的方式產生的,由於他們並不須要進行隨機訪問。mysql
下載地址:https://github.com/MSOpenTech/redis/releases。git
Redis 支持 32 位和 64 位。這個須要根據你係統平臺的實際狀況選擇,這裏咱們下載 Redis-x64-xxx.zip壓縮包到 C 盤,解壓後,將文件夾從新命名爲 redis程序員
打開一個 cmd 窗口 使用cd命令切換目錄到 C:\redis 運行 redis-server.exe redis.windows.conf 。github
若是想方便的話,能夠把 redis 的路徑加到系統的環境變量裏,這樣就免得再輸路徑了,後面的那個 redis.windows.conf 能夠省略,若是省略,會啓用默認的。輸入以後,會顯示以下界面:web
這時候另啓一個cmd窗口,原來的不要關閉,否則就沒法訪問服務端了。redis
切換到redis目錄下運行 redis-cli.exe -h 127.0.0.1 -p 6379 。sql
設置鍵值對 set myKey abcshell
取出鍵值對 get myKey
redis-server : 服務器
redis-cli :命令行客戶端
redis-benchmark 性能工具測試
redis-check-aof ADF文件修復工具
redis-check-dump RDB文件檢測工具
redis.conf是redis的配置文件
將配置文件中的daemonize yes 以守護進程的方式來使用 cd到redis的安裝目錄下 啓動和中止 啓動 : redis-server 中止:shutdown 命令返回值 1)狀態回覆 pong set test "this is a test" 2)錯誤回覆 (error) ERR unknown command 'testerror' 3)整數回覆: (integer) 4 4)字符串回覆 get 'test' (nil) 表明空的結果 5)多行字符串回覆 KEYS * 獲得當前數據庫中的存在的鍵值名
1>動態設置/獲取配置選項的值 獲取 CONFIG GET port 1)"post" 2)"6379" 動態設置 CONFIG GET warning 2>Redis配置文件redis.conf選項相關 ---- 鏈接選項 ---- port 6379 默認端口 bind 127.0.01 默認綁定的主機地址 timeout 0 客戶端閒置多久後自動關閉鏈接 0 表明沒有啓動這個選項 loglevel notice 日誌的記錄級別 debug 調試的很詳細的信息:適合開發和測試 verbose 包含不少不太有用的信息 notice 生產環境 warning 警告信息 logfile stdout 指定日誌的記錄方式 默認是標準輸出 database 16 設置默認數據庫的數量,默認是16個 SETECT 1 選擇數據庫 默認編號是0 ----- 快照相關 ------ 多少秒有多少次改變將其同步到磁盤中的數據文件中 save 900 1 900表明秒數 900秒內有一個更改就記錄到磁盤中 save 300 10 save 60 10000 rdbcompression yes 存儲本地數據庫是是否啓動壓縮, 默認yes dbfilename dump.db 指定本地數據庫的文件名
1 一個鍵最多存儲512MB 2 1》SET 設置key 對應的值爲value 3 4 語法:SET key value [EX seconds] [PX milliseconds] [NX|XX] 5 6 EX seconds 設置key的過時時間 SET key value EX seconds == SETEX 7 PX milliseconds 以毫秒的形式設置過時時間 SET key value PX milliseconds -- PSETEX 8 NX :只有鍵不存在的時候纔會設置成功 --SETNX 9 XX :只有key已經存在的時候才能夠設置 10 11 SET test16 'this is a test' EX 100 12 SET test17 'this is a test17' PX 20000 13 SET test18 'this is a test18' NX 14 SET test18 'this is a test18888' XX 15 16 SET test19 'this is a test19' EX 100 NX 17 18 SET test20 'this is a test20' EX 100 PX 300000 NX 19 20 21 注意: 若是key存在,同名會產生覆蓋 22 23 SET testStr1 "this is a test" 24 25 2》GET key 根據key找到對應的值 26 27 語法: GET key 28 29 注意:若是key不存在,返回nil 30 若是key不是字符串,會報錯 31 32 3》GETGANGE: 返回字符串中的一部分 33 34 語法:GETRANGE key start end 35 36 GETGANGET testStr2 0 4 37 GETGANGET testStr2 0 -3 38 GETGANGET testStr2 -4 -2 39 GETGANGET testStr2 0 1000 40 41 4》GETSET:設定指定能夠的值,返回原來的值 42 計數器的清零效果 43 44 語法:GETSET key value 45 SET testStr3 'king' 46 47 GET testStr3 48 49 GETSET testStr3 'queen' 50 51 注意:當key不存在返回nil 若是key不是字符串會報錯 52 53 5》MSET 一次設置多個 54 55 語法:MSET key value [key value...] 56 MSET testStr5 'king' testStr6 'sk' testStr7 'queen' 57 58 6》MGET 一次得到多個 59 60 語法:MGET key key 61 MGET testStr5 testStr6 testStr7 62 63 7》STRLEN:獲取key的字符串的長度 64 65 語法:STRLEN key 66 67 注意:對於不存在的key獲取其長度返回的是0 68 69 8》SETRANGE:至關於字符串替換的效果 70 71 語法:8》SETRANGE key offset value 72 73 注意:若是設置的key原來的字符串長度要比偏移量小,就會以零字節(\x00)來填充 74 75 SET testStr9 'hello king' 76 77 SETRANGE testStr9 6 'queen' 78 79 對於不存在的key使用SETRANGE 80 81 EXISTS testStr10 檢測key是否存在 82 83 SETRANGE testStr10 5 'king' 84 85 9》SETNX 只有key不存在才能設置成功 86 87 語法:SETNX key value 88 89 10》SETEX:key 設置key而且設置過時時間,以秒爲單位 90 91 語法:SETEX key seconds value 原子性操做 92 TTL 獲得鍵的生存週期 93 94 注意:SETEX 是原子性操做,至關於執行了SET key value ,又對這個key設置了過時時間 EXPIRE key seconds 95 96 97 SET expireStr1 'test1' 98 99 EXPIRE expireStr1 10 100 101 SETEX test12 1000 'a' 102 103 GET test12 104 105 11》 MSETNX 一次設置多個key-value ,只有全部的key都不存在的時候纔會成功 106 107 語法 MSETNX key value [key value] 108 109 MSETNX test13 'a' test14 'b' test15 'c' 110 111 12》PSETEX:以毫秒爲單位設置key的生存週期 112 113 語法:PSETEX key milliseconds value 114 115 PSETEX test16 2000 'hell0 king' 116 117 PTTL 118 13》INCR 對key中存儲的數字+1 119 120 語法:INCR key 121 122 SET counter 1 123 124 INCR counter 125 126 注意:key若是不存在會先初始化爲0,在進行INCR操做, 127 對於不是數值的值會報錯 128 129 14》INCR BY : 將key中存儲的數字加上指定增量 130 131 語法:INCRBY key INCREMENT 132 133 SET counter2 10 134 INCRBY counter2 5 135 INCRBY counter2 1.2 不能用浮點數 136 15》INCRBYFLOAT : 給key中存儲的數字加上指定的浮點數 137 138 語法:INCRBYFLOAT key increment 139 140 SET counter3 1 141 142 INCRBYFLOAT counter3 1.2 143 144 16》DECR:給將key中存儲的數字減1 145 146 語法:DECR key 147 148 17》DECYBY:將key中存儲的數值減去指定的值 149 150 語法:DECRBY key decrement 151 152 18》APPEND:將值追加到字符串的末尾 153 154 語法:APPEND key value
1 Hash類型(散列表) 2 在配置文件中能夠經過配置 3 hash-max-ziplist-entries 512 512個字節 4 hash-max-ziplist-value 64 字段數目 5 6 Hash相關命令 7 1》HSET : 將哈希表key中域field設置成指定的value 8 9 語法 : HSET key field value 10 11 HSET userInfo1 username 'king' 12 13 HSET userInfo1 password '123456' 14 15 HSET userInfo1 email '18368827317@163.com' 16 17 HSET username 'queen' 18 19 注意:若是哈希表中key中field不存在至關於新建field,設置成功返回1 20 若是哈希表中key中field存在,至關於從新賦值,成功返回0 21 22 2》HGET :返回哈希表中給定field的值 23 24 語法:HGET key field 25 26 HGET userInfo1 username 27 28 注意:若是key中field不存在,返回的是nil 29 30 3》HSETNX: 將哈希表中的field設定成指定的值,只有field不在的時候才能成功,若是filed存在,操做無效 31 32 語法:HSETNX key filed value 33 34 HSETNX testHash1 test 'a' 35 36 4》 HMSET :經過將多個field-value設置到hash表key中 37 38 語法:HMSET key value filed value ... 39 40 HMSET userInfo2 username(域) 'king'(值) kickname 'freeyman' 41 42 5》HMGET : 一次得到hash表key中多個filed的值 43 44 語法:HMGET key field field 45 46 注意:若是hash表中field不存在,會返回nil 47 48 6》HGETALL:返回哈希表key中全部的field和value 49 50 語法:HGETALL key 51 52 7》HKEYS:返回hash中key的全部的filed 53 54 語法: HKEYS key 55 56 8》HVALS :返回全部hash中key中field全部的對應的值 57 58 語法:HVALS key 59 60 9》HEXISTS :檢測hash表中key的field是否存在 61 62 語法:HEXISTS key field 63 64 HEXISTS userInfo2 username 65 66 HEXISTS userInfo2 notExists 67 68 10》HLEN: 返回hash表中keyfield的數量 69 70 語法:HLEN key 71 72 11》HINCRBY:給hash中key的field作增量操做 73 74 語法:HINCRBY key field increment 75 76 12》HINCRBYFLOAT: 給hash中key的field作增量操做 家浮點數 77 78 語法:HINCRBYFLOAT key field increment 79 80 HSET userInfo3 salary '12343.341' 81 82 HINCRBYFLOAT userInfo3 salary '0.214' 83 84 13》HDEL :刪除hash中key的指定域,能夠刪除一個也能夠刪除多個 85 86 語法:HDEL key field field 87 88 HGETALL userInfo2 89 90 HDEL userInfo2 username
1 雙向鏈表實現的兩邊的獲取速度快) 2 3 1》LPUSH:向列表左端添加元素 4 5 語法:LPUSH key value... 從左到右依次插入 6 7 LPUSH myList1 a b c 8 9 2》RPUSH :向列表右端添加元素 10 11 語法:RPUSH key value... 12 13 RPUSH myList1 test1 test2 test3 14 15 3》LPUSHX:向列表頭部添加元素,只有key存在在來添加 只能添加一個值, 16 17 語法:LPUSHX key value 18 19 LPUSH myList2 test4 20 21 4》RPUSHX:向列表尾部添加元素,只有key存在在來添加 22 23 語法:RPUSHX key value 24 25 RPUSH myList2 test4 26 27 5》LPOP :將列表頭部的元素彈出 28 29 語法:LPOP myList1 30 31 6》RPOP : 將列表尾部的元素彈出 32 33 語法:RPOP myList1 34 35 7》LLEN : 獲得列表的長度 36 37 語法:LLEN key 38 39 8》LRANGE : 獲取列表片斷 40 41 語法:LRANGE key start stop 42 43 LRANGE myList1 0 -1 44 45 注意:若是start下標比列表的最大的下標end大,返回空列表 46 若是stop比列表長度大,返回列表的末尾 47 48 9》LREM:刪除列表中指定的值 49 50 語法:LRME key count value 51 52 count>0 從列表的頭開始,向尾部搜索,移除與value相等的元素,移動count個 53 54 count<0 從列表的尾部向頭搜索,移除與value相等的元素,移除count個 55 56 count=0 移除列表全部與count相等的值 57 58 LPUSH myList3 a b c d a b c d b e f b g e b 59 60 LREM myList3 2 b 61 62 LREM myList3 -1 a 63 64 LREM myList3 0 e 65 66 10》LINDEX:獲取指定索引元素的值 67 68 語法:LINDEX key index 69 70 LINDEX myList3 3 71 72 LINDEX myList3 -2 73 74 11》LSET :設置指定索引元素的值 75 76 LSET key index value 77 78 12》LTRIM :只保留列表的片斷 79 80 語法:LTRIN key start stop 81 82 LPUSH myList4 log1 log2 log3 log4 log5 83 84 LRTIM myList4 0 1 85 86 LPUSH myList4 a b c d e f g 87 88 LTRIM myList4 1 -1 89 90 LTRIM myList4 1 91 92 13》LINSERT 向列表插入元素 93 94 語法:LINSERT key BEFORE|AFTER pivot value 95 96 LPUSH myList6 a b c d 97 98 LINSERT myList6 BEFORE "b" "king" 99 100 若是沒有成功返回-1 成功返回當前列表的長度 對於空列表返回0 不成功 101 102 14》RPOPLPUSH:將一個元素從一個列表轉移到另外一個列表(原子性操做) 103 104 語法:RPOPLPUSH source destination 105 106 RPOPLPUSH myList1 myList6 107 108 15》BLPOP:BLPOP是LPOP 的一個阻塞版本 109 110 語法: BLPOP key [key...] timeout 111 112 LPUSH myList9 a b c 113 114 LPUSH myList10 d e f 115 116 BLPOP myList8 myList9 myList10 0 117 118 BLPOP myList8 8 0
1 sns 和 博客系統 能夠經過集合類型實現 2 3 1》SADD :向集合中添加元素 4 5 語法:SADD key member [,...] 6 7 SADD web sunkai.clog.com 8 9 2》SMEMBERS :返回指定集合中的元素 10 11 語法:SMEMBERS key 12 13 3》SISMEMBER : 檢測member是不是集合中的成員 14 15 語法:SISMEMBER key member 16 17 4》SREM :刪除 18 19 語法:SREM key member 20 21 5》SPOP :隨機刪除並返回集合中的刪除的元素 22 23 語法:SPOP key 24 25 6》SRANDMEMBER :隨機返回集合中的元素 26 27 語法:SRANDMEMBER key counter 28 29 注意:count 爲正數,並且小於集合中的元素,返回的一個包含隨機元素的集合數組;count若是大於集合中的元素,這個時候會返回整個集合 30 count 爲負數,返回一個數組,數組中的成員可能出現重複,數組的長度是count取絕對值 31 32 7》SDIFF 返回集合間的差集 33 34 語法:SDIFF key key 35 36 SADD couser2 java PHP js jq Python 37 38 SADD couser1 iOS anzhuo Python 39 40 SDIFF couser2 couser1 41 42 8》SINTER 返回集合間的交集 43 44 語法:SINTER key key ... 45 46 SINTER couser2 couser1 47 48 9》SUNION :返回集合間的並集 49 50 語法:SUNION key key 51 52 10》SCARD :返回集合的長度 53 54 語法:SCARD 55 56 11》SDIFFSTORE :講差集結果保存到指定集合中 57 58 語法:SDIFFSTORE destination key key ... 59 60 SDIFFSTORE diffSET couser2 couser1 61 62 12》SINTERSTORE 63 64 13》SUNIONSTORE 65 66 14》SMOVE 將集合中的元素移動到另外一個集合中(原子性操做) 67 68 語法:SMOVE source destination member
1 1》ZADD :將元素及其分數添加到集合中 2 3 語法: ZADD key score member [score member] 4 5 ZADD PYTHONcourse 100 king 6 7 ZADD PYTHONcourse 98 queen 98 test 78 test1 8 9 ZADD PYTHONcourse +inf maxInt -inf minInx 正無窮大,負無窮大 10 11 2》ZSCORE :得到集合中的指定元素的分數 12 13 語法:ZSCORE key member 14 15 3》ZRANGE :按照元素分數從小到大的順序返回指定索引start到stop之間的全部元素(包含兩端) 16 17 語法:ZRANGE key start stop WITHSCORES 帶分數 18 19 注意:當元素的兩個元素的分數相同的時候,redis在排序按照字典的順序排列 20 21 4》ZREVRANGE: 和ZRANGE 相反,按照從大到小的順序返回 22 23 語法:ZREVRANGE key start stop [WITHSCORES] 24 25 5》ZRANGEBYSCORE :獲取指定分數範圍內的元素,按照從小到大的順序,返回的是分數在指定的min到max之間 26 27 語法:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] 28 29 得到分數80~90之間的全部元素 30 31 ZRANGEBYSCORE PYTHONcourse 80 90 32 33 ZRANGEBYSCORE PYTHONcourse 80 (90 不包含90 34 35 注意:經過左括號表明不包含端點 36 37 6:ZREVRANGEBYSCORE 和上面的相反 38 39 語法:ZREVRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] 40 41 7》ZINCRBY : 操做某個元素的分數,返回操做以後的分數 42 43 語法:ZINCRBY key increment member 44 45 8》ZCARD :得到集合中元素的數量 46 47 語法:ZCARD key 48 49 9》ZCOUNT 得到指定分數內元素的數量 50 51 語法:ZCOUNT PYTHONcourse 80 90 52 53 10》ZREM :刪除一個或多個元素,返回刪除元素的個數 54 55 語法:ZREM key member ... 56 57 11》ZREMRANGEBYRANK: 按照排名範圍刪除元素,按照分數從小到大的順序刪除全部指定的排名範圍內的全部元素 58 59 語法:ZREMRANGEBYRANK key start stop 60 61 12》ZREMRANGEBYSCORE:按照分數範圍內刪除元素 62 63 語法:ZREMRANGEBYSCORE key min max 64 65 13》ZRANK :得到指定元素的排名,根據分數從小到大的順序 66 67 語法:ZRANK key member 68 69 14》ZREVRANK:得到指定元素的排名,根據分數從大到小的順序 70 71 語法:ZREVRANK key member 72 73 15》ZINTERSTORE: 計算有序集合的交集,並將結果保存起來 74 75 語法:ZINTERSTORE destination numkeys key key ... WEIGHTS weight weight AGGREGATE [SUM|MIN|MAX] 76 77 ZADD testSortedSet1 1 a 2 b 3 c 78 79 ZADD testSortedSet2 10 a 20 b 30 c 80 81 ZINTERSTORE testSorted1 2 testSortedSet1 testSortedSet2 82 83 ZRANGE testSorted1 0 -1 WITHSCORES 84 85 ZINTERSTORE testSorted2 2 testSortedSet1 testSortedSet2 AGGREGATE SUM 86 87 ZINTERSTORE testSorted3 2 testSortedSet1 testSortedSet2 AGGREGATE MIN 88 89 16》ZUNIONSTORE :計算有序集合並集,並將結果保存起來 90 91 語法:ZUNIONSTORE destination numkeys key key ... WEIGHTS weight weight AGGREGATE [SUM|MIN|MAX]
未完待續。。。。。。
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。RabbitMQ能夠,多個程序同時使用RabbitMQ ,可是必須隊列名稱不同。採用erlang語言,屬於愛立信公司開發的。
消息中間件 --->就是消息隊列
異步方式:不須要立馬獲得結果,須要排隊
同步方式:須要實時得到數據,堅定不能排隊
subprocess 的Q也提供不一樣進程之間的溝通
應用場景:
電商秒殺活動
搶購小米手機
堡壘機批量發送文件
1、安裝erlang 依賴包: yum -y install gcc ncurses ncurses-base ncurses-devel ncurses-libs ncurses-static ncurses-term ocaml-curses ocaml-curses-devel openssl-devel zlib-devel openssl-devel perl xz xmlto kernel-devel m4 這是一行 一、下載otp_src_19.3.tar.gz 二、tar xvf otp_src_19.3.tar.gz 三、cd opt_src_19.3.tar.gz 四、./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac 五、make && make install 五、配置erlang環境: vi /etc/profile export PATH=$PATH:/usr/local/erlang/bin source /etc/profile # 環境變量重啓生效 2、安裝rabbitmq 一、下載rabbitmq-server-generic-unix-3.6.5.tar.xz 二、tar xvf rabbitmq-server-generic-unix-3.6.5.tar.xz 三、mv rabbitmq_server-3.6.5/ /usr/local/rabbitmq 四、啓動: #啓動rabbitmq服務 /usr/local/rabbitmq/sbin/rabbitmq-server #後臺啓動 /usr/local/rabbitmq/sbin/rabbitmq-server -detached #關閉rabbitmq服務 /usr/local/rabbitmq/sbin/rabbitmqctl stop 或 ps -ef | grep rabbit 和 kill -9 xxx 殺死服務 #開啓插件管理頁面 /usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management #建立用戶 /usr/local/rabbitmq/sbin/rabbitmqctl add_user rabbitadmin 123456 /usr/local/rabbitmq/sbin/rabbitmqctl set_user_tags rabbitadmin administrator ./rabbitmqctl set_permissions -p / rabbitadmin ".*" ".*" ".*" 爲這個用戶受權 五、登陸 #WEB登陸 http://10.10.3.63:15672 本身的IP地址 用戶名:rabbitadmin 密碼:123456
sender
import pika
# 認證
credentials = pika.PlainCredentials('rabbitadmin', '123456') # 必定要認證
# 鏈接這臺機器
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.14.38',credentials=credentials)) #主機IP 和驗證
channel = connection.channel() # 創建了rabbitmq的協議通道
# 聲明queue隊列
channel.queue_declare(queue='hello')
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
# 發送消息
channel.basic_publish(exchange='', exchange表示交換器,能精確指定消息應該發送到哪一個隊列,
routing_key='hello', #
body='Hello World!')#routing_key設置爲隊列的名稱,body就是發送的內容
print(" [x] Sent 'Hello World!'")
connection.close()
import pika import time credentials = pika.PlainCredentials('rabbitadmin', '123456') # 鏈接這臺機器 connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.14.38',credentials=credentials)) channel = connection.channel() # 創建了rabbitmq的協議通道 channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print("received msg...start processing....",body) time.sleep(20) print(" [x] msg process done....",body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少。
消息提供者代碼
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 聲明queue channel.queue_declare(queue='task_queue') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. import sys message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent %r" % message) connection.close()
import pika, time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(20) print(" [x] Done") print("method.delivery_tag",method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='task_queue', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
此時,先啓動消息生產者,而後再分別啓動3個消費者,經過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上
雖然有了消息反饋機制,可是若是rabbitmq自身掛掉的話,那麼任務仍是會丟失。因此須要將任務持久化存儲起來。聲明持久化存儲:
將隊列(Queue)與消息(Message)都設置爲可持久化的(durable),這樣能夠保證絕大部分狀況下咱們的RabbitMQ消息不會丟失。但依然解決不了小几率丟失事件的發生(好比RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),若是須要對這種小几率事件也要管理起來,那麼要用到事務。因爲這裏僅爲RabbitMQ的簡單介紹,因此不講解RabbitMQ相關的事務。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 創建了rabbit 協議的通道 # durable=True 聲明持久化存儲 channel.queue_declare(queue='task_queue', durable=True) channel.basic_publish(exchange='', routing_key='task_queue', body='Hello World!', # 在發送任務的時候,用delivery_mode=2來標記消息爲持久化存儲 properties=pika.BasicProperties( delivery_mode=2, )) print(" [x] Sent 'Hello World!'") connection.close() sender.py
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) def callback(ch, method, properties, body): print("received msg...start processing....",body) time.sleep(20) print(" [x] msg process done....", body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume( callback, queue='task_queue', no_ack=False # 默認爲False ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
channel.basic_qos(prefetch_count
=
1
)
帶消息持久化+公平分發的完整代碼
生產者端
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消費者端
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,
交流是一個很是簡單的事情。一方面它收到消息從生產者和另外一邊推他們隊列。交換必須知道如何處理接收到的消息。應該是附加到一個特定的隊列嗎?應該是附加到多隊列?或者應該丟棄。交換的規則定義的類型。
Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
表達式符號說明:#表明一個或多個字符,*表明任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout
headers: 經過headers 來決定把消息發給哪些queue
消息publisher
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
消息subscriber
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.
In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).
That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.
topi: 意思是話題
To receive all the logs run:
python receive_logs_topic.py "#" #綁定#號,就是收全部消息,至關於廣播
To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*" #以kern開頭
Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical" #以critical結尾
You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical" #收kern開頭而且以critical結尾(至關於收兩個)
And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error" #發消息到kern.critical裏,內容是:
A critical kernel error
示例:
rabbit_topic_send.py (生產者是發送端)
1 import pika 2 import sys 3 4 credentials = pika.PlainCredentials('nulige', '123456') 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='192.168.1.118',credentials=credentials)) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='topic_logs',type='topic') #指定類型 11 12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 13 14 message = ' '.join(sys.argv[2:]) or 'Hello World!' #消息 15 16 channel.basic_publish(exchange='topic_logs', 17 routing_key=routing_key, 18 body=message) 19 print(" [x] Sent %r:%r" % (routing_key, message)) 20 connection.close()
rabbit_topic_recv.py (消費者是接收端)單向的
1 import pika 2 import sys 3 4 credentials = pika.PlainCredentials('nulige', '123456') 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='192.168.1.118',credentials=credentials)) 7 8 channel = connection.channel() 9 channel.exchange_declare(exchange='topic_logs',type='topic') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 binding_keys = sys.argv[1:] 15 if not binding_keys: 16 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 17 sys.exit(1) 18 19 for binding_key in binding_keys: 20 channel.queue_bind(exchange='topic_logs', 21 queue=queue_name, 22 routing_key=binding_key) 23 24 print(' [*] Waiting for logs. To exit press CTRL+C') 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" % (method.routing_key, body)) 28 29 channel.basic_consume(callback,queue=queue_name) 30 31 channel.start_consuming()
執行結果:
1 #接收端 2 D:\python\day42>python3 rabbit_topic_recv.py error 3 [*] Waiting for logs. To exit press CTRL+C 4 [x] 'error':b'mysql has error' 5 6 7 D:\python\day42>python3 rabbit_topic_recv.py *.warning mysql.* 8 [*] Waiting for logs. To exit press CTRL+C 9 [x] 'mysql.error':b'mysql has error' 10 11 12 D:\python\day42>python3 rabbit_topic_send.py mysql.info "mysql has error" 13 [x] Sent 'mysql.info':'mysql has error' 14 15 16 D:\python\day42>python3 rabbit_topic_recv.py *.error.* 17 [*] Waiting for logs. To exit press CTRL+C 18 [x] 'mysql.error.':b'mysql has error' 19 20 21 #發送端 指定類型:error 消息內容 22 D:\python\day42>python3 rabbit_topic_send.py error "mysql has error" 23 [x] Sent 'error':'mysql has error' 24 25 26 D:\python\day42>python3 rabbit_topic_send.py mysql.error "mysql has error" 27 [x] Sent 'mysql.error':'mysql has error' 28 [x] 'mysql.info':b'mysql has error' 29 30 31 D:\python\day42>python3 rabbit_topic_send.py mysql.error. "mysql has error" 32 [x] Sent 'mysql.error.':'mysql has error'
To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:
fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print("fib(4) is %r" % result)
應用場景:
示例:實現RPC服務功能
rabbit_rpc_send.py(生產者是發送端) 1 import pika 2 import uuid 3 4 class SSHRpcClient(object): 5 def __init__(self): 6 credentials = pika.PlainCredentials('nulige', '123456') 7 self.connection = pika.BlockingConnection(pika.ConnectionParameters( 8 host='192.168.1.118',credentials=credentials)) 9 10 self.channel = self.connection.channel() 11 12 result = self.channel.queue_declare(exclusive=True) #客戶端的結果必需要返回到這個queue 13 self.callback_queue = result.method.queue 14 15 self.channel.basic_consume(self.on_response,queue=self.callback_queue) #聲明從這個queue裏收結果 16 17 def on_response(self, ch, method, props, body): 18 if self.corr_id == props.correlation_id: #任務標識符 19 self.response = body 20 print(body) 21 22 # 返回的結果,放在callback_queue中 23 def call(self, n): 24 self.response = None 25 self.corr_id = str(uuid.uuid4()) #惟一標識符 26 self.channel.basic_publish(exchange='', 27 routing_key='rpc_queue3', #聲明一個Q 28 properties=pika.BasicProperties( 29 reply_to=self.callback_queue, 30 correlation_id=self.corr_id, 31 ), 32 body=str(n)) 33 34 print("start waiting for cmd result ") 35 count = 0 36 while self.response is None: #若是命令沒返回結果 37 print("loop ",count) 38 count +=1 39 self.connection.process_data_events() #以不阻塞的形式去檢測有沒有新事件 40 #若是沒事件,那就什麼也不作, 若是有事件,就觸發on_response事件 41 return self.response 42 43 ssh_rpc = SSHRpcClient() 44 45 print(" [x] sending cmd") 46 response = ssh_rpc.call("ipconfig") 47 48 print(" [.] Got result ") 49 print(response.decode("gbk"))
rabbit_rpc_recv.py(消費端是接收端) 1 import pika 2 import time 3 import subprocess 4 5 credentials = pika.PlainCredentials('nulige', '123456') 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host='192.168.1.118', credentials=credentials)) 8 9 channel = connection.channel() 10 channel.queue_declare(queue='rpc_queue3') 11 12 def SSHRPCServer(cmd): 13 14 print("recv cmd:",cmd) 15 cmd_obj = subprocess.Popen(cmd.decode(),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 16 17 result = cmd_obj.stdout.read() or cmd_obj.stderr.read() 18 return result 19 20 def on_request(ch, method, props, body): 21 22 print(" [.] fib(%s)" % body) 23 response = SSHRPCServer(body) 24 25 ch.basic_publish(exchange='', 26 routing_key=props.reply_to, 27 properties=pika.BasicProperties(correlation_id= \ 28 props.correlation_id), 29 body=response) 30 31 channel.basic_consume(on_request, queue='rpc_queue3') 32 print(" [x] Awaiting RPC requests") 33 channel.start_consuming()