python基礎-UDP、進程、進程池、paramike模塊

1 基於UDP套接字
1.1 介紹
  udp是無鏈接的,是數據報協議,先啓動哪端都不會報錯
  udp服務端python

import socket

sk = socket() #建立一個服務器的套接字
sk.bind() #綁定服務器套接字
while True: #服務器無限循環
cs = sk.recvfrom()/sk.sendto() # 對話(接收與發送)
sk.close() # 關閉服務器套接字

  udp客戶端git

import socket
client = socket() # 建立客戶套接字
while True: # 通信循環
client.sendto()/client.recvfrom() # 對話(發送/接收)
client.close() # 關閉客戶套接字

1.2 基本實例
1.2.1 服務端github

import socket

udp_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_server.bind(('127.0.0.1', 9999))

while True:
data,client_addr = udp_server.recvfrom(512)
print(data, client_addr)
udp_server.sendto(data.upper(), client_addr)

1.2.2 客戶端數據庫

import socket

udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

while True:
msg = input('>>').strip()
udp_client.sendto(msg.encode('utf-8'), ('127.0.0.1',9999))
data,server_addr = udp_client.recvfrom(512)
print(data.decode('utf-8'))

1.3 udp不會粘包
  udp是基於數據報協議,發送一份信息,有完整的報頭的主題,不會像tcp那樣基於數據流的,沒有開頭、沒有結尾;而udp有開頭(報頭),也有結尾,因此不會出現像tcp那樣粘包現象。
1.3.1 服務端json

import socket

udp_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_server.bind(('127.0.0.1',9999))

info1,client_addr = udp_server.recvfrom(1)
print('info1', info1)
info2,client_addr = udp_server.recvfrom(512)
print('info2', info2)

1.3.2 客戶端安全

import socket

udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

udp_client.sendto('welcome'.encode('utf-8'), ('127.0.0.1',9999))
udp_client.sendto('beijing'.encode('utf-8'), ('127.0.0.1',9999))

1.4 udp併發
1.4.1 服務端服務器

import socketserver
class MyUDPhandler(socketserver.BaseRequestHandler):
def handle(self):
print(self.request)
self.request[1].sendto(self.request[0].upper(), self.client_address)

if __name__ == '__main__':
udp_server = socketserver.ThreadingUDPServer(('127.0.0.1',8080), MyUDPhandler)
udp_server.serve_forever()

1.4.2 客戶端網絡

import socket

udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

while True:
info = input('>>').strip()
udp_client.sendto(info.encode('utf-8'), ('127.0.0.1',9999))
body,server_addr = udp_client.recvfrom(512)
print(body.decode('utf-8'))

2 進程
2.1 介紹
  進程:正在運行的一個過程或者任務,是對正在運行程序的一個抽象。併發

2.2 開啓進程
示例1app

from multiprocessing import Process
import time
def my_run(info):
print('task <%s> is running' %info)
time.sleep(0.5)
print('task <%s> is done' % info)

if __name__ == '__main__':
process1 = Process(target = my_run, args=('mary',))
process2 = Process(target = my_run, args=('jack',))
process1.start()
process2.start()

示例2

from multiprocessing import Process
import time
class MyMulProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name

def my_run(self):
print('task <%s> is runing' % self.name)
time.sleep(0.5)
print('task <%s> is done' % self.name)

if __name__ == '__main__':
process = MyMulProcess('jack')
process.my_run()
process.start()

2.3 併發通訊
2.3.1 服務端

from multiprocessing import Pool
import os
import socket
tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 解決Address already in use
tcp_server.bind(('127.0.0.1',9999))
tcp_server.listen(5)
def work(conn, addr):
print(os.getpid())
print(addr)
while True:
try:
data = conn.recv(1024)
if not data:break
conn.send(data.upper())
except Exception:
break
conn.close()

if __name__ == '__main__':
pool = Pool(4)
while True:
conn,addr = tcp_server.accept()
pool.apply_async(work, args = (conn, addr))
tcp_server.close()

2.3.2 客戶端

import socket

tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_client.connect(('127.0.0.1',9999))

