python網絡爬蟲(2)回顧Python編程

文件寫入

def storFile(data,fileName,method='a'):
    with open(fileName,method,newline ='') as f:
        f.write(data)
        pass
    pass

storFile('123', '1.txt')

文件讀取

with open('1.txt','r') as f:
    print(f.read())

序列化操做

把內存中的數據變爲可保存和共享,實現狀態保存。cPickle使用C語言編寫,效率高,優先使用。若是不存在則使用pickle。pickle使用dump和dumps實現序列化。html

try:
    import cPickle as pickle
except ImportError:
    import pickle
d=dict(url='index.html',title='1',content='2')
f=open('2.txt','wb')
pickle.dump(d,f)
f.close()
print(pickle.dumps(d))

反序列化操做

使用load實現反序列化python

try:
    import cPickle as pickle
except ImportError:
    import pickle
f=open('2.txt','rb')
d=pickle.load(f)
f.close()
print(d)

多進程建立

多進程使用os的fork複製徹底相同的進程,並對子進程返回0,對父進程返回子進程的pid。只在linux/unix中使用。linux

import os
if __name__ == '__main__':
  pid=os.fork()   if pid<0:    print('error pid')   elif pid==0:    print('child ,parent pid',os.getpid(),os.getppid())   else:    print('parent pid,create child ',os.getpid,pid)

使用multiprocessing模塊建立進程,使用start啓動進程,使用join同步。git

import os
from multiprocessing import Process
def run_proc(name):
    print('name ,child pid   running',name,os.getpid())
if __name__ == '__main__':
    print('parent pid',os.getpid())
    for i in range(5):
        p=Process(target=run_proc,args=(str(i),))
        print('Process will start')
        p.start()
    p.join()
    print('end')

使用multiprocessing模塊中的Pool限定進程數量github

import os
from multiprocessing import Process,Pool
import random,time
def run_proc(name):
    print('name ,child pid   running ',name,os.getpid())
    time.sleep(random.random()*10)
    print('name ,child pid   running end',name,os.getpid())
if __name__ == '__main__':
    print('parent pid',os.getpid())
    p=Pool(processes=3)
    for i in range(10):
        p.apply_async(run_proc,args=(i,))
    print('wait')
    p.close()
    p.join()
    print('end')

進程間通訊

Queue通訊

適用多進程間通訊,採用put和get方法。編程

import os
from multiprocessing import Process,Queue
import time,random
def write_proc(q,urls):
    print('w processing ',os.getpid(),'is running')
    for u in urls:
        q.put(u)
        print('put :',u)
        time.sleep(random.random())
    pass
def read_proc(q):
    print('r processing ',os.getpid(),'is running')
    while(True):
        u=q.get(True)
        print('get:',u)
    pass

if __name__ == '__main__':
    q=Queue()
    w1=Process(target=write_proc,args=(q,['u1','u2','u3']))
    w2=Process(target=write_proc,args=(q,['u4','u5','u6']))
    r1=Process(target=read_proc,args=(q,))
    w1.start()
    w2.start()
    r1.start()
    w1.join()
    w2.join()
    r1.terminate()
    pass

Pipe通訊

Pipe方法返回conn1和conn2,全雙工模式下都可收發(Pipe方法中duplex參數控制),經過send和recv控制。windows

import os
from multiprocessing import Process,Pipe
import time,random
def send_proc(p,urls):
    print('s processing ',os.getpid(),'is running')
    for u in urls:
        p.send(u)
        print('send :',u)
        time.sleep(random.random())
    pass
def receive_proc(p):
    print('r processing ',os.getpid(),'is running')
    while(True):
        u=p.recv()
        print('receive:',u)
    pass

if __name__ == '__main__':
    p=Pipe()
    p1=Process(target=send_proc,args=(p[0],['u1','u2','u3']))
    p2=Process(target=receive_proc,args=(p[1],))
    p1.start()
    p2.start()

    p1.join()
    p2.terminate()
    pass

多線程

一點理解。使用threading模塊建立多線程服務器

import time,random,threading

def run_proc(url):
    print('threading name',threading.current_thread().name)
    for u in url:
        print(threading.current_thread().name,'----->',u)
        time.sleep(random.random())
    print('end ',threading.current_thread().name)
    pass

