<!DOCTYPE html>css
<html> <head> <title>RabbitMQRedisMysql</title> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <style type="text/css"> /* GitHub stylesheet for MarkdownPad (http://markdownpad.com) */ /* Author: Nicolas Hery - http://nicolashery.com */ /* Version: b13fe65ca28d2e568c6ed5d7f06581183df8f2ff */ /* Source: https://github.com/nicolahery/markdownpad-github */html
/* RESET =============================================================================*/python
html, body, div, span, applet, object, iframe, h1, h2, h3, h4, h5, h6, p, blockquote, pre, a, abbr, acronym, address, big, cite, code, del, dfn, em, img, ins, kbd, q, s, samp, small, strike, strong, sub, sup, tt, var, b, u, i, center, dl, dt, dd, ol, ul, li, fieldset, form, label, legend, table, caption, tbody, tfoot, thead, tr, th, td, article, aside, canvas, details, embed, figure, figcaption, footer, header, hgroup, menu, nav, output, ruby, section, summary, time, mark, audio, video { margin: 0; padding: 0; border: 0; }mysql
/* BODY =============================================================================*/git
body { font-family: Helvetica, arial, freesans, clean, sans-serif; font-size: 14px; line-height: 1.6; color: #333; background-color: #fff; padding: 20px; max-width: 960px; margin: 0 auto; }github
body>*:first-child { margin-top: 0 !important; }web
body>*:last-child { margin-bottom: 0 !important; }redis
/* BLOCKS =============================================================================*/sql
p, blockquote, ul, ol, dl, table, pre { margin: 15px 0; }mongodb
/* HEADERS =============================================================================*/
h1, h2, h3, h4, h5, h6 { margin: 20px 0 10px; padding: 0; font-weight: bold; -webkit-font-smoothing: antialiased; }
h1 tt, h1 code, h2 tt, h2 code, h3 tt, h3 code, h4 tt, h4 code, h5 tt, h5 code, h6 tt, h6 code { font-size: inherit; }
h1 { font-size: 28px; color: #000; }
h2 { font-size: 24px; border-bottom: 1px solid #ccc; color: #000; }
h3 { font-size: 18px; }
h4 { font-size: 16px; }
h5 { font-size: 14px; }
h6 { color: #777; font-size: 14px; }
body>h2:first-child, body>h1:first-child, body>h1:first-child+h2, body>h3:first-child, body>h4:first-child, body>h5:first-child, body>h6:first-child { margin-top: 0; padding-top: 0; }
a:first-child h1, a:first-child h2, a:first-child h3, a:first-child h4, a:first-child h5, a:first-child h6 { margin-top: 0; padding-top: 0; }
h1+p, h2+p, h3+p, h4+p, h5+p, h6+p { margin-top: 10px; }
/* LINKS =============================================================================*/
a { color: #4183C4; text-decoration: none; }
a:hover { text-decoration: underline; }
/* LISTS =============================================================================*/
ul, ol { padding-left: 30px; }
ul li > :first-child, ol li > :first-child, ul li ul:first-of-type, ol li ol:first-of-type, ul li ol:first-of-type, ol li ul:first-of-type { margin-top: 0px; }
ul ul, ul ol, ol ol, ol ul { margin-bottom: 0; }
dl { padding: 0; }
dl dt { font-size: 14px; font-weight: bold; font-style: italic; padding: 0; margin: 15px 0 5px; }
dl dt:first-child { padding: 0; }
dl dt>:first-child { margin-top: 0px; }
dl dt>:last-child { margin-bottom: 0px; }
dl dd { margin: 0 0 15px; padding: 0 15px; }
dl dd>:first-child { margin-top: 0px; }
dl dd>:last-child { margin-bottom: 0px; }
/* CODE =============================================================================*/
pre, code, tt { font-size: 12px; font-family: Consolas, "Liberation Mono", Courier, monospace; }
code, tt { margin: 0 0px; padding: 0px 0px; white-space: nowrap; border: 1px solid #eaeaea; background-color: #f8f8f8; border-radius: 3px; }
pre>code { margin: 0; padding: 0; white-space: pre; border: none; background: transparent; }
pre { background-color: #f8f8f8; border: 1px solid #ccc; font-size: 13px; line-height: 19px; overflow: auto; padding: 6px 10px; border-radius: 3px; }
pre code, pre tt { background-color: transparent; border: none; }
kbd { -moz-border-bottom-colors: none; -moz-border-left-colors: none; -moz-border-right-colors: none; -moz-border-top-colors: none; background-color: #DDDDDD; background-image: linear-gradient(#F1F1F1, #DDDDDD); background-repeat: repeat-x; border-color: #DDDDDD #CCCCCC #CCCCCC #DDDDDD; border-image: none; border-radius: 2px 2px 2px 2px; border-style: solid; border-width: 1px; font-family: "Helvetica Neue",Helvetica,Arial,sans-serif; line-height: 10px; padding: 1px 4px; }
/* QUOTES =============================================================================*/
blockquote { border-left: 4px solid #DDD; padding: 0 15px; color: #777; }
blockquote>:first-child { margin-top: 0px; }
blockquote>:last-child { margin-bottom: 0px; }
/* HORIZONTAL RULES =============================================================================*/
hr { clear: both; margin: 15px 0; height: 0px; overflow: hidden; border: none; background: transparent; border-bottom: 4px solid #ddd; padding: 0; }
/* TABLES =============================================================================*/
table th { font-weight: bold; }
table th, table td { border: 1px solid #ccc; padding: 6px 13px; }
table tr { border-top: 1px solid #ccc; background-color: #fff; }
table tr:nth-child(2n) { background-color: #f8f8f8; }
/* IMAGES =============================================================================*/
img { max-width: 100% } </style>
</head> <body> <p>RabbitMQ 消息隊列<br /> python裏有threading QUEUE 只用於線程間交互,進程QUEUE 用於父進程與子進程或者是兄弟進程<br /> RabbitMQ採用消息輪詢的方式發送消息。一個一個的給每一個消費者<br /> 應用之間使用socket實現數據共享<br /> 連接幾個應用的中間商著名的有:<br /> 1. RabbitMQ 2. ZeroMQ 3. ActiveMQ<br /> RabbitMQ使用<br /> 生產者:<br /> 1. 引用pika模塊<br /> <code>import pika</code><br /> 2. 創建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 聲明一個管道<br /> <code>channel=connection.channel()</code><br /> 4. 在管道中聲明隊列<br /> <code>channel.queue_declare(queue='隊列名')</code><br /> 5. 經過管道發送消息 rounting<em>key就是隊列名字 <br /> <code>channel.basic_publish(exchange='',routing_key='隊列名',body='Hello World!')</code><br /> 6. 關閉隊列,不用關閉管道<br /> <code>connection.close()</code> <br /> 消費者(多是其餘機器,能夠跨機器) 1. 引用pika模塊<br /> <code>import pika</code><br /> 2. 創建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 聲明一個管道<br /> <code>channel=connection.channel()</code><br /> 4. 在管道中聲明隊列<br /> <code>channel.queue_declare(queue='隊列名')</code><br /> 5. 定義函數,標準的處理消息的函數都會帶下面四個參數ch管道聲明的對象地址,method包含你要發消息給誰的信息,properties包含發消息端的設置信息<br /> <code>def callback(ch,method,properties,body): print(ch,method,properties,body) print("[x] Received %r"%body) ch.basic_ack(delivery_tag=method.delivery_tag)手動確認消息處理完,否則消息一直不銷燬</code><br /> 6. 消費消息,定義函數的目的,若是收到消息,就用定義的函數處理消息no</em>ack參數消息確認,當爲True時消息不等消費者確認消息隊列就銷燬消息,爲False時須要等待消費者處理完消息的確認消息隊消息隊列才銷燬消息(判斷的是socket是否斷開)<br /> <code>channel.basic_consume('定義的函數名',queue='隊列名',no_ack=True)</code><br /> 7. 啓動管道接收消息,啓動後一直處於開啓狀態,沒有消息就等待。<br /> <code>channel.start_consuming()</code> </p> <h3>RabbitMQ消息分發輪詢</h3> <p>採用輪詢的方式,依次的發給每一個消費者<br /> 生產者會等待消費者肯定處理完消息的回覆纔會銷燬消息。<br /> 當消息執行到一半的時候,消費者斷開,消息會保留髮送給下一個消費者處理 </p> <h3>消息持久化</h3> <p>消息必須等消費者手動肯定後,才銷燬ch.basic<em>ack(delivery</em>tag=method.delivery_tag)手動確認消息處理完,否則消息一直不銷燬<br /> 當RabbitMQ服務中止,服務裏的消息隊列會銷燬。<br /> 若是想保持消息隊列的持久化,必須在聲明隊列的時候設置,durable=True。這樣當RabbitMQ服務斷開再重啓,消息隊列仍是存在,消息會銷燬<br /> <code>channel.queue_declare(queue='隊列名',durable=True)</code> 消息也持久化<br /> <code>channel.basic_publish(exchange='',routing_key='隊列名',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))</code> </p> <h3>廣播模式</h3> <p>消費者端加channel.basic<em>qos(prefetch</em>count=1),加過這句話實現了,不是按順序分發,而是看哪個是空閒的,才分發給空閒的消費者消息。多大本事幹多少活。<br /> 廣播是生產者發消息,全部消費者都收到。<br /> 用exchange實現廣播。 fanout:全部bind到此exchange的queue均可以接收消息<br /> direct:經過routingkey和exchange決定的那個惟一的queue能夠接收消息<br /> topic:全部符合routingkey(此時能夠是一個表達式)的routingkey所bind的queue能夠接受消息。 </p> <h5>fanout純廣播</h5> <p>設置管道的時候設置<br /> <code>channel.exchange_declare(exchange='logs',type='fanout')</code><br /> 不聲明queue<br /> 生產者:<br /> 1. 引用pika模塊<br /> <code>import pika</code><br /> 2. 創建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 聲明一個管道<br /> <code>channel=connection.channel()</code><br /> 4. 設置管道的時候設置<br /> <code>channel.exchange_declare(exchange='logs',exchange_type='fanout')</code> <br /> 5. 經過管道發送消息,廣播不須要管道名<br /> <code>channel.basic_publish(exchange='logs',routing_key='',body='Hello World!')</code><br /> 6. 關閉隊列,不用關閉管道<br /> <code>connection.close()</code> <br /> 消費者(多是其餘機器,能夠跨機器) 1. 引用pika模塊<br /> <code>import pika</code><br /> 2. 創建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 聲明一個管道<br /> <code>channel=connection.channel()</code><br /> 4. 設置管道的時候設置<br /> <code>channel.exchange_declare(exchange='logs',exchange_type='fanout')</code> 5. 生成隨機queue與exchange轉發器綁定。 ``` result=channel.queue<em>declare(exclusive=True)exclusive排他的,單獨的,生成隨街queue,綁定再exchange上 queue</em>name=result.method.queue<br /> channel.queue<em>bind(exchange='logs',queue=queue</em>name)綁定轉發器,接受轉發器裏的消息,exchange與隨機生成的queue綁定 </p> <p><code>6. 定義函數,標準的處理消息的函數都會帶下面四個參數ch管道聲明的對象地址,method包含你要發消息給誰的信息,properties</code> def callback(ch,method,properties,body): print(ch,method,properties,body) print("[x] Received %r"%body)<br /> ch.basic<em>ack(delivery</em>tag=method.delivery<em>tag)手動確認消息處理完,否則消息一直不銷燬 ``` 7. 消費消息,定義函數的目的,若是收到消息,就用定義的函數處理消息no</em>ack參數消息確認,當爲True時消息不等消費者確認消息隊列就銷燬消息,爲False時須要等待消費者處理完消息的確認消息隊消息隊列才銷燬消息(判斷的是socket是否斷開)<br /> <code>channel.basic_consume('定義的函數名',queue='隊列名',no_ack=True)</code><br /> 8. 啓動管道接收消息,啓動後一直處於開啓狀態,沒有消息就等待。<br /> <code>channel.start_consuming()</code> </p> <h5>direct廣播 info warning error 劃分消息</h5> <p>生產者:<br /> 1. 引用pika模塊<br /> <code>import pika</code><br /> 2. 創建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 聲明一個管道<br /> <code>channel=connection.channel()</code><br /> 4. 設置管道的時候設置<br /> <code>channel.exchange_declare(exchange='direct_logs',exchange_type='direct')</code> 5. 接受分發消息的級別<br /> <code>severity=sys.argv[1] if len(sys.argv)>1 else 'info' message=' '.join(sys.argv[2:]) or 'hello world!'</code> 5. 經過管道發送消息,廣播不須要管道名<br /> <code>channel.basic_publish(exchange='direct_logs', routing_key=severity,#相似指定queue body=message)</code><br /> 6. 關閉隊列,不用關閉管道<br /> <code>connection.close()</code><br /> 消費者<br /> 1. 引用pika模塊<br /> <code>import pika</code><br /> 2. 創建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 聲明一個管道<br /> <code>channel=connection.channel()</code><br /> 4. 設置管道的時候設置<br /> <code>channel.exchange_declare(exchange='direct_logs',exchange_type='direct')</code> 5. 生成隨機queue與exchange轉發器綁定。 ``` result=channel.queue<em>declare(exclusive=True)exclusive排他的,單獨的,生成隨街queue,綁定再exchange上 queue</em>name=result.method.queue<br /> severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1)</p> <p>for severity in severities: channel.queue<em>bind(exchange='direct</em>logs',queue=queue<em>name,routing</em>key=severity)</p> <p><code>6. 定義函數,標準的處理消息的函數都會帶下面四個參數ch管道聲明的對象地址,method包含你要發消息給誰的信息,properties</code> def callback(ch,method,properties,body): print(ch,method,properties,body) print("[x] Received %r"%body)<br /> ch.basic<em>ack(delivery</em>tag=method.delivery<em>tag)手動確認消息處理完,否則消息一直不銷燬 ``` 7. 消費消息,定義函數的目的,若是收到消息,就用定義的函數處理消息no</em>ack參數消息確認,當爲True時消息不等消費者確認消息隊列就銷燬消息,爲False時須要等待消費者處理完消息的確認消息隊消息隊列才銷燬消息(判斷的是socket是否斷開)<br /> <code>channel.basic_consume('定義的函數名',queue='隊列名',no_ack=True)</code><br /> 8. 啓動管道接收消息,啓動後一直處於開啓狀態,沒有消息就等待。<br /> <code>channel.start_consuming()</code> </p> <h5>topic廣播</h5> <p>更細緻的消息過濾,包括應用程序,#收全部消息,<em>.info接受帶有.info的消息,mysql.</em>接受帶mysql的消息 <br /> 生產者<br /> ``` import pika import sys</p> <p>connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()</p> <p>channel.exchange<em>declare(exchange='topic</em>logs', type='topic')</p> <p>routing<em>key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic</em>publish(exchange='topic<em>logs', routing<em>key=routing</em>key, body=message) print(" [x] Sent %r:%r" % (routing</em>key, message)) connection.close() <code>消費者</code> import pika import sys</p> <p>connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()</p> <p>channel.exchange<em>declare(exchange='topic</em>logs', type='topic')</p> <p>result = channel.queue<em>declare(exclusive=True) queue</em>name = result.method.queue</p> <p>binding<em>keys = sys.argv[1:] if not binding</em>keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1)</p> <p>for binding<em>key in binding</em>keys: channel.queue<em>bind(exchange='topic<em>logs', queue=queue</em>name, routing</em>key=binding_key)</p> <p>print(' [*] Waiting for logs. To exit press CTRL+C') </p> <p>def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) </p> <p>channel.basic<em>consume(callback, queue=queue</em>name, no_ack=True) </p> <p>channel.start_consuming() <br /> ```</p> <h3>RabbitMQ rpc(remote procedure call)遠程調用一個方法</h3> <p>即便生產者又是消費者 <br /> start<em>cosuming爲阻塞模式,rpc不用阻塞,rpc是執行一會這個再去執行另外一個。process</em>data_events()非阻塞方法,能收到消息就收,沒有消息不阻塞繼續往下執行<br /> 服務器端<br /> ```</p> <h1><em><em><em>coding:utf-8</em></em></em></h1> <p><strong>author</strong> = 'Alex Li' import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))</p> <p>channel = connection.channel()</p> <p>channel.queue<em>declare(queue='rpc</em>queue')</p> <p>def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2)</p> <p>def on_request(ch, method, props, body): n = int(body)</p> <pre><code>print(" [.] fib(%s)" % n)<br/> response = fib(n)<br/>
ch.basic_publish(exchange='',<br/> routing_key=props.reply_to,<br/> properties=pika.BasicProperties(correlation_id = <br/> props.correlation_id),<br/> body=str(response))<br/> ch.basic_ack(delivery_tag = method.delivery_tag)<br/> </code></pre><br/>
<p>channel.basic<em>qos(prefetch</em>count=1) channel.basic_consume(on<em>request, queue='rpc</em>queue')</p> <p>print(" [x] Awaiting RPC requests") channel.start_consuming() <code>客戶端</code> import pika import uuid</p> <p>class FibonacciRpcClient(object): def <strong>init</strong>(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))</p> <pre><code> self.channel = self.connection.channel()<br/> <br/> result = self.channel.queue_declare(exclusive=True)<br/> self.callback_queue = result.method.queue<br/>
self.channel.basic_consume(self.on_response, no_ack=True,<br/> queue=self.callback_queue)<br/>
def on_response(self, ch, method, props, body):<br/> if self.corr_id == props.correlation_id:<br/> self.response = body<br/>
def call(self, n):<br/> self.response = None<br/> self.corr_id = str(uuid.uuid4())<br/> self.channel.basic_publish(exchange='',<br/> routing_key='rpc_queue',<br/> properties=pika.BasicProperties(<br/> </code></pre>
<p>reply<em>to = self.callback</em>queue,#客戶端發送消息是,把接收返回消息的管道也告訴給服務器端 correlation<em>id = self.corr<em>id,#用於判斷服務器端返回的結果和個人請求是不是同一條。目的就是爲了客戶端能夠同時發兩條消息,當服務器端返回結果時,須要有判斷關於哪條請求的結果 ), body=str(n)) while self.response is None: self.connection.process</em>data</em>events() return int(self.response)</p> <p>fibonacci_rpc = FibonacciRpcClient()</p> <p>print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response) ``` </p> <h3>Redis</h3> <p>緩存中間商,用socket,例如:mongodb,redis,memcache<br /> * 鏈接方式<br /> * 鏈接池<br /> * 操做<br /> - String 操做<br /> - Hash 操做<br /> - List 操做<br /> - Set 操做 <br /> - Sort Set 操做<br /> * 管道<br /> * 發佈訂閱 </p> <h5>String操做</h5> <ol> <li> set(name, value, ex=None, px=None, nx=False, xx=False)<br /> 在Redis中設置值,默認,不存在則建立,存在則修改<br /> 參數:<br /> ex,過時時間(秒) <br /> px,過時時間(毫秒)<br /> nx,若是設置爲True,則只有name不存在時,當前set操做才執行<br /> xx,若是設置爲True,則只有name存在時,崗前set操做才執行
</li> <li>setnx(name, value) 設置值,只有name不存在時,執行設置操做(添加)</li> <li>setex(name, value, time) 設置值 參數:time,過時時間(數字秒 或 timedelta對象)</li> <li>psetex(name, time<em>ms, value) 設置值 參數:time</em>ms,過時時間(數字毫秒 或 timedelta對象) </li> <li>mset(*args, **kwargs) 批量設置值 如:mset(k1='v1', k2='v2')或mget({'k1': 'v1', 'k2': 'v2'}) </li> <li>get(name) 獲取值</li> <li>mget(keys, *args) 批量獲取 如:mget('ylr', 'wupeiqi')或r.mget(['ylr', 'wupeiqi']) </li> <li>getset(name, value) 設置新值並獲取原來的值</li> <li>getrange(key, start, end) 獲取子序列(根據字節獲取,非字符) 參數: name,Redis 的 name start,起始位置(字節) end,結束位置(字節) 如: "武沛齊" ,0-3表示 "武"</li> <li>setrange(name, offset, value) 修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加) 參數: offset,字符串的索引,字節(一個漢字三個字節) value,要設置的值 </li> <li>setbit(name, offset, value) BITCOUNT 統計二進制有多少個1 對name對應值的二進制表示的位進行操做 參數: name,redis的name offset,位的索引(將值變換成二進制後再進行索引) value,值只能是 1 或 0 注:若是在Redis中有一個對應: n1 = "foo", 那麼字符串foo的二進制表示爲:01100110 01101111 01101111 因此,若是執行 setbit('n1', 7, 1),則就會將第7位設置爲1, 那麼最終二進制則變成 01100111 01101111 01101111,即:"goo" 擴展,轉換二進制表示: source = "武沛齊" source = "foo" for i in source: num = ord(i) print bin(num).replace('b','') 特別的,若是source是漢字 "武沛齊"怎麼辦? 答:對於utf-8,每個漢字佔 3 個字節,那麼 "武沛齊" 則有 9個字節 對於漢字,for循環時候會按照 字節 迭代,那麼在迭代時,將每個字節轉換 十進制數,而後再將十進制數轉換成二進制 11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000 -------------------------- ----------------------------- ----------------------------- 武 沛 齊 </li> <li>getbit(name, offset) 獲取name對應的值的二進制表示中的某位的值 (0或1)</li> <li>bitcount(key, start=None, end=None) 獲取name對應的值的二進制表示中 1 的個數 參數: key,Redis的name start,位起始位置 end,位結束位置</li> <li> bitop(operation, dest, *keys) 獲取多個值,並將值作位運算,將最後的結果保存至新的name對應的值 參數: operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或) dest, 新的Redis的name *keys,要查找的Redis的name 如: bitop("AND", 'new<em>name', 'n1', 'n2', 'n3') 獲取Redis中n1,n2,n3對應的值,而後講全部的值作位運算(求並集),而後將結果保存 new</em>name 對應的值中 </li> <li>strlen(name) 返回name對應值的字節長度(一個漢字3個字節)</li> <li>incr(self, name, amount=1) 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。 參數: name,Redis的name amount,自增數(必須是整數) 注:同incrby</li> <li>incrbyfloat(self, name, amount=1.0) 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。 參數: name,Redis的name amount,自增數(浮點型)</li> <li>decr(self, name, amount=1) 自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減。 參數: name,Redis的name amount,自減數(整數)</li> <li>append(key, value) 在redis name對應的值後面追加內容 參數: key, redis的name value, 要追加的字符串 </li> </ol>
</body> </html> <!-- This document was created with MarkdownPad, the Markdown editor for Windows (http://markdownpad.com) -->