while True:
info = input('>>').strip()
if not info:continue
tcp_client.send(info.encode('utf-8'))
data = tcp_client.recv(1024)
print(data.decode('utf-8'))

tcp_client.close()

2.4 join方法
  主進程,等待子進程運行完,才執行下面內容;
  p.join只能join住start開啓的進程,而不能join住run開啓的進程
  主進程,等待p1執行結束,才執行主進程下面的內容

from multiprocessing import Process
import time

def work(name):
print('task <%s> is runing' %name)
time.sleep(0.5)
print('task <%s> is done' % name)

if __name__ == '__main__':
process1 = Process(target = work, args=('jack',))
process2 = Process(target = work, args=('mary',))

process_list = [process1, process2]
for process in process_list:
process.start()

for process in process_list:
process.join()

2.6 守護進程
  主進程代碼運行完畢,守護進程就會結束
  主進程建立守護進程
  1.守護進程會在主進程代碼執行結束後就終止
  2.守護進程內沒法再開啓子進程,不然拋出異常。

  守護進程,守護者主進程,主進程結束,守護進程隨即結束;主進程代碼結束後,守護進程隨之結束

from multiprocessing import Process
import time
def work(name):
print('task <%s> is runing' %name)
time.sleep(0.5)
print('task <%s> is done' % name)

if __name__ == '__main__':
p1=Process(target=work,args=('jack',))
p1.daemon = True # 必須在進程開啓以前,設置爲守護進程
p1.start()

  重複守護進程概念,守護進程什麼時間結束;在主進程代碼結束,就會結束

from multiprocessing import Process
import time
def foo():
print("from foo start")
time.sleep(0.5)
print("from foo end")

def bar():
print("from bar start")
time.sleep(0.8)
print("from bar end")
if __name__ == '__main__':
process1 = Process(target = foo)
process2 = Process(target = bar)

process1.daemon = True
process1.start()
process2.start()
print("主進程")

#打印該行則主進程代碼結束,則守護進程process1應該被終止,
# 可能會有process1任務執行的打印信息from foo start,
# 由於主進程打印主進程時,process1也執行了,可是隨即被終止

2.7 進程同步鎖
  核心點:保證一個進程用完一個終端,再交個另外一個終端使用,獨享終端,保證有序;

2.7.1 基本用法
  加鎖,變爲串行,保證數據不會錯亂;效率與錯亂之間作出取捨

from multiprocessing import Process,Lock
import time
def work(name, mutex):
mutex.acquire()
print('task <%s> is runing' %name)
time.sleep(0.5)
print('task <%s> is done' % name)
mutex.release()

if __name__ == '__main__':
mutex = Lock()
process1 = Process(target = work, args = ('jack', mutex))
process2 = Process(target = work, args = ('mary', mutex))
process1.start()
process2.start()

2.7.2 模擬購票
  模擬購票,查詢票的餘額,不要考慮前後順序;而到真正購票環境,須要保證一張票不被屢次購買,須要加鎖。

import json,time,os
from multiprocessing import Process,Lock
def search():
dic = json.load(open('ticket.txt'))
print('\033[32m[%s] 看到剩餘票數<%s>\033[0m' %(os.getpid(), dic['count']))
def get_ticket():
dic = json.load(open('ticket.txt'))
time.sleep(0.5) # 模擬讀數據庫的網絡延遲
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(0.5) # 模擬寫數據庫的網絡延遲
json.dump(dic,open('ticket.txt','w'))
print('\033[31m%s 購票成功\033[0m' %os.getpid())
def work(mutex):
search()
mutex.acquire()
get_ticket()
mutex.release()
if __name__ == '__main__':
mutex = Lock()
for index in range(10):
process = Process(target = work, args = (mutex,))
process.start()

2.7.3 共享數據通訊
  基於共享內存方式,進行數據通訊,須要考慮鎖的形式。

from multiprocessing import Process,Manager,Lock

def work(dic, mutex):
with mutex:
dic['count'] -= 1

if __name__ == '__main__':
mutex = Lock()
manager = Manager()
dic = manager.dict({'count':100})
p_list = []
for i in range(100):
process = Process(target = work, args = (dic, mutex))
p_list.append(process)
process.start()