if __name__ == '__main__':
    print('running :',threading.current_thread().name)
    w1=threading.Thread(target=run_proc,name='T1',args=(['u1','u2','u3'],))
    w2=threading.Thread(target=run_proc,name='T2',args=(['u4','u5','u6'],))
    w1.start()
    w2.start()
    w1.join()
    w2.join()
    print('end')
    pass

使用threading.Thread繼承建立線程類:代碼源:https://github.com/qiyeboy/SpiderBook網絡

import random
import threading
import time
class myThread(threading.Thread):
    def __init__(self,name,urls):
        threading.Thread.__init__(self,name=name)
        self.urls = urls

    def run(self):
        print('Current %s is running...' % threading.current_thread().name)
        for url in self.urls:
                print('%s ---->>> %s' % (threading.current_thread().name,url))
                time.sleep(random.random())
        print('%s ended.' % threading.current_thread().name)
        
print('%s is running...' % threading.current_thread().name)
t1 = myThread(name='Thread_1',urls=['url_1','url_2','url_3'])
t2 = myThread(name='Thread_2',urls=['url_4','url_5','url_6'])
t1.start()
t2.start()
t1.join()
t2.join()
print('%s ended.' % threading.current_thread().name)

線程同步

線程同步以保護數據,主要有Lock和RLock兩種方案。參閱。另外,全局解釋鎖的存在,限制了線程資源訪問,在CPU密集場合傾向使用多進程。對於IO密集型場合,使用多線程。多線程

import threading
mylock = threading.RLock()
num=0
class myThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self,name=name)

    def run(self):
        global num
        while True:
            mylock.acquire()
            print( '%s locked, Number: %d'%(threading.current_thread().name, num))
            if num>=100:
                mylock.release()
                print( '%s released, Number: %d'%(threading.current_thread().name, num))
                break
            num+=1
            print( '%s released, Number: %d'%(threading.current_thread().name, num))
            mylock.release()

if __name__== '__main__':
    thread1 = myThread('Thread_1')
    thread2 = myThread('Thread_2')
    thread1.start()
    thread2.start()

協程 

from gevent import monkey; monkey.patch_all()
import gevent
import urllib.request as urllib2

def run_task(url):
    print('Visit --> %s' % url)
    try:
        response = urllib2.urlopen(url)
        data = response.read()
        print('%d bytes received from %s.' % (len(data), url))
    except Exception:
        print(Exception)
if __name__=='__main__':
    urls = ['https://github.com/','https://www.python.org/','http://www.cnblogs.com/']
    greenlets = [gevent.spawn(run_task, url) for url in urls  ]
    gevent.joinall(greenlets)

支持的池

from gevent import monkey
monkey.patch_all()
import urllib.request as urllib2
from gevent.pool import Pool
def run_task(url):
    print('Visit --> %s' % url)
    try:
        response = urllib2.urlopen(url)
        data = response.read()
        print('%d bytes received from %s.' % (len(data), url))
    except Exception:
        print(Exception)
    return 'url:%s --->finish'% url

if __name__=='__main__':
    pool = Pool(2)
    urls = ['https://github.com/','https://www.python.org/','http://www.cnblogs.com/']
    results = pool.map(run_task,urls)
    print(results)

分佈式進程

 建立服務進程(Windows)代碼源

import queue as Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
#任務個數
task_number = 10
#定義收發隊列
task_queue = Queue.Queue(task_number)
result_queue = Queue.Queue(task_number)
def get_task():
    return task_queue
def get_result():
     return result_queue
# 建立相似的QueueManager:
class QueueManager(BaseManager):
    pass
def win_run():
    #windows下綁定調用接口不能使用lambda,因此只能先定義函數再綁定
    QueueManager.register('get_task_queue',callable = get_task)
    QueueManager.register('get_result_queue',callable = get_result)
    #綁定端口並設置驗證口令,windows下須要填寫ip地址,linux下不填默認爲本地
    manager = QueueManager(address = ('127.0.0.1',8001),authkey = b'qiye')
    #啓動
    manager.start()
    try:
        #經過網絡獲取任務隊列和結果隊列
        task = manager.get_task_queue()
        result = manager.get_result_queue()
        #添加任務
        for url in ["ImageUrl_"+str(i) for i in range(10)]:
            print('put task %s ...' %url)
            task.put(url)
        print('try get result...')
        for i in range(10):
            print('result is %s' %result.get(timeout=10))
    except:
        print('Manager error')
    finally:
        #必定要關閉,不然會爆管道未關閉的錯誤
        manager.shutdown()

