1 import pika 2 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost')) 4 #聲明一個管道 5 channel = connection.channel() 6 # 聲明隊列 7 channel.queue_declare(queue='hello',durable=True)#durable=True使隊列持久化 8 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 9 channel.basic_publish(exchange='', #定義
routing_key='hello',
body='Hello World!',
properties=pike.BasicProperties(delivery_mode=2,abc='alex'))#delivery_mode=2使消息持久化 10 print(" [x] Sent 'Hello World!'") 11 connection.close()
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 4 channel = connection.channel() 5 #若是確認這個隊列聲明過了,則能夠不在此聲明。若是不肯定消費者或者生產者先運行,那麼能夠在二者中都進行聲明。 6 channel.queue_declare(queue='hello',durable=True)#durable=True使隊列持久化 7 8 def callback(ch, method, properties, body): 9 #ch是管道內存對象地址 10 #method是包含給接收消息者等屬性 11 print(ch,method,properties) 12 print(" [x] Received %r" % body)
#客戶端對服務端的消息進行手動確認
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)#實現廣播效果,消息公平分發 13 #消費消息 14 channel.basic_consume(callback,#若是收到消息,就調用callback函數來處理消息 15 queue='hello')#從哪個隊列裏接收消息17 18 print(' [*] Waiting for messages. To exit press CTRL+C') 19 #開始接收消息 20 channel.start_consuming()
生產者javascript
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange='direct_logs', 8 type='direct') 9 10 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 11 message = ' '.join(sys.argv[2:]) or 'Hello World!' 12 channel.basic_publish(exchange='direct_logs', 13 routing_key=severity, 14 body=message) 15 print(" [x] Sent %r:%r" % (severity, message)) 16 connection.close()
消費者 html
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel = connection.channel() 6 channel.exchange_declare(exchange='direct_logs', 7 type='direct') 8 9 result = channel.queue_declare(exclusive=True) 10 queue_name = result.method.queue 11 12 severities = sys.argv[1:] 13 if not severities: 14 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 15 sys.exit(1) 16 17 for severity in severities: 18 channel.queue_bind(exchange='direct_logs', 19 queue=queue_name, 20 routing_key=severity) 21 22 print(' [*] Waiting for logs. To exit press CTRL+C') 23 24 25 def callback(ch, method, properties, body): 26 print(" [x] %r:%r" % (method.routing_key, body)) 27 28 29 channel.basic_consume(callback, 30 queue=queue_name, 31 no_ack=True) 32 33 channel.start_consuming()
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 channel.exchange_declare(exchange='logs',#轉發器 8 type='fanout')#設置模式 9 10 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 11 channel.basic_publish(exchange='logs', 12 routing_key='', 13 body=message) 14 print(" [x] Sent %r" % message) 15 connection.close()
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 4 channel = connection.channel() 5 6 channel.exchange_declare(exchange='logs',#轉發器 7 type='fanout') 8 #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 9 result = channel.queue_declare(exclusive=True) # exclusive排他,惟一的 10 11 queue_name = result.method.queue #獲取隊列名字 12 13 channel.queue_bind(exchange='logs',queue=queue_name) #將隊列綁定到名爲logs的exchange 14 15 print(' [*] Waiting for logs. To exit press CTRL+C') 16 17 def callback(ch, method, properties, body): 18 print(" [x] %r" % body) 19 20 channel.basic_consume(callback,queue=queue_name,no_ack=True) 21 channel.start_consuming()
生產者java
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel = connection.channel() 6 channel.exchange_declare(exchange='topic_logs', 7 type='topic') 8 9 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 10 message = ' '.join(sys.argv[2:]) or 'Hello World!' 11 channel.basic_publish(exchange='topic_logs', 12 routing_key=routing_key, 13 body=message) 14 print(" [x] Sent %r:%r" % (routing_key, message)) 15 connection.close()
消費者 python
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel = connection.channel() 6 channel.exchange_declare(exchange='topic_logs', 7 type='topic') 8 9 result = channel.queue_declare(exclusive=True) 10 queue_name = result.method.queue 11 12 binding_keys = sys.argv[1:] 13 # #號 表明收全部消息 14 # "kern.*" 表明收以kern開頭的消息 15 # "*.critical" 表明收全部critical結尾的消息 16 # "kern.*" "*.critical" 同時收以上二者 17 if not binding_keys: 18 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 19 sys.exit(1) 20 21 for binding_key in binding_keys: 22 channel.queue_bind(exchange='topic_logs', 23 queue=queue_name, 24 routing_key=binding_key) 25 26 print(' [*] Waiting for logs. To exit press CTRL+C') 27 28 29 def callback(ch, method, properties, body): 30 print(" [x] %r:%r" % (method.routing_key, body)) 31 32 33 channel.basic_consume(callback, 34 queue=queue_name, 35 no_ack=True) 36 37 channel.start_consuming()
1 import redis 2 #>>>>>>>>>>>>>>>>>>>>>鏈接方法1 3 #鏈接 4 r = redis.Redis(host='10.211.55.4', port=6379) 5 #存儲值 6 r.set('foo', 'Bar') 7 #獲取值 8 print(r.get('foo')) 9 10 #>>>>>>>>>>>>>>>>>>>>>鏈接方法2 11 #建立鏈接池 12 pool=redis.ConnectionPool(host='10.211.55.4', port=6379) 13 #鏈接 14 r=redis.Redis(connection_pool=pool) 15 #存儲值 16 r.set('foo', 'Bar') 17 #獲取值 18 print(r.get('foo'))
在Redis中設置值,不存在則建立,存在則修改
set(name, value, ex=None, px=None, nx=False, xx=False)
ex,過時時間(秒)
px,過時時間(毫秒)
nx,若是設置爲True,則只有name不存在時,當前set操做才執行
xx,若是設置爲True,則只有name存在時,當前set操做才執行
設置值,只有name不存在時,執行設置操做(添加)
time,過時時間(數字秒 或 timedelta對象)
time_ms,過時時間(數字毫秒 或 timedelta對象)
批量設置值
mset(k1=
'v1'
, k2=
'v2'
)或者mget({
'k1'
:
'v1'
,
'k2'
:
'v2'
})
獲取值
批量獲取 mget(
'ylr'
,
'wupeiqi'
)或者r.mget([
'ylr'
,
'wupeiqi'
])
設置新值並獲取返回原來的值(這個須要以前存在)
獲取子序列(根據字節獲取,非字符),能夠對字符串進行切片
start,起始位置(字節);end,結束位置(字節)
修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加)
offset,字符串的索引,字節(一個漢字三個字節)
value,要設置的值
對name對應值的二進制表示的位進行操做
offset,位的索引(將值變換成二進制後再進行索引)
value,值只能是 1 或 0
獲取name對應的值的二進制表示中的某位的值 (0或1)
獲取name對應的值的二進制表示中 1 的個數
start,位起始位置; end,位結束位置
返回name對應值的字節長度(一個漢字3個字節)
自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增
amount,自增數(必須是整數)
自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增
amount,自增數(浮點型)
自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減
amount,自減數(整數)
在redis name對應的值後面追加內容
value, 要追加的字符串
name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改)
key,name對應的hash中的key;value,name對應的hash中的value
hsetnx(name, key, value),當name對應的hash中不存在當前key時則建立(至關於添加)
在name對應的hash中批量設置鍵值對
mapping,字典,如:{'k1':'v1', 'k2': 'v2'},例如hmset('xx', {'k1':'v1', 'k2': 'v2'})
在name對應的hash中獲取根據key獲取value
在name對應的hash中獲取多個key的值
keys,要獲取key集合,如:['k1', 'k2', 'k3'];*args,要獲取的key,如:k1,k2,k3,例如r.hmget('xx', ['k1', 'k2'])或者r.hmget('xx', 'k1', 'k2')
獲取name對應
hash
的全部鍵值
獲取name對應的hash中鍵值對的個數
獲取name對應的hash中全部的key的值
獲取name對應的hash中全部的value的值
檢查name對應的hash是否存在當前傳入的key
將name對應的hash中指定key的鍵值對刪除
自增name對應的hash中的指定key的值,不存在則建立key=amount
自增name對應的hash中的指定key的值,不存在則建立key=amount
增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而防止內存溢出
cursor,遊標(基於遊標分批取獲取數據);match,匹配指定key,默認None 表示全部的key;count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
利用yield封裝hscan建立生成器,實現分批去redis中獲取數據
match,匹配指定key,默認None 表示全部的key;count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊
r.lpush('oo', 11,22,33),保存順序爲: 33,22,11
rpush(name, values) 表示從右向左操做
在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊
rpushx(name, value) 表示從右向左操做
name對應的list元素的個數
在name對應的列表的某一個值前或後插入一個新值
where,BEFORE或AFTER;refvalue,標杆值,即:在它先後插入數據;value,要插入的數據
對name對應的list中的某一個索引位置從新賦值
index,list的索引位置;value,要設置的值
在name對應的list中刪除指定的值
value,要刪除的值;num=0,刪除列表中全部的指定值;num=2,從前到後,刪除2個;
num=-2,從後向前,刪除2個
在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素
rpop(name) 表示從右向左操做
在name對應的列表中根據索引獲取列表元素
在name對應的列表分片獲取數據
在name對應的列表中移除沒有在start-end索引之間的值
從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊
將多個列表排列,按照從左到右去pop對應列表的元素
timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
brpop(keys, timeout),從右向左獲取數據
從一個列表的右側移除一個元素並將其添加到另外一個列表的左側
name對應的集合中添加元素
獲取name對應的集合中元素個數
在第一個name對應的集合中且不在其餘name對應的集合的元素集合
獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中
獲取多一個name對應集合的並集
獲取多一個name對應集合的並集,再講其加入到dest對應的集合中
檢查value是不是name對應的集合的成員
獲取name對應的集合的全部成員
將某個成員從一個集合中移動到另一個集合
從集合的右側(尾部)移除一個成員,並將其返回
從name對應的集合中隨機獲取 numbers 個元素
在name對應的集合中刪除某些值
獲取多一個name對應的集合的並集
獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中
同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
有序集合,在集合的基礎上,爲每元素排序;元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。
在name對應的有序集合中添加元素,例如:zadd('zz', 'n1', 1, 'n2', 2)或zadd('zz', n1=11, n2=22)
獲取name對應的有序集合元素的數量
獲取name對應的有序集合中分數 在 [min,max] 之間的個數
自增name對應的有序集合的 name 對應的分數
按照索引範圍獲取name對應的有序集合的元素
start,有序集合索引發始位置(非分數);end,有序集合索引結束位置(非分數);desc,排序規則,默認按照分數從小到大排序;withscores,是否獲取元素的分數,默認只獲取元素的值;score_cast_func,對分數進行數據轉換的函數
從大到小排序:zrevrange(name, start, end, withscores=False, score_cast_func=float)
按照分數範圍獲取name對應的有序集合的元素:zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
從大到小排序:zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
獲取某個值在 name對應的有序集合中的排行(從 0 開始)
zrevrank(name, value),從大到小排序
刪除name對應的有序集合中值是values的成員
根據排行範圍刪除
根據分數範圍刪除
獲取name對應有序集合中 value 對應的分數
獲取兩個有序集合的交集,若是遇到相同值不一樣分數,則按照aggregate進行操做
aggregate的值爲: SUM MIN MAX
獲取兩個有序集合的並集,若是遇到相同值不一樣分數,則按照aggregate進行操做
同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做
根據刪除redis中的任意數據類型
檢測redis的name是否存在
根據模型獲取redis的name
KEYS * 匹配數據庫中全部 key;KEYS h?llo 匹配 hello , hallo 和 hxllo 等;KEYS h*llo 匹配 hllo 和 heeeeello 等;KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
爲某個redis的某個name設置超時時間
對redis的name重命名爲
將redis的某個值移動到指定的db下
隨機獲取一個redis的name(不刪除)
獲取name對應值的類型
同字符串操做,用於增量迭代獲取key
1 import redis,time 2 #鏈接redis 3 pool = redis.ConnectionPool(host='10.211.55.4', port=6379) 4 r = redis.Redis(connection_pool=pool) 5 6 #啓動管道 7 pipe = r.pipeline() #或者pipe=r.pipeline() 8 9 #執行命令,兩條命令一塊兒執行 10 pipe.set('name', 'alex')
time.sleep(30) 11 pipe.set('role', 'sb') 12 13 pipe.execute()transaction=True
1 #redishelper.py 2 import redis 3 4 class RedisHelper: 5 def __init__(self): 6 self.__conn = redis.Redis(host='10.211.55.4') 7 self.chan_sub = 'fm104.5' 8 self.chan_pub = 'fm104.5' 9 10 def public(self, msg):#發佈者 11 self.__conn.publish(self.chan_pub, msg) 12 return True 13 14 def subscribe(self):#訂閱者 15 pub = self.__conn.pubsub()#打開收音機 16 pub.subscribe(self.chan_sub)#調頻道 17 pub.parse_response()#準備接收 18 return pub
1 #訂閱者.py 2 from redishelper import RedisHelper 3 4 obj = RedisHelper() 5 redis_sub = obj.subscribe() 6 7 while True: 8 msg = redis_sub.parse_response() 9 print(msg)
1 #發佈者.py 2 from redishelper import RedisHelper 3 obj = RedisHelper() 4 obj.public('hello')
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 4 channel = connection.channel() 5 channel.queue_declare(queue='rpc_queue') 6 7 def fib(n): #求斐波那契數列 8 if n == 0: 9 return 0 10 elif n == 1: 11 return 1 12 else: 13 return fib(n - 1) + fib(n - 2) 14 15 16 def on_request(ch, method, props, body): 17 n = int(body) 18 print(" [.] fib(%s)" % n) 19 response = fib(n) 20 ch.basic_publish(exchange='', 21 routing_key=props.reply_to, 22 properties=pika.BasicProperties(correlation_id=props.correlation_id), 23 body=str(response)) 24 ch.basic_ack(delivery_tag=method.delivery_tag) 25 26 channel.basic_qos(prefetch_count=1) 27 channel.basic_consume(on_request, queue='rpc_queue') 28 print(" [x] Awaiting RPC requests") 29 channel.start_consuming()
1 import pika,uuid 2 3 class FibonacciRpcClient(object): 4 def __init__(self): 5 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 6 self.channel = self.connection.channel() 7 result = self.channel.queue_declare(exclusive=True) 8 self.callback_queue = result.method.queue 9 self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue) 10 11 def on_response(self, ch, method, props, body): 12 if self.corr_id == props.correlation_id: 13 self.response = body 14 15 def call(self, n): 16 self.response = None 17 self.corr_id = str(uuid.uuid4()) 18 self.channel.basic_publish(exchange='', 19 routing_key='rpc_queue', 20 properties=pika.BasicProperties( 21 reply_to=self.callback_queue, 22 correlation_id=self.corr_id, 23 ), 24 body=str(n)) 25 while self.response is None: 26 self.connection.process_data_events() 27 return int(self.response) 28 29 30 fibonacci_rpc = FibonacciRpcClient() 31 print(" [x] Requesting fib(30)") 32 response = fibonacci_rpc.call(30) 33 print(" [.] Got %r" % response)