for process in p_list:
process.join()
print(dic)

2.7.3 進程間通訊
  進程間的通訊,有不少方式,例如:管道、共享數據、消息隊列等;推薦的方式是:經過消息隊列的方式進行通訊。
  Queue,經常使用方法:put、get;隊列就是管道加鎖,進行實現的

from multiprocessing import Queue

queue = Queue(3) # 隊列的最大長度爲3

queue.put('first')
queue.put('second')
queue.put('third')
queue.put('fourth') # 超過隊列長度,滿了會卡着

print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get()) # 隊列空了,一直卡着,等待隊裏有值,進行獲取

# 瞭解知識點
queue = Queue(3)
queue.put('first',block = False) # 隊列滿了,不進行鎖住,會拋異常
queue.put('second', block = False)
queue.put('third', block = False)
queue.put_nowait('fourth') # 等價queue.put('fourth', block = False)
queue.put('fourth',timeout = 3) # 默認等待3秒鐘,指定超時時間

3 生產者、消費者
  應該具備兩類模型,生產者和消費者
3.1 基本版本的生產者消費者

from multiprocessing import Process,Queue
import time,os
def producer(q,name):
for i in range(5):
time.sleep(0.5)
res='%s%s' %(name,i)
q.put(res)
print('\033[42m<%s> 製造 [%s]\033[0m' %(os.getpid(),res))

def consumer(q):
while True:
res=q.get()
if res is None:break
time.sleep(0.8)
print('\033[31m<%s> 購買 [%s]\033[0m' % (os.getpid(), res))

if __name__ == '__main__':
queue = Queue()
# 生產者
producer1 = Process(target = producer, args = (queue, '自行車'))
producer2 = Process(target = producer, args = (queue, '汽車'))
producer3 = Process(target = producer, args = (queue, '飛機'))

# 消費者
consumer1 = Process(target = consumer, args = (queue,))
consumer2 = Process(target = consumer, args = (queue,))

producer1.start()
producer2.start()
producer3.start()
consumer1.start()
consumer2.start()

producer1.join()
producer2.join()
producer3.join()
queue.put(None) # 利用None通知消費者,東西已經生產完了
queue.put(None) # 有幾個消費者,就要通知幾回

3.2 JoinableQueue改進生產者消費者模型
  生產者等待消費者把隊列的內容所有去空,就結束生產過程;消費等待主進程結束,也就隨之結束
  主進程等待生產者結束,才執行剩餘代碼;須要消費者利用守護進程,隨者主進程結束也就結束。
  邏輯性比較強,利用JoinableQueue和守護進程和join。

from multiprocessing import Process,JoinableQueue
import time,os
def producer(queue, name):
for i in range(5):
time.sleep(0.5)
res = '%s%s' %(name,i)
queue.put(res)
print('\033[42m<%s> 製造 [%s]\033[0m' %(os.getpid(),res))
queue.join() # 生產者等待queue裏面的全部內容都被賣掉了,就結束了

def consumer(queue):
while True:
res = queue.get()
if res is None:break
time.sleep(0.8)
print('\033[31m<%s> 購買 [%s]\033[0m' % (os.getpid(), res))
queue.task_done() # 肯定購買了生產者的一個內容,通知生產者
# 通知queue,已經取走一個東西

if __name__ == '__main__':
queue = JoinableQueue()
# 生產者
producer1 = Process(target = producer, args = (queue, '自行車'))
producer2 = Process(target = producer, args = (queue, '汽車'))
producer3 = Process(target = producer, args = (queue, '飛機'))

# 消費者
consumer1 = Process(target = consumer, args = (queue,))
consumer2 = Process(target = consumer, args = (queue,))

consumer1.daemon = True # 消費者利用守護進程,隨着主進程結束也就結束
consumer2.daemon = True
producer1.start()
producer2.start()
producer3.start()
consumer1.start()
consumer2.start()

producer1.join()

4 進程池
  默認開啓進程池的個數,是cpu核數的個數
4.1 同步提交任務
  同步提交任務,損失效率,保證數據有序和安全

from multiprocessing import Pool
import os,time

