在程序系統中,例如外賣系統,訂單系統,庫存系統,優先級較高 發紅包,發郵件,發短信,app消息推送等任務優先級很低,很適合交給消息隊列去處理,以便於程序系統更快的處理其餘請求。
# 消息隊列通常有三個角色: 隊列服務端 隊列生產者 隊列消費者
RabbitMQ Erlang編寫的消息隊列產品,企業級消息隊列軟件,支持消息負載均衡,數據持久化等。 ZeroMQ saltstack軟件使用此消息,速度最快。 Redis key-value的系統,也支持隊列數據結構,輕量級消息隊列 Kafka 由Scala編寫,目標是爲處理實時數據提供一個統1、高通量、低等待的平臺
1)程序解耦 容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。 2)冗餘: 消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。 許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。 3)峯值處理能力: (大白話,就是原本公司業務只須要5臺機器,可是臨時的秒殺活動,5臺機器確定受不了這個壓力,咱們又不可能將總體服務器架構提高到10臺,那在秒殺活動後,機器不就浪費了嗎?所以引入消息隊列) 在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。 若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。 使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。 4)可恢復性: 系統的一部分組件失效時,不會影響到整個系統。 消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。 5)順序保證: 在大多使用場景下,數據處理的順序都很重要。 大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性) 6)緩衝: 有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。 7)異步通訊: 不少時候,用戶不想也不須要當即處理消息。好比發紅包,發短信等流程。 消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。
# RabbitMQ 是用來 接收,存儲,轉發數據的 # 消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。 # 消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而無論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。 # 官網學習:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
RabbitMQ 下載安裝啓動html
### rabbitmq-server服務端 1.下載centos源 wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.cloud.tencent.com/repo/centos7_base.repo 2.下載epel源 wget -O /etc/yum.repos.d/epel.repo http://mirrors.cloud.tencent.com/repo/epel-7.repo 3.清空yum緩存而且生成新的yum緩存 yum clean all yum makecache 4.安裝erlang $ yum -y install erlang 5.安裝RabbitMQ $ yum -y install rabbitmq-server 6. 開啓rabbitmq服務 systemctl start rabbitmq-server 7. 開啓rabbitmq管理平臺 rabbitmq-plugins enable rabbitmq-manager
#### 設置rabbitmq帳號密碼,以及角色權限設置 # 設置新用戶yugo 密碼123 sudo rabbitmqctl add_user yugo 123 # 設置用戶爲administrator角色 sudo rabbitmqctl set_user_tags yugo administrator ### 設置權限,容許對全部的隊列都有權限 對何種資源具備配置、寫、讀的權限經過正則表達式來匹配,具體命令以下: set_permissions [-p <vhostpath>] <user> <conf> <write> <read> sudo rabbitmqctl set_permissions -p "/" yugo ".*" ".*" ".*" ### 重啓服務生效設置 service rabbitmq-server start/stop/restart #### rabbitmq相關命令 // 新建用戶 rabbitmqctl add_user {用戶名} {密碼} // 設置權限 rabbitmqctl set_user_tags {用戶名} {權限} // 查看用戶列表 rabbitmqctl list_users // 爲用戶受權 添加 Virtual Hosts : rabbitmqctl add_vhost <vhost> // 刪除用戶 rabbitmqctl delete_user Username // 修改用戶的密碼 rabbitmqctl change_password Username Newpassword // 刪除 Virtual Hosts : rabbitmqctl delete_vhost <vhost> // 添加 Users : rabbitmqctl add_user <username> <password> rabbitmqctl set_user_tags <username> <tag> ... rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> // 刪除 Users : delete_user <username> // 使用戶user1具備vhost1這個virtual host中全部資源的配置、寫、讀權限以便管理其中的資源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' // 查看權限 rabbitmqctl list_user_permissions user1 rabbitmqctl list_permissions -p vhost1 // 清除權限 rabbitmqctl clear_permissions [-p VHostPath] User //清空隊列步驟 rabbitmqctl reset 須要提早關閉應用rabbitmqctl stop_app , 而後再清空隊列,啓動應用 rabbitmqctl start_app 此時查看隊列rabbitmqctl list_queues ### 其餘操做 查看全部的exchange: rabbitmqctl list_exchanges 查看全部的queue: rabbitmqctl list_queues 查看全部的用戶: rabbitmqctl list_users 查看全部的綁定(exchange和queue的綁定信息): rabbitmqctl list_bindings 查看消息確認信息: rabbitmqctl list_queues name messages_ready messages_unacknowledged 查看RabbitMQ狀態,包括版本號等信息:rabbitmqctl status ### 開啓web界面rabbitmq rabbitmq-plugins enable rabbitmq_management ### 訪問web界面 http://server-name:15672/
# RabbitMQ組件解釋 AMQP AMQP協議是一個高級抽象層消息通訊協議,RabbitMQ是AMQP協議的實現。它主要包括如下組件: 1.Server(broker): 接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。 2.Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Host 3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。 4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。 5.Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。 6.Binding:Binding聯繫了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。 7.Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。 8.Channel:信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection。 9.Command:AMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。
// rabbitmq官方推薦的python客戶端pika模塊 pip3 install pika
# 安裝 pika 0.10.0 #### 生產 - 消費者 模型 ## touch send.py #!/usr/bin/env python import pika # 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials("s14","123") # 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接 channel = connection.channel() # 聲明一個隊列,用於接收消息,隊列名字叫「水許傳」 channel.queue_declare(queue='水許傳') # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據 channel.basic_publish(exchange='', routing_key='水許傳', body='武松又去打老虎啦2') print("已經發送了消息") # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接 connection.close() ## touch receive.py import pika # 創建與rabbitmq的鏈接 credentials = pika.PlainCredentials("s14","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue="水許傳") def callbak(ch,method,properties,body): print("消費者接收到了任務:%r"%body.decode("utf8")) # 有消息來臨,當即執行callbak,沒有消息則夯住,等待消息 # 老百姓開始去郵箱取郵件啦,隊列名字是水許傳 channel.basic_consume(callbak,queue="水許傳",no_ack=True) # 開始消費,接收消息 channel.start_consuming()
# ACK機制用於保證消費者若是拿了隊列的消息,客戶端處理時出錯了,那麼隊列中仍然還存在這個消息,提供下一位消費者繼續取 ### 流程: 1.生產者無須變更,發送消息 2.消費者若是no_ack=True啊,數據消費後若是出錯就會丟失 反之no_ack=False,數據消費若是出錯,數據也不會丟失 3.ack機制在消費者代碼中演示 ### no-ack # 當 no-ack爲false時,表示 須要獲得ack的校驗 # 當 no-ack爲True時,表示 不須要獲得校驗
### send2.py #!/usr/bin/env python import pika # 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials("s14","123") # 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接 channel = connection.channel() # 新建一個hello隊列,用於接收消息 # 這個郵箱能夠收發各個班級的郵件,經過 channel.queue_declare(queue='金品沒') # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據 channel.basic_publish(exchange='', routing_key='金品沒', body='潘金蓮又出去。。。') print("已經發送了消息") # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接 connection.close() ### receive2.py import pika credentials = pika.PlainCredentials("s14","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) channel = connection.channel() # 聲明一個隊列(建立一個隊列) channel.queue_declare(queue='金品沒') def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body.decode("utf-8")) # int('asdfasdf') # 我告訴rabbitmq服務端,我已經取走了消息 # 回覆方式在這 ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉no_ack,表明給與服務端ack回覆,確認給與回覆 channel.basic_consume(callback,queue='金品沒',no_ack=False) channel.start_consuming()
1.執行生產者,向隊列寫入數據,產生一個新隊列queue 2.重啓服務端,隊列丟失 3.開啓生產者數據持久化後,重啓rabbitmq,隊列不丟失 4.依舊能夠讀取數據
#### chiuhua_send.py import pika # 無密碼 # connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61')) # 有密碼 credentials = pika.PlainCredentials("s14","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) channel = connection.channel() # 聲明一個隊列(建立一個隊列) # 默認此隊列不支持持久化,若是服務掛掉,數據丟失 # durable=True 開啓持久化,必須新開啓一個隊列,本來的隊列已經不支持持久化了 ''' 實現rabbitmq持久化條件 delivery_mode=2 使用durable=True聲明queue是持久化 ''' channel.queue_declare(queue='LOL',durable=True) channel.basic_publish(exchange='', routing_key='LOL', # 消息隊列名稱 body='德瑪西亞萬歲', # 支持數據持久化 properties=pika.BasicProperties( delivery_mode=2,#表明消息是持久的 2 ) ) connection.close()
### chijiuhua_receive.py import pika credentials = pika.PlainCredentials("s14","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) channel = connection.channel() # 確保隊列持久化 channel.queue_declare(queue='LOL',durable=True) ''' 必須確保給與服務端消息回覆,表明我已經消費了數據,不然數據一直持久化,不會消失 ''' def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body.decode("utf-8")) # 模擬代碼報錯 # int('asdfasdf') # 此處報錯,沒有給予回覆,保證客戶端掛掉,數據不丟失 # 告訴服務端,我已經取走了數據,不然數據一直存在 ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉no_ack,表明給與回覆確認 channel.basic_consume(callback,queue='LOL',no_ack=False) channel.start_consuming()
#RabbitMQ參考博客: https://www.cnblogs.com/pyyu/p/10318053.html
# 什麼是 RPC RPC(Remote Procedure Call)——遠程過程調用,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。 # RPC的做用 RPC 就是爲解決服務之間信息交互而發明和存在的。 # rpc 簡易理解: 相似 WEB網絡請求。 RPC就是一種遠程調用函數接口的方式,說白了,就是一種遠程調用函數接口的方式,客戶端和服務端之間約定一種契約(函數接口),而後服務端一直等待客戶端的調用。
# RPC的交互 # RPC是兩個子系統之間進行的直接消息交互,使用操做系統提供的套接字做爲消息的載體 # 1. python的socket編程就是一種RPC通訊 #1.1 rpc_server.py import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("localhost", 8080)) sock.listen(1) # 監聽客戶端鏈接 while True: conn, addr = sock.accept() # 接收一個客戶端鏈接 print(conn.recv(1024)) # 從接收緩衝讀消息 recv buffer conn.sendall(b"world") # 將響應發送到發送緩衝 send buffer conn.close() # 關閉鏈接 #1.2 rpc_client.py import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(("localhost", 8080)) # 鏈接服務器 sock.sendall(b"hello") # 將消息輸出到發送緩衝 send buffer print(sock.recv(1024)) # 從接收緩衝 recv buffer 中讀響應 sock.close() # 關閉套接字...
# xmlrpc庫 python 內置的 # 服務端 from xmlrpc.server import SimpleXMLRPCServer # 調用函數 def respon_string(str): return "get string:%s"%str if __name__ == '__main__': # 初始化 獲得一個server對象 server = SimpleXMLRPCServer(('localhost', 8888)) # 註冊 被調用的函數 server.register_function(respon_string, "get_string") print ("Listening for Client") server.serve_forever() # 保持等待調用狀態 # 客戶端 from xmlrpc.client import ServerProxy if __name__ == '__main__': server = ServerProxy("http://localhost:8888") # 初始化服務器 print (server.get_string("oldboy_python6666")) # 調用函數並傳參
# 參考 屁眼魚: https://www.cnblogs.com/pyyu/p/9465608.html
# 什麼是 saltstack? saltstack是由thomas Hatch於2011年建立的一個開源項目,設計初衷是爲了實現一個快速的遠程執行系統。 # salt的做用 早期運維人員會根據本身的生產環境來寫特定腳本完成大量重複性工做,這些腳本複雜且難以維護。系統管理員面臨的問題主要是一、系統配置管理,二、遠程執行命令,所以誕生了不少開源軟件,系統維護方面有fabric、puppet、chef、ansible、saltstack等,這些軟件擅長維護系統狀態或方便的對大量主機進行批量的命令執行。 salt靈活性強大,能夠進行大規模部署,也能進行小規模的系統部署。salt的設計架構適用於任意數量的服務器,從少許本地網絡系統到跨越數個數據中心,拓撲架構都是c/s模型,配置簡單。 無論是幾臺、幾百臺、幾千臺服務器,均可以使用salt在一箇中心節點上進行管控,靈活定位任意服務器子集來運行命令。 Salt是python編寫的,支持用戶經過python自定義功能模塊,也提供了大量的python API接口,用戶能夠根據須要進行簡單快速的擴展。 # saltstack的運行方式 Local 本地運行,交付管理 Master/Minion <<< 經常使用方式 Salt SSH 不須要客戶端
在安裝salt以前,先理解salt架構中各個角色,主要區分是salt-master和salt-minion,顧名思義master是中心控制系統,minion是被管理的客戶端。 salt架構中的一種就是master > minion。
# 環境準備 服務器環境 centos7(master) centos7(minion) ip地址 192.168.178.131 192.168.178.132 身份 master slave 軟件包 salt-master salt-minion
# 1. 修改hosts文件 192.168.11.132 slave 192.168.11.131 master
# 2 . 關閉 防火牆 # 關閉firewalld systemctl disable firewalld systemctl stop firewalld # 關閉iptables iptables -F
# 3. 安裝 saltstack wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo yum clean all #清空緩存 yum makecache #生成yum緩存 # 3.1 master 節點上 安裝 yum install salt-master -y # 3.2 minion 節點上 安裝 yum install salt-minion -y
# 4. 修改配置文件 # 4.1 vim /etc/master # salt運行的用戶,影響到salt的執行權限 user: root #s alt的運行線程,開的線程越多通常處理的速度越快,但通常不要超過CPU的個數 worker_threads: 10 # master的管理端口 publish_port : 4505 # master跟minion的通信端口,用於文件服務,認證,接受返回結果等 ret_port : 4506 # 若是這個master運行的salt-syndic鏈接到了一個更高層級的master,那麼這個參數須要配置成鏈接到的這個高層級master的監聽端口 syndic_master_port : 4506 # 指定pid文件位置 pidfile: /var/run/salt-master.pid # 4.2 vim /etc/minion # minion的識別ID,能夠是IP,域名,或是能夠經過DNS解析的字符串 id: slave # salt運行的用戶權限 user: root # master的識別ID,能夠是IP,域名,或是能夠經過DNS解析的字符串 master : master # master通訊端口 master_port: 4506 # 備份模式,minion是本地備份,當進行文件管理時的文件備份模式 backup_mode: minion # 執行salt-call時候的輸出方式 output: nested # minion等待master接受認證的時間 acceptance_wait_time: 10 # 失敗重連次數,0表示無限次,非零會不斷嘗試到設置值後中止嘗試 acceptance_wait_time_max: 0 # 從新認證延遲時間,能夠避免由於master的key改變致使minion須要從新認證的syn風暴 random_reauth_delay: 60 # 日誌文件位置 log_file: /var/logs/salt_minion.log
# 5. 啓動salt-master和salt-minion systemctl start salt-minion systemctl start salt-master #檢查salt狀態 systemctl status salt-minion systemctl status salt-master
# 6. 在master上接收minion祕鑰 # 1. 查看祕鑰 salt-key -L # 2. 查看 minion祕鑰 master節點: salt-key -f slave minion節點: salt-call --local key.finger # 3. master添加minion祕鑰 salt-key -a slave
# 7. salt 命令 salt '*' test.ping # 7.1 test.fib生成斐波那契數列 salt "minion" test.fib 50 # 8. cmd是超級模塊,全部shell命令都能執行 salt 'slave' cmd.run 'ps -ef|grep python' # 9. 安裝軟件 salt 'slave' pkg.install "nginx" # 10. 輸出結果 --out # json salt --out=json '*' cmd.run_all 'hostname' { "slave": { "pid": 2268, "retcode": 0, "stderr": "", "stdout": "slave" } } # yaml salt --out=yaml '*' cmd.run_all 'hostname' slave: pid: 2289 retcode: 0 stderr: '' stdout: slave
### 在學習saltstack過程當中,第一要點就是States編寫技巧,簡稱SLS文件。這個文件遵循YAML語法。初學者看這玩意很容易懵逼,來,超哥拯救你學習YAML語法 # json xml yaml 數據序列化格式 # yaml容易被解析,應用於配置文件 ## salt的配置文件是yaml配置文件,不能用tab ## saltstack,k8s,ansible都用的yaml格式配置文件 # 語法規則: 大小寫敏感 使用縮進表示層級關係 縮進時禁止tab鍵,只能空格 縮進的空格數不重要,相同層級的元素左側對其便可 # 表示註釋行 ### yaml支持的數據結構 # 1. 對象: 鍵值對,也稱做映射 mapping 哈希hashes 字典 dict 冒號表示 key: value key冒號後必須有 # 2. 數組: 一組按次序排列的值,又稱爲序列sequence 列表list 短橫線 - list1 # 3. 純量: 單個不可再分的值 # 4. 對象:鍵值對 yaml first_key: second_key:second_value python { 'first_key':{ 'second_key':'second_value', } }
# 注意: YAML是YAML Ain't Markup Language的首字母縮寫,YAML的語法簡單, 結構體經過空格展現 項目使用 '-' 表明 鍵值對經過 ':' 分割 ### YAML語法遵循固定的縮進風格,表示數據層級結構關係,saltstack須要每一個縮進級別由2個空格組成,禁止用tabs!!!
# 1. salt 'slave' grains.items # 2. salt "*" grains.item ipv4 --out json >> salt-granis-computerIPV4-.json
# 1. 安裝模塊 pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple salt # 2. 進入python3交互模式 import salt.client local = salt.client.LocalClient() local.cmd('*','cmd.run',['poweroff']) #向全部minion發送關機命令