python網絡編程基礎,第四版
pycharm實現,python版本2.7.5php
第一部分 底層網絡css
#coding=utf-8 第一章 import socket,sys port=70 print (len(sys.argv)) for i in range(len(sys.argv)): print (sys.argv[i]) host=sys.argv[1] filename=sys.argv[2] s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) print ("old s is:",s) print (host) s.connect((host,port)) print ("new s is:",s) s.sendall(filename+"\r\n") while 1: buf=s.recv(2048) if not len(buf): break sys.stdout.write(buf)
加入錯誤處理html
import socket,sys port=70 print (len(sys.argv)) for i in range(len(sys.argv)): print (sys.argv[i]) host=sys.argv[1] filename=sys.argv[2] s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) print ("old s is:",s) print (host) try: s.connect((host, port)) except socket.gaierror,e: print ("ERROR connection to server:%s" %e) sys.exit(1) s.sendall(filename+"\r\n") while 1: buf=s.recv(2048) if not len(buf): break sys.stdout.write(buf) 文件接口類重寫 import socket,sys port=70 host=sys.argv[1] filename=sys.argv[2] s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((host,port)) fd=s.makefile('rw',0) fd.write(filename+"\r\n") for line in fd.readlines(): sys.stdout.write(line)
基本服務器操做node
import socket host='' port=80 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.bind((host,port)) s.listen(1) print "Server is running on port %d;press ctrl-c to\ terminate."% port while 1: clientsock,clientaddr=s.accept() clientfile=clientsock.makefile('rw',0) clientfile.write("welcome,"+str(clientaddr)+"/n") clientfile.write("Please enter a string:") line=clientfile.readline().strip() clientfile.write("You entered %d characters.\n"%len(line)) clientfile.close() clientsock.close()
高級接口python
import gopherlib,sys host=sys.argv[1] file=sys.argv[2] f=gopherlib.send_selector(file,host) for line in f.readlines(): sys.stdout.write(line) import urllib,sys host=sys.argv[1] file=sys.argv[2] f=urllib.urlopen('gopher://%s%s'%(host,file)) for line in f.readlines(): sys.stdout.write(line) import urllib,sys f=urllib.urlopen(sys.argv[1]) while 1: buf=f.read(2048) if not len(buf): break sys.stdout.write(buf)
第二章
使用udpweb
第三章 網絡服務器chrome
import socket solist=[x for x in dir(socket) if x.startswith('SO')] solist.sort() for x in solist: print x
第四章 域名系統apache
import sys,socket result=socket.getaddrinfo(sys.argv[1],None) print result[0][4] import sys,socket result=socket.getaddrinfo(sys.argv[1],None) counter=0 for item in result: print "%-2d:%s"%(counter,item[4]) counter+=1 P70 import sys,socket result=socket.getaddrinfo(sys.argv[1],None,0,socket.SOCK_STREAM) counter=0 for item in result: print "%-2d:%s"%(counter,item[4]) counter+=1
執行反向查詢編程
import sys,socket try: result=socket.gethostbyaddr(sys.argv[1]) print "Primary hostname:" print " "+result[0] print "\nAddresses:" for item in result[2]: print " "+item except socket.herror,e: print "Couldn't look up name:",e
正反向查詢api
import sys,socket def getipaddrs(hostname): result=socket.getaddrinfo(hostname,None,0,socket.SOCK_STREAM) return [x[4][0] for x in result] def gethostname(ipaddr): return socket.gethostbyaddr(ipaddr)[0] try: hostname=gethostname(sys.argv[1]) ipaddrs=getipaddrs(hostname) except socket.herror,e: print "NO host names available for %s;it may be normal"%sys.argv[1] sys.exit(0) except socket.gaierror,e: print "Got hostname %s,but it could not be forward-resolved:%s"%(hostname,str(e)) sys.exit(1) if not sys.argv[1] in ipaddrs: print "GOt hostnae %s,but no forward lookup,"% hostname print "original IP %s did not appear in IP address list"% sys.argv[1] sys.exit(1) print "Validated hostname:",hostname
得到完整域名,gethostname()得到主機名,getfqdn()得到完整信息,getaddrinfo()得到該域名對性的IP地址。
import sys,socket def getipaddrs(hostname): result=socket.getaddrinfo(hostname,None,0,socket.SOCK_STREAM) return [x[4][0] for x in result] hostname=socket.gethostname() print "Host name:",hostname print "Fully-qualified name:",socket.getfqdn(hostname) try: print "IP addresses:",",".join(getipaddrs(hostname)) except socket.gaierror,e: print "Couldn't not get IP addresses:",e import sys,DNS query=sys.argv[1] DNS.DiscoverNameServers() reqobj=DNS.Request() answerobj=reqobj.req(name=query,qtrpe=DNS.Type.ANY) if not len(answerobj.answers): print "NOT found." for item in answerobj.answers: print "%-5s %s"%(item['typename'],item['data']) import sys,DNS def hierquery(qstring,qtype):#給出主機名的相應服務器 reqobj=DNS.Request()#創建查詢對象實例 try: answerobj=reqobj.req(name=qstring,qtype=qtype) answers=[x['data'] for x in answerobj.answers if x['type']==qtype] except DNS.Base.DNSError: answers=[] if len(answers): return answers else: remainder=qstring.split(".",1) if len(remainder)==1: return None else: return hierquery(remainder[1],qtype) def findnameservers(hostname):#取得權威名稱服務器列表 return hierquery(hostname,DNS.Type.NS) def getrecordsfromnameserver(qstring,qtype,nslist):#在服務器查詢,直到找到答案或者查完該表 for ns in nslist: reqobj=DNS.Request(server=ns) try: answers=reqobj.req(name=qstring,qtype=qtype).answers if len(answers): return answers except DNS.Base.DNSError: pass return [] def nslookup(qstring,qtype,verbose=1): nslist=findnameservers(qstring) if nslist==None: raise RuntimeError,"Could not find nameserver to use." if verbose: print "using nameserver:",",".join(nslist) return getrecordsfromnameserver(qstring,qtype,nslist) if __name__=='__main__': query=sys.argv[1] DNS.DiscoverNameServers() answers=nslookup(query,DNS.Type.ANY) if not len(answers): print "not found." for item in answers: print "%-5s %s"%(item['typename'],item['data'])
第五章
超時的用法
echoserver.py
import socket,traceback host = '' port = 51432 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.bind((host,port)) s.listen(1) while True: try: clientsock,clientaddr = s.accept() except KeyboardInterrupt: raise except: traceback.print_exc() continue try: print "Got connection from",clientsock.getpeername() while True: data = clientsock.recv(4096) if not len(data): break clientsock.sendall(data) except (KeyboardInterrupt,SystemExit): raise except: traceback.print_exc() try: clientsock.close() except KeyboardInterrupt: raise except: traceback.print_exc() import struct,sys def htones(num): return struct.pack('!H',num) def htonl(num): return struct.pack('!I',num) def ntohs(data): return struct.unoack('!H',data)[0] def ntohl(data): return struct.unpack('!I',data)[0] def sendstring(data): return htonl(len(data))+data print "Enter a string:" str=sys.stdin.readline().rstrip() print repr(sendstring(str)) import socket,sys host,port = sys.argv[1:] results = socket.getaddrinfo(host,port,0,socket.SOCK_STREAM) for result in results: print "-"*60 if result[0] == socket.AF_INET: print "Family: AF_INET" elif result[0] == socket.AF_INET6: print "Family: AF_INET6" else: print "Family:",result[0] if result[1] == socket.SOCK_STREAM: print "Socket Type: SOCK_STREAM" elif result[1] == socket.SOCK_DGRAM: print "Socket Type: SOCK_DGRAM" print "Protocol:",result[2] print "Canonical Name:",result[3] print "Socket Address:",result[4]
先找ipv4,再找ivp6
Connect Example with ipv6 Awareness ------------- ipv6connect.py
import socket,sys def getaddrinfo_pref(host,port,socktype,familypreference=socket.AF_INET):#ipv4 results = socket.getaddrinfo(host,port,0,socktype) for result in results: if result[0] == familypreference: return result return results[0] host = sys.argv[1] port = 'http' c = getaddrinfo_pref(host,port,socket.SOCK_STREAM) print "Connecting to",c[4] s = socket.socket(c[0],c[1]) s.connect(c[4]) s.sendall("HEAD / HTTP/1.0\n\n") while True: buf = s.recv(4096) if not len(buf): break sys.stdout.write(buf) Echo Server Bound to Specific Address bindserver.py import socket,traceback host = '127.0.0.1' port = 51423 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.bind((host,port)) s.listen(1) while True: clientsock,clientaddr = s.accept() print "Got connection from",clientsock.getpeername() while True: data = clientsock.recv(4096) if not len(data): break clientsock.sendall(data) clientsock.close() pull() import socket,sys,select port = 51423 host = 'localhost' spinsize = 10 spinpos = 0 spindir = 1 def spin(): global spinsize,spinpos,spindir spinstr = '.' * spinpos + '|' + '.'*(spinsize-spinpos-1) sys.stdout.write('\r'+spinstr+' ') sys.stdout.flush() spinpos += spindir if spinpos < 0: spindir = 1 spinpos = 1 elif spinpos >= spinsize: spinpos -= 2 spindir = -1 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((host,port)) p = select.poll() p.register(s.fileno(),select.POLLIN | select.POLLERR | select.POLLHUP) while True: results = p.poll(50) if len(results): if results[0][1] == select.POLLIN: data = s.recv(4096) if not len(data): print ("\rRemove end closed connection ; exiting.") break sys.stdout.write("\rReceived: " + data) sys.stdout.flush() else: print "\rProblem occurred exitng." sys.exit(0) spin() selectclient.py import socket,sys,select port = 51423 host = 'localhost' spinsize = 10 spinpos = 0 spindir = 1 def spin(): global spinsize,spinpos,spindir spinstr = '.' * spinpos + '|' + '.' *(spinsize - spinpos -1) sys.stdout.write('\r' + spinstr +' ') sys.stdout.flush() spinpos += spindir if spinpos < 0: spindir = 1 spinpos = 1 elif spinpos >= spinsize: spinpos -= 2 spindir = -1 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((host,port)) while True: infds,outfds,errfds = select.select([s],[],[s],0.05) if len(infds): data = s.recv(4096) if not len(data): print("\rRemote end closed connection; Exiting.") break sys.stdout.write("\rReceived: " + data) sys.stdout.flush() if len(errfds): print "\rProblen occurred; exiting." sys.exit(0) spin()
服務端:
#coding=utf-8 from socket import* #監聽套接字的鏈接和迴應 #服務器端 myHost='' #‘’表明主機全部可用端口 myPort=50007 sockobj=socket(AF_INET,SOCK_STREAM)#建立一個TCP scoket 對象 sockobj.bind((myHost,myPort))#綁定服務端口號 sockobj.listen(5)#監聽,容許5個掛起鏈接 while True:#一直監聽直到進程被殺死 connection,address=sockobj.accept()#等待下個客戶端鏈接 print('Server connected by:',address)#鏈接是新的scoket while True: data=connection.recv(1024)#讀取新的客戶端scoket,for 循環接收 if not data:break#發送接收報文給客戶端 connection.send(b'Echo get your message:'+data)#直到結束關閉scoket,發送只能是b,bite格式 connection.close()
客戶端:
#coding=utf-8 import socket #客戶端 import sys from socket import * serverHost='localhost' serverPort=50007 message=[b'hello network world'] if len(sys.argv)>1: serverHost=sys.argv[1] if len(sys.argv)>2: message=(x.encode()for x in sys.argv[2:]) sockobj=socket(AF_INET,SOCK_STREAM) sockobj.connect((serverHost,serverPort)) for line in message: sockobj.send(line) data=sockobj.recv(1024)#1024字節 print('Client received:',data) sockobj.close()
UDP請求端:
#coding=utf-8 import socket,sys host=sys.argv[1] textpost=sys.argv[2] s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) try: port=int(textpost) except ValueError: print "輸入錯入" port=socket.getservbyname(textpost,'udp') s.connect((host,port)) print "Enter data to transmit: " data=sys.stdin.readline().strip() s.sendall(data) print "Looking for replies." while(1): buf=s.recv(2048) if not len(buf): break sys.stdout.write(buf) UDP應答端: #coding=utf-8 import socket,traceback host='' port=54132 s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.bind((host,port)) while 1: try: message,address=s.recvfrom(8192) print "Got data from",address s.sendto(message,address) except(KeyboardInterrupt,SystemExit): raise except: traceback.print_exc() UDP查詢時間 服務端: #coding=utf-8 import socket,traceback,time,struct host='' port=51432 s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.bind((host,port)) while 1: try: message,address=s.recvfrom(8192) print message,address secs=int(time.time()) secs-=60*60*24 secs+=220898800 reply=struct.pack("!I",secs) s.sendto(reply,address) except(KeyboardInterrupt,SystemExit): raise except: traceback.print_exc()
客戶端:
#coding=utf-8 import socket,sys,struct,time hostname='localhost' port=51432 host=socket.gethostbyname(hostname) s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) s.sendto('',(host,port)) print "Loking for replies" buf=s.recvfrom(2048)[0] if len(buf)!=4: print "Wrong-size reply %d:%s"%(len(buf),buf) sys.exit(1) secs=struct.unpack("!I",buf)[0] secs-=220898800 print time.ctime(int(secs)) 超時: #coding=utf-8 import socket,traceback host = '' port = 51432 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.bind((host,port)) s.listen(1) while True: try: clientsock,clientaddr = s.accept() except KeyboardInterrupt: raise except: traceback.print_exc() continue try: print "Got connection from",clientsock.getpeername() while True: data = clientsock.recv(4096) if not len(data): break clientsock.sendall(data) except (KeyboardInterrupt,SystemExit): raise except: traceback.print_exc() try: clientsock.close() except KeyboardInterrupt: raise except: traceback.print_exc() 4、ftp #coding=utf-8 #自動抓取並打開遠程文件文件 import os,sys from getpass import getpass from ftplib import FTP nonpassive=False filename='monkeys.jpg' dirname='.' sitename='ftp.rmi.net' userinfo=('lutz',getpass('pwd?')) if len(sys.argv)>1:filename=sys.argv[1] print('Connection...') connection=FTP(sitename) connection.login(*userinfo) connection.cwd(dirname) if nonpassive: connection.set_pasv(False) print('Downloading...') localfile=open(filename,'wb') connection.retrbinary('RETR'+filename,localfile.write,1024) connection.quit() localfile.close() if input('Open file?') in ['Y','y']: from PP4E.System.Media.playfile import playfile playfile(filename)
接收端:
#coding=utf-8 import socket,traceback host = '' port = 51423 s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.setsockopt(socket.SOL_SOCKET,socket.SO_BROADCAST,1) s.bind((host,port)) while True: try: message,address =s.recvfrom(8192) print "Got data from ",address s.sendto("I am here",address) except (KeyboardInterrupt,SystemExit): raise except: traceback.print_exc()
發送端:
#coding=utf-8 import socket,sys dest = ('<broadcast>',51423) s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET,socket.SO_BROADCAST,1) s.sendto("Hello",dest) print "Looking for replies; press Ctrl-C to stop." while True: (buf,address) = s.recvfrom(2048) if not len(buf): break print "Received from %s: %s" % (address,buf)
urllib2擴展性更好
1.下載Web界面
2.在遠程HTTP服務器上驗證
3.提交表單(from)數據
4.處理錯誤
5.與非HTTP協議通訊
1.下載Web界面
(1)
#coding=utf-8 import sys,urllib2 req=urllib2.Request(sys.argv[1]) fd=urllib2.urlopen(req) while 1: data=fd.read(1024) if not len(data): break sys.stdout.write(data)
sys.stdout 是標準輸出文件。write就是往這個文件寫數據。
合起來就是打印數據到標準輸出。相似print
運行結果:
D:\python\python.exe E:/code/python/unit6/dump_page.py
http://www.example.com<!doctype html> <html> <head> <title>Example Domain</title> <meta charset="utf-8" /> <meta http-equiv="Content-type" content="text/html; charset=utf-8" /> <meta name="viewport" content="width=device-width, initial-scale=1" /> <style type="text/css"> body { background-color: #f0f0f2; margin: 0; padding: 0; font-family: "Open Sans", "Helvetica Neue", Helvetica, Arial, sans-serif; } div { width: 600px; margin: 5em auto; padding: 50px; background-color: #fff; border-radius: 1em; } a:link, a:visited { color: #38488f; text-decoration: none; } @media (max-width: 700px) { body { background-color: #fff; } div { width: auto; margin: 0 auto; border-radius: 0; padding: 1em; } } </style> </head> <body> <div> <h1>Example Domain</h1> <p>This domain is established to be used for illustrative examples in documents. You may use this domain in examples without prior coordination or asking for permission.</p> <p><a href="http://www.iana.org/domains/example">More information...</a></p> </div> </body> </html>Process finished with exit code 0
(2)
#coding=utf-8 import sys,urllib2 req=urllib2.Request(sys.argv[1]) fd=urllib2.urlopen(req) print "Retrieved",fd.geturl() info=fd.info() for key,value in info.items(): print "%s=%s"%(key,value)
運行結果以下:
D:\python\python.exe E:/code/python/unit6/dump_info.py http://httpd.apache.org/dev
Retrieved http://httpd.apache.org/dev/
content-length=8870
accept-ranges=bytes
vary=Accept-Encoding
server=Apache/2.4.7 (Ubuntu)
last-modified=Wed, 25 Jan 2017 14:38:55 GMT
connection=close
etag="22a6-546ec313cb061"
date=Fri, 17 Mar 2017 06:29:52 GMT
content-type=text/html
Process finished with exit code 0
注:從geturl()獲得的值與傳入Request的對象不一樣,結尾處多了一條斜線,遠程服務器作了一個Http轉向,urllib自動跟隨了轉向。
其餘行顯示Http的header信息;
2.在遠程HTTP服務器上驗證
(1)
#coding=utf-8 import sys,urllib2,getpass class TerminalPassword(urllib2.HTTPPasswordMgr): def find_user_password(self, realm, authuri): ret=urllib2.HTTPPasswordMgr.find_user_password(self,realm,authuri) if ret[0] == None and ret[1] == None: sys.stdout.write("Login reauired for %s at %sn" % (realm,authuri)) sys.stdout.write("Username: ") username = sys.stdin.readline().rstrip() password = getpass.getpass().rstrip() return (username, password) else: return ret req = urllib2.Request(sys.argv[1]) opener = urllib2.build_opener(urllib2.HTTPBasicAuthHandler(TerminalPassword())) response = opener.open(req) print response.read()
擴展urllib2.HTTPPasswordMgr類,容許程序在須要的時候像操做員詢問用戶名和密碼,
build_opener:容許指定額外的處理程序,代碼須要支持認證,因此HTTPBasicAuthHandler加處處理連接
3.提交表單(from)數據
GET方法:把表單數據編碼至url,在給出請求的頁面後,加一個問號,接着是表單的元素。每一個鍵和值對用「&」分割,有些字符須要被避免。不適合數據量比較大的地方。
(1)
代碼: #coding=utf-8 import sys,urllib2 req=urllib2.Request(sys.argv[1]) fd=urllib2.urlopen(req) while 1: data=fd.read(1024) if not len(data): break sys.stdout.write(data)
sys.stdout 是標準輸出文件。write就是往這個文件寫數據。
合起來就是打印數據到標準輸出。相似print
運行結果:
D:\python\python.exe E:/code/python/unit6/dump_page.py http://weixin.sogou.com/weixin?p=01030402&query=%E5%8D%9A%E5%AE%A2%E5%9B%AD&type=2&ie=utf8
<!doctype html>
注:必須給url加上引號
(2)
代碼:
#coding=utf-8 import sys,urllib2,urllib def addGETdata(url,data): return url+'?'+urllib.urlencode(data) zipcode=sys.argv[1] url=addGETdata('http://www.weather.com.cn/cgi-bin/findweather/getForecast',[('query',zipcode)]) print "using URL",url req=urllib2.Request(url) fd=urllib2.urlopen(req) while 1: data=fd.read(1024) if not len(data): break sys.stdout.write(data)
注:函數addGETdata(url,data)負責在url結尾添加全部的數據。在內部,他在URL和經過urllib.urlencode()獲得的數據間添加問號。
POST方法:單獨部分發送。URL永遠不會被修改,附加信息經過第二個參數傳遞給urlopen().
(3)
代碼:
#coding=utf-8 import sys,urllib2,urllib zipcode=sys.argv[1] url='http://www.wunderground.com/cgi-bin/findweather/getForcecast' data=urllib.urlencode([('query',zipcode)]) req=urllib2.Request(url) fd=urllib2.urlopen(req,data) while 1: data=fd.read(1024) if not len(data): break sys.stdout.write(data)
4.處理錯誤
(1)
代碼:
#coding=utf-8 import sys,urllib2 req=urllib2.Request(sys.argv[1]) try: fd=urllib2.urlopen(req) except urllib2.URLError,e: print "Error reteiveving data:",e sys.exit(1) print "Retrieved",fd.geturl() info=fd.info() for key,value in info.items(): print "%s=%s"% (key,value)
運行結果:
D:\python\python.exe E:/code/python/unit6/error_basic.py
https://www.wunderground.com/cgi-bin/findweather/getForcecast
Error reteiveving data: HTTP Error 404: Not FoundProcess finished with exit code 1
(2)
代碼:
#coding=utf-8 # import sys,urllib2 # # req=urllib2.Request(sys.argv[1]) # # try: # fd=urllib2.urlopen(req) # except urllib2.URLError,e: # print "Error reteiveving data:",e # sys.exit(1) # print "Retrieved",fd.geturl() # info=fd.info() # for key,value in info.items(): # print "%s=%s"% (key,value) import sys,urllib2 req=urllib2.Request(sys.argv[1]) try: fd=urllib2.urlopen(req) except urllib2.HTTPError,e: print "Error reteiveving data:",e print "Server error document follows:\n" print e.read sys.exit(1) except urllib2.URLError,e: print "Error retriveving data",e sys.exit(2) print "Retrieved",fd.geturl() info=fd.info() for key,value in info.items(): print "%s=%s"% (key,value)
運行結果:
D:\python\python.exe E:/code/python/unit6/error_basic.py
https://www.wunderground.com/cgi-bin/findweather/getForcecast
Error reteiveving data: HTTP Error 404: Not Found
Server error document follows:<bound method _fileobject.read of <socket._fileobject object at
0x0216A5B0>>Process finished with exit code 1
注:若是產生了一個HTTPEroor的實力,會捕獲異常打印細節。不然,urllib2.URLError類的實例,會顯示一條URLError信息。
讀取數據錯誤:
通訊錯誤,會使socket模塊調用read()函數時發生socket.error;(會經過系統層傳遞)
沒有通訊狀況下發送的文檔被刪節;
(3)
代碼:
#coding=utf-8 import sys,urllib2,socket req=urllib2.Request(sys.argv[1]) try: fd=urllib2.urlopen(req) except urllib2.HTTPError,e: print "Error retrieving data:",e print "Sever error document follows:\n" print e.read() sys.exit(1) except urllib2.URLError,e: print "Error retrieving data:",e sys.exit(2) print "Retrieved",fd.geturl() bytesread=0 while 1: try: data=fd.read(1024) except socket.error,e: print "Error reading data:",e sys.exit(3) if not len(data): break bytesread+=len(data) sys.stdout.write(data) if fd.info().has_key('Content-Length') and long(fd.info()['Content-Length'])!=long(bytesread): print "Excepted a document of size %d,but read %d bytes"%(long(fd.info()['Content-Length']),bytesread) sys.exit(4)
運行結果:
> D:\python\python.exe E:/code/python/unit6/erroe_all.py > https://www.wunderground.com/cgi-bin/findweather/getForcecast > Error retrieving data: HTTP Error 404: Not Found > Sever error document follows: > > > <!DOCTYPE html> > <!--[if IE 9]><html class="no-js ie9"> <![endif]--> > <!--[if gt IE 9]><!--> <html class="no-js "> <!--<![endif]--> > <head> > <title>Error | Weather Underground</title> > <link href="//icons.wxug.com/" rel="dns-prefetch" /> > <link href="//api-ak.wunderground.com/" rel="dns-prefetch" /> > <meta charset="utf-8"> > <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
第七章 解析Html 和XHtml p151-p168
1.提取標題
代碼:
#coding=utf-8 from HTMLParser import HTMLParser import sys class TitleParser(HTMLParser): def __init__(self): self.title='' self.readingtitle=0 HTMLParser.__init__(self) def handle_starttag(self, tag, attrs): if tag =='title': self.readingtitle = 1 def handle_data(self, data): if self.readingtitle: self.title += data def handle_endtag(self, tag): if tag == 'title': self.readingtitle = 0 def gettitle(self): return self.title fd = open(sys.argv[1]) tp = TitleParser() tp.feed(fd.read()) print "Title is:",tp.gettitle()
運行結果:
D:\python\python.exe E:/code/python/unit7/basic_title.py
E:/code/python/unit7/faqs.html
Title is: Appendix?B. MySQL 5.6 Frequently Asked QuestionsProcess finished with exit code 0
注:從表中摘取數據,或
2.改進
代碼:
#coding=utf-8 from HTMLParser import HTMLParser from htmlentitydefs import entitydefs import sys class TitleParser(HTMLParser): def __init__(self): self.title='' self.readingtitle=0 HTMLParser.__init__(self) def handle_starttag(self, tag, attrs): if tag =='title': self.readingtitle = 1 def handle_data(self, data): if self.readingtitle: self.title += data def handle_endtag(self, tag): if tag == 'title': self.readingtitle = 0 def handle_entityref(self, name): if entitydefs.has_key(name): self.handle_data(entitydefs[name]) else: self.handle_data('&'+name+';') def gettitle(self): return self.title fd = open(sys.argv[1]) tp = TitleParser() tp.feed(fd.read()) print "Title is:",tp.gettitle()
etitle.html
<!DOCTYPE html> <html > <head> <title>Document Title &Intro</title> </head> <body> this is my text. </body> </html>
運行結果一:
D:\python\python.exe E:/code/python/unit7/basic_title.py
E:/code/python/unit7/etitle.html
Title is: Document Title Intro
Process finished with exit code 0
運行結果二:
D:\python\python.exe E:/code/python/unit7/etitle.py
E:/code/python/unit7/etitle.html
Title is: Document Title &IntroProcess finished with exit code 0
當一個實體出現時,代碼檢查該實體是否能夠識別,能夠,轉換爲相應得知,不然輸入流中的文字;
3.轉換字符參考
代碼:
#coding=utf-8 from HTMLParser import HTMLParser from htmlentitydefs import entitydefs import sys class TitleParser(HTMLParser): def __init__(self): self.title='' self.readingtitle=0 HTMLParser.__init__(self) def handle_starttag(self, tag, attrs): if tag =='title': self.readingtitle = 1 def handle_data(self, data): if self.readingtitle: self.title += data def handle_endtag(self, tag): if tag == 'title': self.readingtitle = 0 def handle_entityref(self, name): if entitydefs.has_key(name): self.handle_data(entitydefs[name]) else: self.handle_data('&'+name+';') def handle_charref(self, name): try: charnum=int(name) except ValueError: return if charnum<1 or charnum>225: return self.handle_data(chr(charnum)) def gettitle(self): return self.title fd = open(sys.argv[1]) tp = TitleParser() tp.feed(fd.read()) print "Title is:",tp.gettitle()
4.處理不均衡的標籤
代碼:
#coding=utf-8 from HTMLParser import HTMLParser from htmlentitydefs import entitydefs import sys,re class TitleParser(HTMLParser): def __init__(self): self.taglevels=[] self.handledtags=['title','ul','li'] self.processing=None HTMLParser.__init__(self) def handle_starttag(self, tag, attrs): if len(self.taglevels) and self.taglevels[-1] == tag: self.handle_endtag(tag) self.taglevels.append(tag) if tag in self.handledtags: self.data = '' self.processing = tag if tag == 'ul': print"List start" def handle_data(self, data): if self.processing: self.data += data def handle_endtag(self, tag): if not tag in self.taglevels: return while len(self.taglevels): starttag = self.taglevels.pop() if starttag in self.handledtags: self.finishprocessing(starttag) if starttag == tag: break def cleanse(self): self.data = re.sub('\s+', ' ', self.data) def finishprocessing(self, tag): self.cleanse() if tag == 'title' and tag == self.processing: print "Dom title", self.data elif tag == 'ul': print "List ended" elif tag == 'li' and tag == self.processing: print "List item", self.data self.processing = None def gettitle(self): return self.title
處理特殊值,若是在映射表中有對應的,即採用映射的值,不然爲字面值
def handle_entityref(self, name): if entitydefs.has_key(name): self.handle_data(entitydefs[name]) else: self.handle_data('&' + name + ';') def handle_charref(self, name): try: charnum = int(name) except ValueError: return if charnum < 1 or charnum > 255: return self.handle_data(chr(charnum)) fd = open(sys.argv[1]) tp = TitleParser() tp.feed(fd.read())
運行結果:
D:\python\python.exe E:/code/python/unit7/4un.py
E:/code/python/unit7/4un.html
Dom title DOCTYPE Title & Intro?
List start
List item First List item
List item second list item
List item second list item
List endedProcess finished with exit code 0
5.一個能夠實際工做的例子
P169-p190
展現XML文檔:tree,event.基於事件的解析器能夠掃描文檔,事件解析器能夠響應。
8.2 使用Dom
代碼:
#coding=utf-8 from xml.dom import minidom,Node def scanNode(node,level=0): msg = node.__class__.__name__ if node.nodeType == Node.ELEMENT_NODE: msg += ",tag" + node.tagName print " " * level * 4, msg if node.hasChildNodes: for child in node.childNodes: scanNode(child, level + 1) doc = minidom.parse("Sample.xml") scanNode(doc)
運行結果:
D:\python\python.exe E:/code/python/unit8/un1.py
Document > Element,tagbook > Text > Element,tagtitle > Text > Text > Element,tagauthor > Text > Element,tagname > Text > Element,tagfirst > Text > Text > Element,taglast > Text > Text > Text > Element,tagaffiliation > Text > Text > Text > Element,tagchapter > Text > Element,tagtitle > Text > Text > Element,tagpara > Text > Element,tagcompany > Text > Text > Text > Text
Process finished with exit code 0
sample.xml
<?xml version="1.0" encoding="UTF-8"?> <book> <title> Sample XML Thing </title> <author> <name> <first>Benjamin</first> <last>Smith</last> </name> <affiliation>Springy Widgets,Inc.</affiliation> </author> <chapter number = "1"> <title>First chapter</title> <para> I think widgets are great.you should buy lots of them from <company>Springy widgets,Inc</company> </para> </chapter> </book>
2.使用dom徹底解析
代碼:
#coding=utf-8 """ 將XML以文本形式從新格式化輸出 1.使用Node的節點類型,判斷下一步如何處理 2.對不一樣的節點名(tagName)進行相應的處理 """ from xml.dom import minidom, Node import re, textwrap class SampleScanner: def __init__(self, doc): for child in doc.childNodes: if child.nodeType == Node.ELEMENT_NODE and child.tagName == "book": """只處理book元素""" self.handleBook(child) def gettext(self, nodelist): """獲取當前節點的文本, 1.若是當前的節點爲TEXT_NODE,將文本追加到列表中 2.若是當前的節點不是TEXT_NODE,遞歸地調用gettext""" retlist = [] for node in nodelist: if node.nodeType == Node.TEXT_NODE: retlist.append(node.wholeText) elif node.hasChildNodes: retlist.append(self.gettext(node.childNodes)) return re.sub("\s+", " ", "".join(retlist)) def handleBook(self, node): """處理Book節點 1.若是不是ELEMENT_NODE,不予理睬 2.若是是title,直接打印出文本內容 3.若是是author,調用handleAuthor,繼續處理節點 4.若是是chapter,調用handleChapter,繼續處理節點 """ for child in node.childNodes: if child.nodeType != Node.ELEMENT_NODE: continue if child.tagName == "title": print "Book title is :", self.gettext(child.childNodes) if child.tagName == "author": self.handleAuthor(child) if child.tagName == "chapter": self.handleChapter(child) def handleAuthor(self, node): """處理Autho節點 1.若是不是ELEMENT_NODE,不予理睬 2.若是是name,調用handleAuthoerName,繼續處理節點 3.若是是affiliation,調用gettext,並打印出來 """ for child in node.childNodes: if child.nodeType != Node.ELEMENT_NODE: continue if child.tagName == "name": self.handleAuthorName(child) elif child.tagName == "affiliation": print "Author affiliation:", self.gettext([child]) def handleAuthorName(self, node): """處理author.name節點 1.使用getElementsByTagName得到子節點 2.調用gettext獲得子節點的文本,並打印處理 """ surname = self.gettext(node.getElementsByTagName("last")) givenname = self.gettext(node.getElementsByTagName("first")) print "Author Name:%s %s " % (surname, givenname) def handleChapter(self, node): """處理chapter節點 1.若是不是ELEMENT_NODE,不予理睬 2.若是是para,調用handlePara,繼續處理 """ print "*** Start of Chapter %s,%s" % ( node.getAttribute("number"), self.gettext(node.getElementsByTagName("title"))) for child in node.childNodes: if child.nodeType != Node.ELEMENT_NODE: continue if child.tagName == "para": self.handlePara(child) def handlePara(self, node): """ 1.獲取當前節點的文本 2.調用textwrap格式化文本 """ paratext = self.gettext([node]) paratext = textwrap.fill(paratext) print paratext doc = minidom.parse("Sample.xml") SampleScanner(doc)
運行結果:
D:\python\python.exe E:/code/python/unit8/un2.py
Book title is : Sample XML Thing
Author Name:Smith Benjamin
Author affiliation: Springy Widgets,Inc.
*** Start of Chapter 1,First chapter
I think widgets are great.you should buy lots of them from Springy
widgets,IncProcess finished with exit code 0
3.使用Dom產生文檔
代碼:
#coding=utf-8 """ 使用minidom生成XML 1.建立Element,createElement 2.添加子節點,appendChild 3.建立Text,createTextNode 4.建立屬性,createAttribute """ from xml.dom import minidom,Node # 建立Document doc = minidom.Document() # 建立book節點 book = doc.createElement("book") doc.appendChild(book) # 建立Title節點 title = doc.createElement("title") text = doc.createTextNode("Sample XML Thing") title.appendChild(text) book.appendChild(title) # 建立author節點 author = doc.createElement("author") # 建立name節點 name = doc.createElement("name") first = doc.createElement("first") first.appendChild(doc.createTextNode("Benjamin")) name.appendChild(first) last = doc.createElement("last") last.appendChild(doc.createTextNode("Smith")) name.appendChild(last) author.appendChild(name) book.appendChild(author) # author節點完畢 # 建立chapter節點 chapter = doc.createElement("chapter") chapter.setAttribute("number","1") title = doc.createElement("title") title.appendChild(doc.createTextNode("Fisrt Chapter")) chapter.appendChild(title) para = doc.createElement("para") para.appendChild(doc.createTextNode("I think widgets are great.you should buy lots \ of them from")) company = doc.createElement("company") company.appendChild(doc.createTextNode("Springy widgets,Inc")) para.appendChild(company) chapter.appendChild(para) # chapter節點完畢 book.appendChild(chapter) # book節點完畢 print doc.toprettyxml(indent = " ")
運行結果:
D:\python\python.exe E:/code/python/unit8/un3.py
<?xml version="1.0" ?> <book> <title>Sample XML Thing</title> <author> <name> <first>Benjamin</first> <last>Smith</last> </name> </author> <chapter number="1"> <title>Fisrt Chapter</title> <para> I think widgets are great.you should buy lots of them from <company>Springy widgets,Inc</company> </para> </chapter> </book>Process finished with exit code 0
4.dom類型參考
8.3使用xml-rpc
#coding=utf-8 import xmlrpclib url='http://liandesinian.blog.51cto.com/7737219/1565474' s=xmlrpclib.ServerProxy(url) catdata=s.meerkat.getCategories() cattiles=[item['title'] for item in catdata] cattiles.sort() for item in cattiles: print item
運行結果:
D:\python\python.exe E:/code/python/unit8/un6.py
Process finished with exit code 0
#coding=utf-8 import xmlrpclib,sys,textwrap class NewsCat: def __init__(self,catdata): self.id=catdata['id'] self.title=catdata['title'] def __cmp__(self, other): return cmp(self.title,other.title) class NewsSource: def __init__(self,url='http://www.oreillynet.com/meerkat/xml-rpc/server.php'): self.s=xmlrpclib.ServerProxy(url) self.loadcats() def loadcats(self): print "Loading categories...." catdata=self.s.meerkat.getCatgries() self.cats=[NewsCat(item) for item in catdata] self.cat.sort() def displaycats(self): numonline=0 i=0 for item in self.cats: sys.stdout.write("%2d:%20.20s"%(i+1,item.title)) i+=1 numonline+=1 if numonline%3==0: sys.stdout.write("\n") if numonline!=0: sys.stdout.write("\n") def promotcat(self): sys.__displaycats() sys.stdout.write("select a catgory or q to quit") selection = sys.stdin.readline().strip() if selection == 'q': sys.exit(0) return int(selection) - 1 def dispact(self, cat): items = self.s.meerkat.getItems({'category': cat, 'ids': 1, 'descriptions': 1, 'categories': 1, 'channels': 1, 'data': 1, 'num_items': 15}) if not len(items): print "Sorry,no items in that category." sys.stdout.write("Press Enter to continue:") sys.stdin.readline() return while 1: print self.dispitemsummary(items) sys.stdout.write("select a catgory or q to quit") selection = sys.stdin.readline().strip() if selection=='q': return self.dispitem(items[int(selection)-1]) def dispitemsummary(self, items): counter = 0 for item in items: print "%2d:%s"(counter + 1, item['title']) counter += 1 def dispitem(self, item): print "---%s---" % item['title'] print "Posted on", item['data'] print "Description:" print textwrap.fill(item['description']) print "\nlink:", item['link'] sys.stdout.write("\nPress Enter to continue: ") sys.stdin.readline() n = NewsSource() while 1: cat = n.promotcat() n.dispact(cat)