def task(n):
print('task <%s> is runing' %os.getpid())
time.sleep(0.5)
return n**3

if __name__ == '__main__':
print(os.cpu_count())
p = Pool(4)
for index in range(10):
res = p.apply(task, args = (index,))
print(res)

4.2 異步提交任務
  等待進程池中的全部任務結束,就能夠拿到運行結果
  p.close(),禁止向進程池中提交新任務

from multiprocessing import Pool
import os,time

def task(n):
print('task <%s> is runing' %os.getpid())
time.sleep(0.5)
return n**3

if __name__ == '__main__':
print(os.cpu_count())
pool = Pool(4)
res_list = []
for index in range(10):
res = pool.apply_async(task, args = (index,)) # 只負責向隊列仍任務,不等任務結束
res_list.append(res)

for res in res_list:
print(res.get())

pool.close()
pool.join()

4.3 進程池控制併發
  利用進程池,控制併發的進程數量
4.3.1 服務端

from multiprocessing import Pool
import os
import socket

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1',9999))
server.listen(5)
def work(conn, addr):
print(os.getpid())
print(addr)
while True:
try:
data=conn.recv(1024)
if not data:break
conn.send(data.upper())
except Exception:
break
conn.close()

if __name__ == '__main__':
pool = Pool(4)
while True:
conn,addr = server.accept()
pool.apply_async(work, args = (conn, addr))
server.close()

4.3.2 客戶端

import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1',9999))

while True:
info = input('>>').strip()
if not info:continue
client.send(info.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))

client.close()

4.4 回調函數
  讓下載的函數進行併發,解析的函數進行串行執行(利用回調函數進行執行)
  耗時時間長的利用進程池進行併發處理,利用異步進行處理

import requests # pip3 install requests
import os,time
from multiprocessing import Pool
def get_info(url):
print('<%s> get :%s' %(os.getpid(),url))
respone = requests.get(url)
if respone.status_code == 200:
return {'url':url, 'text':respone.text}

def parse_page(dic):
print('<%s> parse :%s' %(os.getpid(),dic['url']))
time.sleep(0.5)
parse_res='url:<%s> size:[%s]\n' %(dic['url'],len(dic['text'])) # 模擬解析網頁內容
with open('reptile.txt','a') as f:
f.write(parse_res)

if __name__ == '__main__':
pool = Pool(4)
urls = [
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com',
'https://www.sina.com.cn',
]

for url in urls:
pool.apply_async(get_info, args = (url,), callback = parse_page)
# 利用回調函數,通知主進程,調用parse_page,須要執行parse_page函數了
# 把get_page執行結果,傳遞給parse_page做爲參數進行傳遞

pool.close()
pool.join()

5 paramike模塊
5.1 介紹
  paramiko是一個用於作遠程控制的模塊,使用該模塊能夠對遠程服務器進行命令或文件操做,值得一說的是,fabric和ansible內部的遠程管理就是使用的paramiko來現實。

5.2 基於密碼鏈接

import paramiko

# 建立SSH對象
ssh = paramiko.SSHClient()
# 容許鏈接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 鏈接服務器
ssh.connect(hostname='127.0.0.1', port=22, username='root', password='xxx')

# 執行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結果
result = stdout.read()
print(result.decode('utf-8'))
# 關閉鏈接
ssh.close()

5.3 基於祕鑰連接
  客戶端文件名:id_rsa
  服務端必須有文件名:authorized_keys(在用ssh-keygen時,必須製做一個authorized_keys,能夠用ssh-copy-id來製做)

import paramiko

private_key = paramiko.RSAKey.from_private_key_file('id_rsa')

# 建立SSH對象
ssh = paramiko.SSHClient()
# 容許鏈接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 鏈接服務器
ssh.connect(hostname='127.0.0.1', port=22, username='root', pkey=private_key)