if __name__ == '__main__':
    #windows下多進程可能會有問題,添加這句能夠緩解
    freeze_support()
    win_run()

建立任務進程:python後續版本的修正

#coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 建立相似的QueueManager:
class QueueManager(BaseManager):
    pass
# 實現第一步:使用QueueManager註冊獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 實現第二步:鏈接到服務器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證口令注意保持與服務進程設置的徹底一致:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 從網絡鏈接:
m.connect()
# 實現第三步:獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 實現第四步:從task隊列取任務,並把結果寫入result隊列:
while(not task.empty()):
        image_url = task.get(True,timeout=5)
        print('run task download %s...' % image_url)
        time.sleep(1)
        result.put('%s--->success'%image_url)

# 處理結束:
print('worker exit.')

建立Linux版本的服務進程:(未測試)

服務進程(taskManager.py)(linux版)

import random,time,Queue
from multiprocessing.managers import BaseManager
#實現第一步:創建task_queue和result_queue,用來存聽任務和結果
task_queue=Queue.Queue()
result_queue=Queue.Queue()

class Queuemanager(BaseManager):
    pass
#實現第二步:把建立的兩個隊列註冊在網絡上,利用register方法,callable參數關聯了Queue對象,
# 將Queue對象在網絡中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)

#實現第三步:綁定端口8001,設置驗證口令‘qiye’。這個至關於對象的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')

#實現第四步:啓動管理,監聽信息通道
manager.start()

#實現第五步:經過管理實例的方法得到經過網絡訪問的Queue對象
task=manager.get_task_queue()
result=manager.get_result_queue()

#實現第六步:添加任務
for url in ["ImageUrl_"+str(i) for i in range(10)]:
    print 'put task %s ...' %url
    task.put(url) 
#獲取返回結果
print 'try get result...'
for i in range(10):
    print 'result is %s' %result.get(timeout=10)
#關閉管理
manager.shutdown()

TCP編程

建立服務端

#coding:utf-8
import socket
import threading
import time
def dealClient(sock, addr):
    #第四步:接收傳來的數據,併發送給對方數據
    print('Accept new connection from %s:%s...' % addr)
    sock.send(b'Hello,I am server!')
    while True:
        data = sock.recv(1024)
        time.sleep(1)
        if not data or data.decode('utf-8') == 'exit':
            break
        print('-->>%s!' % data.decode('utf-8'))
        sock.send(('Loop_Msg: %s!' % data.decode('utf-8')).encode('utf-8'))
    #第五步:關閉套接字
    sock.close()
    print('Connection from %s:%s closed.' % addr)
if __name__=="__main__":
    #第一步:建立一個基於IPv4和TCP協議的Socket
    # 套接字綁定的IP(127.0.0.1爲本機ip)與端口
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(('127.0.0.1', 999))
    #第二步:監聽鏈接
    s.listen(5)
    print('Waiting for connection...')
    while True:
        # 第三步:接受一個新鏈接:
        sock, addr = s.accept()
        # 建立新線程來處理TCP鏈接:
        t = threading.Thread(target=dealClient, args=(sock, addr))
        t.start()

建立客戶端

#coding:utf-8
import socket
#初始化Socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#鏈接目標的ip和端口
s.connect(('127.0.0.1', 999))
# 接收消息
print('-->>'+s.recv(1024).decode('utf-8'))
# 發送消息
s.send(b'Hello,I am a client')
print('-->>'+s.recv(1024).decode('utf-8'))
s.send(b'exit')
#關閉套接字
s.close()

UDP編程

服務端與客戶端

import socket
#建立Socket,綁定指定的ip和端口
#SOCK_DGRAM指定了這個Socket的類型是UDP。綁定端口和TCP同樣。
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(('127.0.0.1', 9999))
print('Bind UDP on 9999...')
while True:
    # 直接發送數據和接收數據
    data, addr = s.recvfrom(1024)
    print('Received from %s:%s.' % addr)
    s.sendto(b'Hello, %s!' % data, addr)

import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for data in [b'Hello', b'World']:
    # 發送數據:
    s.sendto(data, ('127.0.0.1', 9999))
    # 接收數據:
    print(s.recv(1024).decode('utf-8'))
s.close()
相關文章
相關標籤/搜索