在電腦中忽然發現一個這麼好的資料,雪松大神製做,不敢獨享,特與你們共享。連他的廣告也一併複製了吧!php
python實例手冊 #encoding:utf8 # 設定編碼-支持中文 0說明 手冊製做: 雪松 littlepy reboot 更新日期: 2014-10-29 歡迎系統運維加入Q羣: 198173206 # 加羣請回答問題 歡迎運維開發加入Q羣: 365534424 # 不按期技術分享 請使用"notepad++"打開此文檔,"alt+0"將函數摺疊後方便查閱 請勿刪除信息,轉載請說明出處,抵制不道德行爲。 錯誤在所不免,還望指正! # python實例手冊下載地址: http://hi.baidu.com/quanzhou722/item/cf4471f8e23d3149932af2a7 # shell實例手冊最新下載地址: http://hi.baidu.com/quanzhou722/item/f4a4f3c9eb37f02d46d5c0d9 # LazyManage運維批量管理軟件下載[shell]: http://hi.baidu.com/quanzhou722/item/4ccf7e88a877eaccef083d1a # LazyManage運維批量管理軟件下載[python]: http://hi.baidu.com/quanzhou722/item/4213db3626a949fe96f88d3c 1 基礎 查看幫助 import os for i in dir(os): print i # 模塊的方法 help(os.path) # 方法的幫助 調試 python -m trace -t aaaaaa.py pip模塊安裝 yum install python-pip # centos安裝pip sudo apt-get install python-pip # ubuntu安裝pip pip官方安裝腳本 wget https://raw.github.com/pypa/pip/master/contrib/get-pip.py python get-pip.py 加載環境變量 vim /etc/profile export PATH=/usr/local/python27/bin:$PATH . /etc/profile pip install Package # 安裝包 pip install requests pip show --files Package # 查看安裝包時安裝了哪些文件 pip show --files Package # 查看哪些包有更新 pip install --upgrade Package # 更新一個軟件包 pip uninstall Package # 卸載軟件包 變量 r=r'\n' # 輸出時原型打印 u=u'中文' # 定義爲unicode編碼 global x # 全局變量 a = 0 or 2 or 1 # 布爾運算賦值,a值爲True既不處理後面,a值爲2. None、字符串''、空元組()、空列表[],空字典{}、0、空字符串都是false name = raw_input("input:").strip() # 輸入字符串變量 num = int(raw_input("input:").strip()) # 輸入字符串str轉爲int型 locals() # 全部局部變量組成的字典 locals().values() # 全部局部變量值的列表 os.popen("date -d @{0} +'%Y-%m-%d %H:%M:%S'".format(12)).read() # 特殊狀況引用變量 {0} 表明第一個參數 打印 # 字符串 %s 整數 %d 浮點 %f 原樣打印 %r print '字符串: %s 整數: %d 浮點: %f 原樣打印: %r' % ('aa',2,1.0,'r') print 'abc', # 有逗號,表明不換行打印,在次打印會接着本行打印 列表 # 列表元素的個數最多 536870912 shoplist = ['apple', 'mango', 'carrot', 'banana'] shoplist[2] = 'aa' del shoplist[0] shoplist.insert('4','www') shoplist.append('aaa') shoplist[::-1] # 倒着打印 對字符翻轉串有效 shoplist[2::3] # 從第二個開始每隔三個打印 shoplist[:-1] # 排除最後一個 '\t'.join(li) # 將列表轉換成字符串 sys.path[1:1]=[5] # 在位置1前面插入列表中一個值 list(set(['qwe', 'as', '123', '123'])) # 將列表經過集合去重複 eval("['1','a']") # 將字符串當表達式求值,獲得列表 元組 # 不可變 zoo = ('wolf', 'elephant', 'penguin') 字典 ab = { 'Swaroop' : 'swaroopch@byteofpython.info', 'Larry' : 'larry@wall.org', } ab['c'] = 80 # 添加字典元素 del ab['Larry'] # 刪除字典元素 ab.keys() # 查看全部鍵值 ab.values() # 打印全部值 ab.has_key('a') # 查看鍵值是否存在 ab.items() # 返回整個字典列表 複製字典 a = {1: {1: 2, 3: 4}} b = a b[1][1] = 8888 # a和b都爲 {1: {1: 8888, 3: 4}} import copy c = copy.deepcopy(a) # 再次賦值 b[1][1] = 9999 拷貝字典爲新的字典,互不干擾 a[2] = copy.deepcopy(a[1]) # 複製出第二個key,互不影響 {1: {1: 2, 3: 4},2: {1: 2, 3: 4}} 流程結構 if判斷 # 布爾值操做符 and or not 實現多重判斷 if a == b: print '==' elif a < b: print b else: print a fi while循環 while True: if a == b: print "==" break print "!=" else: print 'over' count=0 while(count<9): print count count += 1 for循環 sorted() # 返回一個序列(列表) zip() # 返回一個序列(列表) enumerate() # 返回循環列表序列 for i,v in enumerate(['a','b']): reversed() # 反序迭代器對象 dict.iterkeys() # 經過鍵迭代 dict.itervalues() # 經過值迭代 dict.iteritems() # 經過鍵-值對迭代 randline() # 文件迭代 iter(obj) # 獲得obj迭代器 檢查obj是否是一個序列 iter(a,b) # 重複調用a,直到迭代器的下一個值等於b for i in range(1, 5): print i else: print 'over' list = ['a','b','c','b'] for i in range(len(list)): print list[i] for x, Lee in enumerate(list): print "%d %s Lee" % (x+1,Lee) # enumerate 使用函數獲得索引值和對應值 for i, v in enumerate(['tic', 'tac', 'toe']): print(i, v) 流程結構簡寫 [ i * 2 for i in [8,-2,5]] [16,-4,10] [ i for i in range(8) if i %2 == 0 ] [0,2,4,6] tab補全 # vim /usr/lib/python2.7/dist-packages/tab.py # python startup file import sys import readline import rlcompleter import atexit import os # tab completion readline.parse_and_bind('tab: complete') # history file histfile = os.path.join(os.environ['HOME'], '.pythonhistory') 函數 def printMax(a, b = 1): if a > b: print a return a else: print b return b x = 5 y = 7 printMax(x, y) def update(*args,**kwargs): p='' for i,t in kwargs.items(): p = p+ '%s=%s,' %(i,str(t)) sql = "update 'user' set (%s) where (%s)" %(args[0],p) print sql update('aaa',uu='uu',id=3) 模塊 # Filename: mymodule.py def sayhi(): print 'mymodule' version = '0.1' # 使用模塊中方法 import mymodule from mymodule import sayhi, version mymodule.sayhi() # 使用模塊中函數方法 類對象的方法 class Person: # 實例化初始化的方法 def __init__(self, name ,age): self.name = name self.age = age print self.name # 有self此函數爲方法 def sayHi(self): print 'Hello, my name is', self.name # 對象消逝的時候被調用 def __del__(self): print 'over' # 實例化對象 p = Person('Swaroop') # 使用對象方法 p.sayHi() # 繼承 class Teacher(Person): def __init__(self, name, age, salary): Person.__init__(self, name, age) self.salary = salary print '(Initialized Teacher: %s)' % self.name def tell(self): Person.tell(self) print 'Salary: "%d"' % self.salary t = Teacher('Mrs. Shrividya', 40, 30000) 執行模塊類中的全部方法 # moniItems.py import sys, time import inspect class mon: def __init__(self, n): self.name = n self.data = dict() def run(self): print 'hello', self.name return self.runAllGet() def getDisk(self): return 222 def getCpu(self): return 111 def runAllGet(self): for fun in inspect.getmembers(self, predicate=inspect.ismethod): print fun[0], fun[1] if fun[0][:3] == 'get': self.data[fun[0][3:]] = fun[1]() print self.data return self.data # 模塊導入使用 from moniItems import mon m = mon() m.runAllGet() 文件處理 # 模式: 讀'r' 寫[清空整個文件]'w' 追加[文件須要存在]'a' 讀寫'r+' 二進制文件'b' 'rb','wb','rb+' 寫文件 i={'ddd':'ccc'} f = file('poem.txt', 'a') f.write("string") f.write(str(i)) f.flush() f.close() 讀文件 f = file('/etc/passwd','r') c = f.read().strip() # 讀取爲一個大字符串,並去掉最後一個換行符 for i in c.spilt('\n'): # 用換行符切割字符串獲得列表循環每行 print i f.close() 讀文件1 f = file('/etc/passwd','r') while True: line = f.readline() # 返回一行 if len(line) == 0: break x = line.split(":") # 冒號分割定義序列 #x = [ x for x in line.split(":") ] # 冒號分割定義序列 #x = [ x.split("/") for x in line.split(":") ] # 先冒號分割,在/分割 打印x[6][1] print x[6],"\n", f.close() 讀文件2 f = file('/etc/passwd') c = f.readlines() # 讀入全部文件內容,可反覆讀取,大文件時佔用內存較大 for line in c: print line.rstrip(), f.close() 讀文件3 for i in open('b.txt'): # 直接讀取也可迭代,並有利於大文件讀取,但不可反覆讀取 print i, 追加日誌 log = open('/home/peterli/xuesong','a') print >> log,'faaa' log.close() with讀文件 with open('a.txt') as f: for i in f: print i print f.read() # 打印全部內容爲字符串 print f.readlines() # 打印全部內容按行分割的列表 csv讀配置文件 192.168.1.5,web # 配置文件按逗號分割 list = csv.reader(file('a.txt')) for line in list: print line # ['192.168.1.5', 'web'] 內建函數 dir(sys) # 顯示對象的屬性 help(sys) # 交互式幫助 int(obj) # 轉型爲整形 str(obj) # 轉爲字符串 len(obj) # 返回對象或序列長度 open(file,mode) # 打開文件 #mode (r 讀,w 寫, a追加) range(0,3) # 返回一個整形列表 raw_input("str:") # 等待用戶輸入 type(obj) # 返回對象類型 abs(-22) # 絕對值 random # 隨機數 choice() # 隨機返回給定序列的一個元素 divmod(x,y) # 函數完成除法運算,返回商和餘數。 round(x[,n]) # 函數返回浮點數x的四捨五入值,如給出n值,則表明舍入到小數點後的位數 strip() # 是去掉字符串兩端多於空格,該句是去除序列中的全部字串兩端多餘的空格 del # 刪除列表裏面的數據 cmp(x,y) # 比較兩個對象 #根據比較結果返回一個整數,若是x<y,則返回-1;若是x>y,則返回1,若是x==y則返回0 max() # 字符串中最大的字符 min() # 字符串中最小的字符 sorted() # 對序列排序 reversed() # 對序列倒序 enumerate() # 返回索引位置和對應的值 sum() # 總和 list() # 變成列表可用於迭代 eval('3+4') # 將字符串當表達式求值 獲得7 exec 'a=100' # 將字符串按python語句執行 exec(a+'=new') # 將變量a的值做爲新的變量 tuple() # 變成元組可用於迭代 #一旦初始化便不能更改的數據結構,速度比list快 zip(s,t) # 返回一個合併後的列表 s = ['11','22'] t = ['aa','bb'] [('11', 'aa'), ('22', 'bb')] isinstance(object,int) # 測試對象類型 int xrange([lower,]stop[,step]) # 函數與range()相似,但xrnage()並不建立列表,而是返回一個xrange對象 字符串相關模塊 string # 字符串操做相關函數和工具 re # 正則表達式 struct # 字符串和二進制之間的轉換 c/StringIO # 字符串緩衝對象,操做方法相似於file對象 base64 # Base16\32\64數據編解碼 codecs # 解碼器註冊和基類 crypt # 進行單方面加密 difflib # 找出序列間的不一樣 hashlib # 多種不一樣安全哈希算法和信息摘要算法的API hma # HMAC信息鑑權算法的python實現 md5 # RSA的MD5信息摘要鑑權 rotor # 提供多平臺的加解密服務 sha # NIAT的安全哈希算法SHA stringprep # 提供用於IP協議的Unicode字符串 textwrap # 文本包裝和填充 unicodedate # unicode數據庫 列表類型內建函數 list.append(obj) # 向列表中添加一個對象obj list.count(obj) # 返回一個對象obj在列表中出現的次數 list.extend(seq) # 把序列seq的內容添加到列表中 list.index(obj,i=0,j=len(list)) # 返回list[k] == obj 的k值,而且k的範圍在i<=k<j;不然異常 list.insert(index.obj) # 在索引量爲index的位置插入對象obj list.pop(index=-1) # 刪除並返回指定位置的對象,默認是最後一個對象 list.remove(obj) # 從列表中刪除對象obj list.reverse() # 原地翻轉列表 list.sort(func=None,key=None,reverse=False) # 以指定的方式排序列表中成員,若是func和key參數指定,則按照指定的方式比較各個元素,若是reverse標誌被置爲True,則列表以反序排列 序列類型操做符 seq[ind] # 獲取下標爲ind的元素 seq[ind1:ind2] # 得到下標從ind1到ind2的元素集合 seq * expr # 序列重複expr次 seq1 + seq2 # 鏈接seq1和seq2 obj in seq # 判斷obj元素是否包含在seq中 obj not in seq # 判斷obj元素是否不包含在seq中 字符串類型內建方法 string.expandtabs(tabsize=8) # tab符號轉爲空格 #默認8個空格 string.endswith(obj,beg=0,end=len(staring)) # 檢測字符串是否已obj結束,若是是返回True #若是beg或end指定檢測範圍是否已obj結束 string.count(str,beg=0,end=len(string)) # 檢測str在string裏出現次數 f.count('\n',0,len(f)) 判斷文件行數 string.find(str,beg=0,end=len(string)) # 檢測str是否包含在string中 string.index(str,beg=0,end=len(string)) # 檢測str不在string中,會報異常 string.isalnum() # 若是string至少有一個字符而且全部字符都是字母或數字則返回True string.isalpha() # 若是string至少有一個字符而且全部字符都是字母則返回True string.isnumeric() # 若是string只包含數字字符,則返回True string.isspace() # 若是string包含空格則返回True string.isupper() # 字符串都是大寫返回True string.islower() # 字符串都是小寫返回True string.lower() # 轉換字符串中全部大寫爲小寫 string.upper() # 轉換字符串中全部小寫爲大寫 string.lstrip() # 去掉string左邊的空格 string.rstrip() # 去掉string字符末尾的空格 string.replace(str1,str2,num=string.count(str1)) # 把string中的str1替換成str2,若是num指定,則替換不超過num次 string.startswith(obj,beg=0,end=len(string)) # 檢測字符串是否以obj開頭 string.zfill(width) # 返回字符長度爲width的字符,原字符串右對齊,前面填充0 string.isdigit() # 只包含數字返回True string.split("分隔符") # 把string切片成一個列表 ":".join(string.split()) # 以:做爲分隔符,將全部元素合併爲一個新的字符串 序列類型相關的模塊 array # 一種受限制的可變序列類型,元素必須相同類型 copy # 提供淺拷貝和深拷貝的能力 operator # 包含函數調用形式的序列操做符 operator.concat(m,n) re # perl風格的正則表達式查找 StringIO # 把長字符串做爲文件來操做 如: read() \ seek() cStringIO # 把長字符串做爲文件來操,做速度更快,但不能被繼承 textwrap # 用做包裝/填充文本的函數,也有一個類 types # 包含python支持的全部類型 collections # 高性能容器數據類型 字典內建方法 dict.clear() # 刪除字典中全部元素 dict copy() # 返回字典(淺複製)的一個副本 dict.fromkeys(seq,val=None) # 建立並返回一個新字典,以seq中的元素作該字典的鍵,val作該字典中全部鍵對的初始值 dict.get(key,default=None) # 對字典dict中的鍵key,返回它對應的值value,若是字典中不存在此鍵,則返回default值 dict.has_key(key) # 若是鍵在字典中存在,則返回True 用in和not in代替 dicr.items() # 返回一個包含字典中鍵、值對元組的列表 dict.keys() # 返回一個包含字典中鍵的列表 dict.iter() # 方法iteritems()、iterkeys()、itervalues()與它們對應的非迭代方法同樣,不一樣的是它們返回一個迭代子,而不是一個列表 dict.pop(key[,default]) # 和方法get()類似.若是字典中key鍵存在,刪除並返回dict[key] dict.setdefault(key,default=None) # 和set()類似,但若是字典中不存在key鍵,由dict[key]=default爲它賦值 dict.update(dict2) # 將字典dict2的鍵值對添加到字典dict dict.values() # 返回一個包含字典中全部值得列表 dict([container]) # 建立字典的工廠函數。提供容器類(container),就用其中的條目填充字典 len(mapping) # 返回映射的長度(鍵-值對的個數) hash(obj) # 返回obj哈希值,判斷某個對象是否可作一個字典的鍵值 集合方法 s.update(t) # 用t中的元素修改s,s如今包含s或t的成員 s |= t s.intersection_update(t) # s中的成員是共用屬於s和t的元素 s &= t s.difference_update(t) # s中的成員是屬於s但不包含在t中的元素 s -= t s.symmetric_difference_update(t) # s中的成員更新爲那些包含在s或t中,但不是s和t共有的元素 s ^= t s.add(obj) # 在集合s中添加對象obj s.remove(obj) # 從集合s中刪除對象obj;若是obj不是集合s中的元素(obj not in s),將引起KeyError錯誤 s.discard(obj) # 若是obj是集合s中的元素,從集合s中刪除對象obj s.pop() # 刪除集合s中的任意一個對象,並返回它 s.clear() # 刪除集合s中的全部元素 s.issubset(t) # 若是s是t的子集,則返回True s <= t s.issuperset(t) # 若是t是s的超集,則返回True s >= t s.union(t) # 合併操做;返回一個新集合,該集合是s和t的並集 s | t s.intersection(t) # 交集操做;返回一個新集合,該集合是s和t的交集 s & t s.difference(t) # 返回一個新集合,改集合是s的成員,但不是t的成員 s - t s.symmetric_difference(t) # 返回一個新集合,該集合是s或t的成員,但不是s和t共有的成員 s ^ t s.copy() # 返回一個新集合,它是集合s的淺複製 obj in s # 成員測試;obj是s中的元素 返回True obj not in s # 非成員測試:obj不是s中元素 返回True s == t # 等價測試 是否具備相同元素 s != t # 不等價測試 s < t # 子集測試;s!=t且s中全部元素都是t的成員 s > t # 超集測試;s!=t且t中全部元素都是s的成員 序列化 #!/usr/bin/python import cPickle obj = {'1':['4124','1241','124'],'2':['12412','142','1241']} pkl_file = open('account.pkl','wb') cPickle.down(obj,pkl_file) pkl_file.close() pkl_file = open('account.pkl','rb') account_list = cPickle.load(pkl_file) pkl_file.close() 文件對象方法 file.close() # 關閉文件 file.fileno() # 返回文件的描述符 file.flush() # 刷新文件的內部緩衝區 file.isatty() # 判斷file是不是一個類tty設備 file.next() # 返回文件的下一行,或在沒有其餘行時引起StopIteration異常 file.read(size=-1) # 從文件讀取size個字節,當未給定size或給定負值的時候,讀取剩餘的全部字節,而後做爲字符串返回 file.readline(size=-1) # 從文件中讀取並返回一行(包括行結束符),或返回最大size個字符 file.readlines(sizhint=0) # 讀取文件的全部行做爲一個列表返回 file.xreadlines() # 用於迭代,可替換readlines()的一個更高效的方法 file.seek(off, whence=0) # 在文件中移動文件指針,從whence(0表明文件起始,1表明當前位置,2表明文件末尾)偏移off字節 file.tell() # 返回當前在文件中的位置 file.truncate(size=file.tell()) # 截取文件到最大size字節,默認爲當前文件位置 file.write(str) # 向文件寫入字符串 file.writelines(seq) # 向文件寫入字符串序列seq;seq應該是一個返回字符串的可迭代對象 文件對象的屬性 file.closed # 表示文件已被關閉,不然爲False file.encoding # 文件所使用的編碼 當unicode字符串被寫入數據時,它將自動使用file.encoding轉換爲字節字符串;若file.encoding爲None時使用系統默認編碼 file.mode # Access文件打開時使用的訪問模式 file.name # 文件名 file.newlines # 未讀取到行分隔符時爲None,只有一種行分隔符時爲一個字符串,當文件有多種類型的行結束符時,則爲一個包含全部當前所遇到的行結束符的列表 file.softspace # 爲0表示在輸出一數據後,要加上一個空格符,1表示不加 異常處理 # try 中使用 sys.exit(2) 會被捕獲,沒法退出腳本,可以使用 os._exit(2) 退出腳本 class ShortInputException(Exception): # 繼承Exception異常的類,定義本身的異常 def __init__(self, length, atleast): Exception.__init__(self) self.length = length self.atleast = atleast try: s = raw_input('Enter something --> ') if len(s) < 3: raise ShortInputException(len(s), 3) # 觸發異常 except EOFError: print '\nWhy did you do an EOF on me?' except ShortInputException, x: # 捕捉指定錯誤信息 print 'ShortInputException: %d | %d' % (x.length, x.atleast) except Exception as err: # 捕捉全部其它錯誤信息內容 print str(err) #except urllib2.HTTPError as err: # 捕捉外部導入模塊的錯誤 #except: # 捕捉全部其它錯誤 不會看到錯誤內容 # print 'except' finally: # 不管什麼狀況都會執行 關閉文件或斷開鏈接等 print 'finally' else: # 無任何異常 沒法和finally同用 print 'No exception was raised.' 不可捕獲的異常 NameError: # 嘗試訪問一個未申明的變量 ZeroDivisionError: # 除數爲零 SyntaxErrot: # 解釋器語法錯誤 IndexError: # 請求的索引元素超出序列範圍 KeyError: # 請求一個不存在的字典關鍵字 IOError: # 輸入/輸出錯誤 AttributeError: # 嘗試訪問未知的對象屬性 ImportError # 沒有模塊 IndentationError # 語法縮進錯誤 KeyboardInterrupt # ctrl+C SyntaxError # 代碼語法錯誤 ValueError # 值錯誤 TypeError # 傳入對象類型與要求不符合 內建異常 BaseException # 全部異常的基類 SystemExit # python解釋器請求退出 KeyboardInterrupt # 用戶中斷執行 Exception # 常規錯誤的基類 StopIteration # 迭代器沒有更多的值 GeneratorExit # 生成器發生異常來通知退出 StandardError # 全部的內建標準異常的基類 ArithmeticError # 全部數值計算錯誤的基類 FloatingPointError # 浮點計算錯誤 OverflowError # 數值運算超出最大限制 AssertionError # 斷言語句失敗 AttributeError # 對象沒有這個屬性 EOFError # 沒有內建輸入,到達EOF標記 EnvironmentError # 操做系統錯誤的基類 IOError # 輸入/輸出操做失敗 OSError # 操做系統錯誤 WindowsError # windows系統調用失敗 ImportError # 導入模塊/對象失敗 KeyboardInterrupt # 用戶中斷執行(一般是ctrl+c) LookupError # 無效數據查詢的基類 IndexError # 序列中沒有此索引(index) KeyError # 映射中沒有這個鍵 MemoryError # 內存溢出錯誤(對於python解釋器不是致命的) NameError # 未聲明/初始化對象(沒有屬性) UnboundLocalError # 訪問未初始化的本地變量 ReferenceError # 若引用試圖訪問已經垃圾回收了的對象 RuntimeError # 通常的運行時錯誤 NotImplementedError # 還沒有實現的方法 SyntaxError # python語法錯誤 IndentationError # 縮進錯誤 TabError # tab和空格混用 SystemError # 通常的解釋器系統錯誤 TypeError # 對類型無效的操做 ValueError # 傳入無效的參數 UnicodeError # Unicode相關的錯誤 UnicodeDecodeError # Unicode解碼時的錯誤 UnicodeEncodeError # Unicode編碼時的錯誤 UnicodeTranslateError # Unicode轉換時錯誤 Warning # 警告的基類 DeprecationWarning # 關於被棄用的特徵的警告 FutureWarning # 關於構造未來語義會有改變的警告 OverflowWarning # 舊的關於自動提高爲長整形的警告 PendingDeprecationWarning # 關於特性將會被廢棄的警告 RuntimeWarning # 可疑的運行時行爲的警告 SyntaxWarning # 可疑的語法的警告 UserWarning # 用戶代碼生成的警告 觸發異常 raise exclass # 觸發異常,從exclass生成一個實例(不含任何異常參數) raise exclass() # 觸發異常,但如今不是類;經過函數調用操做符(function calloperator:"()")做用於類名生成一個新的exclass實例,一樣也沒有異常參數 raise exclass, args # 觸發異常,但同時提供的異常參數args,能夠是一個參數也能夠是元組 raise exclass(args) # 觸發異常,同上 raise exclass, args, tb # 觸發異常,但提供一個跟蹤記錄(traceback)對象tb供使用 raise exclass,instance # 經過實例觸發異常(一般是exclass的實例) raise instance # 經過實例觸發異常;異常類型是實例的類型:等價於raise instance.__class__, instance raise string # 觸發字符串異常 raise string, srgs # 觸發字符串異常,但觸發伴隨着args raise string,args,tb # 觸發字符串異常,但提供一個跟蹤記錄(traceback)對象tb供使用 raise # 從新觸發前一個異常,若是以前沒有異常,觸發TypeError 跟蹤異常棧 # traceback 獲取異常相關數據都是經過sys.exc_info()函數獲得的 import traceback import sys try: s = raw_input() print int(s) except ValueError: # sys.exc_info() 返回值是元組,第一個exc_type是異常的對象類型,exc_value是異常的值,exc_tb是一個traceback對象,對象中包含出錯的行數、位置等數據 exc_type, exc_value, exc_tb = sys.exc_info() print "\n%s \n %s \n %s\n" %(exc_type, exc_value, exc_tb ) traceback.print_exc() # 打印棧跟蹤信息 抓取所有錯誤信息存如字典 import sys, traceback try: s = raw_input() int(s) except: exc_type, exc_value, exc_traceback = sys.exc_info() traceback_details = { 'filename': exc_traceback.tb_frame.f_code.co_filename, 'lineno' : exc_traceback.tb_lineno, 'name' : exc_traceback.tb_frame.f_code.co_name, 'type' : exc_type.__name__, 'message' : exc_value.message, } del(exc_type, exc_value, exc_traceback) print traceback_details f = file('test1.txt', 'a') f.write("%s %s %s %s %s\n" %(traceback_details['filename'],traceback_details['lineno'],traceback_details['name'],traceback_details['type'],traceback_details['message'], )) f.flush() f.close() 調試log # cgitb覆蓋了默認sys.excepthook全局異常攔截器 def func(a, b): return a / b if __name__ == '__main__': import cgitb cgitb.enable(format='text') func(1, 0) 函數式編程的內建函數 apply(func[,nkw][,kw]) # 用可選的參數來調用func,nkw爲非關鍵字參數,kw爲關鍵字參數;返回值是函數調用的返回值 filter(func,seq) # 調用一個布爾函數func來迭代遍歷每一個seq中的元素;返回一個使func返回值爲true的元素的序列 map(func,seq1[,seq2]) # 將函數func做用於給定序列(s)的每一個元素,並用一個列表來提供返回值;若是func爲None,func表現爲一個身份函數,返回一個含有每一個序列中元素集合的n個元組的列表 reduce(func,seq[,init]) # 將二元函數做用於seq序列的元素,每次攜帶一堆(先前的結果以及下一個序列元素),連續地將現有的結果和下一個值做用在得到的隨後的結果上,最後減小咱們的序列爲一個單一的返回值;若是初始值init給定,第一個比較會是init和第一個序列元素而不是序列的頭兩個元素 # filter 即經過函數方法只保留結果爲真的值組成列表 def f(x): return x % 2 != 0 and x % 3 != 0 f(3) # 函數結果是False 3被filter拋棄 f(5) # 函數結果是True 5被加入filter最後的列表結果 filter(f, range(2, 25)) [5, 7, 11, 13, 17, 19, 23] # map 經過函數對列表進行處理獲得新的列表 def cube(x): return x*x*x map(cube, range(1, 11)) [1, 8, 27, 64, 125, 216, 343, 512, 729, 1000] # reduce 經過函數會先接收初始值和序列的第一個元素,而後是返回值和下一個元素,依此類推 def add(x,y): return x+y reduce(add, range(1, 11)) # 結果55 是1到10的和 x的值是上一次函數返回的結果,y是列表中循環的值 re正則 compile(pattern,flags=0) # 對正則表達式模式pattern進行編譯,flags是可選標識符,並返回一個regex對象 match(pattern,string,flags=0) # 嘗試用正則表達式模式pattern匹配字符串string,flags是可選標識符,若是匹配成功,則返回一個匹配對象;不然返回None search(pattern,string,flags=0) # 在字符串string中搜索正則表達式模式pattern的第一次出現,flags是可選標識符,若是匹配成功,則返回一個匹配對象;不然返回None findall(pattern,string[,flags]) # 在字符串string中搜索正則表達式模式pattern的全部(非重複)出現:返回一個匹配對象的列表 # pattern=u'\u4e2d\u6587' 表明UNICODE finditer(pattern,string[,flags]) # 和findall()相同,但返回的不是列表而是迭代器;對於每一個匹配,該迭代器返回一個匹配對象 split(pattern,string,max=0) # 根據正則表達式pattern中的分隔符把字符string分割爲一個列表,返回成功匹配的列表,最多分割max次(默認全部) sub(pattern,repl,string,max=0) # 把字符串string中全部匹配正則表達式pattern的地方替換成字符串repl,若是max的值沒有給出,則對全部匹配的地方進行替換(subn()會返回一個表示替換次數的數值) group(num=0) # 返回所有匹配對象(或指定編號是num的子組) groups() # 返回一個包含所有匹配的子組的元組(若是沒匹配成功,返回一個空元組) 例子 re.findall(r'a[be]c','123abc456eaec789') # 返回匹配對象列表 ['abc', 'aec'] re.findall("(.)12[34](..)",a) # 取出匹配括號中內容 a='qedqwe123dsf' re.search("(.)123",a ).group(1) # 搜索匹配的取第1個標籤 re.match("^(1|2) *(.*) *abc$", str).group(2) # 取第二個標籤 re.match("^(1|2) *(.*) *abc$", str).groups() # 取全部標籤 re.sub('[abc]','A','alex') # 替換 for i in re.finditer(r'\d+',s): # 迭代 print i.group(),i.span() # 搜索網頁中UNICODE格式的中文 QueryAdd='http://www.anti-spam.org.cn/Rbl/Query/Result' Ip='222.129.184.52' s = requests.post(url=QueryAdd, data={'IP':Ip}) re.findall(u'\u4e2d\u56fd', s.text, re.S) 編碼轉換 a='中文' # 編碼未定義按輸入終端utf8或gbk u=u'中文' # 定義爲unicode編碼 u值爲 u'\u4e2d\u6587' u.encode('utf8') # 轉爲utf8格式 u值爲 '\xe4\xb8\xad\xe6\x96\x87' print u # 結果顯示 中文 print u.encode('utf8') # 轉爲utf8格式,當顯示終端編碼爲utf8 結果顯示 中文 編碼不一致則亂碼 print u.encode('gbk') # 當前終端爲utf8 故亂碼 ord('4') # 字符轉ASCII碼 chr(52) # ASCII碼轉字符 遍歷遞歸 [os.path.join(x[0],y) for x in os.walk('/root/python/5') for y in x[2]] for i in os.walk('/root/python/5/work/server'): print i 2 經常使用模塊 sys sys.argv # 取參數列表 sys.exit(2) # 退出腳本返回狀態 會被try截取 sys.exc_info() # 獲取當前正在處理的異常類 sys.version # 獲取Python解釋程序的版本信息 sys.maxint # 最大的Int值 9223372036854775807 sys.maxunicode # 最大的Unicode值 sys.modules # 返回系統導入的模塊字段,key是模塊名,value是模塊 sys.path # 返回模塊的搜索路徑,初始化時使用PYTHONPATH環境變量的值 sys.platform # 返回操做系統平臺名稱 sys.stdout # 標準輸出 sys.stdin # 標準輸入 sys.stderr # 錯誤輸出 sys.exec_prefix # 返回平臺獨立的python文件安裝的位置 sys.stdin.readline() # 從標準輸入讀一行 sys.stdout.write("a") # 屏幕輸出a os # 相對sys模塊 os模塊更爲底層 os._exit() try沒法抓取 os.popen('id').read() # 執行系統命令獲得返回結果 os.system() # 獲得返回狀態 返回沒法截取 os.name # 返回系統平臺 Linux/Unix用戶是'posix' os.getenv() # 讀取環境變量 os.putenv() # 設置環境變量 os.getcwd() # 當前工做路徑 os.chdir() # 改變當前工做目錄 os.walk('/root/') # 遞歸路徑 文件處理 mkfifo()/mknod() # 建立命名管道/建立文件系統節點 remove()/unlink() # 刪除文件 rename()/renames() # 重命名文件 *stat() # 返回文件信息 symlink() # 建立符號連接 utime() # 更新時間戳 tmpfile() # 建立並打開('w+b')一個新的臨時文件 walk() # 遍歷目錄樹下的全部文件名 目錄/文件夾 chdir()/fchdir() # 改變當前工做目錄/經過一個文件描述符改變當前工做目錄 chroot() # 改變當前進程的根目錄 listdir() # 列出指定目錄的文件 getcwd()/getcwdu() # 返回當前工做目錄/功能相同,但返回一個unicode對象 mkdir()/makedirs() # 建立目錄/建立多層目錄 rmdir()/removedirs() # 刪除目錄/刪除多層目錄 訪問/權限 saccess() # 檢驗權限模式 chmod() # 改變權限模式 chown()/lchown() # 改變owner和groupID功能相同,但不會跟蹤連接 umask() # 設置默認權限模式 文件描述符操做 open() # 底層的操做系統open(對於穩健,使用標準的內建open()函數) read()/write() # 根據文件描述符讀取/寫入數據 按大小讀取文件部份內容 dup()/dup2() # 複製文件描述符號/功能相同,可是複製到另外一個文件描述符 設備號 makedev() # 從major和minor設備號建立一個原始設備號 major()/minor() # 從原始設備號得到major/minor設備號 os.path模塊 os.path.expanduser('~/.ssh/key') # 家目錄下文件的全路徑 分隔 os.path.basename() # 去掉目錄路徑,返回文件名 os.path.dirname() # 去掉文件名,返回目錄路徑 os.path.join() # 將分離的各部分組合成一個路徑名 os.path.spllt() # 返回(dirname(),basename())元組 os.path.splitdrive() # 返回(drivename,pathname)元組 os.path.splitext() # 返回(filename,extension)元組 信息 os.path.getatime() # 返回最近訪問時間 os.path.getctime() # 返回文件建立時間 os.path.getmtime() # 返回最近文件修改時間 os.path.getsize() # 返回文件大小(字節) 查詢 os.path.exists() # 指定路徑(文件或目錄)是否存在 os.path.isabs() # 指定路徑是否爲絕對路徑 os.path.isdir() # 指定路徑是否存在且爲一個目錄 os.path.isfile() # 指定路徑是否存在且爲一個文件 os.path.islink() # 指定路徑是否存在且爲一個符號連接 os.path.ismount() # 指定路徑是否存在且爲一個掛載點 os.path.samefile() # 兩個路徑名是否指向同一個文件 相關模塊 base64 # 提供二進制字符串和文本字符串間的編碼/解碼操做 binascii # 提供二進制和ASCII編碼的二進制字符串間的編碼/解碼操做 bz2 # 訪問BZ2格式的壓縮文件 csv # 訪問csv文件(逗號分隔文件) csv.reader(open(file)) filecmp # 用於比較目錄和文件 fileinput # 提供多個文本文件的行迭代器 getopt/optparse # 提供了命令行參數的解析/處理 glob/fnmatch # 提供unix樣式的通配符匹配的功能 gzip/zlib # 讀寫GNU zip(gzip)文件(壓縮須要zlib模塊) shutil # 提供高級文件訪問功能 c/StringIO # 對字符串對象提供類文件接口 tarfile # 讀寫TAR歸檔文件,支持壓縮文件 tempfile # 建立一個臨時文件 uu # uu格式的編碼和解碼 zipfile # 用於讀取zip歸檔文件的工具 environ['HOME'] # 查看系統環境變量 子進程 os.fork() # 建立子進程,並複製父進程全部操做 經過判斷pid = os.fork() 的pid值,分別執行父進程與子進程操做,0爲子進程 os.wait() # 等待子進程結束 跨平臺os模塊屬性 linesep # 用於在文件中分隔行的字符串 sep # 用來分隔文件路徑名字的字符串 pathsep # 用於分割文件路徑的字符串 curdir # 當前工做目錄的字符串名稱 pardir # 父目錄字符串名稱 commands commands.getstatusoutput('id') # 返回元組(狀態,標準輸出) commands.getoutput('id') # 只返回執行的結果, 忽略返回值 commands.getstatus('file') # 返回ls -ld file執行的結果 文件和目錄管理 import shutil shutil.copyfile('data.db', 'archive.db') # 拷貝文件 shutil.move('/build/executables', 'installdir') # 移動文件或目錄 文件通配符 import glob glob.glob('*.py') # 查找當前目錄下py結尾的文件 隨機模塊 import random random.choice(['apple', 'pear', 'banana']) # 隨機取列表一個參數 random.sample(xrange(100), 10) # 不重複抽取10個 random.random() # 隨機浮點數 random.randrange(6) # 隨機整數範圍 發送郵件 發送郵件內容 #!/usr/bin/python #encoding:utf8 # 導入 smtplib 和 MIMEText import smtplib from email.mime.text import MIMEText # 定義發送列表 mailto_list=["272121935@qq.com","272121935@163.com"] # 設置服務器名稱、用戶名、密碼以及郵件後綴 mail_host = "smtp.163.com" mail_user = "mailuser" mail_pass = "password" mail_postfix="163.com" # 發送郵件函數 def send_mail(to_list, sub): me = mail_user + "<"+mail_user+"@"+mail_postfix+">" fp = open('context.txt') msg = MIMEText(fp.read(),_charset="utf-8") fp.close() msg['Subject'] = sub msg['From'] = me msg['To'] = ";".join(to_list) try: send_smtp = smtplib.SMTP() send_smtp.connect(mail_host) send_smtp.login(mail_user, mail_pass) send_smtp.sendmail(me, to_list, msg.as_string()) send_smtp.close() return True except Exception, e: print str(e) return False if send_mail(mailto_list,"標題"): print "測試成功" else: print "測試失敗" 發送附件 #!/usr/bin/python #encoding:utf8 import smtplib from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders def send_mail(to_list, sub, filename): me = mail_user + "<"+mail_user+"@"+mail_postfix+">" msg = MIMEMultipart() msg['Subject'] = sub msg['From'] = me msg['To'] = ";".join(to_list) submsg = MIMEBase('application', 'x-xz') submsg.set_payload(open(filename,'rb').read()) encoders.encode_base64(submsg) submsg.add_header('Content-Disposition', 'attachment', filename=filename) msg.attach(submsg) try: send_smtp = smtplib.SMTP() send_smtp.connect(mail_host) send_smtp.login(mail_user, mail_pass) send_smtp.sendmail(me, to_list, msg.as_string()) send_smtp.close() return True except Exception, e: print str(e)[1] return False # 設置服務器名稱、用戶名、密碼以及郵件後綴 mail_host = "smtp.163.com" mail_user = "xuesong" mail_pass = "mailpasswd" mail_postfix = "163.com" mailto_list = ["272121935@qq.com","quanzhou722@163.com"] title = 'check' filename = 'file_check.html' if send_mail(mailto_list,title,filename): print "發送成功" else: print "發送失敗" 解壓縮 gzip壓縮 import gzip f_in = open('file.log', 'rb') f_out = gzip.open('file.log.gz', 'wb') f_out.writelines(f_in) f_out.close() f_in.close() gzip壓縮1 File = 'xuesong_18.log' g = gzip.GzipFile(filename="", mode='wb', compresslevel=9, fileobj=open((r'%s.gz' %File),'wb')) g.write(open(r'%s' %File).read()) g.close() gzip解壓 g = gzip.GzipFile(mode='rb', fileobj=open((r'xuesong_18.log.gz'),'rb')) open((r'xuesong_18.log'),'wb').write(g.read()) 壓縮tar.gz import os import tarfile tar = tarfile.open("/tmp/tartest.tar.gz","w:gz") # 建立壓縮包名 for path,dir,files in os.walk("/tmp/tartest"): # 遞歸文件目錄 for file in files: fullpath = os.path.join(path,file) tar.add(fullpath) # 建立壓縮包 tar.close() 解壓tar.gz import tarfile tar = tarfile.open("/tmp/tartest.tar.gz") #tar.extract("/tmp") # 所有解壓到指定路徑 names = tar.getnames() # 包內文件名 for name in names: tar.extract(name,path="./") # 解壓指定文件 tar.close() zip壓縮 import zipfile,os f = zipfile.ZipFile('filename.zip', 'w' ,zipfile.ZIP_DEFLATED) # ZIP_STORE 爲默認表不壓縮. ZIP_DEFLATED 表壓縮 #f.write('file1.txt') # 將文件寫入壓縮包 for path,dir,files in os.walk("tartest"): # 遞歸壓縮目錄 for file in files: f.write(os.path.join(path,file)) # 將文件逐個寫入壓縮包 f.close() zip解壓 if zipfile.is_zipfile('filename.zip'): # 判斷一個文件是否是zip文件 f = zipfile.ZipFile('filename.zip') for file in f.namelist(): # 返回文件列表 f.extract(file, r'/tmp/') # 解壓指定文件 #f.extractall() # 解壓所有 f.close() 時間 import time time.time() # 時間戳[浮點] time.localtime()[1] - 1 # 上個月 int(time.time()) # 時間戳[整s] tomorrow.strftime('%Y%m%d_%H%M') # 格式化時間 time.strftime('%Y-%m-%d_%X',time.localtime( time.time() ) ) # 時間戳轉日期 time.mktime(time.strptime('2012-03-28 06:53:40', '%Y-%m-%d %H:%M:%S')) # 日期轉時間戳 判斷輸入時間格式是否正確 #encoding:utf8 import time while 1: atime=raw_input('輸入格式如[14.05.13 13:00]:') try: btime=time.mktime(time.strptime('%s:00' %atime, '%y.%m.%d %H:%M:%S')) break except: print '時間輸入錯誤,請從新輸入,格式如[14.05.13 13:00]' 上一個月最後一天 import datetime lastMonth=datetime.date(datetime.date.today().year,datetime.date.today().month,1)-datetime.timedelta(1) lastMonth.strftime("%Y/%m") 前一天 (datetime.datetime.now() + datetime.timedelta(days=-1) ).strftime('%Y%m%d') 兩日期相差天數 import datetime d1 = datetime.datetime(2005, 2, 16) d2 = datetime.datetime(2004, 12, 31) (d1 - d2).days 向後加10個小時 import datetime d1 = datetime.datetime.now() d3 = d1 + datetime.timedelta(hours=10) d3.ctime() 參數[optparse] import os, sys import time import optparse # python aaa.py -t file -p /etc/opt -o aaaaa def do_fiotest( type, path, output,): print type, path, output, def main(): parser = optparse.OptionParser() parser.add_option('-t', '--type', dest = 'type', default = None, help = 'test type[file, device]') parser.add_option('-p', '--path', dest = 'path', default = None, help = 'test file path or device path') parser.add_option('-o', '--output', dest = 'output', default = None, help = 'result dir path') (o, a) = parser.parse_args() if None == o.type or None == o.path or None == o.output: print "No device or file or output dir" return -1 if 'file' != o.type and 'device' != o.type: print "You need specify test type ['file' or 'device']" return -1 do_fiotest(o.type, o.path, o.output) print "Test done!" if __name__ == '__main__': main() hash import md5 m = md5.new('123456').hexdigest() import hashlib m = hashlib.md5() m.update("Nobody inspects") # 使用update方法對字符串md5加密 m.digest() # 加密後二進制結果 m.hexdigest() # 加密後十進制結果 hashlib.new("md5", "string").hexdigest() # 對字符串加密 hashlib.new("md5", open("file").read()).hexdigest() # 查看文件MD5值 隱藏輸入密碼 import getpass passwd=getpass.getpass() string打印a-z import string string.lowercase # a-z小寫 string.uppercase # A-Z大小 paramiko [ssh客戶端] 安裝 sudo apt-get install python-setuptools easy_install sudo apt-get install python-all-dev sudo apt-get install build-essential paramiko實例(帳號密碼登陸執行命令) #!/usr/bin/python #ssh import paramiko import sys,os host = '10.152.15.200' user = 'peterli' password = '123456' s = paramiko.SSHClient() # 綁定實例 s.load_system_host_keys() # 加載本地HOST主機文件 s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 容許鏈接不在know_hosts文件中的主機 s.connect(host,22,user,password,timeout=5) # 鏈接遠程主機 while True: cmd=raw_input('cmd:') stdin,stdout,stderr = s.exec_command(cmd) # 執行命令 cmd_result = stdout.read(),stderr.read() # 讀取命令結果 for line in cmd_result: print line, s.close() paramiko實例(傳送文件) #!/usr/bin/evn python import os import paramiko host='127.0.0.1' port=22 username = 'peterli' password = '123456' ssh=paramiko.Transport((host,port)) privatekeyfile = os.path.expanduser('~/.ssh/id_rsa') mykey = paramiko.RSAKey.from_private_key_file( os.path.expanduser('~/.ssh/id_rsa')) # 加載key 不使用key可不加 ssh.connect(username=username,password=password) # 鏈接遠程主機 # 使用key把 password=password 換成 pkey=mykey sftp=paramiko.SFTPClient.from_transport(ssh) # SFTP使用Transport通道 sftp.get('/etc/passwd','pwd1') # 下載 兩端都要指定文件名 sftp.put('pwd','/tmp/pwd') # 上傳 sftp.close() ssh.close() paramiko實例(密鑰執行命令) #!/usr/bin/python #ssh import paramiko import sys,os host = '10.152.15.123' user = 'peterli' s = paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) privatekeyfile = os.path.expanduser('~/.ssh/id_rsa') # 定義key路徑 mykey = paramiko.RSAKey.from_private_key_file(privatekeyfile) # mykey=paramiko.DSSKey.from_private_key_file(privatekeyfile,password='061128') # DSSKey方式 password是key的密碼 s.connect(host,22,user,pkey=mykey,timeout=5) cmd=raw_input('cmd:') stdin,stdout,stderr = s.exec_command(cmd) cmd_result = stdout.read(),stderr.read() for line in cmd_result: print line, s.close() ssh併發(Pool控制最大併發) #!/usr/bin/env python #encoding:utf8 #ssh_concurrent.py import multiprocessing import sys,os,time import paramiko def ssh_cmd(host,port,user,passwd,cmd): msg = "-----------Result:%s----------" % host s = paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: s.connect(host,22,user,passwd,timeout=5) stdin,stdout,stderr = s.exec_command(cmd) cmd_result = stdout.read(),stderr.read() print msg for line in cmd_result: print line, s.close() except paramiko.AuthenticationException: print msg print 'AuthenticationException Failed' except paramiko.BadHostKeyException: print msg print "Bad host key" result = [] p = multiprocessing.Pool(processes=20) cmd=raw_input('CMD:') f=open('serverlist.conf') list = f.readlines() f.close() for IP in list: print IP host=IP.split()[0] port=int(IP.split()[1]) user=IP.split()[2] passwd=IP.split()[3] result.append(p.apply_async(ssh_cmd,(host,port,user,passwd,cmd))) p.close() for res in result: res.get(timeout=35) ssh併發(取文件狀態併發送郵件) #!/usr/bin/python #encoding:utf8 #config file: ip.list import paramiko import multiprocessing import smtplib import sys,os,time,datetime,socket,re from email.mime.text import MIMEText # 配置文件(IP列表) Conf = 'ip.list' user_name = 'peterli' user_pwd = 'passwd' port = 22 PATH = '/home/peterli/' # 設置服務器名稱、用戶名、密碼以及郵件後綴 mail_host = "smtp.163.com" mail_user = "xuesong" mail_pass = "mailpasswd" mail_postfix = "163.com" mailto_list = ["272121935@qq.com","quanzhou722@163.com"] title = 'file check' DATE1=(datetime.datetime.now() + datetime.timedelta(days=-1) ).strftime('%Y%m%d') file_path = '%s%s' %(PATH,DATE1) def Ssh_Cmd(file_path,host_ip,user_name,user_pwd,port=22): s = paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: s.connect(hostname=host_ip,port=port,username=user_name,password=user_pwd) stdin,stdout,stderr = s.exec_command('stat %s' %file_path) stat_result = '%s%s' %(stdout.read(),stderr.read()) if stat_result.find('No such file or directory') == -1: file_status = 'OK\t' stdin,stdout,stderr = s.exec_command('du -sh %s' %file_path) cmd1_result = '%s_%s' %(stat_result.split()[32],stat_result.split()[33].split('.')[0]) cmd2_result = ('%s%s' %(stdout.read(),stderr.read())).split()[0] else: file_status = '未生成\t' cmd1_result = 'null' cmd2_result = 'null' q.put(['Login successful']) s.close() except socket.error: file_status = '主機或端口錯誤' cmd1_result = '-' cmd2_result = '-' except paramiko.AuthenticationException: file_status = '用戶或密碼錯誤' cmd1_result = '-' cmd2_result = '-' except paramiko.BadHostKeyException: file_status = 'Bad host key' cmd1_result = '-' cmd2_result = '-' except: file_status = 'ssh異常' cmd1_result = '-' cmd2_result = '-' r.put('%s\t-\t%s\t%s\t%s\t%s\n' %(time.strftime('%Y-%m-%d_%H:%M'),host_ip,file_status,cmd2_result,cmd1_result)) def Concurrent(Conf,file_path,user_name,user_pwd,port): # 執行總計 total = 0 # 讀取配置文件 f=open(Conf) list = f.readlines() f.close() # 併發執行 process_list = [] log_file = file('file_check.log', 'w') log_file.write('檢查時間\t\t業務\tIP\t\t文件狀態\t大小\t生成時間\n') for host_info in list: # 判斷配置文件中註釋行跳過 if host_info.startswith('#'): continue # 取變量,其中任意變量未取到就跳過執行 try: host_ip=host_info.split()[0].strip() #user_name=host_info.split()[1] #user_pwd=host_info.split()[2] except: log_file.write('Profile error: %s\n' %(host_info)) continue #try: # port=int(host_info.split()[3]) #except: # port=22 total +=1 p = multiprocessing.Process(target=Ssh_Cmd,args=(file_path,host_ip,user_name,user_pwd,port)) p.start() process_list.append(p) for j in process_list: j.join() for j in process_list: log_file.write(r.get()) successful = q.qsize() log_file.write('執行完畢。 總執行:%s 登陸成功:%s 登陸失敗:%s\n' %(total,successful,total - successful)) log_file.flush() log_file.close() def send_mail(to_list, sub): me = mail_user + "<"+mail_user+"@"+mail_postfix+">" fp = open('file_check.log') msg = MIMEText(fp.read(),_charset="utf-8") fp.close() msg['Subject'] = sub msg['From'] = me msg['To'] = ";".join(to_list) try: send_smtp = smtplib.SMTP() send_smtp.connect(mail_host) send_smtp.login(mail_user, mail_pass) send_smtp.sendmail(me, to_list, msg.as_string()) send_smtp.close() return True except Exception, e: print str(e)[1] return False if __name__ == '__main__': q = multiprocessing.Queue() r = multiprocessing.Queue() Concurrent(Conf,file_path,user_name,user_pwd,port) if send_mail(mailto_list,title): print "發送成功" else: print "發送失敗" LazyManage併發批量操做(判斷非root交互到root操做) #!/usr/bin/python #encoding:utf8 # LzayManage.py # config file: serverlist.conf import paramiko import multiprocessing import sys,os,time,socket,re def Ssh_Cmd(host_ip,Cmd,user_name,user_pwd,port=22): s = paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) s.connect(hostname=host_ip,port=port,username=user_name,password=user_pwd) stdin,stdout,stderr = s.exec_command(Cmd) Result = '%s%s' %(stdout.read(),stderr.read()) q.put('successful') s.close() return Result.strip() def Ssh_Su_Cmd(host_ip,Cmd,user_name,user_pwd,root_name,root_pwd,port=22): s = paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) s.connect(hostname=host_ip,port=port,username=user_name,password=user_pwd) ssh = s.invoke_shell() time.sleep(0.1) ssh.send('su - %s\n' %(root_name)) buff = '' while not buff.endswith('Password: '): resp = ssh.recv(9999) buff +=resp ssh.send('%s\n' %(root_pwd)) buff = '' while True: resp = ssh.recv(9999) buff +=resp if ': incorrect password' in buff: su_correct='passwd_error' break elif buff.endswith('# '): su_correct='passwd_correct' break if su_correct == 'passwd_correct': ssh.send('%s\n' %(Cmd)) buff = '' while True: resp = ssh.recv(9999) if resp.endswith('# '): buff +=re.sub('\[.*@.*\]# $','',resp) break buff +=resp Result = buff.lstrip('%s' %(Cmd)) q.put('successful') elif su_correct == 'passwd_error': Result = "\033[31mroot密碼錯誤\033[m" s.close() return Result.strip() def Send_File(host_ip,PathList,user_name,user_pwd,Remote='/tmp',port=22): s=paramiko.Transport((host_ip,port)) s.connect(username=user_name,password=user_pwd) sftp=paramiko.SFTPClient.from_transport(s) for InputPath in PathList: LocalPath = re.sub('^\./','',InputPath.rstrip('/')) RemotePath = '%s/%s' %( Remote , os.path.basename( LocalPath )) try: sftp.rmdir(RemotePath) except: pass try: sftp.remove(RemotePath) except: pass if os.path.isdir(LocalPath): sftp.mkdir(RemotePath) for path,dirs,files in os.walk(LocalPath): for dir in dirs: dir_path = os.path.join(path,dir) sftp.mkdir('%s/%s' %(RemotePath,re.sub('^%s/' %LocalPath,'',dir_path))) for file in files: file_path = os.path.join(path,file) sftp.put( file_path,'%s/%s' %(RemotePath,re.sub('^%s/' %LocalPath,'',file_path))) else: sftp.put(LocalPath,RemotePath) q.put('successful') sftp.close() s.close() Result = '%s \033[32m傳送完成\033[m' % PathList return Result def Ssh(host_ip,Operation,user_name,user_pwd,root_name,root_pwd,Cmd=None,PathList=None,port=22): msg = "\033[32m-----------Result:%s----------\033[m" % host_ip try: if Operation == 'Ssh_Cmd': Result = Ssh_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,port=port) elif Operation == 'Ssh_Su_Cmd': Result = Ssh_Su_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,port=port) elif Operation == 'Ssh_Script': Send_File(host_ip=host_ip,PathList=PathList,user_name=user_name,user_pwd=user_pwd,port=port) Script_Head = open(PathList[0]).readline().strip() LocalPath = re.sub('^\./','',PathList[0].rstrip('/')) Cmd = '%s /tmp/%s' %( re.sub('^#!','',Script_Head), os.path.basename( LocalPath )) Result = Ssh_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,port=port) elif Operation == 'Ssh_Su_Script': Send_File(host_ip=host_ip,PathList=PathList,user_name=user_name,user_pwd=user_pwd,port=port) Script_Head = open(PathList[0]).readline().strip() LocalPath = re.sub('^\./','',PathList[0].rstrip('/')) Cmd = '%s /tmp/%s' %( re.sub('^#!','',Script_Head), os.path.basename( LocalPath )) Result = Ssh_Su_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,port=port) elif Operation == 'Send_File': Result = Send_File(host_ip=host_ip,PathList=PathList,user_name=user_name,user_pwd=user_pwd,port=port) else: Result = '操做不存在' except socket.error: Result = '\033[31m主機或端口錯誤\033[m' except paramiko.AuthenticationException: Result = '\033[31m用戶名或密碼錯誤\033[m' except paramiko.BadHostKeyException: Result = '\033[31mBad host key\033[m[' except IOError: Result = '\033[31m遠程主機已存在非空目錄或沒有寫權限\033[m' except: Result = '\033[31m未知錯誤\033[m' r.put('%s\n%s\n' %(msg,Result)) def Concurrent(Conf,Operation,user_name,user_pwd,root_name,root_pwd,Cmd=None,PathList=None,port=22): # 讀取配置文件 f=open(Conf) list = f.readlines() f.close() # 執行總計 total = 0 # 併發執行 for host_info in list: # 判斷配置文件中註釋行跳過 if host_info.startswith('#'): continue # 取變量,其中任意變量未取到就跳過執行 try: host_ip=host_info.split()[0] #user_name=host_info.split()[1] #user_pwd=host_info.split()[2] except: print('Profile error: %s' %(host_info) ) continue try: port=int(host_info.split()[3]) except: port=22 total +=1 p = multiprocessing.Process(target=Ssh,args=(host_ip,Operation,user_name,user_pwd,root_name,root_pwd,Cmd,PathList,port)) p.start() # 打印執行結果 for j in range(total): print(r.get() ) if Operation == 'Ssh_Script' or Operation == 'Ssh_Su_Script': successful = q.qsize() / 2 else: successful = q.qsize() print('\033[32m執行完畢[總執行:%s 成功:%s 失敗:%s]\033[m' %(total,successful,total - successful) ) q.close() r.close() def Help(): print(''' 1.執行命令 2.執行腳本 \033[32m[位置1腳本(必須帶腳本頭),後可帶執行腳本所須要的包\文件\文件夾路徑,空格分隔]\033[m 3.發送文件 \033[32m[傳送的包\文件\文件夾路徑,空格分隔]\033[m 退出: 0\exit\quit 幫助: help\h\? 注意: 發送文件默認爲/tmp下,如已存在同名文件會被強制覆蓋,非空目錄則中斷操做.執行腳本先將本地腳本及包發送遠程主機上,發送規則同發送文件 ''') if __name__=='__main__': # 定義root帳號信息 root_name = 'root' root_pwd = 'peterli' user_name='peterli' user_pwd='<++(3Ie' # 配置文件 Conf='serverlist.conf' if not os.path.isfile(Conf): print('\033[33m配置文件 %s 不存在\033[m' %(Conf) ) sys.exit() Help() while True: i = raw_input("\033[35m[請選擇操做]: \033[m").strip() q = multiprocessing.Queue() r = multiprocessing.Queue() if i == '1': if user_name == root_name: Operation = 'Ssh_Cmd' else: Operation = 'Ssh_Su_Cmd' Cmd = raw_input('CMD: ').strip() if len(Cmd) == 0: print('\033[33m命令爲空\033[m') continue Concurrent(Conf=Conf,Operation=Operation,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,Cmd=Cmd) elif i == '2': if user_name == root_name: Operation = 'Ssh_Script' else: Operation = 'Ssh_Su_Script' PathList = raw_input('\033[36m本地腳本路徑: \033[m').strip().split() if len(PathList) == 0: print('\033[33m路徑爲空\033[m') continue if not os.path.isfile(PathList[0]): print('\033[33m本地路徑 %s 不存在或不是文件\033[m' %(PathList[0]) ) continue for LocalPath in PathList[1:]: if not os.path.exists(LocalPath): print('\033[33m本地路徑 %s 不存在\033[m' %(LocalPath) ) break else: Concurrent(Conf=Conf,Operation=Operation,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,PathList=PathList) elif i == '3': Operation = 'Send_File' PathList = raw_input('\033[36m本地路徑: \033[m').strip().split() if len(PathList) == 0: print('\033[33m路徑爲空\033[m') continue for LocalPath in PathList: if not os.path.exists(LocalPath): print('\033[33m本地路徑 %s 不存在\033[m' %(LocalPath) ) break else: Concurrent(Conf=Conf,Operation=Operation,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,PathList=PathList) elif i == '0' or i == 'exit' or i == 'quit': print("\033[34m退出LazyManage腳本\033[m") sys.exit() elif i == 'help' or i == 'h' or i == '?': Help() pysnmp #!/usr/bin/python from pysnmp.entity.rfc3413.oneliner import cmdgen cg = cmdgen.CommandGenerator() # 注意IP 端口 組默認public oid值 varBinds = cg.getCmd( cmdgen.CommunityData('any-agent', 'public',0 ), cmdgen.UdpTransportTarget(('10.10.76.42', 161)), (1,3,6,1,4,1,2021,10,1,3,1), ) print varBinds[3][0][1] 3 socket socket.gethostname() # 獲取主機名 from socket import * # 避免 socket.socket() s=socket() s.bind() # 綁定地址到套接字 s.listen() # 開始TCP監聽 s.accept() # 被動接受TCP客戶端鏈接,等待鏈接的到來 s.connect() # 主動初始化TCP服務器鏈接 s.connect_ex() # connect()函數的擴展版本,出錯時返回出錯碼,而不是跑出異常 s.recv() # 接收TCP數據 s.send() # 發送TCP數據 s.sendall() # 完整發送TCP數據 s.recvfrom() # 接收UDP數據 s.sendto() # 發送UDP數據 s.getpeername() # 鏈接到當前套接字的遠端的地址(TCP鏈接) s.getsockname() # 當前套接字的地址 s.getsockopt() # 返回指定套接字的參數 s.setsockopt() # 設置指定套接字的參數 s.close() # 關閉套接字 s.setblocking() # 設置套接字的阻塞與非阻塞模式 s.settimeout() # 設置阻塞套接字操做的超時時間 s.gettimeout() # 獲得阻塞套接字操做的超時時間 s.filen0() # 套接字的文件描述符 s.makefile() # 建立一個與該套接字關聯的文件對象 socket.AF_UNIX # 只可以用於單一的Unix系統進程間通訊 socket.AF_INET # 服務器之間網絡通訊 socket.AF_INET6 # IPv6 socket.SOCK_STREAM # 流式socket , for TCP socket.SOCK_DGRAM # 數據報式socket , for UDP socket.SOCK_RAW # 原始套接字,普通的套接字沒法處理ICMP、IGMP等網絡報文,而SOCK_RAW能夠;其次,SOCK_RAW也能夠處理特殊的IPv4報文;此外,利用原始套接字,能夠經過IP_HDRINCL套接字選項由用戶構造IP頭。 socket.SOCK_RDM # 是一種可靠的UDP形式,即保證交付數據報但不保證順序。SOCK_RAM用來提供對原始協議的低級訪問,在須要執行某些特殊操做時使用,如發送ICMP報文。SOCK_RAM一般僅限於高級用戶或管理員運行的程序使用。 socket.SOCK_SEQPACKET # 可靠的連續數據包服務 SocketServer #!/usr/bin/python #server.py import SocketServer import os class MyTCP(SocketServer.BaseRequestHandler): def handle(self): while True: self.data=self.request.recv(1024).strip() if self.data == 'quit' or not self.data:break cmd=os.popen(self.data).read() if cmd == '':cmd= self.data + ': Command not found' self.request.sendall(cmd) if __name__ == '__main__': HOST,PORT = '10.0.0.119',50007 server = SocketServer.ThreadingTCPServer((HOST,PORT),MyTCP) server.serve_forever() SocketClient #!/usr/bin/python #client.py import socket HOST='10.0.0.119' PORT=50007 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((HOST,PORT)) while True: while True: cmd=raw_input('CMD:').strip() if cmd != '':break s.sendall(cmd) data=s.recv(1024).split('\n') print 'cmd:' for line in data:print line s.close() ftp ftpserver #!/usr/bin/python #ftpserver.py import SocketServer import os import cPickle import md5 from time import sleep def filer(file1): try: f = file(file1,'rb') return cPickle.load(f) except IOError: return {} except EOFError: return {} f.close() def filew(file1,content): f = file(file1,'wb') cPickle.dump(content,f) f.close() class MyTCP(SocketServer.BaseRequestHandler): def handle(self): i = 0 while i<3: user=self.request.recv(1024).strip() userinfo=filer('user.pkl') if userinfo.has_key(user.split()[0]): if md5.new(user.split()[1]).hexdigest() == userinfo[user.split()[0]]: results='login successful' self.request.sendall(results) login='successful' break else: i = i + 1 results='Error:password not correct' self.request.sendall(results) continue else: i = i + 1 results='Error:password not correct' self.request.sendall(results) continue break else: results = 'Error:Wrong password too many times' self.request.sendall(results) login='failure' home_path = os.popen('pwd').read().strip() + '/' + user.split()[0] current_path = '/' print home_path while True: if login == 'failure': break print 'home_path:%s=current_path:%s' %(home_path,current_path) cmd=self.request.recv(1024).strip() print cmd if cmd == 'quit': break elif cmd == 'dir': list=os.listdir('%s%s' %(home_path,current_path)) if list: dirlist,filelist = '','' for i in list: if os.path.isdir('%s%s%s' %(home_path,current_path,i)): dirlist = dirlist + '\033[32m' + i + '\033[m\t' else: filelist = filelist + i + '\t' results = dirlist + filelist else: results = '\033[31mnot find\033[m' self.request.sendall(results) elif cmd == 'pdir': self.request.sendall(current_path) elif cmd.split()[0] == 'mdir': if cmd.split()[1].isalnum(): tmppath='%s%s%s' %(home_path,current_path,cmd.split()[1]) os.makedirs(tmppath) self.request.sendall('\033[32mcreating successful\033[m') else: self.request.sendall('\033[31mcreate failure\033[m') elif cmd.split()[0] == 'cdir': if cmd.split()[1] == '/': tmppath='%s%s' %(home_path,cmd.split()[1]) if os.path.isdir(tmppath): current_path = cmd.split()[1] self.request.sendall(current_path) else: self.request.sendall('\033[31mnot_directory\033[m') elif cmd.split()[1].startswith('/'): tmppath='%s%s' %(home_path,cmd.split()[1]) if os.path.isdir(tmppath): current_path = cmd.split()[1] + '/' self.request.sendall(current_path) else: self.request.sendall('\033[31mnot_directory\033[m') else: tmppath='%s%s%s' %(home_path,current_path,cmd.split()[1]) if os.path.isdir(tmppath): current_path = current_path + cmd.split()[1] + '/' self.request.sendall(current_path) else: self.request.sendall('\033[31mnot_directory\033[m') elif cmd.split()[0] == 'get': if os.path.isfile('%s%s%s' %(home_path,current_path,cmd.split()[1])): f = file('%s%s%s' %(home_path,current_path,cmd.split()[1]),'rb') self.request.sendall('ready_file') sleep(0.5) self.request.send(f.read()) f.close() sleep(0.5) elif os.path.isdir('%s%s%s' %(home_path,current_path,cmd.split()[1])): self.request.sendall('ready_dir') sleep(0.5) for dirpath in os.walk('%s%s%s' %(home_path,current_path,cmd.split()[1])): dir=dirpath[0].replace('%s%s' %(home_path,current_path),'',1) self.request.sendall(dir) sleep(0.5) for filename in dirpath[2]: self.request.sendall(filename) sleep(0.5) f = file('%s/%s' %(dirpath[0],filename),'rb') self.request.send(f.read()) f.close() sleep(0.5) self.request.sendall('file_get_done') sleep(0.5) else: self.request.sendall('dir_get_done') sleep(0.5) else: self.request.sendall('get_failure') continue self.request.sendall('get_done') elif cmd.split()[0] == 'send': if os.path.exists('%s%s%s' %(home_path,current_path,cmd.split()[1])): self.request.sendall('existing') action=self.request.recv(1024) if action == 'cancel': continue self.request.sendall('ready') msg=self.request.recv(1024) if msg == 'ready_file': f = file('%s%s%s' %(home_path,current_path,cmd.split()[1]),'wb') while True: data=self.request.recv(1024) if data == 'file_send_done':break f.write(data) f.close() elif msg == 'ready_dir': os.system('mkdir -p %s%s%s' %(home_path,current_path,cmd.split()[1])) while True: dir=self.request.recv(1024) if dir == 'get_done':break os.system('mkdir -p %s%s%s' %(home_path,current_path,dir)) while True: filename=self.request.recv(1024) if filename == 'dir_send_done':break f = file('%s%s%s/%s' %(home_path,current_path,dir,filename),'wb') while True: data=self.request.recv(1024) if data == 'file_send_done':break f.write(data) f.close() self.request.sendall('%s/%s\t\033[32mfile_done\033[m' %(dir,filename)) self.request.sendall('%s\t\033[32mdir_done\033[m' %(dir)) elif msg == 'unknown_file': continue else: results = cmd.split()[0] + ': Command not found' self.request.sendall(results) if __name__ == '__main__': HOST,PORT = '10.152.14.85',50007 server = SocketServer.ThreadingTCPServer((HOST,PORT),MyTCP) server.serve_forever() ftpmanage #!/usr/bin/python #manage_ftp.py import cPickle import sys import md5 import os import getpass def filer(file1): try: f = file(file1,'rb') return cPickle.load(f) except IOError: return {} except EOFError: return {} f.close() def filew(file1,content): f = file(file1,'wb') cPickle.dump(content,f) f.close() while True: print ''' 1.add user 2.del user 3.change password 4.query user 0.exit ''' i = raw_input(':').strip() userinfo=filer('user.pkl') if i == '': continue elif i == '1': while True: user=raw_input('user name:').strip() if user.isalnum(): i = 0 while i<3: passwd=getpass.getpass('passwd:').strip() if passwd == '': continue else: passwd1=getpass.getpass('Confirm password:').strip() if passwd == passwd1: mpasswd = md5.new(passwd).hexdigest() userinfo[user] = mpasswd os.system('mkdir -p %s' %user) print '%s creating successful ' %user break else: print "Passwords don't match " i = i + 1 continue else: print 'Too many wrong' continue break else: print 'user not legal' continue elif i == '2': user=raw_input('user name:').strip() if userinfo.has_key(user): del userinfo[user] print 'Delete users successfully' else: print 'user not exist' continue elif i == '3': user=raw_input('user name:').strip() if userinfo.has_key(user): i = 0 while i<3: passwd=getpass.getpass('passwd:').strip() if passwd == '': continue else: passwd1=getpass.getpass('Confirm password:').strip() if passwd == passwd1: mpasswd = md5.new(passwd).hexdigest() userinfo[user] = mpasswd print '%s password is changed' %user break else: print "Passwords don't match " i = i + 1 continue else: print 'Too many wrong' continue else: print 'user not exist' continue elif i == '4': print userinfo.keys() elif i == '0': sys.exit() else: print 'select error' continue filew('user.pkl',content=userinfo) ftpclient #!/usr/bin/python #ftpclient.py import socket import os import getpass from time import sleep HOST='10.152.14.85' PORT=50007 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((HOST,PORT)) while True: user = raw_input('user:').strip() if user.isalnum(): while True: passwd = getpass.getpass('passwd:').strip() s.sendall(user + ' ' + passwd) servercmd=s.recv(1024) if servercmd == 'login successful': print '\033[32m%s\033[m' %servercmd break else: print servercmd while True: cmd=raw_input('FTP>').strip() if cmd == '': continue if cmd.split()[0] == 'get': if cmd == 'get':continue for i in cmd.split()[1:]: if os.path.exists(i): confirm = raw_input("\033[31mPlease confirm whether the cover %s(Y/N):\033[m" %(i)).upper().startswith('Y') if not confirm: print '%s cancel' %i continue s.sendall('get ' + i) servercmd=s.recv(1024) if servercmd == 'inexistence': print '%s \t\033[32minexistence\033[m' %i continue elif servercmd == 'ready_file': f = file(i,'wb') while True: data=s.recv(1024) if data == 'get_done':break f.write(data) f.close() print '%s \t\033[32mfile_done\033[m' %(i) elif servercmd == 'ready_dir': try: os.makedirs(i) except: pass while True: serverdir=s.recv(1024) if serverdir == 'get_done':break os.system('mkdir -p %s' %serverdir) print '%s \t\033[32mdir_done\033[m' %(serverdir) while True: serverfile=s.recv(1024) if serverfile == 'dir_get_done':break f = file('%s/%s' %(serverdir,serverfile),'wb') while True: data=s.recv(1024) if data == 'file_get_done':break f.write(data) f.close() print '%s/%s \t\033[32mfile_done\033[m' %(serverdir,serverfile) elif cmd.split()[0] == 'send': if cmd == 'send':continue for i in cmd.split()[1:]: if not os.path.exists(i): print '%s\t\033[31minexistence\033[m' %i continue s.sendall('send ' + i) servercmd=s.recv(1024) if servercmd == 'existing': confirm = raw_input("\033[31mPlease confirm whether the cover %s(Y/N):\033[m" %(i)).upper().startswith('Y') if confirm: s.sendall('cover') servercmd=s.recv(1024) else: s.sendall('cancel') print '%s\tcancel' %i continue if os.path.isfile(i): s.sendall('ready_file') sleep(0.5) f = file(i,'rb') s.send(f.read()) sleep(0.5) s.sendall('file_send_done') print '%s\t\033[32mfile done\033[m' %(cmd.split()[1]) f.close() elif os.path.isdir(i): s.sendall('ready_dir') sleep(0.5) for dirpath in os.walk(i): dir=dirpath[0].replace('%s/' %os.popen('pwd').read().strip(),'',1) s.sendall(dir) sleep(0.5) for filename in dirpath[2]: s.sendall(filename) sleep(0.5) f = file('%s/%s' %(dirpath[0],filename),'rb') s.send(f.read()) f.close() sleep(0.5) s.sendall('file_send_done') msg=s.recv(1024) print msg else: s.sendall('dir_send_done') msg=s.recv(1024) print msg else: s.sendall('unknown_file') print '%s\t\033[31munknown type\033[m' %i continue sleep(0.5) s.sendall('get_done') elif cmd.split()[0] == 'cdir': if cmd == 'cdir':continue s.sendall(cmd) data=s.recv(1024) print data continue elif cmd == 'ls': list=os.popen(cmd).read().strip().split('\n') if list: dirlist,filelist = '','' for i in list: if os.path.isdir(i): dirlist = dirlist + '\033[32m' + i + '\033[m\t' else: filelist = filelist + i + '\t' results = dirlist + filelist else: results = '\033[31mnot find\033[m' print results continue elif cmd == 'pwd': os.system(cmd) elif cmd.split()[0] == 'cd': try: os.chdir(cmd.split()[1]) except: print '\033[31mcd failure\033[m' elif cmd == 'dir': s.sendall(cmd) data=s.recv(1024) print data continue elif cmd == 'pdir': s.sendall(cmd) data=s.recv(1024) print data continue elif cmd.split()[0] == 'mdir': if cmd == 'mdir':continue s.sendall(cmd) data=s.recv(1024) print data continue elif cmd.split()[0] == 'help': print ''' get [file] [dir] send [file] [dir] dir mdir cdir pdir pwd md cd ls help quit ''' continue elif cmd == 'quit': break else: print '\033[31m%s: Command not found,Please see the "help"\033[m' %cmd else: continue break s.close() 掃描主機開放端口 #!/usr/bin/env python import socket def check_server(address,port): s=socket.socket() try: s.connect((address,port)) return True except socket.error,e: return False if __name__=='__main__': from optparse import OptionParser parser=OptionParser() parser.add_option("-a","--address",dest="address",default='localhost',help="Address for server",metavar="ADDRESS") parser.add_option("-s","--start",dest="start_port",type="int",default=1,help="start port",metavar="SPORT") parser.add_option("-e","--end",dest="end_port",type="int",default=1,help="end port",metavar="EPORT") (options,args)=parser.parse_args() print 'options: %s, args: %s' % (options, args) port=options.start_port while(port<=options.end_port): check = check_server(options.address, port) if (check): print 'Port %s is on' % port port=port+1 4 mysql #apt-get install mysql-server #apt-get install python-MySQLdb help(MySQLdb.connections.Connection) # 查看連接參數 conn=MySQLdb.connect(host='localhost',user='root',passwd='123456',db='fortress',port=3306) # 定義鏈接 #conn=MySQLdb.connect(unix_socket='/var/run/mysqld/mysqld.sock',user='root',passwd='123456') # 使用socket文件連接 cur=conn.cursor() # 定義遊標 conn.select_db('fortress') # 選擇數據庫 sqlcmd = 'insert into user(name,age) value(%s,%s)' # 定義sql命令 cur.executemany(sqlcmd,[('aa',1),('bb',2),('cc',3)]) # 插入多條值 cur.execute('delete from user where id=20') # 刪除一條記錄 cur.execute("update user set name='a' where id=20") # 更細數據 sqlresult = cur.fetchall() # 接收所有返回結果 conn.commit() # 提交 cur.close() # 關閉遊標 conn.close() # 關閉鏈接 import MySQLdb def mydb(dbcmdlist): try: conn=MySQLdb.connect(host='localhost',user='root',passwd='123456',db='fortress',port=3306) cur=conn.cursor() cur.execute('create database if not exists fortress;') # 建立數據庫 conn.select_db('fortress') # 選擇數據庫 cur.execute('drop table if exists log;') # 刪除表 cur.execute('CREATE TABLE log ( id BIGINT(20) NOT NULL AUTO_INCREMENT, loginuser VARCHAR(50) DEFAULT NULL, remoteip VARCHAR(50) DEFAULT NULL, PRIMARY KEY (id) );') # 建立表 result=[] for dbcmd in dbcmdlist: cur.execute(dbcmd) # 執行sql sqlresult = cur.fetchall() # 接收所有返回結果 result.append(sqlresult) conn.commit() # 提交 cur.close() conn.close() return result except MySQLdb.Error,e: print 'mysql error msg: ',e sqlcmd=[] sqlcmd.append("insert into log (loginuser,remoteip)values('%s','%s');" %(loginuser,remoteip)) mydb(sqlcmd) sqlcmd=[] sqlcmd.append("select * from log;") result = mydb(sqlcmd) for i in result[0]: print i 5 處理信號 信號的概念 信號(signal): 進程之間通信的方式,是一種軟件中斷。一個進程一旦接收到信號就會打斷原來的程序執行流程來處理信號。 發送信號通常有兩種緣由: 1(被動式) 內核檢測到一個系統事件.例如子進程退出會像父進程發送SIGCHLD信號.鍵盤按下control+c會發送SIGINT信號 2(主動式) 經過系統調用kill來向指定進程發送信號 操做系統規定了進程收到信號之後的默認行爲,能夠經過綁定信號處理函數來修改進程收到信號之後的行爲,有兩個信號是不可更改的 SIGTOP 和 SIGKILL 若是一個進程收到一個SIGUSR1信號,而後執行信號綁定函數,第二個SIGUSR2信號又來了,第一個信號沒有被處理完畢的話,第二個信號就會丟棄。 進程結束信號 SIGTERM 和 SIGKILL 的區別: SIGTERM 比較友好,進程能捕捉這個信號,根據您的須要來關閉程序。在關閉程序以前,您能夠結束打開的記錄文件和完成正在作的任務。在某些狀況下,假如進程正在進行做業並且不能中斷,那麼進程能夠忽略這個SIGTERM信號。 常見信號 kill -l # 查看linux提供的信號 SIGHUP 1 A # 終端掛起或者控制進程終止 SIGINT 2 A # 鍵盤終端進程(如control+c) SIGQUIT 3 C # 鍵盤的退出鍵被按下 SIGILL 4 C # 非法指令 SIGABRT 6 C # 由abort(3)發出的退出指令 SIGFPE 8 C # 浮點異常 SIGKILL 9 AEF # Kill信號 馬上中止 SIGSEGV 11 C # 無效的內存引用 SIGPIPE 13 A # 管道破裂: 寫一個沒有讀端口的管道 SIGALRM 14 A # 鬧鐘信號 由alarm(2)發出的信號 SIGTERM 15 A # 終止信號,可以讓程序安全退出 kill -15 SIGUSR1 30,10,16 A # 用戶自定義信號1 SIGUSR2 31,12,17 A # 用戶自定義信號2 SIGCHLD 20,17,18 B # 子進程結束自動向父進程發送SIGCHLD信號 SIGCONT 19,18,25 # 進程繼續(曾被中止的進程) SIGSTOP 17,19,23 DEF # 終止進程 SIGTSTP 18,20,24 D # 控制終端(tty)上按下中止鍵 SIGTTIN 21,21,26 D # 後臺進程企圖從控制終端讀 SIGTTOU 22,22,27 D # 後臺進程企圖從控制終端寫 缺省處理動做一項中的字母含義以下: A 缺省的動做是終止進程 B 缺省的動做是忽略此信號,將該信號丟棄,不作處理 C 缺省的動做是終止進程並進行內核映像轉儲(dump core),內核映像轉儲是指將進程數據在內存的映像和進程在內核結構中的部份內容以必定格式轉儲到文件系統,而且進程退出執行,這樣作的好處是爲程序員提供了方便,使得他們能夠獲得進程當時執行時的數據值,容許他們肯定轉儲的緣由,而且能夠調試他們的程序。 D 缺省的動做是中止進程,進入中止情況之後還能從新進行下去,通常是在調試的過程當中(例如ptrace系統調用) E 信號不能被捕獲 F 信號不能被忽略 Python提供的信號 import signal dir(signal) ['NSIG', 'SIGABRT', 'SIGALRM', 'SIGBUS', 'SIGCHLD', 'SIGCLD', 'SIGCONT', 'SIGFPE', 'SIGHUP', 'SIGILL', 'SIGINT', 'SIGIO', 'SIGIOT', 'SIGKILL', 'SIGPIPE', 'SIGPOLL', 'SIGPROF', 'SIGPWR', 'SIGQUIT', 'SIGRTMAX', 'SIGRTMIN', 'SIGSEGV', 'SIGSTOP', 'SIGSYS', 'SIGTERM', 'SIGTRAP', 'SIGTSTP', 'SIGTTIN', 'SIGTTOU', 'SIGURG', 'SIGUSR1', 'SIGUSR2', 'SIGVTALRM', 'SIGWINCH', 'SIGXCPU', 'SIGXFSZ', 'SIG_DFL', 'SIG_IGN', '__doc__', '__name__', 'alarm', 'default_int_handler', 'getsignal', 'pause', 'signal'] 綁定信號處理函數 #encoding:utf8 import os,signal from time import sleep def onsignal_term(a,b): print 'SIGTERM' # kill -15 signal.signal(signal.SIGTERM,onsignal_term) # 接收信號,執行相應函數 def onsignal_usr1(a,b): print 'SIGUSR1' # kill -10 signal.signal(signal.SIGUSR1,onsignal_usr1) while 1: print 'ID',os.getpid() sleep(10) 經過另一個進程發送信號 import os,signal os.kill(16175,signal.SIGTERM) # 發送信號,16175是綁定信號處理函數的進程pid,須要自行修改 os.kill(16175,signal.SIGUSR1) 父進程接收子進程結束髮送的SIGCHLD信號 #encoding:utf8 import os,signal from time import sleep def onsigchld(a,b): print '收到子進程結束信號' signal.signal(signal.SIGCHLD,onsigchld) pid = os.fork() # 建立一個子進程,複製父進程全部資源操做 if pid == 0: # 經過判斷子進程os.fork()是否等於0,分別同時執行父進程與子進程操做 print '我是子進程,pid是',os.getpid() sleep(2) else: print '我是父進程,pid是',os.getpid() os.wait() # 等待子進程結束 接收信號的程序,另一端使用多線程向這個進程發送信號,會遺漏一些信號 #encoding:utf8 import os import signal from time import sleep import Queue QCOUNT = Queue.Queue() # 初始化隊列 def onsigchld(a,b): '''收到信號後向隊列中插入一個數字1''' print '收到SIGUSR1信號' sleep(1) QCOUNT.put(1) # 向隊列中寫入 signal.signal(signal.SIGUSR1,onsigchld) # 綁定信號處理函數 while 1: print '個人pid是',os.getpid() print '如今隊列中元素的個數是',QCOUNT.qsize() sleep(2) 多線程發信號端的程序 #encoding:utf8 import threading import os import signal def sendusr1(): print '發送信號' os.kill(17788, signal.SIGUSR1) # 這裏的進程id須要寫前一個程序實際運行的pid WORKER = [] for i in range(1, 7): # 開啓6個線程 threadinstance = threading.Thread(target = sendusr1) WORKER.append(threadinstance) for i in WORKER: i.start() for i in WORKER: i.join() print '主線程完成' 6 緩存數據庫 python使用memcache easy_install python-memcached # 安裝(python2.7+) import memcache mc = memcache.Client(['10.152.14.85:12000'],debug=True) mc.set('name','luo',60) mc.get('name') mc.delete('name1') 保存數據 set(key,value,timeout) # 把key映射到value,timeout指的是何時這個映射失效 add(key,value,timeout) # 僅當存儲空間中不存在鍵相同的數據時才保存 replace(key,value,timeout) # 僅當存儲空間中存在鍵相同的數據時才保存 獲取數據 get(key) # 返回key所指向的value get_multi(key1,key2,key3) # 能夠非同步地同時取得多個鍵值, 比循環調用get快數十倍 python使用mongodb 原文: http://blog.nosqlfan.com/html/2989.html easy_install pymongo # 安裝(python2.7+) import pymongo connection=pymongo.Connection('localhost',27017) # 建立鏈接 db = connection.test_database # 切換數據庫 collection = db.test_collection # 獲取collection # db和collection都是延時建立的,在添加Document時才真正建立 文檔添加, _id自動建立 import datetime post = {"author": "Mike", "text": "My first blog post!", "tags": ["mongodb", "python", "pymongo"], "date": datetime.datetime.utcnow()} posts = db.posts posts.insert(post) ObjectId('...') 批量插入 new_posts = [{"author": "Mike", "text": "Another post!", "tags": ["bulk", "insert"], "date": datetime.datetime(2009, 11, 12, 11, 14)}, {"author": "Eliot", "title": "MongoDB is fun", "text": "and pretty easy too!", "date": datetime.datetime(2009, 11, 10, 10, 45)}] posts.insert(new_posts) [ObjectId('...'), ObjectId('...')] 獲取全部collection db.collection_names() # 至關於SQL的show tables 獲取單個文檔 posts.find_one() 查詢多個文檔 for post in posts.find(): post 加條件的查詢 posts.find_one({"author": "Mike"}) 高級查詢 posts.find({"date": {"$lt": "d"}}).sort("author") 統計數量 posts.count() 加索引 from pymongo import ASCENDING, DESCENDING posts.create_index([("date", DESCENDING), ("author", ASCENDING)]) 查看查詢語句的性能 posts.find({"date": {"$lt": "d"}}).sort("author").explain()["cursor"] posts.find({"date": {"$lt": "d"}}).sort("author").explain()["nscanned"] python使用redis https://pypi.python.org/pypi/redis pip install redis OR easy_install redis import redis r = redis.StrictRedis(host='localhost', port=6379, db=0) r.set('foo', 'bar') r.get('foo') r.save() 分片 # 沒搞懂 redis.connection.Connection(host='localhost', port=6379, db=0, parser_class=<class 'redis.connection.PythonParser'>) redis.ConnectionPool( connection_class=<class 'redis.connection.Connection'>, max_connections=None, **connection_kwargs) python使用kestrel隊列 # pykestrel import kestrel q = kestrel.Client(servers=['127.0.0.1:22133'],queue='test_queue') q.add('some test job') job = q.get() # 從隊列讀取工做 job = q.peek() # 讀取下一份工做 # 讀取一組工做 while True: job = q.next(timeout=10) # 完成工做並獲取下一個工做,若是沒有工做,則等待10秒 if job is not None: try: # 流程工做 except: q.abort() # 標記失敗工做 q.finish() # 完成最後工做 q.close() # 關閉鏈接 kestrel狀態檢查 # kestrel支持memcache協議客戶端 #!/usr/local/bin/python # 10.13.81.125 22133 10000 import memcache import sys import traceback ip="%s:%s" % (sys.argv[1],sys.argv[2]) try: mc = memcache.Client([ip,]) st=mc.get_stats() except: print "kestrel connection exception" sys.exit(2) if st: for s in st[0][1].keys(): if s.startswith('queue_') and s.endswith('_mem_items'): num = int(st[0][1][s]) if num > int(sys.argv[3]): print "%s block to %s" %(s[6:-6],num) sys.exit(2) print "kestrel ok!" sys.exit(0) else: print "kestrel down" sys.exit(2) python使用tarantool # pip install tarantool-queue from tarantool_queue import Queue queue = Queue("localhost", 33013, 0) # 鏈接讀寫端口 空間0 tube = queue.tube("name_of_tube") # tube.put([1, 2, 3]) task = tube.take() task.data # take task and read data from it task.ack() # move this task into state DONE 7 web頁面操做 urllib2 [網絡資源訪問] import urllib2 response = urllib2.urlopen('http://baidu.com') print response.geturl() # url headers = response.info() print headers # web頁面頭部信息 print headers['date'] # 頭部信息中的時間 date = response.read() # 返回頁面全部信息[字符串] # date = response.readlines() # 返回頁面全部信息[列表] for i in urllib2.urlopen('http://qq.com'): # 可直接迭代 print i, 下載文件 #!/usr/bin/env python #encoding:utf8 import urllib2 url = 'http://www.01happy.com/wp-content/uploads/2012/09/bg.png' file("./pic/%04d.png" % i, "wb").write(urllib2.urlopen(url).read()) 抓取網頁解析指定內容 #!/usr/bin/env python #encoding:utf8 import urllib2 import urllib import random from bs4 import BeautifulSoup url='http://www.aaammm.com/aaa/' ua=["Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0)", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)", "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.2; .NET4.0C; .NET4.0E)", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"] browser = random.choice(ua) req_header = {'User-Agent':browser, 'Accept':'text/html;q=0.9,*/*;q=0.8', 'Cookie':'BAIDUID=4C8274B52CFB79DEB4FBA9A7EC76A1BC:FG=1; BDUSS=1dCdU1WNFdxUll0R09XcnBZTkRrVVVNbWVnSkRKSVRPeVljOUswclBoLUNzVEpVQVFBQUFBJCQAAAAAAAAAAAEAAADEuZ8BcXVhbnpob3U3MjIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIIIkC1SCJAtUY; BD_UPN=123143; BD_HOME=1', # 添真實登錄後的Cookie 'Accept-Charset':'ISO-8859-1,utf-8;q=0.7,*;q=0.3', 'Connection':'close', } #data = urllib.urlencode({'name':'xuesong','id':'30' }) # urllib 的處理參數的方法,能夠再urllib2中使用 data = urllib2.quote("pgv_ref=im.perinfo.perinfo.icon&rrr=pppp") req_timeout = 10 try: req = urllib2.Request(url,data=data,headers=req_header) # data爲None 則方法爲get,有date爲post方法 html = urllib2.urlopen(req,data=None,req_timeout).read() except urllib2.HTTPError as err: print str(err) except: print "timeout" print(html) # 百度帶Cookie後查看本身的用戶 #for i in html.split('\n'): # if 'bds.comm.user=' in i: # print i soup = BeautifulSoup(html) for i in soup.find_all(target="_blank",attrs={"class": "usr-pic"}): # 條件看狀況選擇 if i.img: print(i.get('href')) 模擬瀏覽器訪問web頁面 python3 #! /usr/bin/env python # -*- coding=utf-8 -*- import urllib.request url = "http://www.baidu.com" # AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11 headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1)', 'Accept':'text/html;q=0.9,*/*;q=0.8', 'Accept-Charset':'ISO-8859-1,utf-8;q=0.7,*;q=0.3', 'Connection':'close', 'Referer':None #注意若是依然不能抓取的話,這裏能夠設置抓取網站的host } opener = urllib.request.build_opener() opener.addheaders = [headers] data = opener.open(url).read() print(data) requests [替代urllib2] # Requests是一個Python的HTTP客戶端庫 # 官方中文文檔 http://cn.python-requests.org/zh_CN/latest/user/quickstart.html#id2 # 安裝: sudo pip install requests import requests # get方法提交表單 url = r'http://dict.youdao.com/search?le=eng&q={0}'.format(word.strip()) r = requests.get(url,timeout=2) # get方法帶參數 http://httpbin.org/get?key=val payload = {'key1': 'value1', 'key2': 'value2'} r = requests.get("http://httpbin.org/get", params=payload) # post方法提交表單 QueryAdd='http://www.anti-spam.org.cn/Rbl/Query/Result' r = requests.post(url=QueryAdd, data={'IP':'211.211.54.54'}) # 定製請求頭post請求 payload = {'some': 'data'} headers = {'content-type': 'application/json'} r = requests.post(url, data=json.dumps(payload), headers=headers) # https 需登陸加auth r = requests.get('https://baidu.com', auth=('user', 'pass')) if r.ok: # 判斷請求是否正常 print r.url # u'http://httpbin.org/get?key2=value2&key1=value1' print r.status_code # 狀態碼 print r.content # 獲取到的原始內容 可以使用 BeautifulSoup4 解析處理斷定結果 print r.text # 把原始內容轉unicode編碼 print r.headers # 響應頭 print r.headers['content-type'] # 網頁頭信息 不存在爲None print r.cookies['example_cookie_name'] # 查看cookie print r.history # 追蹤重定向 [<Response [301]>] 開啓重定向 allow_redirects=True 獲取JSON r = requests.get('https://github.com/timeline.json') r.json() 獲取圖片 from PIL import Image from StringIO import StringIO i = Image.open(StringIO(r.content)) 發送cookies到服務器 url = 'http://httpbin.org/cookies' cookies = dict(cookies_are='working') r = requests.get(url, cookies=cookies) r.text '{"cookies": {"cookies_are": "working"}}' 在同一個Session實例發出的全部請求之間保持cookies s = requests.Session() s.get('http://httpbin.org/cookies/set/sessioncookie/123456789') r = s.get("http://httpbin.org/cookies") print r.text 會話對象可以跨請求保持某些參數 s = requests.Session() s.auth = ('user', 'pass') s.headers.update({'x-test': 'true'}) s.get('http://httpbin.org/headers', headers={'x-test2': 'true'}) # both 'x-test' and 'x-test2' are sent ssl證書驗證 requests.get('https://github.com', verify=True) requests.get('https://kennethreitz.com', verify=False) # 忽略證書驗證 requests.get('https://kennethreitz.com', cert=('/path/server.crt', '/path/key')) # 本地指定一個證書 正確 <Response [200]> 錯誤 SSLError 流式上傳 with open('massive-body') as f: requests.post('http://some.url/streamed', data=f) 流式請求 import requests import json r = requests.post('https://stream.twitter.com/1/statuses/filter.json', data={'track': 'requests'}, auth=('username', 'password'), stream=True) for line in r.iter_lines(): if line: # filter out keep-alive new lines print json.loads(line) 自定義身份驗證 from requests.auth import AuthBase class PizzaAuth(AuthBase): """Attaches HTTP Pizza Authentication to the given Request object.""" def __init__(self, username): # setup any auth-related data here self.username = username def __call__(self, r): # modify and return the request r.headers['X-Pizza'] = self.username return r requests.get('http://pizzabin.org/admin', auth=PizzaAuth('kenneth')) 基自己份認證 from requests.auth import HTTPBasicAuth requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass')) 摘要式身份認證 from requests.auth import HTTPDigestAuth url = 'http://httpbin.org/digest-auth/auth/user/pass' requests.get(url, auth=HTTPDigestAuth('user', 'pass')) 代理 import requests proxies = { "http": "http://10.10.1.10:3128", # "http": "http://user:pass@10.10.1.10:3128/", # 用戶名密碼 "https": "http://10.10.1.10:1080", } requests.get("http://example.org", proxies=proxies) #也能夠設置環境變量之間訪問 export HTTP_PROXY="http://10.10.1.10:3128" export HTTPS_PROXY="http://10.10.1.10:1080" BeautifulSoup [html\xml解析器] # BeautifulSoup中文官方文檔 # http://www.crummy.com/software/BeautifulSoup/bs3/documentation.zh.html # http://www.crummy.com/software/BeautifulSoup/bs4/doc/index.zh.html # Beautiful Soup將複雜HTML文檔轉換成一個複雜的樹形結構,每一個節點都是Python對象,全部對象能夠概括爲4種: Tag , NavigableString , BeautifulSoup , Comment 導入模塊 from BeautifulSoup import BeautifulSoup # For processing HTML 版本3.0 已中止更新 from BeautifulSoup import BeautifulStoneSoup # For processing XML import BeautifulSoup # To get everything from bs4 import BeautifulSoup # 版本4.0 bs4 安裝: pip install BeautifulSoup4 from bs4 import BeautifulSoup soup = BeautifulSoup(html_doc) # 解析html文本 能夠是 requests 提交返回的頁面 results.content print(soup.prettify()) # 輸出解析後的結構 print(soup.title) # 指定標籤內容 print(soup.title.name) # 標籤名 print(soup.title.string) # 標籤內容 print(soup.title.parent.name) # 上層標籤名 print(soup.p) # <p class="title"><b>The Dormouse's story</b></p> print(soup.p['class']) # u'title' class屬性值 print(soup.a) # 找到第一個a標籤的標籤行 print(soup.find_all('a',limit=2)) # 找到a標籤的行,最多爲limit個 print(soup.find(id="link3")) # 標籤內id爲link3的標籤行 print(soup.get_text()) # 從文檔中獲取全部文字內容 soup.find_all("a", text="Elsie") # 從文檔中搜索關鍵字 soup.find(text=re.compile("sisters")) # 從文檔中正則搜索關鍵字 soup.find_all("a", class_="sister") # 按CSS搜索 soup.find_all(id='link2',"table",attrs={"class": "status"},href=re.compile("elsie")) # 搜索方法 for i in soup.find_all('a',attrs={"class": "usr-pic"}): # 循環全部a標籤的標籤行 if i.a.img: print(i.a.img.get("src")) # 取出當前a標籤中的鏈接 Tag # find_all 後循環的值是 Tag 不是字符串 不能直接截取 tag.text # 文本 tag.name tag.name = "blockquote" # 查找name爲 blockquote 的 tag['class'] tag.attrs # 按熟悉查找 tag['class'] = 'verybold' del tag['class'] # 刪除 print(tag.get('class')) # 打印屬性值 print(i.get('href')) # 打印鏈接 json #!/usr/bin/python import json #json file temp.json #{ "name":"00_sample_case1", "description":"an example."} f = file("temp.json"); s = json.load(f) # 直接讀取json文件 print s f.close d = {"a":1} j=json.dumps(d) # 字典轉json json.loads(j) # json轉字典 s = json.loads('{"name":"test", "type":{"name":"seq", "parameter":["1", "2"]}}') print type(s) # dic print s print s.keys() print s["type"]["parameter"][1] cookielib [保留cookie登陸頁面] ck = cookielib.CookieJar() # 經過 這個就能夠實現請求帶過去的COOKIE與發送回來的COOKIE值了。 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(ck)) # 獲取到COOKIE urllib2.install_opener(opener) # 此句設置urllib2的全局opener content = urllib2.urlopen(url).read() 登陸cacti取圖片 #encoding:utf8 import urllib2 import urllib import cookielib def renrenBrower(url,user,password): #查找form標籤中的action提交地址 login_page = "http://10.10.76.79:81/cacti/index.php" try: #得到一個cookieJar實例 cj = cookielib.CookieJar() #cookieJar做爲參數,得到一個opener的實例 opener=urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) #假裝成一個正常的瀏覽器,避免有些web服務器拒絕訪問 opener.addheaders = [('User-agent','Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)')] #生成Post數據,含有登錄用戶名密碼,全部表單內的input中name值 data = urllib.urlencode({"action":"login","login_username":user,"login_password":password}) #以post的方法訪問登錄頁面,訪問以後cookieJar會自定保存cookie opener.open(login_page,data) #以帶cookie的方式訪問頁面 op=opener.open(url) #讀取頁面源碼 data=op.read() #將圖片寫到本地 #file("1d.png" , "wb").write(data) return data except Exception,e: print str(e) print renrenBrower("http://10.10.76.79:81/cacti/graph_image.php?local_graph_id=1630&rra_id=0&view_type=tree&graph_start=1397525517&graph_end=1397611917","admin","admin") 例子2 import urllib, urllib2, cookielib import os, time headers = [] def login(): cj = cookielib.CookieJar() opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) login_url = r'http://zhixing.bjtu.edu.cn/member.php?mod=logging&action=login&loginsubmit=yes&infloat=yes&lssubmit=yes&inajax=1' login_data = urllib.urlencode({'cookietime': '2592000', 'handlekey': 'ls', 'password': 'xxx', 'quickforward': 'yes', 'username': 'GuoYuan'}) opener.addheaders = [('Host', 'zhixing.bjtu.edu.cn'), ('User-Agent', 'Mozilla/5.0 (Ubuntu; X11; Linux i686; rv:8.0) Gecko/20100101 Firefox/8.0'), ('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'), ('Accept-Language', 'en-us,en;q=0.5'), ('Accept-Encoding', 'gzip, deflate'), ('Accept-Charset', 'ISO-8859-1,utf-8;q=0.7,*;q=0.7'), ('Connection', 'keep-alive'), ('Referer', 'http://zhixing.bjtu.edu.cn/forum.php'),] opener.open(login_url, login_data) return opener if __name__ == '__main__': opener = login() url = r'http://zhixing.bjtu.edu.cn/forum.php?mod=topicadmin&action=moderate&optgroup=2&modsubmit=yes&infloat=yes&inajax=1' data = {'fid': '601', 'formhash': '0cdd1596', 'frommodcp': '', 'handlekey': 'mods', 'listextra': 'page%3D62', 'moderate[]': '496146', 'operations[]': 'type', 'reason': '...', 'redirect': r'http://zhixing.bjtu.edu.cn/thread-496146-1-1.html', 'typeid': '779'} data2 = [(k, v) for k,v in data.iteritems()] cnt = 0 for tid in range(493022, 496146 + 1): cnt += 1 if cnt % 20 == 0: print print tid, data2.append(('moderate[]', str(tid))) if cnt % 40 == 0 or cnt == 496146: request = urllib2.Request(url=url, data=urllib.urlencode(data2)) print opener.open(request).read() data2 = [(k, v) for k,v in data.iteritems()] httplib [http協議的客戶端] import httplib conn3 = httplib.HTTPConnection('www.baidu.com',80,True,10) 查看網頁圖片尺寸類型 #將圖片讀入內存 #!/usr/bin/env python #encoding=utf-8 import cStringIO, urllib2, Image url = 'http://www.01happy.com/wp-content/uploads/2012/09/bg.png' file = urllib2.urlopen(url) tmpIm = cStringIO.StringIO(file.read()) im = Image.open(tmpIm) print im.format, im.size, im.mode 爬蟲 #!/usr/bin/env python #encoding:utf-8 #sudo pip install BeautifulSoup import requests from BeautifulSoup import BeautifulSoup import re baseurl = 'http://blog.sina.com.cn/s/articlelist_1191258123_0_1.html' r = requests.get(baseurl) for url in re.findall('<a.*?</a>', r.content, re.S): if url.startswith('<a title='): with open(r'd:/final.txt', 'ab') as f: f.write(url + '\n') linkfile = open(r'd:/final.txt', 'rb') soup = BeautifulSoup(linkfile) for link in soup.findAll('a'): #print link.get('title') + ': ' + link.get('href') ss = requests.get(link.get('href')) for content in re.findall('<div id="sina_keyword_ad_area2" class="articalContent ">.*?</div>', ss.content, re.S): with open(r'd:/myftp/%s.txt'%link.get('title').strip('<>'), 'wb') as f: f.write(content) print '%s has been copied.' % link.get('title') 反垃圾郵件提交申訴 #很遺憾,反垃圾郵件聯盟改版後加了驗證碼 #!/usr/bin/env python #encoding:utf-8 import requests import re IpList=['113.212.91.25','113.212.91.23'] QueryAdd='http://www.anti-spam.org.cn/Rbl/Query/Result' ComplaintAdd='http://www.anti-spam.org.cn/Rbl/Getout/Submit' data = { 'CONTENT':'''咱們是一家正規的XXX。xxxxxxx。懇請將咱們的發送服務器IP移出黑名單。謝謝! 處理措施: 1.XXXX。 2.XXXX。''', 'CORP':'abc.com', 'WWW':'www.abc.cm', 'NAME':'def', 'MAIL':'def@163.com.cn', 'TEL':'010-50000000', 'LEVEL':'0', } for Ip in IpList: query = requests.post(url=QueryAdd, data={'IP':Ip}) # 黑名單查詢 if query.ok: if re.findall(u'\u7533\u8bc9\u8131\u79bb', query.text, re.S): # 查找關鍵字 申訴脫離 既代表在黑名單中 data['IP']=Ip complaint = requests.post(url=ComplaintAdd, data=data) # 提交申訴 if complaint.ok: if re.findall(u'\u60a8\u7684\u9ed1\u540d\u5355\u8131\u79bb\u7533\u8bf7\u5df2\u63d0\u4ea4', complaint.text, re.S): status='申請提交' elif re.findall(u'\u8131\u79bb\u7533\u8bf7\u5df2\u88ab\u4ed6\u4eba\u63d0\u4ea4', complaint.text, re.S): status='重複提交' elif re.findall(u'\u7533\u8bf7\u7531\u4e8e\u8fd1\u671f\u5185\u6709\u88ab\u62d2\u7edd\u7684\u8bb0\u5f55', complaint.text, re.S): status='近期拒絕' else: status='異常' else: status='正常' print '%s %s' %(Ip,status) 有道詞典 #!/usr/bin/env python import requests from bs4 import BeautifulSoup # bs4安裝: pip install BeautifulSoup4 def youdao(word): url = r'http://dict.youdao.com/search?le=eng&q={0}'.format(word.strip()) r = requests.get(url) if r.ok: soup = BeautifulSoup(r.content) div = soup.find_all('div', class_='trans-container')[:1] # find_all是bs4的方法 ul = BeautifulSoup(str(div[0])) li = ul.find_all('li') for mean in li: print mean.text def query(): print('Created by @littlepy, QQ:185635687') while True: word = raw_input('>>>') youdao(word) if __name__ == '__main__': query() python啓動http服務提供訪問或下載 python -m SimpleHTTPServer 9900 8 併發 #線程安全/競爭條件,鎖/死鎖檢測,同步/異步,阻塞/非阻塞,epoll非阻塞IO,信號量/事件,線程池,生產消費模型,僞併發,微線程,協程 #Stackless Python 是Python編程語言的一個加強版本,它使程序員從基於線程的編程方式中得到好處,並避免傳統線程所帶來的性能與複雜度問題。Stackless爲 Python帶來的微線程擴展,是一種低開銷、輕量級的便利工具 threading多線程 thread start_new_thread(function,args kwargs=None) # 產生一個新的線程 allocate_lock() # 分配一個LockType類型的鎖對象 exit() # 讓線程退出 acquire(wait=None) # 嘗試獲取鎖對象 locked() # 若是獲取了鎖對象返回True release() # 釋放鎖 thread例子 #!/usr/bin/env python #thread_test.py #不支持守護進程 import thread from time import sleep,ctime loops = [4,2] def loop(nloop,nsec,lock): print 'start loop %s at:%s' % (nloop,ctime()) sleep(nsec) print 'loop %s done at: %s' % (nloop, ctime()) lock.release() # 分配已得到的鎖,操做結束後釋放相應的鎖通知主線程 def main(): print 'starting at:',ctime() locks = [] nloops = range(len(loops)) for i in nloops: lock = thread.allocate_lock() # 建立一個鎖 lock.acquire() # 調用各個鎖的acquire()函數得到鎖 locks.append(lock) # 把鎖放到鎖列表locks中 for i in nloops: thread.start_new_thread(loop,(i,loops[i],locks[i])) # 建立線程 for i in nloops: while locks[i].locked():pass # 等待所有解鎖才繼續運行 print 'all DONE at:',ctime() if __name__ == '__main__': main() thread例子1 #coding=utf-8 import thread,time,os def f(name): i =3 while i: time.sleep(1) print name i -= 1 # os._exit() 會把整個進程關閉 os._exit(22) if __name__ == '__main__': thread.start_new_thread(f,("th1",)) while 1: pass os._exit(0) threading Thread # 表示一個線程的執行的對象 start() # 開始線程的執行 run() # 定義線程的功能的函數(通常會被子類重寫) join(timeout=None) # 容許主線程等待線程結束,程序掛起,直到線程結束;若是給了timeout,則最多等待timeout秒. getName() # 返回線程的名字 setName(name) # 設置線程的名字 isAlive() # 布爾標誌,表示這個線程是否還在運行中 isDaemon() # 返回線程的daemon標誌 setDaemon(daemonic) # 後臺線程,把線程的daemon標誌設置爲daemonic(必定要在調用start()函數前調用) # 默認主線程在退出時會等待全部子線程的結束。若是但願主線程不等待子線程,而是在退出時自動結束全部的子線程,就須要設置子線程爲後臺線程(daemon) Lock # 鎖原語對象 Rlock # 可重入鎖對象.使單線程能夠在此得到已得到了的鎖(遞歸鎖定) Condition # 條件變量對象能讓一個線程停下來,等待其餘線程知足了某個條件.如狀態改變或值的改變 Event # 通用的條件變量.多個線程能夠等待某個事件的發生,在事件發生後,全部的線程都會被激活 Semaphore # 爲等待鎖的線程提供一個相似等候室的結構 BoundedSemaphore # 與Semaphore相似,只是不容許超過初始值 Time # 與Thread類似,只是他要等待一段時間後纔開始運行 activeCount() # 當前活動的線程對象的數量 currentThread() # 返回當前線程對象 enumerate() # 返回當前活動線程的列表 settrace(func) # 爲全部線程設置一個跟蹤函數 setprofile(func) # 爲全部線程設置一個profile函數 threading例子1 #!/usr/bin/env python #encoding:utf8 import threading from Queue import Queue from time import sleep,ctime class ThreadFunc(object): def __init__(self,func,args,name=''): self.name=name self.func=func # loop self.args=args # (i,iplist[i],queue) def __call__(self): apply(self.func,self.args) # 函數apply() 執行loop函數並傳遞元組參數 def loop(nloop,ip,queue): print 'start',nloop,'at:',ctime() queue.put(ip) sleep(2) print 'loop',nloop,'done at:',ctime() if __name__ == '__main__': threads = [] queue = Queue() iplist = ['192.168.1.2','192.168.1.3','192.168.1.4','192.168.1.5','192.168.1.6','192.168.1.7','192.168.1.8'] nloops = range(len(iplist)) for i in nloops: t = threading.Thread(target=ThreadFunc(loop,(i,iplist[i],queue),loop.__name__)) threads.append(t) for i in nloops: threads[i].start() for i in nloops: threads[i].join() for i in nloops: print queue.get() threading例子2 #!/usr/bin/env python #encoding:utf8 from Queue import Queue import random,time,threading class Producer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): for i in range(5): print "%s: %s is producing %d to the queue!\n" %(time.ctime(), self.getName(), i) self.data.put(i) self.data.put(i*i) time.sleep(2) print "%s: %s finished!" %(time.ctime(), self.getName()) class Consumer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): for i in range(10): val = self.data.get() print "%s: %s is consuming. %d in the queue is consumed!\n" %(time.ctime(), self.getName(), val) print "%s: %s finished!" %(time.ctime(), self.getName()) if __name__ == '__main__': queue = Queue() producer = Producer('Pro.', queue) consumer = Consumer('Con.', queue) producer.start() consumer.start() producer.join() consumer.join() threading例子3 # 啓動線程後自動執行 run函數其餘不能夠 import threading import time class Th(threading.Thread): def __init__(self,name): threading.Thread.__init__(self) self.t_name=name self.daemon = True # 默認爲false,讓主線程等待處理完成 def run(self): time.sleep(1) print "this is " + self.t_name if __name__ == '__main__': thread1 = Th("Th_1") thread1.start() threading例子4 import threading import time class Th(threading.Thread): def __init__(self,thread_name): threading.Thread.__init__(self) self.setName(thread_name) def run(self): threadLock.acquire() print self.getName() for i in range(3): time.sleep(1) print str(i) print self.getName() + " is over" threadLock.release() if __name__ == '__main__': threadLock = threading.Lock() thread1 = Th("Th_1") thread2 = Th("Th_2") thread1.start() thread2.start() 後臺線程 import threading import time,random class MyThread(threading.Thread): def run(self): wait_time=random.randrange(1,10) print "%s will wait %d seconds" % (self.name, wait_time) time.sleep(wait_time) print "%s finished!" % self.name if __name__=="__main__": for i in range(5): t = MyThread() t.setDaemon(True) # 設置爲後臺線程,主線程完成時不等待子線程完成就結束 t.start() threading控制最大併發_查詢日誌中IP信息 #!/usr/bin/env python #coding:utf-8 import urllib2 import json import threading import time ''' by:某大牛 QQ:185635687 這個是多線程併發控制. 若是要改爲多進程,只需把threading 換成 mulitprocessing.Process , 對, 就是換個名字而已. ''' #獲取ip 及其出現次數 def ip_dic(file_obj, dic): for i in file_obj: if i: ip=i.split('-')[0].strip() if ip in dic.keys(): dic[ip]=dic[ip] + 1 else: dic[ip]=1 return dic.iteritems() #目標函數 def get_data(url, ipcounts): data=urllib2.urlopen(url).read() datadict=json.loads(data) fdata = u"ip:%s---%s,%s,%s,%s,%s" %(datadict["data"]["ip"],ipcounts,datadict["data"]["country"],datadict["data"]["region"],datadict["data"]["city"],datadict["data"]["isp"]) print fdata #多線程 def threads(iters): thread_pool = [] for k in iters: url = "http://ip.taobao.com/service/getIpInfo.php?ip=" ipcounts = k[1] url = (url + k[0]).strip() t = threading.Thread(target=get_data, args=(url, ipcounts)) thread_pool.append(t) return thread_pool #控制多線程 def startt(t_list, max,second): l = len(t_list) n = max while l > 0: if l > max: nl = t_list[:max] t_list = t_list[max:] for t in nl: t.start() time.sleep(second) for t in nl: t.join() print '*'*15, str(n)+ ' ip has been queried'+'*'*15 n += max l = len(t_list) continue elif l <= max: nl = t_list for t in nl: t.start() for t in nl: t.join() print '>>> Totally ' + str(n+l ) + ' ip has been queried' l = 0 if __name__ =="__main__": dic={} with open('access.log') as file_obj: it = ip_dic(file_obj, dic) t_list= threads(it) startt(t_list, 15, 1) 多線程取隊列 #!/usr/bin/python import Queue import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print "Starting " + self.name process_data(self.name, self.q) print "Exiting " + self.name def process_data(threadName, q): while not exitFlag: # 死循環等待 queueLock.acquire() if not q.empty(): # 判斷隊列是否爲空 data = q.get() print "%s processing %s" % (threadName, data) queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() # 鎖與隊列並沒有任何關聯,其餘線程也進行取鎖操做的時候就會檢查是否有被佔用,有就阻塞等待解鎖爲止 workQueue = Queue.Queue(10) threads = [] threadID = 1 # Create new threads for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # Fill the queue queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # Wait for queue to empty while not workQueue.empty(): # 死循環判斷隊列被處理完畢 pass # Notify threads it's time to exit exitFlag = 1 # Wait for all threads to complete for t in threads: t.join() print "Exiting Main Thread" Queue通用隊列 q=Queue(size) # 建立大小size的Queue對象 qsize() # 返回隊列的大小(返回時候,可能被其餘進程修改,近似值) empty() # 若是隊列爲空返回True,不然Fales full() # 若是隊列已滿返回True,不然Fales put(item,block0) # 把item放到隊列中,若是給了block(不爲0),函數會一直阻塞到隊列中有空間爲止 get(block=0) # 從隊列中取一個對象,若是給了block(不爲0),函數會一直阻塞到隊列中有對象爲止 get_nowait # 默認get阻塞,這個不阻塞 multiprocessing [多進程併發] 多線程 import urllib2 from multiprocessing.dummy import Pool as ThreadPool urls=['http://www.baidu.com','http://www.sohu.com'] pool=ThreadPool(4) # 線程池 results=pool.map(urllib2.urlopen,urls) pool.close() pool.join() 多進程併發 #!/usr/bin/env python #encoding:utf8 from multiprocessing import Process import time,os def f(name): time.sleep(1) print 'hello ',name print os.getppid() # 取得父進程ID print os.getpid() # 取得進程ID process_list = [] for i in range(10): p = Process(target=f,args=(i,)) p.start() process_list.append(p) for j in process_list: j.join() Queue進程間通訊 from multiprocessing import Process,Queue import time def f(name): time.sleep(1) q.put(['hello'+str(name)]) process_list = [] q = Queue() if __name__ == '__main__': for i in range(10): p = Process(target=f,args=(i,)) p.start() process_list.append(p) for j in process_list: j.join() for i in range(10): print q.get() Pipe管道 from multiprocessing import Process,Pipe import time import os def f(conn,name): time.sleep(1) conn.send(['hello'+str(name)]) print os.getppid(),'-----------',os.getpid() process_list = [] parent_conn,child_conn = Pipe() if __name__ == '__main__': for i in range(10): p = Process(target=f,args=(child_conn,i)) p.start() process_list.append(p) for j in process_list: j.join() for p in range(10): print parent_conn.recv() 進程間同步 #加鎖,使某一時刻只有一個進程 print from multiprocessing import Process,Lock import time import os def f(name): lock.acquire() time.sleep(1) print 'hello--'+str(name) print os.getppid(),'-----------',os.getpid() lock.release() process_list = [] lock = Lock() if __name__ == '__main__': for i in range(10): p = Process(target=f,args=(i,)) p.start() process_list.append(p) for j in process_list: j.join() 共享內存 # 經過使用Value或者Array把數據存儲在一個共享的內存表中 # 'd'和'i'參數是num和arr用來設置類型,d表示一個雙精浮點類型,i表示一個帶符號的整型。 from multiprocessing import Process,Value,Array import time import os def f(n,a,name): time.sleep(1) n.value = name * name for i in range(len(a)): a[i] = -i process_list = [] if __name__ == '__main__': num = Value('d',0.0) arr = Array('i',range(10)) for i in range(10): p = Process(target=f,args=(num,arr,i)) p.start() process_list.append(p) for j in process_list: j.join() print num.value print arr[:] manager # 比共享內存靈活,但緩慢 # 支持list,dict,Namespace,Lock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value,Array from multiprocessing import Process,Manager import time import os def f(d,name): time.sleep(1) d[name] = name * name print d process_list = [] if __name__ == '__main__': manager = Manager() d = manager.dict() for i in range(10): p = Process(target=f,args=(d,i)) p.start() process_list.append(p) for j in process_list: j.join() print d 最大併發數 import multiprocessing import time,os result = [] def run(h): print 'threading:' ,h,os.getpid() p = multiprocessing.Pool(processes=20) for i in range(100): result.append(p.apply_async(run,(i,))) p.close() for res in result: res.get(timeout=5) 9 框架 flask [微型網絡開發框架] # http://dormousehole.readthedocs.org/en/latest/ # html放在 ./templates/ js放在 ./static/ request.args.get('page', 1) # 獲取參數 ?page=1 request.json # 獲取傳遞的整個json數據 request.form.get("host",'127') # 獲取表單值 簡單實例 # 接收數據和展現 import MySQLdb as mysql from flask import Flask, request app = Flask(__name__) db.autocommit(True) c = db.cursor() """ CREATE TABLE `statusinfo` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `hostname` varchar(32) NOT NULL, `load` float(10) NOT NULL DEFAULT 0.00, `time` int(15) NOT NULL, `memtotal` int(15) NOT NULL, `memusage` int(15) NOT NULL, `memfree` int(15) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=161 DEFAULT CHARSET=utf8; """ @app.route("/collect", methods=["GET", "POST"]) def collect(): sql = "" if request.method == "POST": data = request.json # 獲取傳遞的json hostname = data["Host"] load = data["LoadAvg"] time = data["Time"] memtotal = data["MemTotal"] memusage = data["MemUsage"] memfree = data["MemFree"] try: sql = "INSERT INTO `statusinfo` (`hostname`,`load`,`time`,`memtotal`,`memusage`,`memfree`) VALUES('%s', %s, %s, %s, %s, %s);" % (hostname, load,time,memtotal,memusage,memfree) ret = c.execute(sql) return 'ok' except mysql.IntegrityError: return 'errer' @app.route("/show", methods=["GET", "POST"]) def show(): try: hostname = request.form.get("hostname") # 獲取表單方式的變量值 sql = "SELECT `load` FROM `statusinfo` WHERE hostname = '%s';" % (hostname) c.execute(sql) ones = c.fetchall() return render_template("sysstatus.html", data=ones, sql = sql) except: print 'hostname null' from flask import render_template @app.route("/xxx/<name>") def hello_xx(name): return render_template("sysstatus.html", name='teach') if __name__ == "__main__": app.run(host="0.0.0.0", port=50000, debug=True) twisted [非阻塞異步服務器框架] # 用來進行網絡服務和應用程序的編程。雖然 Twisted Matrix 中有大量鬆散耦合的模塊化組件,但該框架的中心概念仍是非阻塞異步服務器這一思想。對於習慣於線程技術或分叉服務器的開發人員來講,這是一種新穎的編程風格,但它卻能在繁重負載的狀況下帶來極高的效率。 pip install twisted from twisted.internet import protocol, reactor, endpoints class Echo(protocol.Protocol): def dataReceived(self, data): self.transport.write(data) class EchoFactory(protocol.Factory): def buildProtocol(self, addr): return Echo() endpoints.serverFromString(reactor, "tcp:1234").listen(EchoFactory()) reactor.run() greenlet [微線程/協程框架] # 更加原始的微線程的概念,沒有調度,或者叫作協程。這在你須要控制你的代碼時頗有用。你能夠本身構造微線程的 調度器;也可使用"greenlet"實現高級的控制流。例如能夠從新建立構造器;不一樣於Python的構造器,咱們的構造器能夠嵌套的調用函數,而被嵌套的函數也能夠 yield 一個值。 pip install greenlet tornado [極輕量級Web服務器框架] # 高可伸縮性和epoll非阻塞IO,響應快速,可處理數千併發鏈接,特別適用用於實時的Web服務 # http://www.tornadoweb.cn/documentation pip install tornado import tornado.ioloop import tornado.web class MainHandler(tornado.web.RequestHandler): def get(self): self.write("Hello, world") application = tornado.web.Application([ (r"/", MainHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start() Scrapy [web抓取框架] # Python開發的一個快速,高層次的屏幕抓取和web抓取框架,用於抓取web站點並從頁面中提取結構化的數據。Scrapy用途普遍,能夠用於數據挖掘、監測和自動化測試。 pip install scrapy from scrapy import Spider, Item, Field class Post(Item): title = Field() class BlogSpider(Spider): name, start_urls = 'blogspider', ['http://blog.scrapinghub.com'] def parse(self, response): return [Post(title=e.extract()) for e in response.css("h2 a::text")] scrapy runspider myspider.py django [重量級web框架] bottle [輕量級的Web框架] 10 例子 小算法 斐波那契 #將函數結果做爲列表可用於循環 def fab(max): n, a, b = 0, 0, 1 while n < max: yield b a, b = b, a + b n = n + 1 for n in fab(5): print n 乘法口訣 #!/usr/bin/python for i in range(1,10): for j in range(1,i+1): print j,'*',i,'=',j*i, else: print '' 最小公倍數 # 1-70的最小公倍數 def c(m,n): a1=m b1=n r=n%m while r!=0: n=m m=r r=n%m return (a1*b1)/m d=1 for i in range(3,71,2): d = c(d,i) print d 排序算法 插入排序 def insertion_sort(sort_list): iter_len = len(sort_list) if iter_len < 2: return sort_list for i in range(1, iter_len): key = sort_list[i] j = i - 1 while j>=0 and sort_list[j]>key: sort_list[j+1] = sort_list[j] j -= 1 sort_list[j+1] = key return sort_list 選擇排序 def selection_sort(sort_list): iter_len = len(sort_list) if iter_len < 2: return sort_list for i in range(iter_len-1): smallest = sort_list[i] location = i for j in range(i, iter_len): if sort_list[j] < smallest: smallest = sort_list[j] location = j if i != location: sort_list[i], sort_list[location] = sort_list[location], sort_list[i] return sort_list 冒泡排序算法 def bubblesort(numbers): for j in range(len(numbers)-1,-1,-1): for i in range(j): if numbers[i]>numbers[i+1]: numbers[i],numbers[i+1] = numbers[i+1],numbers[i] print(i,j) print(numbers) 二分算法 #python 2f.py 123456789 4 # list('123456789') = ['1', '2', '3', '4', '5', '6', '7', '8', '9'] #!/usr/bin/env python import sys def search2(a,m): low = 0 high = len(a) - 1 while(low <= high): mid = (low + high)/2 midval = a[mid] if midval < m: low = mid + 1 elif midval > m: high = mid - 1 else: print mid return mid print -1 return -1 if __name__ == "__main__": a = [int(i) for i in list(sys.argv[1])] m = int(sys.argv[2]) search2(a,m) 將字典中全部time去掉 a={'version01': {'nba': {'timenba': 'valuesasdfasdf', 'nbanbac': 'vtimefasdf', 'userasdf': 'vtimasdf'}}} eval(str(a).replace("time","")) PIL圖像處理 import Image im = Image.open("j.jpg") # 打開圖片 print im.format, im.size, im.mode # 打印圖像格式、像素寬和高、模式 # JPEG (440, 330) RGB im.show() # 顯示最新加載圖像 box = (100, 100, 200, 200) region = im.crop(box) # 從圖像中提取出某個矩形大小的圖像 圖片等比縮小 # -*- coding: cp936 -*- import Image import glob, os #圖片批處理 def timage(): for files in glob.glob('D:\\1\\*.JPG'): filepath,filename = os.path.split(files) filterame,exts = os.path.splitext(filename) #輸出路徑 opfile = r'D:\\22\\' #判斷opfile是否存在,不存在則建立 if (os.path.isdir(opfile)==False): os.mkdir(opfile) im = Image.open(files) w,h = im.size #im_ss = im.resize((400,400)) #im_ss = im.convert('P') im_ss = im.resize((int(w*0.12), int(h*0.12))) im_ss.save(opfile+filterame+'.jpg') if __name__=='__main__': timage() 取系統返回值賦給序列 cmd = os.popen("df -Ph|awk 'NR!=1{print $5}'").readlines(); cmd = os.popen('df -h').read().split('\n') cmd = os.popen('lo 2>&1').read() #取磁盤使用空間 import commands df = commands.getoutput("df -hP") [ x.split()[4] for x in df.split("\n") ] [ (x.split()[0],x.split()[4]) for x in df.split("\n") if x.split()[4].endswith("%") ] 打印表格 map = [["a","b","c"], ["d","e","f"], ["g","h","i"]] def print_board(): for i in range(0,3): for j in range(0,3): print "|",map[i][j], #if j != 2: print '|' 生成html文件表格 log_file = file('check.html', 'w') log_file.write(""" <!DOCTYPE HTML> <html lang="utr-8"> <head> <meta charset="UTF-8"> <title></title> </head> <body> <table align='center' border='0' cellPadding='0' style='font-size:24px;'><tr ><td>狀態統計</td></tr></table> <style>.font{font-size:13px}</style> <table align='center' border='1' borderColor=gray cellPadding=3 width=1350 class='font'> <tr style='background-color:#666666'> <th width=65>IP</th> <th width=65>狀態</th> </tr> """) for i in list: log_file.write('<tr><td>%s</td><td>%s</td></tr>\n' %(i.split()[0],i.split()[1]) ) log_file.write(""" </table> </body> </html> """) log_file.flush() log_file.close() 井字遊戲 #!/usr/bin/python # http://www.admin10000.com/document/2506.html def print_board(): for i in range(0,3): for j in range(0,3): print map[2-i][j], if j != 2: print "|", print "" def check_done(): for i in range(0,3): if map[i][0] == map[i][1] == map[i][2] != " " \ or map[0][i] == map[1][i] == map[2][i] != " ": print turn, "won!!!" return True if map[0][0] == map[1][1] == map[2][2] != " " \ or map[0][2] == map[1][1] == map[2][0] != " ": print turn, "won!!!" return True if " " not in map[0] and " " not in map[1] and " " not in map[2]: print "Draw" return True return False turn = "X" map = [[" "," "," "], [" "," "," "], [" "," "," "]] done = False while done != True: print_board() print turn, "'s turn" print moved = False while moved != True: print "Please select position by typing in a number between 1 and 9, see below for which number that is which position..." print "7|8|9" print "4|5|6" print "1|2|3" print try: pos = input("Select: ") if pos <=9 and pos >=1: Y = pos/3 X = pos%3 if X != 0: X -=1 else: X = 2 Y -=1 if map[Y][X] == " ": map[Y][X] = turn moved = True done = check_done() if done == False: if turn == "X": turn = "O" else: turn = "X" except: print "You need to add a numeric value" 網段劃分 題目 192.168.1 192.168.3 192.168.2 172.16.3 192.16.1 192.16.2 192.16.3 10.0.4 輸出結果: 192.16.1-192.16.3 192.168.1-192.168.3 172.16.3 10.0.4 答案 #!/usr/bin/python f = file('a.txt') c = f.readlines() dic={} for i in c: a=i.strip().split('.') if a[0]+'.'+a[1] in dic.keys(): key=dic["%s.%s" %(a[0],a[1])] else: key=[] key.append(a[2]) dic[a[0]+'.'+a[1]]=sorted(key) for x,y in dic.items(): if y[0] == y[-1]: print '%s.%s' %(x,y[0]) else: print '%s.%s-%s.%s' %(x,y[0],x,y[-1]) 統計日誌IP # 打印出獨立IP,並統計獨立IP數 219.140.190.130 - - [23/May/2006:08:57:59 +0800] "GET /fg172.exe HTTP/1.1" 200 2350253 221.228.143.52 - - [23/May/2006:08:58:08 +0800] "GET /fg172.exe HTTP/1.1" 206 719996 221.228.143.52 - - [23/May/2006:08:58:08 +0800] "GET /fg172.exe HTTP/1.1" 206 713242 #!/usr/bin/python dic={} a=open("a").readlines() for i in a: ip=i.strip().split()[0] if ip in dic.keys(): dic[ip] = dic[ip] + 1 else: dic[ip] = 1 for x,y in dic.items(): print x," ",y 不按期更新,更新下載地址: http://hi.baidu.com/quanzhou722/item/cf4471f8e23d3149932af2a7 請勿刪除信息,植入廣告,抵制不道德行爲。