# 執行命令
stdin, stdout, stderr = ssh.exec_command('df -h')
# 獲取命令結果h
result = stdout.read()
print(result.decode('utf-8'))
# 關閉鏈接
ssh.close()
import paramiko
from io import StringIO
key_str = """-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAl6OGU27q3YV1LMdAZH02PWFtPJaxQ68zbWIBmgP0NfnVQz30
Bvhe+rZCFEQCwLoeld4jj7vA+Vk8nKPoBhX1WVvq9MfSF+YRXeu1+XAXvL1pH+Q9
6gJ1l4NmwHq8DVgUCx3TsAdK0wOqEIUDSOu+2/nECK1lmKN1Cf5Va20r+B0CDjkT
RRA6C9MIb2WWwf4EiEWjBl+lnZ0vDXoB38oZQR71tdLsvrVumUDzyGvF79jWW3O6
9gc4F2ivoKVbXswyJfq1bVpH1N7aM6DzUiELv1mFsyIVveydMVzFIrDbUI8gCErx
NhI6esUbhNwfd3NfBOTzEQxBd5V3/a6rkC5MpwIDAQABAoIBAQCD9cJHiRbKgAFg
XmUjDfPNpqMxPtI0XJscbVWHejljX26/fYKHLk05ULJggG8E2PMU6KN5yaI9W/Lr
PZgE88b3ZI4rRlkGgyhJ234Y+/ssPIjnP/DBXDKJD8izaBuOYT/QDLzTSwVKbL3q
clZRdxY4yDpYcs0e7+BCOhqLyg2hdAYA8Z4VOOs4SQqRW6k9K+oXeNMhc4htozOf
tVsSM3XkFZ4hpug34S89+FKEwZ00RTJEEkK8IjBfLJ5w+RfLFXth8hTVMbeLhcv+
RqDYdUwq/JAXCrri0687hCwi5J06xTrY7BwgoKJznxlpz6tiyEPNrnqZ1vAayWqS
G/x+R/cBAoGBAMj49kKmEpKZesbbSD9KJvD1YnkIGwEheLvNZzRtx+2cwejHHWDZ
F0i+KzDTt+7QZ2zb+ABOgK1sq8Xhfn40M90xqixbqp0UzaFyMFnmiB2iyZ56I3Fi
Kqeerr+f3i/djewNhMJZZZhO/n+YxhCpQUBotepQ3tGA/G6vvkSSMe73AoGBAMEo
i9LaSDyJxk51mW8OmgiIyJ1376LKu3sHlUkn+Ca5dm2/iYNIuN5dC0YTPjo1A5It
jZTid5VBEb4OMOpKYygR4S9euAxv22Uxib1xGZLdJHKblwizdJnBazsqDQW6mPfN
o/BADQl/+h3pPpOWoIxSxi07xYq+gAToW2tc6TPRAoGBAKhGHRwtJbvuGqlKjjHA
Ct8S94LT0JifyBGnqNRzX0WLTal0nxqqax6TbGKTw5yIjzDM9dh74q5TIXismFdf
qlV48j31+uNPueWGUQnVRv9ZgGvbZLXZNlHnQfZdC5MUdXLC1vhMFg7zhZCdAKqO
rX4arsclM4xD7hlXuX580qZ9AoGALs5te43LnWfhdxfGM4Q9TT4gJxBuMGuiHMEM
quqVloSwrw2P/BE+QxwW5Ec7eA1qrRx+x4pNYgyfiQeVUODvwED86WaxgMoGRzJG
53IluVH/SApuAfzCj5OwMWkSOMYr1TiutkQ/JIMvj9n6gPcqNnbEcSefyew5x3aq
2IxuMlECgYEAtVuORz9C7WnJIVW6HwNiS4OBs7becOgXDHAOw0hnu+3mxVm/NIkX
yeGK7rP1KKbS4I3pbG+H0nWAQkfQtW6nQjU5nvoCTX8Yyk6ZNC0mhGNTqKRqpjI/
eXe8nUib71izC6g6kJY66+BTg2SCBsHUAqAB7L4gvFHGt8sq46TQ3jw=
-----END RSA PRIVATE KEY-----"""

private_key = paramiko.RSAKey(file_obj=StringIO(key_str))
transport = paramiko.Transport(('127.0.0.1', 22))
transport.connect(username='root', pkey=private_key)

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')
result = stdout.read()
print(result.decode('utf-8'))
transport.close()

print(result)
View Code
相關文章
相關標籤/搜索