python實例手冊 #encoding:utf8 # 設定編碼-支持中文 0說明 手冊製做: 雪松 更新日期: 2013-12-19 歡迎系統運維加入Q羣: 198173206 # 加羣請回答問題 請使用"notepad++"打開此文檔,"alt+0"將函數摺疊後方便查閱 請勿刪除信息,轉載請說明出處,抵制不道德行爲。 錯誤在所不免,還望指正! # python實例手冊下載地址: http://hi.baidu.com/quanzhou722/item/cf4471f8e23d3149932af2a7 # shell實例手冊最新下載地址: http://hi.baidu.com/quanzhou722/item/f4a4f3c9eb37f02d46d5c0d9 # LazyManage運維批量管理軟件下載(shell): http://hi.baidu.com/quanzhou722/item/4ccf7e88a877eaccef083d1a 查看幫助 import os for i in dir(os): print i # 模塊的方法 help(os.path) # 方法的幫助 變量 r=r'\n' # 輸出時原型打印 u=u'中文' # 定義爲unicode編碼 global x # 全局變量 a = 0 or 2 or 1 # 布爾運算賦值,a值爲True既不處理後面,a值爲2. None、字符串''、空元組()、空列表[],空字典{}、0、空字符串都是false name = raw_input("input:").strip() # 輸入字符串變量 num = int(raw_input("input:").strip()) # 輸入字符串str轉爲int型 locals() # 全部局部變量組成的字典 locals().values() # 全部局部變量值的列表 os.popen("date -d @{0} +'%Y-%m-%d %H:%M:%S'".format(12)).read() # 特殊狀況引用變量 {0} 表明第一個參數 打印 # 字符串 %s 整數 %d 浮點 %f 原樣打印 %r print '字符串: %s 整數: %d 浮點: %f 原樣打印: %r' % ('aa',2,1.0,'r') print 'abc', # 有逗號,表明不換行打印,在次打印會接着本行打印 列表 # 列表元素的個數最多 536870912 shoplist = ['apple', 'mango', 'carrot', 'banana'] shoplist[2] = 'aa' del shoplist[0] shoplist.insert('4','www') shoplist.append('aaa') shoplist[::-1] # 倒着打印 對字符翻轉串有效 元組 # 不可變 zoo = ('wolf', 'elephant', 'penguin') 字典 ab = { 'Swaroop' : 'swaroopch@byteofpython.info', 'Larry' : 'larry@wall.org', } ab['c'] = 80 # 添加字典元素 del ab['Larry'] # 刪除字典元素 ab.keys() # 查看全部鍵值 ab.values() # 打印全部值 ab.has_key('a') # 查看鍵值是否存在 ab.items() # 返回整個字典列表 流程結構 if判斷 # 布爾值操做符 and or not 實現多重判斷 if a == b: print '==' elif a < b: print b else: print a fi while循環 while True: if a == b: print "==" break print "!=" else: print 'over' count=0 while(count<9): print count count += 1 for循環 sorted() # 返回一個序列(列表) zip() # 返回一個序列(列表) enumerate() # 返回迭代器(相似序列) reversed() # 反序迭代器對象 dict.iterkeys() # 經過鍵迭代 dict.itervalues() # 經過值迭代 dict.iteritems() # 經過鍵-值對迭代 randline() # 文件迭代 iter(obj) # 獲得obj迭代器 檢查obj是否是一個序列 iter(a,b) # 重複調用a,直到迭代器的下一個值等於b for i in range(1, 5): print i else: print 'over' list = ['a','b','c','b'] for i in range(len(list)): print list[i] for x, Lee in enumerate(list): print "%d %s Lee" % (x+1,Lee) 流程結構簡寫 [ i * 2 for i in [8,-2,5]] [16,-4,10] [ i for i in range(8) if i %2 == 0 ] [0,2,4,6] tab補全 # vim /usr/lib/python2.7/dist-packages/tab.py # python startup file import sys import readline import rlcompleter import atexit import os # tab completion readline.parse_and_bind('tab: complete') # history file histfile = os.path.join(os.environ['HOME'], '.pythonhistory') 函數 def printMax(a, b = 1): if a > b: print a return a else: print b return b x = 5 y = 7 printMax(x, y) 模塊 # Filename: mymodule.py def sayhi(): print 'mymodule' version = '0.1' # 導入模塊 import mymodule from mymodule import sayhi, version # 使用模塊中函數 mymodule.sayhi() 取參數 import sys for i in sys.argv: print i 對象的方法 class Person: # 實例化初始化的方法 def __init__(self, name ,age): self.name = name self.age = age print self.name # 有self此函數爲方法 def sayHi(self): print 'Hello, my name is', self.name #對象消逝的時候被調用 def __del__(self): print 'over' # 實例化對象 p = Person('Swaroop') # 使用對象方法 p.sayHi() # 繼承 class Teacher(Person): def __init__(self, name, age, salary): Person.__init__(self, name, age) self.salary = salary print '(Initialized Teacher: %s)' % self.name def tell(self): Person.tell(self) print 'Salary: "%d"' % self.salary t = Teacher('Mrs. Shrividya', 40, 30000) 文件處理 # 模式: 讀'r' 寫'w' 追加'a' 讀寫'r+' 二進制文件'b' 'rb','wb','rb+' 寫文件 f = file('poem.txt', 'w') f.write("string") f.write(str(i)) f.flush() f.close() 讀文件 f = file('/etc/passwd','r') while True: line = f.readline() # 返回一行 if len(line) == 0: break x = line.split(":") # 冒號分割定義序列 #x = [ x for x in line.split(":") ] # 冒號分割定義序列 #x = [ x.split("/") for x in line.split(":") ] # 先冒號分割,在/分割 打印x[6][1] print x[6],"\n", f.close() 讀文件2 f = file('/etc/passwd') c = f.readlines() # 讀入因此文件內容,可反覆讀取,大文件時佔用內存較大 for line in c: print line.rstrip(), f.close() 讀文件3 for i in open('b.txt'): # 直接讀取也可迭代,並有利於大文件讀取,但不可反覆讀取 print i, 追加日誌 log = open('/home/peterli/xuesong','a') print >> log,'faaa' log.close() with讀文件 with open('a.txt') as f: # print f.read() # 打印全部內容爲字符串 print f.readlines() # 打印全部內容按行分割的列表 csv讀配置文件 192.168.1.5,web # 配置文件按逗號分割 list = csv.reader(file('a.txt')) for line in list: print line # ['192.168.1.5', 'web'] 觸發異常 class ShortInputException(Exception): '''A user-defined exception class.''' def __init__(self, length, atleast): Exception.__init__(self) self.length = length self.atleast = atleast try: s = raw_input('Enter something --> ') if len(s) < 3: raise ShortInputException(len(s), 3) except EOFError: print '\nWhy did you do an EOF on me?' except ShortInputException, x: print 'ShortInputException: The input was of length %d, \ was expecting at least %d' % (x.length, x.atleast) else: print 'No exception was raised.' 內建函數 dir(sys) # 顯示對象的屬性 help(sys) # 交互式幫助 int(obj) # 轉型爲整形 str(obj) # 轉爲字符串 len(obj) # 返回對象或序列長度 open(file,mode) # 打開文件 #mode (r 讀,w 寫, a追加) range(0,3) # 返回一個整形列表 raw_input("str:") # 等待用戶輸入 type(obj) # 返回對象類型 abs(-22) # 絕對值 random # 隨機數 random.randrange(9) # 9之內隨機數 choice() # 隨機返回給定序列的一個元素 divmod(x,y) # 函數完成除法運算,返回商和餘數。 isinstance(object,int) # 測試對象類型 int round(x[,n]) # 函數返回浮點數x的四捨五入值,如給出n值,則表明舍入到小數點後的位數 xrange() # 函數與range()相似,但xrnage()並不建立列表,而是返回一個xrange對象 xrange([lower,]stop[,step]) strip() # 是去掉字符串兩端多於空格,該句是去除序列中的全部字串兩端多餘的空格 del # 刪除列表裏面的數據 cmp(x,y) # 比較兩個對象 #根據比較結果返回一個整數,若是x<y,則返回-1;若是x>y,則返回1,若是x==y則返回0 max() # 字符串中最大的字符 min() # 字符串中最小的字符 sorted() # 對序列排序 reversed() # 對序列倒序 enumerate() # 返回索引位置和對應的值 sum() # 總和 list() # 變成列表可用於迭代 tuple() # 變成元組可用於迭代 #一旦初始化便不能更改的數據結構,速度比list快 zip() # 返回一個列表 >>> s = ['11','22','33'] >>> t = ['aa','bb','cc'] >>> zip(s,t) [('11', 'aa'), ('22', 'bb'), ('33', 'cc')] 字符串相關模塊 string # 字符串操做相關函數和工具 re # 正則表達式 struct # 字符串和二進制之間的轉換 c/StringIO # 字符串緩衝對象,操做方法相似於file對象 base64 # Base16\32\64數據編解碼 codecs # 解碼器註冊和基類 crypt # 進行單方面加密 difflib # 找出序列間的不一樣 hashlib # 多種不一樣安全哈希算法和信息摘要算法的API hma # HMAC信息鑑權算法的python實現 md5 # RSA的MD5信息摘要鑑權 rotor # 提供多平臺的加解密服務 sha # NIAT的安全哈希算法SHA stringprep # 提供用於IP協議的Unicode字符串 textwrap # 文本包裝和填充 unicodedate # unicode數據庫 列表類型內建函數 list.append(obj) # 向列表中添加一個對象obj list.count(obj) # 返回一個對象obj在列表中出現的次數 list.extend(seq) # 把序列seq的內容添加到列表中 list.index(obj,i=0,j=len(list)) # 返回list[k] == obj 的k值,而且k的範圍在i<=k<j;不然異常 list.insert(index.obj) # 在索引量爲index的位置插入對象obj list.pop(index=-1) # 刪除並返回指定位置的對象,默認是最後一個對象 list.remove(obj) # 從列表中刪除對象obj list.reverse() # 原地翻轉列表 list.sort(func=None,key=None,reverse=False) # 以指定的方式排序列表中成員,若是func和key參數指定,則按照指定的方式比較各個元素,若是reverse標誌被置爲True,則列表以反序排列 序列類型操做符 seq[ind] # 獲取下標爲ind的元素 seq[ind1:ind2] # 得到下標從ind1到ind2的元素集合 seq * expr # 序列重複expr次 seq1 + seq2 # 鏈接seq1和seq2 obj in seq # 判斷obj元素是否包含在seq中 obj not in seq # 判斷obj元素是否不包含在seq中 字符串類型內建方法 string.expandtabs(tabsize=8) # tab符號轉爲空格 #默認8個空格 string.endswith(obj,beg=0,end=len(staring)) # 檢測字符串是否已obj結束,若是是返回True #若是beg或end指定檢測範圍是否已obj結束 string.count(str,beg=0,end=len(string)) # 檢測str在string裏出現次數 string.find(str,beg=0,end=len(string)) # 檢測str是否包含在string中 string.index(str,beg=0,end=len(string)) # 檢測str不在string中,會報異常 string.isalnum() # 若是string至少有一個字符而且全部字符都是字母或數字則返回True string.isalpha() # 若是string至少有一個字符而且全部字符都是字母則返回True string.isnumeric() # 若是string只包含數字字符,則返回True string.isspace() # 若是string包含空格則返回True string.isupper() # 字符串都是大寫返回True string.islower() # 字符串都是小寫返回True string.lower() # 轉換字符串中全部大寫爲小寫 string.upper() # 轉換字符串中全部小寫爲大寫 string.lstrip() # 去掉string左邊的空格 string.rstrip() # 去掉string字符末尾的空格 string.replace(str1,str2,num=string.count(str1)) # 把string中的str1替換成str2,若是num指定,則替換不超過num次 string.startswith(obj,beg=0,end=len(string)) # 檢測字符串是否以obj開頭 string.zfill(width) # 返回字符長度爲width的字符,原字符串右對齊,前面填充0 string.isdigit() # 只包含數字返回True string.split("分隔符") # 把string切片成一個列表 ":".join(string.split()) # 以:做爲分隔符,將全部元素合併爲一個新的字符串 序列類型相關的模塊 array # 一種受限制的可變序列類型,元素必須相同類型 copy # 提供淺拷貝和深拷貝的能力 operator # 包含函數調用形式的序列操做符 operator.concat(m,n) re # perl風格的正則表達式查找 StringIO # 把長字符串做爲文件來操做 如: read() \ seek() cStringIO # 把長字符串做爲文件來操,做速度更快,但不能被繼承 textwrap # 用做包裝/填充文本的函數,也有一個類 types # 包含python支持的全部類型 collections # 高性能容器數據類型 字典相關函數 dict([container]) # 建立字典的工廠函數。提供容器類(container),就用其中的條目填充字典 len(mapping) # 返回映射的長度(鍵-值對的個數) hash(obj) # 返回obj哈希值,判斷某個對象是否可作一個字典的鍵值 字典內建方法 dict.clear() # 刪除字典中全部元素 dict copy() # 返回字典(淺複製)的一個副本 dict.fromkeys(seq,val=None) # 建立並返回一個新字典,以seq中的元素作該字典的鍵,val作該字典中全部鍵對的初始值 dict.get(key,default=None) # 對字典dict中的鍵key,返回它對應的值value,若是字典中不存在此鍵,則返回default值 dict.has_key(key) # 若是鍵在字典中存在,則返回True 用in和not in代替 dicr.items() # 返回一個包含字典中鍵、值對元組的列表 dict.keys() # 返回一個包含字典中鍵的列表 dict.iter() # 方法iteritems()、iterkeys()、itervalues()與它們對應的非迭代方法同樣,不一樣的是它們返回一個迭代子,而不是一個列表 dict.pop(key[,default]) # 和方法get()類似.若是字典中key鍵存在,刪除並返回dict[key] dict.setdefault(key,default=None) # 和set()類似,但若是字典中不存在key鍵,由dict[key]=default爲它賦值 dict.update(dict2) # 將字典dict2的鍵值對添加到字典dict dict.values() # 返回一個包含字典中全部值得列表 集合方法 s.issubset(t) # 若是s是t的子集,則返回True s <= t s.issuperset(t) # 若是t是s的超集,則返回True s >= t s.union(t) # 合併操做;返回一個新集合,該集合是s和t的並集 s | t s.intersection(t) # 交集操做;返回一個新集合,該集合是s和t的交集 s & t s.difference(t) # 返回一個新集合,改集合是s的成員,但不是t的成員 s - t s.symmetric_difference(t) # 返回一個新集合,該集合是s或t的成員,但不是s和t共有的成員 s ^ t s.copy() # 返回一個新集合,它是集合s的淺複製 obj in s # 成員測試;obj是s中的元素 返回True obj not in s # 非成員測試:obj不是s中元素 返回True s == t # 等價測試 是否具備相同元素 s != t # 不等價測試 s < t # 子集測試;s!=t且s中全部元素都是t的成員 s > t # 超集測試;s!=t且t中全部元素都是s的成員 可變集合方法 s.update(t) # 用t中的元素修改s,s如今包含s或t的成員 s |= t s.intersection_update(t) # s中的成員是共用屬於s和t的元素 s &= t s.difference_update(t) # s中的成員是屬於s但不包含在t中的元素 s -= t s.symmetric_difference_update(t) # s中的成員更新爲那些包含在s或t中,但不是s和t共有的元素 s ^= t s.add(obj) # 在集合s中添加對象obj s.remove(obj) # 從集合s中刪除對象obj;若是obj不是集合s中的元素(obj not in s),將引起KeyError錯誤 s.discard(obj) # 若是obj是集合s中的元素,從集合s中刪除對象obj s.pop() # 刪除集合s中的任意一個對象,並返回它 s.clear() # 刪除集合s中的全部元素 序列化 #!/usr/bin/python import cPickle obj = {'1':['4124','1241','124'],'2':['12412','142','1241']} pkl_file = open('account.pkl','wb') cPickle.down(obj,pkl_file) pkl_file.close() pkl_file = open('account.pkl','rb') account_list = cPickle.load(pkl_file) pkl_file.close() 文件對象方法 file.close() # 關閉文件 file.fileno() # 返回文件的描述符 file.flush() # 刷新文件的內部緩衝區 file.isatty() # 判斷file是不是一個類tty設備 file.next() # 返回文件的下一行,或在沒有其餘行時引起StopIteration異常 file.read(size=-1) # 從文件讀取size個字節,當未給定size或給定負值的時候,讀取剩餘的全部字節,而後做爲字符串返回 file.readline(size=-1) # 從文件中讀取並返回一行(包括行結束符),或返回最大size個字符 file.readlines(sizhint=0) # 讀取文件的全部行做爲一個列表返回 file.xreadlines() # 用於迭代,可替換readlines()的一個更高效的方法 file.seek(off, whence=0) # 在文件中移動文件指針,從whence(0表明文件起始,1表明當前位置,2表明文件末尾)偏移off字節 file.tell() # 返回當前在文件中的位置 file.truncate(size=file.tell()) # 截取文件到最大size字節,默認爲當前文件位置 file.write(str) # 向文件寫入字符串 file.writelines(seq) # 向文件寫入字符串序列seq;seq應該是一個返回字符串的可迭代對象 文件對象的屬性 file.closed # 表示文件已被關閉,不然爲False file.encoding # 文件所使用的編碼 當unicode字符串被寫入數據時,它將自動使用file.encoding轉換爲字節字符串;若file.encoding爲None時使用系統默認編碼 file.mode # Access文件打開時使用的訪問模式 file.name # 文件名 file.newlines # 未讀取到行分隔符時爲None,只有一種行分隔符時爲一個字符串,當文件有多種類型的行結束符時,則爲一個包含全部當前所遇到的行結束符的列表 file.softspace # 爲0表示在輸出一數據後,要加上一個空格符,1表示不加 os模塊 文件處理 mkfifo()/mknod() # 建立命名管道/建立文件系統節點 remove()/unlink() # 刪除文件 rename()/renames() # 重命名文件 *stat() # 返回文件信息 symlink() # 建立符號連接 utime() # 更新時間戳 tmpfile() # 建立並打開('w+b')一個新的臨時文件 walk() # 遍歷目錄樹下的全部文件名 目錄/文件夾 chdir()/fchdir() # 改變當前工做目錄/經過一個文件描述符改變當前工做目錄 chroot() # 改變當前進程的根目錄 listdir() # 列出指定目錄的文件 getcwd()/getcwdu() # 返回當前工做目錄/功能相同,但返回一個unicode對象 mkdir()/makedirs() # 建立目錄/建立多層目錄 rmdir()/removedirs() # 刪除目錄/刪除多層目錄 訪問/權限 saccess() # 檢驗權限模式 chmod() # 改變權限模式 chown()/lchown() # 改變owner和groupID功能相同,但不會跟蹤連接 umask() # 設置默認權限模式 文件描述符操做 open() # 底層的操做系統open(對於穩健,使用標準的內建open()函數) read()/write() # 根據文件描述符讀取/寫入數據 按大小讀取文件部份內容 dup()/dup2() # 複製文件描述符號/功能相同,可是複製到另外一個文件描述符 設備號 makedev() # 從major和minor設備號建立一個原始設備號 major()/minor() # 從原始設備號得到major/minor設備號 os.path模塊 os.path.expanduser('~/.ssh/key') # 家目錄下文件的全路徑 分隔 basename() # 去掉目錄路徑,返回文件名 dirname() # 去掉文件名,返回目錄路徑 join() # 將分離的各部分組合成一個路徑名 spllt() # 返回(dirname(),basename())元組 splitdrive() # 返回(drivename,pathname)元組 splitext() # 返回(filename,extension)元組 信息 getatime() # 返回最近訪問時間 getctime() # 返回文件建立時間 getmtime() # 返回最近文件修改時間 getsize() # 返回文件大小(字節) 查詢 exists() # 指定路徑(文件或目錄)是否存在 isabs() # 指定路徑是否爲絕對路徑 isdir() # 指定路徑是否存在且爲一個目錄 isfile() # 指定路徑是否存在且爲一個文件 islink() # 指定路徑是否存在且爲一個符號連接 ismount() # 指定路徑是否存在且爲一個掛載點 samefile() # 兩個路徑名是否指向同一個文件 相關模塊 base64 # 提供二進制字符串和文本字符串間的編碼/解碼操做 binascii # 提供二進制和ASCII編碼的二進制字符串間的編碼/解碼操做 bz2 # 訪問BZ2格式的壓縮文件 csv # 訪問csv文件(逗號分隔文件) csv.reader(open(file)) filecmp # 用於比較目錄和文件 fileinput # 提供多個文本文件的行迭代器 getopt/optparse # 提供了命令行參數的解析/處理 glob/fnmatch # 提供unix樣式的通配符匹配的功能 gzip/zlib # 讀寫GNU zip(gzip)文件(壓縮須要zlib模塊) shutil # 提供高級文件訪問功能 c/StringIO # 對字符串對象提供類文件接口 tarfile # 讀寫TAR歸檔文件,支持壓縮文件 tempfile # 建立一個臨時文件 uu # uu格式的編碼和解碼 zipfile # 用於讀取zip歸檔文件的工具 environ['HOME'] # 查看系統環境變量 子進程 os.fork() # 建立子進程,並複製父進程全部操做 經過判斷pid = os.fork() 的pid值,分別執行父進程與子進程操做,0爲子進程 os.wait() # 等待子進程結束 跨平臺os模塊屬性 linesep # 用於在文件中分隔行的字符串 sep # 用來分隔文件路徑名字的字符串 pathsep # 用於分割文件路徑的字符串 curdir # 當前工做目錄的字符串名稱 pardir # 父目錄字符串名稱 異常 NameError: # 嘗試訪問一個未申明的變量 ZeroDivisionError: # 除數爲零 SyntaxErrot: # 解釋器語法錯誤 IndexError: # 請求的索引元素超出序列範圍 KeyError: # 請求一個不存在的字典關鍵字 IOError: # 輸入/輸出錯誤 AttributeError: # 嘗試訪問未知的對象屬性 ImportError # 沒有模塊 IndentationError # 語法縮進錯誤 KeyboardInterrupt # ctrl+C SyntaxError # 代碼語法錯誤 ValueError # 值錯誤 TypeError # 傳入對象類型與要求不符合 觸發異常 raise exclass # 觸發異常,從exclass生成一個實例(不含任何異常參數) raise exclass() # 觸發異常,但如今不是類;經過函數調用操做符(function calloperator:"()")做用於類名生成一個新的exclass實例,一樣也沒有異常參數 raise exclass, args # 觸發異常,但同時提供的異常參數args,能夠是一個參數也能夠是元組 raise exclass(args) # 觸發異常,同上 raise exclass, args, tb # 觸發異常,但提供一個跟蹤記錄(traceback)對象tb供使用 raise exclass,instance # 經過實例觸發異常(一般是exclass的實例) raise instance # 經過實例觸發異常;異常類型是實例的類型:等價於raise instance.__class__, instance raise string # 觸發字符串異常 raise string, srgs # 觸發字符串異常,但觸發伴隨着args raise string,args,tb # 觸發字符串異常,但提供一個跟蹤記錄(traceback)對象tb供使用 raise # 從新觸發前一個異常,若是以前沒有異常,觸發TypeError 內建異常 BaseException # 全部異常的基類 SystemExit # python解釋器請求退出 KeyboardInterrupt # 用戶中斷執行 Exception # 常規錯誤的基類 StopIteration # 迭代器沒有更多的值 GeneratorExit # 生成器發生異常來通知退出 SystemExit # python解釋器請求退出 StandardError # 全部的內建標準異常的基類 ArithmeticError # 全部數值計算錯誤的基類 FloatingPointError # 浮點計算錯誤 OverflowError # 數值運算超出最大限制 AssertionError # 斷言語句失敗 AttributeError # 對象沒有這個屬性 EOFError # 沒有內建輸入,到達EOF標記 EnvironmentError # 操做系統錯誤的基類 IOError # 輸入/輸出操做失敗 OSError # 操做系統錯誤 WindowsError # windows系統調用失敗 ImportError # 導入模塊/對象失敗 KeyboardInterrupt # 用戶中斷執行(一般是ctrl+c) LookupError # 無效數據查詢的基類 IndexError # 序列中沒有此索引(index) KeyError # 映射中沒有這個鍵 MemoryError # 內存溢出錯誤(對於python解釋器不是致命的) NameError # 未聲明/初始化對象(沒有屬性) UnboundLocalError # 訪問未初始化的本地變量 ReferenceError # 若引用試圖訪問已經垃圾回收了的對象 RuntimeError # 通常的運行時錯誤 NotImplementedError # 還沒有實現的方法 SyntaxError # python語法錯誤 IndentationError # 縮進錯誤 TabError # tab和空格混用 SystemError # 通常的解釋器系統錯誤 TypeError # 對類型無效的操做 ValueError # 傳入無效的參數 UnicodeError # Unicode相關的錯誤 UnicodeDecodeError # Unicode解碼時的錯誤 UnicodeEncodeError # Unicode編碼時的錯誤 UnicodeTranslateError # Unicode轉換時錯誤 Warning # 警告的基類 DeprecationWarning # 關於被棄用的特徵的警告 FutureWarning # 關於構造未來語義會有改變的警告 OverflowWarning # 舊的關於自動提高爲長整形的警告 PendingDeprecationWarning # 關於特性將會被廢棄的警告 RuntimeWarning # 可疑的運行時行爲的警告 SyntaxWarning # 可疑的語法的警告 UserWarning # 用戶代碼生成的警告 class MyException(Exception): # 繼承Exception異常的類,定義本身的異常 pass try: # 監控這裏的異常 option=int(raw_input('my age:')) if option != 28: raise MyException,'a1' #觸發異常 except MyException,a: # 異常處理代碼 print 'MyExceptionaa' print a # 打印異常處內容 except: # 捕捉全部其它錯誤 print 'except' finally: # 不管什麼狀況都會執行 關閉文件或斷開鏈接等 print 'finally' else: # 無任何異常 沒法和finally同用 print 'no Exception' 函數式編程的內建函數 apply(func[,nkw][,kw]) # 用可選的參數來調用func,nkw爲非關鍵字參數,kw爲關鍵字參數;返回值是函數調用的返回值 filter(func,seq) # 調用一個布爾函數func來迭代遍歷每一個seq中的元素;返回一個使func返回值爲true的元素的序列 map(func,seq1[,seq2]) # 將函數func做用於給定序列(s)的每一個元素,並用一個列表來提供返回值;若是func爲None,func表現爲一個身份函數,返回一個含有每一個序列中元素集合的n個元組的列表 reduce(func,seq[,init]) # 將二元函數做用於seq序列的元素,每次攜帶一堆(先前的結果以及下一個序列元素),連續地將現有的結果和下一個值做用在得到的隨後的結果上,最後減小咱們的序列爲一個單一的返回值;若是初始值init給定,第一個比較會是init和第一個序列元素而不是序列的頭兩個元素 re正則 compile(pattern,flags=0) # 對正則表達式模式pattern進行編譯,flags是可選標識符,並返回一個regex對象 match(pattern,string,flags=0) # 嘗試用正則表達式模式pattern匹配字符串string,flags是可選標識符,若是匹配成功,則返回一個匹配對象;不然返回None search(pattern,string,flags=0) # 在字符串string中搜索正則表達式模式pattern的第一次出現,flags是可選標識符,若是匹配成功,則返回一個匹配對象;不然返回None findall(pattern,string[,flags]) # 在字符串string中搜索正則表達式模式pattern的全部(非重複)出現:返回一個匹配對象的列表 # pattern=u'\u4e2d\u6587' 表明UNICODE finditer(pattern,string[,flags]) # 和findall()相同,但返回的不是列表而是迭代器;對於每一個匹配,該迭代器返回一個匹配對象 split(pattern,string,max=0) # 根據正則表達式pattern中的分隔符把字符string分割爲一個列表,返回成功匹配的列表,最多分割max次(默認全部) sub(pattern,repl,string,max=0) # 把字符串string中全部匹配正則表達式pattern的地方替換成字符串repl,若是max的值沒有給出,則對全部匹配的地方進行替換(subn()會返回一個表示替換次數的數值) group(num=0) # 返回所有匹配對象(或指定編號是num的子組) groups() # 返回一個包含所有匹配的子組的元組(若是沒匹配成功,返回一個空元組) 例子 re.findall(r'a[be]c','123abc456eaec789') # 返回匹配對象列表 ['abc', 'aec'] re.match("^(1|2) *(.*) *abc$", str).group(2) # 取第二個標籤 re.match("^(1|2) *(.*) *abc$", str).groups() # 取全部標籤 re.sub('[abc]','A','alex') # 替換 for i in re.finditer(r'\d+',s): # 迭代 print i.group(),i.span() # 搜索網頁中UNICODE格式的中文 QueryAdd='http://www.anti-spam.org.cn/Rbl/Query/Result' Ip='222.129.184.52' s = requests.post(url=QueryAdd, data={'IP':Ip}) re.findall(u'\u4e2d\u56fd', s.text, re.S) 多線程 thread start_new_thread(function,args kwargs=None) # 產生一個新的線程 allocate_lock() # 分配一個LockType類型的鎖對象 exit() # 讓線程退出 acquire(wait=None) # 嘗試獲取鎖對象 locked() # 若是獲取了鎖對象返回True release() # 釋放鎖 thread例子 #!/usr/bin/env python #thread_test.py #不支持守護進程 import thread from time import sleep,ctime loops = [4,2] def loop(nloop,nsec,lock): print 'start loop %s at:%s' % (nloop,ctime()) sleep(nsec) print 'loop %s done at: %s' % (nloop, ctime()) lock.release() # 分配已得到的鎖,操做結束後釋放相應的鎖通知主線程 def main(): print 'starting at:',ctime() locks = [] nloops = range(len(loops)) for i in nloops: lock = thread.allocate_lock() # 建立一個鎖 lock.acquire() # 調用各個鎖的acquire()函數得到鎖 locks.append(lock) # 把鎖放到鎖列表locks中 for i in nloops: thread.start_new_thread(loop,(i,loops[i],locks[i])) # 建立線程 for i in nloops: while locks[i].locked():pass # 等待所有解鎖才繼續運行 print 'all DONE at:',ctime() if __name__ == '__main__': main() threading Thread # 表示一個線程的執行的對象 start() # 開始線程的執行 run() # 定義線程的功能的函數(通常會被子類重寫) join(timeout=None) # 容許主線程等待線程結束,程序掛起,直到線程結束;若是給了timeout,則最多等待timeout秒. getName() # 返回線程的名字 setName(name) # 設置線程的名字 isAlive() # 布爾標誌,表示這個線程是否還在運行中 isDaemon() # 返回線程的daemon標誌 setDaemon(daemonic) # 後臺線程,把線程的daemon標誌設置爲daemonic(必定要在調用start()函數前調用) # 默認主線程在退出時會等待全部子線程的結束。若是但願主線程不等待子線程,而是在退出時自動結束全部的子線程,就須要設置子線程爲後臺線程(daemon) Lock # 鎖原語對象 Rlock # 可重入鎖對象.使單線程能夠在此得到已得到了的鎖(遞歸鎖定) Condition # 條件變量對象能讓一個線程停下來,等待其餘線程知足了某個條件.如狀態改變或值的改變 Event # 通用的條件變量.多個線程能夠等待某個事件的發生,在事件發生後,全部的線程都會被激活 Semaphore # 爲等待鎖的線程提供一個相似等候室的結構 BoundedSemaphore # 與Semaphore相似,只是不容許超過初始值 Time # 與Thread類似,只是他要等待一段時間後纔開始運行 activeCount() # 當前活動的線程對象的數量 currentThread() # 返回當前線程對象 enumerate() # 返回當前活動線程的列表 settrace(func) # 爲全部線程設置一個跟蹤函數 setprofile(func) # 爲全部線程設置一個profile函數 threading例子1 #!/usr/bin/env python #encoding:utf8 import threading from Queue import Queue from time import sleep,ctime class ThreadFunc(object): def __init__(self,func,args,name=''): self.name=name self.func=func # loop self.args=args # (i,iplist[i],queue) def __call__(self): apply(self.func,self.args) # 函數apply() 執行loop函數並傳遞元組參數 def loop(nloop,ip,queue): print 'start',nloop,'at:',ctime() queue.put(ip) sleep(2) print 'loop',nloop,'done at:',ctime() if __name__ == '__main__': threads = [] queue = Queue() iplist = ['192.168.1.2','192.168.1.3','192.168.1.4','192.168.1.5','192.168.1.6','192.168.1.7','192.168.1.8'] nloops = range(len(iplist)) for i in nloops: t = threading.Thread(target=ThreadFunc(loop,(i,iplist[i],queue),loop.__name__)) threads.append(t) for i in nloops: threads[i].start() for i in nloops: threads[i].join() for i in nloops: print queue.get() threading例子2 #!/usr/bin/env python #encoding:utf8 from Queue import Queue import random,time,threading class Producer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): for i in range(5): print "%s: %s is producing %d to the queue!\n" %(time.ctime(), self.getName(), i) self.data.put(i) self.data.put(i*i) time.sleep(2) print "%s: %s finished!" %(time.ctime(), self.getName()) class Consumer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): for i in range(10): val = self.data.get() print "%s: %s is consuming. %d in the queue is consumed!\n" %(time.ctime(), self.getName(), val) print "%s: %s finished!" %(time.ctime(), self.getName()) if __name__ == '__main__': queue = Queue() producer = Producer('Pro.', queue) consumer = Consumer('Con.', queue) producer.start() consumer.start() producer.join() consumer.join() 後臺線程 import threading import time,random class MyThread(threading.Thread): def run(self): wait_time=random.randrange(1,10) print "%s will wait %d seconds" % (self.name, wait_time) time.sleep(wait_time) print "%s finished!" % self.name if __name__=="__main__": for i in range(5): t = MyThread() t.setDaemon(True) # 設置爲後臺線程,主線程完成時不等待子線程完成就結束 t.start() threading控制最大併發_查詢日誌中IP信息 #!/usr/bin/env python #coding:utf-8 import urllib2 import json import threading import time ''' by:某大牛 QQ:185635687 這個是多線程併發控制. 若是要改爲多進程,只需把threading 換成 mulitprocessing.Process , 對, 就是換個名字而已. ''' #獲取ip 及其出現次數 def ip_dic(file_obj, dic): for i in file_obj: if i: ip=i.split('-')[0].strip() if ip in dic.keys(): dic[ip]=dic[ip] + 1 else: dic[ip]=1 return dic.iteritems() #目標函數 def get_data(url, ipcounts): data=urllib2.urlopen(url).read() datadict=json.loads(data) fdata = u"ip:%s---%s,%s,%s,%s,%s" %(datadict["data"]["ip"],ipcounts,datadict["data"]["country"],datadict["data"]["region"],datadict["data"]["city"],datadict["data"]["isp"]) print fdata #多線程 def threads(iters): thread_pool = [] for k in iters: url = "http://ip.taobao.com/service/getIpInfo.php?ip=" ipcounts = k[1] url = (url + k[0]).strip() t = threading.Thread(target=get_data, args=(url, ipcounts)) thread_pool.append(t) return thread_pool #控制多線程 def startt(t_list, max,second): l = len(t_list) n = max while l > 0: if l > max: nl = t_list[:max] t_list = t_list[max:] for t in nl: t.start() time.sleep(second) for t in nl: t.join() print '*'*15, str(n)+ ' ip has been queried'+'*'*15 n += max l = len(t_list) continue elif l <= max: nl = t_list for t in nl: t.start() for t in nl: t.join() print '>>> Totally ' + str(n+l ) + ' ip has been queried' l = 0 if __name__ =="__main__": dic={} with open('access.log') as file_obj: it = ip_dic(file_obj, dic) t_list= threads(it) startt(t_list, 15, 1) Queue通用隊列 q=Queue(size) # 建立大小size的Queue對象 qsize() # 返回隊列的大小(返回時候,可能被其餘進程修改,近似值) empty() # 若是隊列爲空返回True,不然Fales full() # 若是隊列已滿返回True,不然Fales put(item,block0) # 把item放到隊列中,若是給了block(不爲0),函數會一直阻塞到隊列中有空間爲止 get(block=0) # 從隊列中取一個對象,若是給了block(不爲0),函數會一直阻塞到隊列中有對象爲止 get_nowait # 默認get阻塞,這個不阻塞 multiprocessing 多進程併發 #!/usr/bin/env python #encoding:utf8 from multiprocessing import Process import time,os def f(name): time.sleep(1) print 'hello ',name print os.getppid() # 取得父進程ID print os.getpid() # 取得進程ID process_list = [] for i in range(10): p = Process(target=f,args=(i,)) p.start() process_list.append(p) for j in process_list: j.join() Queue進程間通訊 from multiprocessing import Process,Queue import time def f(name): time.sleep(1) q.put(['hello'+str(name)]) process_list = [] q = Queue() if __name__ == '__main__': for i in range(10): p = Process(target=f,args=(i,)) p.start() process_list.append(p) for j in process_list: j.join() for i in range(10): print q.get() Pipe管道 from multiprocessing import Process,Pipe import time import os def f(conn,name): time.sleep(1) conn.send(['hello'+str(name)]) print os.getppid(),'-----------',os.getpid() process_list = [] parent_conn,child_conn = Pipe() if __name__ == '__main__': for i in range(10): p = Process(target=f,args=(child_conn,i)) p.start() process_list.append(p) for j in process_list: j.join() for p in range(10): print parent_conn.recv() 進程間同步 #加鎖,使某一時刻只有一個進程 print from multiprocessing import Process,Lock import time import os def f(name): lock.acquire() time.sleep(1) print 'hello--'+str(name) print os.getppid(),'-----------',os.getpid() lock.release() process_list = [] lock = Lock() if __name__ == '__main__': for i in range(10): p = Process(target=f,args=(i,)) p.start() process_list.append(p) for j in process_list: j.join() 共享內存 # 經過使用Value或者Array把數據存儲在一個共享的內存表中 # 'd'和'i'參數是num和arr用來設置類型,d表示一個雙精浮點類型,i表示一個帶符號的整型。 from multiprocessing import Process,Value,Array import time import os def f(n,a,name): time.sleep(1) n.value = name * name for i in range(len(a)): a[i] = -i process_list = [] if __name__ == '__main__': num = Value('d',0.0) arr = Array('i',range(10)) for i in range(10): p = Process(target=f,args=(num,arr,i)) p.start() process_list.append(p) for j in process_list: j.join() print num.value print arr[:] manager # 比共享內存靈活,但緩慢 # 支持list,dict,Namespace,Lock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value,Array from multiprocessing import Process,Manager import time import os def f(d,name): time.sleep(1) d[name] = name * name print d process_list = [] if __name__ == '__main__': manager = Manager() d = manager.dict() for i in range(10): p = Process(target=f,args=(d,i)) p.start() process_list.append(p) for j in process_list: j.join() print d 最大併發數 import multiprocessing import time,os result = [] def run(h): print 'threading:' ,h,os.getpid() p = multiprocessing.Pool(processes=20) for i in range(100): result.append(p.apply_async(run,(i,))) p.close() for res in result: res.get(timeout=5) socket通信 from socket import * # 避免 socket.socket() s.bind() # 綁定地址到套接字 s.listen() # 開始TCP監聽 s.accept() # 被動接受TCP客戶端鏈接,等待鏈接的到來 s.connect() # 主動初始化TCP服務器鏈接 s.connect_ex() # connect()函數的擴展版本,出錯時返回出錯碼,而不是跑出異常 s.recv() # 接收TCP數據 s.send() # 發送TCP數據 s.sendall() # 完整發送TCP數據 s.recvfrom() # 接收UDP數據 s.sendto() # 發送UDP數據 s.getpeername() # 鏈接到當前套接字的遠端的地址(TCP鏈接) s.getsockname() # 當前套接字的地址 s.getsockopt() # 返回指定套接字的參數 s.setsockopt() # 設置指定套接字的參數 s.close() # 關閉套接字 s.setblocking() # 設置套接字的阻塞與非阻塞模式 s.settimeout() # 設置阻塞套接字操做的超時時間 s.gettimeout() # 獲得阻塞套接字操做的超時時間 s.filen0() # 套接字的文件描述符 s.makefile() # 建立一個與該套接字關聯的文件對象 socket.AF_UNIX # 只可以用於單一的Unix系統進程間通訊 socket.AF_INET # 服務器之間網絡通訊 socket.AF_INET6 # IPv6 socket.SOCK_STREAM # 流式socket , for TCP socket.SOCK_DGRAM # 數據報式socket , for UDP socket.SOCK_RAW # 原始套接字,普通的套接字沒法處理ICMP、IGMP等網絡報文,而SOCK_RAW能夠;其次,SOCK_RAW也能夠處理特殊的IPv4報文;此外,利用原始套接字,能夠經過IP_HDRINCL套接字選項由用戶構造IP頭。 socket.SOCK_RDM # 是一種可靠的UDP形式,即保證交付數據報但不保證順序。SOCK_RAM用來提供對原始協議的低級訪問,在須要執行某些特殊操做時使用,如發送ICMP報文。SOCK_RAM一般僅限於高級用戶或管理員運行的程序使用。 socket.SOCK_SEQPACKET # 可靠的連續數據包服務 SocketServer #!/usr/bin/python #server.py import SocketServer import os class MyTCP(SocketServer.BaseRequestHandler): def handle(self): while True: self.data=self.request.recv(1024).strip() if self.data == 'quit' or not self.data:break cmd=os.popen(self.data).read() if cmd == '':cmd= self.data + ': Command not found' self.request.sendall(cmd) if __name__ == '__main__': HOST,PORT = '10.0.0.119',50007 server = SocketServer.ThreadingTCPServer((HOST,PORT),MyTCP) server.serve_forever() SocketClient #!/usr/bin/python #client.py import socket HOST='10.0.0.119' PORT=50007 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((HOST,PORT)) while True: while True: cmd=raw_input('CMD:').strip() if cmd != '':break s.sendall(cmd) data=s.recv(1024).split('\n') print 'cmd:' for line in data:print line s.close() ftp ftpserver #!/usr/bin/python #ftpserver.py import SocketServer import os import cPickle import md5 from time import sleep def filer(file1): try: f = file(file1,'rb') return cPickle.load(f) except IOError: return {} except EOFError: return {} f.close() def filew(file1,content): f = file(file1,'wb') cPickle.dump(content,f) f.close() class MyTCP(SocketServer.BaseRequestHandler): def handle(self): i = 0 while i<3: user=self.request.recv(1024).strip() userinfo=filer('user.pkl') if userinfo.has_key(user.split()[0]): if md5.new(user.split()[1]).hexdigest() == userinfo[user.split()[0]]: results='login successful' self.request.sendall(results) login='successful' break else: i = i + 1 results='Error:password not correct' self.request.sendall(results) continue else: i = i + 1 results='Error:password not correct' self.request.sendall(results) continue break else: results = 'Error:Wrong password too many times' self.request.sendall(results) login='failure' home_path = os.popen('pwd').read().strip() + '/' + user.split()[0] current_path = '/' print home_path while True: if login == 'failure': break print 'home_path:%s=current_path:%s' %(home_path,current_path) cmd=self.request.recv(1024).strip() print cmd if cmd == 'quit': break elif cmd == 'dir': list=os.listdir('%s%s' %(home_path,current_path)) if list: dirlist,filelist = '','' for i in list: if os.path.isdir('%s%s%s' %(home_path,current_path,i)): dirlist = dirlist + '\033[32m' + i + '\033[m\t' else: filelist = filelist + i + '\t' results = dirlist + filelist else: results = '\033[31mnot find\033[m' self.request.sendall(results) elif cmd == 'pdir': self.request.sendall(current_path) elif cmd.split()[0] == 'mdir': if cmd.split()[1].isalnum(): tmppath='%s%s%s' %(home_path,current_path,cmd.split()[1]) os.makedirs(tmppath) self.request.sendall('\033[32mcreating successful\033[m') else: self.request.sendall('\033[31mcreate failure\033[m') elif cmd.split()[0] == 'cdir': if cmd.split()[1] == '/': tmppath='%s%s' %(home_path,cmd.split()[1]) if os.path.isdir(tmppath): current_path = cmd.split()[1] self.request.sendall(current_path) else: self.request.sendall('\033[31mnot_directory\033[m') elif cmd.split()[1].startswith('/'): tmppath='%s%s' %(home_path,cmd.split()[1]) if os.path.isdir(tmppath): current_path = cmd.split()[1] + '/' self.request.sendall(current_path) else: self.request.sendall('\033[31mnot_directory\033[m') else: tmppath='%s%s%s' %(home_path,current_path,cmd.split()[1]) if os.path.isdir(tmppath): current_path = current_path + cmd.split()[1] + '/' self.request.sendall(current_path) else: self.request.sendall('\033[31mnot_directory\033[m') elif cmd.split()[0] == 'get': if os.path.isfile('%s%s%s' %(home_path,current_path,cmd.split()[1])): f = file('%s%s%s' %(home_path,current_path,cmd.split()[1]),'rb') self.request.sendall('ready_file') sleep(0.5) self.request.send(f.read()) f.close() sleep(0.5) elif os.path.isdir('%s%s%s' %(home_path,current_path,cmd.split()[1])): self.request.sendall('ready_dir') sleep(0.5) for dirpath in os.walk('%s%s%s' %(home_path,current_path,cmd.split()[1])): dir=dirpath[0].replace('%s%s' %(home_path,current_path),'',1) self.request.sendall(dir) sleep(0.5) for filename in dirpath[2]: self.request.sendall(filename) sleep(0.5) f = file('%s/%s' %(dirpath[0],filename),'rb') self.request.send(f.read()) f.close() sleep(0.5) self.request.sendall('file_get_done') sleep(0.5) else: self.request.sendall('dir_get_done') sleep(0.5) else: self.request.sendall('get_failure') continue self.request.sendall('get_done') elif cmd.split()[0] == 'send': if os.path.exists('%s%s%s' %(home_path,current_path,cmd.split()[1])): self.request.sendall('existing') action=self.request.recv(1024) if action == 'cancel': continue self.request.sendall('ready') msg=self.request.recv(1024) if msg == 'ready_file': f = file('%s%s%s' %(home_path,current_path,cmd.split()[1]),'wb') while True: data=self.request.recv(1024) if data == 'file_send_done':break f.write(data) f.close() elif msg == 'ready_dir': os.system('mkdir -p %s%s%s' %(home_path,current_path,cmd.split()[1])) while True: dir=self.request.recv(1024) if dir == 'get_done':break os.system('mkdir -p %s%s%s' %(home_path,current_path,dir)) while True: filename=self.request.recv(1024) if filename == 'dir_send_done':break f = file('%s%s%s/%s' %(home_path,current_path,dir,filename),'wb') while True: data=self.request.recv(1024) if data == 'file_send_done':break f.write(data) f.close() self.request.sendall('%s/%s\t\033[32mfile_done\033[m' %(dir,filename)) self.request.sendall('%s\t\033[32mdir_done\033[m' %(dir)) elif msg == 'unknown_file': continue else: results = cmd.split()[0] + ': Command not found' self.request.sendall(results) if __name__ == '__main__': HOST,PORT = '10.152.14.85',50007 server = SocketServer.ThreadingTCPServer((HOST,PORT),MyTCP) server.serve_forever() ftpmanage #!/usr/bin/python #manage_ftp.py import cPickle import sys import md5 import os import getpass def filer(file1): try: f = file(file1,'rb') return cPickle.load(f) except IOError: return {} except EOFError: return {} f.close() def filew(file1,content): f = file(file1,'wb') cPickle.dump(content,f) f.close() while True: print ''' 1.add user 2.del user 3.change password 4.query user 0.exit ''' i = raw_input(':').strip() userinfo=filer('user.pkl') if i == '': continue elif i == '1': while True: user=raw_input('user name:').strip() if user.isalnum(): i = 0 while i<3: passwd=getpass.getpass('passwd:').strip() if passwd == '': continue else: passwd1=getpass.getpass('Confirm password:').strip() if passwd == passwd1: mpasswd = md5.new(passwd).hexdigest() userinfo[user] = mpasswd os.system('mkdir -p %s' %user) print '%s creating successful ' %user break else: print "Passwords don't match " i = i + 1 continue else: print 'Too many wrong' continue break else: print 'user not legal' continue elif i == '2': user=raw_input('user name:').strip() if userinfo.has_key(user): del userinfo[user] print 'Delete users successfully' else: print 'user not exist' continue elif i == '3': user=raw_input('user name:').strip() if userinfo.has_key(user): i = 0 while i<3: passwd=getpass.getpass('passwd:').strip() if passwd == '': continue else: passwd1=getpass.getpass('Confirm password:').strip() if passwd == passwd1: mpasswd = md5.new(passwd).hexdigest() userinfo[user] = mpasswd print '%s password is changed' %user break else: print "Passwords don't match " i = i + 1 continue else: print 'Too many wrong' continue else: print 'user not exist' continue elif i == '4': print userinfo.keys() elif i == '0': sys.exit() else: print 'select error' continue filew('user.pkl',content=userinfo) ftpclient #!/usr/bin/python #ftpclient.py import socket import os import getpass from time import sleep HOST='10.152.14.85' PORT=50007 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((HOST,PORT)) while True: user = raw_input('user:').strip() if user.isalnum(): while True: passwd = getpass.getpass('passwd:').strip() s.sendall(user + ' ' + passwd) servercmd=s.recv(1024) if servercmd == 'login successful': print '\033[32m%s\033[m' %servercmd break else: print servercmd while True: cmd=raw_input('FTP>').strip() if cmd == '': continue if cmd.split()[0] == 'get': if cmd == 'get':continue for i in cmd.split()[1:]: if os.path.exists(i): confirm = raw_input("\033[31mPlease confirm whether the cover %s(Y/N):\033[m" %(i)).upper().startswith('Y') if not confirm: print '%s cancel' %i continue s.sendall('get ' + i) servercmd=s.recv(1024) if servercmd == 'inexistence': print '%s \t\033[32minexistence\033[m' %i continue elif servercmd == 'ready_file': f = file(i,'wb') while True: data=s.recv(1024) if data == 'get_done':break f.write(data) f.close() print '%s \t\033[32mfile_done\033[m' %(i) elif servercmd == 'ready_dir': try: os.makedirs(i) except: pass while True: serverdir=s.recv(1024) if serverdir == 'get_done':break os.system('mkdir -p %s' %serverdir) print '%s \t\033[32mdir_done\033[m' %(serverdir) while True: serverfile=s.recv(1024) if serverfile == 'dir_get_done':break f = file('%s/%s' %(serverdir,serverfile),'wb') while True: data=s.recv(1024) if data == 'file_get_done':break f.write(data) f.close() print '%s/%s \t\033[32mfile_done\033[m' %(serverdir,serverfile) elif cmd.split()[0] == 'send': if cmd == 'send':continue for i in cmd.split()[1:]: if not os.path.exists(i): print '%s\t\033[31minexistence\033[m' %i continue s.sendall('send ' + i) servercmd=s.recv(1024) if servercmd == 'existing': confirm = raw_input("\033[31mPlease confirm whether the cover %s(Y/N):\033[m" %(i)).upper().startswith('Y') if confirm: s.sendall('cover') servercmd=s.recv(1024) else: s.sendall('cancel') print '%s\tcancel' %i continue if os.path.isfile(i): s.sendall('ready_file') sleep(0.5) f = file(i,'rb') s.send(f.read()) sleep(0.5) s.sendall('file_send_done') print '%s\t\033[32mfile done\033[m' %(cmd.split()[1]) f.close() elif os.path.isdir(i): s.sendall('ready_dir') sleep(0.5) for dirpath in os.walk(i): dir=dirpath[0].replace('%s/' %os.popen('pwd').read().strip(),'',1) s.sendall(dir) sleep(0.5) for filename in dirpath[2]: s.sendall(filename) sleep(0.5) f = file('%s/%s' %(dirpath[0],filename),'rb') s.send(f.read()) f.close() sleep(0.5) s.sendall('file_send_done') msg=s.recv(1024) print msg else: s.sendall('dir_send_done') msg=s.recv(1024) print msg else: s.sendall('unknown_file') print '%s\t\033[31munknown type\033[m' %i continue sleep(0.5) s.sendall('get_done') elif cmd.split()[0] == 'cdir': if cmd == 'cdir':continue s.sendall(cmd) data=s.recv(1024) print data continue elif cmd == 'ls': list=os.popen(cmd).read().strip().split('\n') if list: dirlist,filelist = '','' for i in list: if os.path.isdir(i): dirlist = dirlist + '\033[32m' + i + '\033[m\t' else: filelist = filelist + i + '\t' results = dirlist + filelist else: results = '\033[31mnot find\033[m' print results continue elif cmd == 'pwd': os.system(cmd) elif cmd.split()[0] == 'cd': try: os.chdir(cmd.split()[1]) except: print '\033[31mcd failure\033[m' elif cmd == 'dir': s.sendall(cmd) data=s.recv(1024) print data continue elif cmd == 'pdir': s.sendall(cmd) data=s.recv(1024) print data continue elif cmd.split()[0] == 'mdir': if cmd == 'mdir':continue s.sendall(cmd) data=s.recv(1024) print data continue elif cmd.split()[0] == 'help': print ''' get [file] [dir] send [file] [dir] dir mdir cdir pdir pwd md cd ls help quit ''' continue elif cmd == 'quit': break else: print '\033[31m%s: Command not found,Please see the "help"\033[m' %cmd else: continue break s.close() 掃描主機開放端口 #!/usr/bin/env python import socket def check_server(address,port): s=socket.socket() try: s.connect((address,port)) return True except socket.error,e: return False if __name__=='__main__': from optparse import OptionParser parser=OptionParser() parser.add_option("-a","--address",dest="address",default='localhost',help="Address for server",metavar="ADDRESS") parser.add_option("-s","--start",dest="start_port",type="int",default=1,help="start port",metavar="SPORT") parser.add_option("-e","--end",dest="end_port",type="int",default=1,help="end port",metavar="EPORT") (options,args)=parser.parse_args() print 'options: %s, args: %s' % (options, args) port=options.start_port while(port<=options.end_port): check = check_server(options.address, port) if (check): print 'Port %s is on' % port port=port+1 mysql #apt-get install mysql-server #apt-get install python-MySQLdb help(MySQLdb.connections.Connection) # 查看連接參數 conn=MySQLdb.connect(host='localhost',user='root',passwd='123456',db='fortress',port=3306) # 定義鏈接 #conn=MySQLdb.connect(unix_socket='/var/run/mysqld/mysqld.sock',user='root',passwd='123456') # 使用socket文件連接 cur=conn.cursor() # 定義遊標 conn.select_db('fortress') # 選擇數據庫 sqlcmd = 'insert into user(name,age) value(%s,%s)' # 定義sql命令 cur.executemany(sqlcmd,[('aa',1),('bb',2),('cc',3)]) # 插入多條值 cur.execute('delete from user where id=20') # 刪除一條記錄 cur.execute("update user set name='a' where id=20") # 更細數據 sqlresult = cur.fetchall() # 接收所有返回結果 conn.commit() # 提交 cur.close() # 關閉遊標 conn.close() # 關閉鏈接 import MySQLdb def mydb(dbcmdlist): try: conn=MySQLdb.connect(host='localhost',user='root',passwd='123456',db='fortress',port=3306) cur=conn.cursor() cur.execute('create database if not exists fortress;') # 建立數據庫 conn.select_db('fortress') # 選擇數據庫 cur.execute('drop table if exists log;') # 刪除表 cur.execute('CREATE TABLE log ( id BIGINT(20) NOT NULL AUTO_INCREMENT, loginuser VARCHAR(50) DEFAULT NULL, remoteip VARCHAR(50) DEFAULT NULL, PRIMARY KEY (id) );') # 建立表 result=[] for dbcmd in dbcmdlist: cur.execute(dbcmd) # 執行sql sqlresult = cur.fetchall() # 接收所有返回結果 result.append(sqlresult) conn.commit() # 提交 cur.close() conn.close() return result except MySQLdb.Error,e: print 'mysql error msg: ',e sqlcmd=[] sqlcmd.append("insert into log (loginuser,remoteip)values('%s','%s');" %(loginuser,remoteip)) mydb(sqlcmd) sqlcmd=[] sqlcmd.append("select * from log;") result = mydb(sqlcmd) for i in result[0]: print i paramiko 安裝 sudo apt-get install python-setuptools easy_install sudo apt-get install python-all-dev sudo apt-get install build-essential paramiko實例(帳號密碼登陸執行命令) #!/usr/bin/python #ssh import paramiko import sys,os host = '10.152.15.200' user = 'peterli' password = '123456' s = paramiko.SSHClient() # 綁定實例 s.load_system_host_keys() # 加載本地HOST主機文件 s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 容許鏈接不在know_hosts文件中的主機 s.connect(host,22,user,password,timeout=5) # 鏈接遠程主機 while True: cmd=raw_input('cmd:') stdin,stdout,stderr = s.exec_command(cmd) # 執行命令 cmd_result = stdout.read(),stderr.read() # 讀取命令結果 for line in cmd_result: print line, s.close() paramiko實例(傳送文件) #!/usr/bin/evn python import os import paramiko host='127.0.0.1' port=22 username = 'peterli' password = '123456' ssh=paramiko.Transport((host,port)) privatekeyfile = os.path.expanduser('~/.ssh/id_rsa') mykey = paramiko.RSAKey.from_private_key_file( os.path.expanduser('~/.ssh/id_rsa')) # 加載key 不使用key可不加 ssh.connect(username=username,password=password) # 鏈接遠程主機 # 使用key把 password=password 換成 pkey=mykey sftp=paramiko.SFTPClient.from_transport(ssh) # SFTP使用Transport通道 sftp.get('/etc/passwd','pwd1') # 下載 兩端都要指定文件名 sftp.put('pwd','/tmp/pwd') # 上傳 sftp.close() ssh.close() paramiko實例(密鑰執行命令) #!/usr/bin/python #ssh import paramiko import sys,os host = '10.152.15.123' user = 'peterli' s = paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) privatekeyfile = os.path.expanduser('~/.ssh/id_rsa') # 定義key路徑 mykey = paramiko.RSAKey.from_private_key_file(privatekeyfile) # mykey=paramiko.DSSKey.from_private_key_file(privatekeyfile,password='061128') # DSSKey方式 password是key的密碼 s.connect(host,22,user,pkey=mykey,timeout=5) cmd=raw_input('cmd:') stdin,stdout,stderr = s.exec_command(cmd) cmd_result = stdout.read(),stderr.read() for line in cmd_result: print line, s.close() ssh併發(Pool控制最大併發) #!/usr/bin/env python #encoding:utf8 #ssh_concurrent.py import multiprocessing import sys,os,time import paramiko def ssh_cmd(host,port,user,passwd,cmd): msg = "-----------Result:%s----------" % host s = paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: s.connect(host,22,user,passwd,timeout=5) stdin,stdout,stderr = s.exec_command(cmd) cmd_result = stdout.read(),stderr.read() print msg for line in cmd_result: print line, s.close() except paramiko.AuthenticationException: print msg print 'AuthenticationException Failed' except paramiko.BadHostKeyException: print msg print "Bad host key" result = [] p = multiprocessing.Pool(processes=20) cmd=raw_input('CMD:') f=open('serverlist.conf') list = f.readlines() f.close() for IP in list: print IP host=IP.split()[0] port=int(IP.split()[1]) user=IP.split()[2] passwd=IP.split()[3] result.append(p.apply_async(ssh_cmd,(host,port,user,passwd,cmd))) p.close() for res in result: res.get(timeout=35) ssh併發(取文件狀態併發送郵件) #!/usr/bin/python #encoding:utf8 #config file: ip.list import paramiko import multiprocessing import smtplib import sys,os,time,datetime,socket,re from email.mime.text import MIMEText # 配置文件(IP列表) Conf = 'ip.list' user_name = 'peterli' user_pwd = 'passwd' port = 22 PATH = '/home/peterli/' # 設置服務器名稱、用戶名、密碼以及郵件後綴 mail_host = "smtp.163.com" mail_user = "xuesong" mail_pass = "mailpasswd" mail_postfix = "163.com" mailto_list = ["272121935@qq.com","quanzhou722@163.com"] title = 'file check' DATE1=(datetime.datetime.now() + datetime.timedelta(days=-1) ).strftime('%Y%m%d') file_path = '%s%s' %(PATH,DATE1) def Ssh_Cmd(file_path,host_ip,user_name,user_pwd,port=22): s = paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: s.connect(hostname=host_ip,port=port,username=user_name,password=user_pwd) stdin,stdout,stderr = s.exec_command('stat %s' %file_path) stat_result = '%s%s' %(stdout.read(),stderr.read()) if stat_result.find('No such file or directory') == -1: file_status = 'OK\t' stdin,stdout,stderr = s.exec_command('du -sh %s' %file_path) cmd1_result = '%s_%s' %(stat_result.split()[32],stat_result.split()[33].split('.')[0]) cmd2_result = ('%s%s' %(stdout.read(),stderr.read())).split()[0] else: file_status = '未生成\t' cmd1_result = 'null' cmd2_result = 'null' q.put(['Login successful']) s.close() except socket.error: file_status = '主機或端口錯誤' cmd1_result = '-' cmd2_result = '-' except paramiko.AuthenticationException: file_status = '用戶或密碼錯誤' cmd1_result = '-' cmd2_result = '-' except paramiko.BadHostKeyException: file_status = 'Bad host key' cmd1_result = '-' cmd2_result = '-' except: file_status = 'ssh異常' cmd1_result = '-' cmd2_result = '-' r.put('%s\t-\t%s\t%s\t%s\t%s\n' %(time.strftime('%Y-%m-%d_%H:%M'),host_ip,file_status,cmd2_result,cmd1_result)) def Concurrent(Conf,file_path,user_name,user_pwd,port): # 執行總計 total = 0 # 讀取配置文件 f=open(Conf) list = f.readlines() f.close() # 併發執行 process_list = [] log_file = file('file_check.log', 'w') log_file.write('檢查時間\t\t業務\tIP\t\t文件狀態\t大小\t生成時間\n') for host_info in list: # 判斷配置文件中註釋行跳過 if host_info.startswith('#'): continue # 取變量,其中任意變量未取到就跳過執行 try: host_ip=host_info.split()[0].strip() #user_name=host_info.split()[1] #user_pwd=host_info.split()[2] except: log_file.write('Profile error: %s\n' %(host_info)) continue #try: # port=int(host_info.split()[3]) #except: # port=22 total +=1 p = multiprocessing.Process(target=Ssh_Cmd,args=(file_path,host_ip,user_name,user_pwd,port)) p.start() process_list.append(p) for j in process_list: j.join() for j in process_list: log_file.write(r.get()) successful = q.qsize() log_file.write('執行完畢。 總執行:%s 登陸成功:%s 登陸失敗:%s\n' %(total,successful,total - successful)) log_file.flush() log_file.close() def send_mail(to_list, sub): me = mail_user + "<"+mail_user+"@"+mail_postfix+">" fp = open('file_check.log') msg = MIMEText(fp.read(),_charset="utf-8") fp.close() msg['Subject'] = sub msg['From'] = me msg['To'] = ";".join(to_list) try: send_smtp = smtplib.SMTP() send_smtp.connect(mail_host) send_smtp.login(mail_user, mail_pass) send_smtp.sendmail(me, to_list, msg.as_string()) send_smtp.close() return True except Exception, e: print str(e)[1] return False if __name__ == '__main__': q = multiprocessing.Queue() r = multiprocessing.Queue() Concurrent(Conf,file_path,user_name,user_pwd,port) if send_mail(mailto_list,title): print "發送成功" else: print "發送失敗" LazyManage併發批量操做(判斷非root交互到root操做) #!/usr/bin/python #encoding:utf8 # LzayManage.py # config file: serverlist.conf import paramiko import multiprocessing import sys,os,time,socket,re def Ssh_Cmd(host_ip,Cmd,user_name,user_pwd,port=22): s = paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) s.connect(hostname=host_ip,port=port,username=user_name,password=user_pwd) stdin,stdout,stderr = s.exec_command(Cmd) Result = '%s%s' %(stdout.read(),stderr.read()) q.put('successful') s.close() return Result.strip() def Ssh_Su_Cmd(host_ip,Cmd,user_name,user_pwd,root_name,root_pwd,port=22): s = paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) s.connect(hostname=host_ip,port=port,username=user_name,password=user_pwd) ssh = s.invoke_shell() time.sleep(0.1) ssh.send('su - %s\n' %(root_name)) buff = '' while not buff.endswith('Password: '): resp = ssh.recv(9999) buff +=resp ssh.send('%s\n' %(root_pwd)) buff = '' while True: resp = ssh.recv(9999) buff +=resp if ': incorrect password' in buff: su_correct='passwd_error' break elif buff.endswith('# '): su_correct='passwd_correct' break if su_correct == 'passwd_correct': ssh.send('%s\n' %(Cmd)) buff = '' while True: resp = ssh.recv(9999) if resp.endswith('# '): buff +=re.sub('\[.*@.*\]# $','',resp) break buff +=resp Result = buff.lstrip('%s' %(Cmd)) q.put('successful') elif su_correct == 'passwd_error': Result = "\033[31mroot密碼錯誤\033[m" s.close() return Result.strip() def Send_File(host_ip,PathList,user_name,user_pwd,Remote='/tmp',port=22): s=paramiko.Transport((host_ip,port)) s.connect(username=user_name,password=user_pwd) sftp=paramiko.SFTPClient.from_transport(s) for InputPath in PathList: LocalPath = re.sub('^\./','',InputPath.rstrip('/')) RemotePath = '%s/%s' %( Remote , os.path.basename( LocalPath )) try: sftp.rmdir(RemotePath) except: pass try: sftp.remove(RemotePath) except: pass if os.path.isdir(LocalPath): sftp.mkdir(RemotePath) for path,dirs,files in os.walk(LocalPath): for dir in dirs: dir_path = os.path.join(path,dir) sftp.mkdir('%s/%s' %(RemotePath,re.sub('^%s/' %LocalPath,'',dir_path))) for file in files: file_path = os.path.join(path,file) sftp.put( file_path,'%s/%s' %(RemotePath,re.sub('^%s/' %LocalPath,'',file_path))) else: sftp.put(LocalPath,RemotePath) q.put('successful') sftp.close() s.close() Result = '%s \033[32m傳送完成\033[m' % PathList return Result def Ssh(host_ip,Operation,user_name,user_pwd,root_name,root_pwd,Cmd=None,PathList=None,port=22): msg = "\033[32m-----------Result:%s----------\033[m" % host_ip try: if Operation == 'Ssh_Cmd': Result = Ssh_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,port=port) elif Operation == 'Ssh_Su_Cmd': Result = Ssh_Su_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,port=port) elif Operation == 'Ssh_Script': Send_File(host_ip=host_ip,PathList=PathList,user_name=user_name,user_pwd=user_pwd,port=port) Script_Head = open(PathList[0]).readline().strip() LocalPath = re.sub('^\./','',PathList[0].rstrip('/')) Cmd = '%s /tmp/%s' %( re.sub('^#!','',Script_Head), os.path.basename( LocalPath )) Result = Ssh_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,port=port) elif Operation == 'Ssh_Su_Script': Send_File(host_ip=host_ip,PathList=PathList,user_name=user_name,user_pwd=user_pwd,port=port) Script_Head = open(PathList[0]).readline().strip() LocalPath = re.sub('^\./','',PathList[0].rstrip('/')) Cmd = '%s /tmp/%s' %( re.sub('^#!','',Script_Head), os.path.basename( LocalPath )) Result = Ssh_Su_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,port=port) elif Operation == 'Send_File': Result = Send_File(host_ip=host_ip,PathList=PathList,user_name=user_name,user_pwd=user_pwd,port=port) else: Result = '操做不存在' except socket.error: Result = '\033[31m主機或端口錯誤\033[m' except paramiko.AuthenticationException: Result = '\033[31m用戶名或密碼錯誤\033[m' except paramiko.BadHostKeyException: Result = '\033[31mBad host key\033[m[' except IOError: Result = '\033[31m遠程主機已存在非空目錄或沒有寫權限\033[m' except: Result = '\033[31m未知錯誤\033[m' r.put('%s\n%s\n' %(msg,Result)) def Concurrent(Conf,Operation,user_name,user_pwd,root_name,root_pwd,Cmd=None,PathList=None,port=22): # 讀取配置文件 f=open(Conf) list = f.readlines() f.close() # 執行總計 total = 0 # 併發執行 for host_info in list: # 判斷配置文件中註釋行跳過 if host_info.startswith('#'): continue # 取變量,其中任意變量未取到就跳過執行 try: host_ip=host_info.split()[0] #user_name=host_info.split()[1] #user_pwd=host_info.split()[2] except: print('Profile error: %s' %(host_info) ) continue try: port=int(host_info.split()[3]) except: port=22 total +=1 p = multiprocessing.Process(target=Ssh,args=(host_ip,Operation,user_name,user_pwd,root_name,root_pwd,Cmd,PathList,port)) p.start() # 打印執行結果 for j in range(total): print(r.get() ) if Operation == 'Ssh_Script' or Operation == 'Ssh_Su_Script': successful = q.qsize() / 2 else: successful = q.qsize() print('\033[32m執行完畢[總執行:%s 成功:%s 失敗:%s]\033[m' %(total,successful,total - successful) ) q.close() r.close() def Help(): print(''' 1.執行命令 2.執行腳本 \033[32m[位置1腳本(必須帶腳本頭),後可帶執行腳本所須要的包\文件\文件夾路徑,空格分隔]\033[m 3.發送文件 \033[32m[傳送的包\文件\文件夾路徑,空格分隔]\033[m 退出: 0\exit\quit 幫助: help\h\? 注意: 發送文件默認爲/tmp下,如已存在同名文件會被強制覆蓋,非空目錄則中斷操做.執行腳本先將本地腳本及包發送遠程主機上,發送規則同發送文件 ''') if __name__=='__main__': # 定義root帳號信息 root_name = 'root' root_pwd = 'peterli' user_name='peterli' user_pwd='<++(3Ie' # 配置文件 Conf='serverlist.conf' if not os.path.isfile(Conf): print('\033[33m配置文件 %s 不存在\033[m' %(Conf) ) sys.exit() Help() while True: i = raw_input("\033[35m[請選擇操做]: \033[m").strip() q = multiprocessing.Queue() r = multiprocessing.Queue() if i == '1': if user_name == root_name: Operation = 'Ssh_Cmd' else: Operation = 'Ssh_Su_Cmd' Cmd = raw_input('CMD: ').strip() if len(Cmd) == 0: print('\033[33m命令爲空\033[m') continue Concurrent(Conf=Conf,Operation=Operation,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,Cmd=Cmd) elif i == '2': if user_name == root_name: Operation = 'Ssh_Script' else: Operation = 'Ssh_Su_Script' PathList = raw_input('\033[36m本地腳本路徑: \033[m').strip().split() if len(PathList) == 0: print('\033[33m路徑爲空\033[m') continue if not os.path.isfile(PathList[0]): print('\033[33m本地路徑 %s 不存在或不是文件\033[m' %(PathList[0]) ) continue for LocalPath in PathList[1:]: if not os.path.exists(LocalPath): print('\033[33m本地路徑 %s 不存在\033[m' %(LocalPath) ) break else: Concurrent(Conf=Conf,Operation=Operation,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,PathList=PathList) elif i == '3': Operation = 'Send_File' PathList = raw_input('\033[36m本地路徑: \033[m').strip().split() if len(PathList) == 0: print('\033[33m路徑爲空\033[m') continue for LocalPath in PathList: if not os.path.exists(LocalPath): print('\033[33m本地路徑 %s 不存在\033[m' %(LocalPath) ) break else: Concurrent(Conf=Conf,Operation=Operation,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,PathList=PathList) elif i == '0' or i == 'exit' or i == 'quit': print("\033[34m退出LazyManage腳本\033[m") sys.exit() elif i == 'help' or i == 'h' or i == '?': Help() web頁面操做 下載文件 #!/usr/bin/env python #encoding:utf8 import urllib2 url = 'http://www.01happy.com/wp-content/uploads/2012/09/bg.png' file("./pic/%04d.png" % i, "wb").write(urllib2.urlopen(url).read()) 讀取網頁指定內容 #讀取整個網頁,正則截取匹配信息 #!/usr/bin/env python #encoding=utf-8 import re, urllib,datetime def getPageCode(url, fromCharset, toCharset): fr = urllib.urlopen(url) pageCode = fr.read() fr.close() return pageCode def getImgUrl(pageCode): pattern = re.compile(r'(\d*\-\d*\-\d* \d*:\d*:\d*)') return re.findall(pattern, pageCode) if __name__ == '__main__': f = file('url.conf') c = f.readlines() f.close() for url in c: pageCode = getPageCode(url.rstrip(), 'gb2312', 'utf8') imgUrl = getImgUrl(pageCode) print imgUrl 讀取網頁圖片大小 # 根據http頭,獲得content-type的值 #!/usr/bin/env python #encoding=utf-8 import urllib2 url = 'http://www.01happy.com/wp-content/uploads/2012/09/bg.png' reqst = urllib2.Request(url) opener = urllib2.build_opener() con = opener.open(reqst) Type = con.headers.dict['content-type'][:5] Length = int(con.headers.dict['content-length']) if Length > 0: print(Length) print(Type) 查看網頁圖片尺寸類型 #將圖片讀入內存 #!/usr/bin/env python #encoding=utf-8 import cStringIO, urllib2, Image url = 'http://www.01happy.com/wp-content/uploads/2012/09/bg.png' file = urllib2.urlopen(url) tmpIm = cStringIO.StringIO(file.read()) im = Image.open(tmpIm) print im.format, im.size, im.mode requests讀取網頁信息 #Requests是一個Python的HTTP客戶端庫 #安裝: sudo pip install requests import requests r = requests.get('http://baidu.com') #r = requests.get('https://baidu.com', auth=('user', 'pass')) # https需登陸加auth r.status_code # 狀態碼 r.headers # 網頁頭信息 r.headers['content-type'] # 網頁頭信息 r.headers['content-length'] # 網頁頭信息 r.text # 網頁源碼 r.content # 網頁源碼 爬蟲 #!/usr/bin/env python #encoding:utf-8 #sudo pip install BeautifulSoup import requests from BeautifulSoup import BeautifulSoup import re baseurl = 'http://blog.sina.com.cn/s/articlelist_1191258123_0_1.html' r = requests.get(baseurl) for url in re.findall('<a.*?</a>', r.content, re.S): if url.startswith('<a title='): with open(r'd:/final.txt', 'ab') as f: f.write(url + '\n') linkfile = open(r'd:/final.txt', 'rb') soup = BeautifulSoup(linkfile) for link in soup.findAll('a'): #print link.get('title') + ': ' + link.get('href') ss = requests.get(link.get('href')) for content in re.findall('<div id="sina_keyword_ad_area2" class="articalContent ">.*?</div>', ss.content, re.S): with open(r'd:/myftp/%s.txt'%link.get('title').strip('<>'), 'wb') as f: f.write(content) print '%s has been copied.' % link.get('title') 反垃圾郵件提交申訴 #!/usr/bin/env python #encoding:utf-8 import requests import re IpList=['113.212.91.25','113.212.91.23'] QueryAdd='http://www.anti-spam.org.cn/Rbl/Query/Result' ComplaintAdd='http://www.anti-spam.org.cn/Rbl/Getout/Submit' data = { 'CONTENT':'''咱們是一家正規的XXX。xxxxxxx。懇請將咱們的發送服務器IP移出黑名單。謝謝! 處理措施: 1.XXXX。 2.XXXX。''', 'CORP':'abc.com', 'WWW':'www.abc.cm', 'NAME':'def', 'MAIL':'def@163.com.cn', 'TEL':'010-50000000', 'LEVEL':'0', } for Ip in IpList: query = requests.post(url=QueryAdd, data={'IP':Ip}) # 黑名單查詢 if query.ok: if re.findall(u'\u7533\u8bc9\u8131\u79bb', query.text, re.S): # 查找關鍵字 申訴脫離 既代表在黑名單中 data['IP']=Ip complaint = requests.post(url=ComplaintAdd, data=data) # 提交申訴 if complaint.ok: if re.findall(u'\u60a8\u7684\u9ed1\u540d\u5355\u8131\u79bb\u7533\u8bf7\u5df2\u63d0\u4ea4', complaint.text, re.S): status='申請提交' elif re.findall(u'\u8131\u79bb\u7533\u8bf7\u5df2\u88ab\u4ed6\u4eba\u63d0\u4ea4', complaint.text, re.S): status='重複提交' elif re.findall(u'\u7533\u8bf7\u7531\u4e8e\u8fd1\u671f\u5185\u6709\u88ab\u62d2\u7edd\u7684\u8bb0\u5f55', complaint.text, re.S): status='近期拒絕' else: status='異常' else: status='正常' print '%s %s' %(Ip,status) 發送郵件 發送郵件內容 #!/usr/bin/python #encoding:utf8 # 導入 smtplib 和 MIMEText import smtplib from email.mime.text import MIMEText # 定義發送列表 mailto_list=["272121935@qq.com","272121935@163.com"] # 設置服務器名稱、用戶名、密碼以及郵件後綴 mail_host = "smtp.163.com" mail_user = "mailuser" mail_pass = "password" mail_postfix="163.com" # 發送郵件函數 def send_mail(to_list, sub): me = mail_user + "<"+mail_user+"@"+mail_postfix+">" fp = open('context.txt') msg = MIMEText(fp.read(),_charset="utf-8") fp.close() msg['Subject'] = sub msg['From'] = me msg['To'] = ";".join(to_list) try: send_smtp = smtplib.SMTP() send_smtp.connect(mail_host) send_smtp.login(mail_user, mail_pass) send_smtp.sendmail(me, to_list, msg.as_string()) send_smtp.close() return True except Exception, e: print str(e) return False if send_mail(mailto_list,"標題"): print "測試成功" else: print "測試失敗" 發送附件 #!/usr/bin/python #encoding:utf8 import smtplib from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders def send_mail(to_list, sub, filename): me = mail_user + "<"+mail_user+"@"+mail_postfix+">" msg = MIMEMultipart() msg['Subject'] = sub msg['From'] = me msg['To'] = ";".join(to_list) submsg = MIMEBase('application', 'x-xz') submsg.set_payload(open(filename,'rb').read()) encoders.encode_base64(submsg) submsg.add_header('Content-Disposition', 'attachment', filename=filename) msg.attach(submsg) try: send_smtp = smtplib.SMTP() send_smtp.connect(mail_host) send_smtp.login(mail_user, mail_pass) send_smtp.sendmail(me, to_list, msg.as_string()) send_smtp.close() return True except Exception, e: print str(e)[1] return False # 設置服務器名稱、用戶名、密碼以及郵件後綴 mail_host = "smtp.163.com" mail_user = "xuesong" mail_pass = "mailpasswd" mail_postfix = "163.com" mailto_list = ["272121935@qq.com","quanzhou722@163.com"] title = 'check' filename = 'file_check.html' if send_mail(mailto_list,title,filename): print "發送成功" else: print "發送失敗" 解壓縮 gzip壓縮 import gzip f_in = open('file.log', 'rb') f_out = gzip.open('file.log.gz', 'wb') f_out.writelines(f_in) f_out.close() f_in.close() gzip壓縮1 File = 'xuesong_18.log' g = gzip.GzipFile(filename="", mode='wb', compresslevel=9, fileobj=open((r'%s.gz' %File),'wb')) g.write(open(r'%s' %File).read()) g.close() gzip解壓 g = gzip.GzipFile(mode='rb', fileobj=open((r'xuesong_18.log.gz'),'rb')) open((r'xuesong_18.log'),'wb').write(g.read()) 壓縮tar.gz import os import tarfile tar = tarfile.open("/tmp/tartest.tar.gz","w:gz") # 建立壓縮包名 for path,dir,files in os.walk("/tmp/tartest"): # 遞歸文件目錄 for file in files: fullpath = os.path.join(path,file) tar.add(fullpath) # 建立壓縮包 tar.close() 解壓tar.gz import tarfile tar = tarfile.open("/tmp/tartest.tar.gz") #tar.extract("/tmp") # 所有解壓到指定路徑 names = tar.getnames() # 包內文件名 for name in names: tar.extract(name,path="./") # 解壓指定文件 tar.close() zip壓縮 import zipfile,os f = zipfile.ZipFile('filename.zip', 'w' ,zipfile.ZIP_DEFLATED) # ZIP_STORE 爲默認表不壓縮. ZIP_DEFLATED 表壓縮 #f.write('file1.txt') # 將文件寫入壓縮包 for path,dir,files in os.walk("tartest"): # 遞歸壓縮目錄 for file in files: f.write(os.path.join(path,file)) # 將文件逐個寫入壓縮包 f.close() zip解壓 if zipfile.is_zipfile('filename.zip'): # 判斷一個文件是否是zip文件 f = zipfile.ZipFile('filename.zip') for file in f.namelist(): # 返回文件列表 f.extract(file, r'/tmp/') # 解壓指定文件 #f.extractall() # 解壓所有 f.close() 時間 import time time.strftime('%Y-%m-%d_%X',time.localtime( time.time() ) ) 格式化時間 tomorrow.strftime('%Y%m%d_%H%M') 上一個月最後一天 import datetime lastMonth=datetime.date(datetime.date.today().year,datetime.date.today().month,1)-datetime.timedelta(1) lastMonth.strftime("%Y/%m") 前一天 (datetime.datetime.now() + datetime.timedelta(days=-1) ).strftime('%Y%m%d') 上個月 time.localtime()[1] - 1 時間戳轉換可讀時間 a=time.time() time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(a)) 編碼轉換 a='中文' # 編碼未定義按輸入終端utf8或gbk u=u'中文' # 定義爲unicode編碼 u值爲 u'\u4e2d\u6587' u.encode('utf8') # 轉爲utf8格式 u值爲 '\xe4\xb8\xad\xe6\x96\x87' print u # 結果顯示 中文 print u.encode('utf8') # 轉爲utf8格式,當顯示終端編碼爲utf8 結果顯示 中文 編碼不一致則亂碼 print u.encode('gbk') # 當前終端爲utf8 故亂碼 ord('4') # 字符轉ASCII碼 chr(52) # ASCII碼轉字符 hash import md5 m = md5.new('123456').hexdigest() import hashlib m = hashlib.md5() m.update("Nobody inspects") # 使用update方法對字符串md5加密 m.digest() # 加密後二進制結果 m.hexdigest() # 加密後十進制結果 hashlib.new("md5", "string").hexdigest() # 對字符串加密 hashlib.new("md5", open("file").read()).hexdigest() # 查看文件MD5值 隱藏輸入密碼 import getpass passwd=getpass.getpass() 遍歷遞歸 [os.path.join(x[0],y) for x in os.walk('/root/python/5') for y in x[2]] for i in os.walk('/root/python/5/work/server'): print i Python處理信號 信號的概念 信號(signal): 進程之間通信的方式,是一種軟件中斷。一個進程一旦接收到信號就會打斷原來的程序執行流程來處理信號。 發送信號通常有兩種緣由: 1(被動式) 內核檢測到一個系統事件.例如子進程退出會像父進程發送SIGCHLD信號.鍵盤按下control+c會發送SIGINT信號 2(主動式) 經過系統調用kill來向指定進程發送信號 操做系統規定了進程收到信號之後的默認行爲,能夠經過綁定信號處理函數來修改進程收到信號之後的行爲,有兩個信號是不可更改的 SIGTOP 和 SIGKILL 若是一個進程收到一個SIGUSR1信號,而後執行信號綁定函數,第二個SIGUSR2信號又來了,第一個信號沒有被處理完畢的話,第二個信號就會丟棄。 進程結束信號 SIGTERM 和 SIGKILL 的區別: SIGTERM 比較友好,進程能捕捉這個信號,根據您的須要來關閉程序。在關閉程序以前,您能夠結束打開的記錄文件和完成正在作的任務。在某些狀況下,假如進程正在進行做業並且不能中斷,那麼進程能夠忽略這個SIGTERM信號。 常見信號 kill -l # 查看linux提供的信號 SIGHUP 1 A # 終端掛起或者控制進程終止 SIGINT 2 A # 鍵盤終端進程(如control+c) SIGQUIT 3 C # 鍵盤的退出鍵被按下 SIGILL 4 C # 非法指令 SIGABRT 6 C # 由abort(3)發出的退出指令 SIGFPE 8 C # 浮點異常 SIGKILL 9 AEF # Kill信號 馬上中止 SIGSEGV 11 C # 無效的內存引用 SIGPIPE 13 A # 管道破裂: 寫一個沒有讀端口的管道 SIGALRM 14 A # 鬧鐘信號 由alarm(2)發出的信號 SIGTERM 15 A # 終止信號,可以讓程序安全退出 kill -15 SIGUSR1 30,10,16 A # 用戶自定義信號1 SIGUSR2 31,12,17 A # 用戶自定義信號2 SIGCHLD 20,17,18 B # 子進程結束自動向父進程發送SIGCHLD信號 SIGCONT 19,18,25 # 進程繼續(曾被中止的進程) SIGSTOP 17,19,23 DEF # 終止進程 SIGTSTP 18,20,24 D # 控制終端(tty)上按下中止鍵 SIGTTIN 21,21,26 D # 後臺進程企圖從控制終端讀 SIGTTOU 22,22,27 D # 後臺進程企圖從控制終端寫 缺省處理動做一項中的字母含義以下: A 缺省的動做是終止進程 B 缺省的動做是忽略此信號,將該信號丟棄,不作處理 C 缺省的動做是終止進程並進行內核映像轉儲(dump core),內核映像轉儲是指將進程數據在內存的映像和進程在內核結構中的部份內容以必定格式轉儲到文件系統,而且進程退出執行,這樣作的好處是爲程序員提供了方便,使得他們能夠獲得進程當時執行時的數據值,容許他們肯定轉儲的緣由,而且能夠調試他們的程序。 D 缺省的動做是中止進程,進入中止情況之後還能從新進行下去,通常是在調試的過程當中(例如ptrace系統調用) E 信號不能被捕獲 F 信號不能被忽略 Python提供的信號 import signal dir(signal) ['NSIG', 'SIGABRT', 'SIGALRM', 'SIGBUS', 'SIGCHLD', 'SIGCLD', 'SIGCONT', 'SIGFPE', 'SIGHUP', 'SIGILL', 'SIGINT', 'SIGIO', 'SIGIOT', 'SIGKILL', 'SIGPIPE', 'SIGPOLL', 'SIGPROF', 'SIGPWR', 'SIGQUIT', 'SIGRTMAX', 'SIGRTMIN', 'SIGSEGV', 'SIGSTOP', 'SIGSYS', 'SIGTERM', 'SIGTRAP', 'SIGTSTP', 'SIGTTIN', 'SIGTTOU', 'SIGURG', 'SIGUSR1', 'SIGUSR2', 'SIGVTALRM', 'SIGWINCH', 'SIGXCPU', 'SIGXFSZ', 'SIG_DFL', 'SIG_IGN', '__doc__', '__name__', 'alarm', 'default_int_handler', 'getsignal', 'pause', 'signal'] 綁定信號處理函數 #encoding:utf8 import os,signal from time import sleep def onsignal_term(a,b): print 'SIGTERM' # kill -15 signal.signal(signal.SIGTERM,onsignal_term) # 接收信號,執行相應函數 def onsignal_usr1(a,b): print 'SIGUSR1' # kill -10 signal.signal(signal.SIGUSR1,onsignal_usr1) while 1: print 'ID',os.getpid() sleep(10) 經過另一個進程發送信號 import os,signal os.kill(16175,signal.SIGTERM) # 發送信號,16175是綁定信號處理函數的進程pid,須要自行修改 os.kill(16175,signal.SIGUSR1) 父進程接收子進程結束髮送的SIGCHLD信號 #encoding:utf8 import os,signal from time import sleep def onsigchld(a,b): print '收到子進程結束信號' signal.signal(signal.SIGCHLD,onsigchld) pid = os.fork() # 建立一個子進程,複製父進程全部資源操做 if pid == 0: # 經過判斷子進程os.fork()是否等於0,分別同時執行父進程與子進程操做 print '我是子進程,pid是',os.getpid() sleep(2) else: print '我是父進程,pid是',os.getpid() os.wait() # 等待子進程結束 接收信號的程序,另一端使用多線程向這個進程發送信號,會遺漏一些信號 #encoding:utf8 import os import signal from time import sleep import Queue QCOUNT = Queue.Queue() # 初始化隊列 def onsigchld(a,b): '''收到信號後向隊列中插入一個數字1''' print '收到SIGUSR1信號' sleep(1) QCOUNT.put(1) # 向隊列中寫入 signal.signal(signal.SIGUSR1,onsigchld) # 綁定信號處理函數 while 1: print '個人pid是',os.getpid() print '如今隊列中元素的個數是',QCOUNT.qsize() sleep(2) 多線程發信號端的程序 #encoding:utf8 import threading import os import signal def sendusr1(): print '發送信號' os.kill(17788, signal.SIGUSR1) # 這裏的進程id須要寫前一個程序實際運行的pid WORKER = [] for i in range(1, 7): # 開啓6個線程 threadinstance = threading.Thread(target = sendusr1) WORKER.append(threadinstance) for i in WORKER: i.start() for i in WORKER: i.join() print '主線程完成' python使用memcache easy_install python-memcached # 安裝(python2.7+) import memcache mc = memcache.Client(['10.152.14.85:12000'],debug=True) mc.set('name','luo',60) mc.get('name') mc.delete('name1') 保存數據 set(key,value,timeout) # 把key映射到value,timeout指的是何時這個映射失效 add(key,value,timeout) # 僅當存儲空間中不存在鍵相同的數據時才保存 replace(key,value,timeout) # 僅當存儲空間中存在鍵相同的數據時才保存 獲取數據 get(key) # 返回key所指向的value get_multi(key1,key2,key3) # 能夠非同步地同時取得多個鍵值, 比循環調用get快數十倍 python使用mongodb 原文: http://blog.nosqlfan.com/html/2989.html easy_install pymongo # 安裝(python2.7+) import pymongo connection=pymongo.Connection('localhost',27017) # 建立鏈接 db = connection.test_database # 切換數據庫 collection = db.test_collection # 獲取collection # db和collection都是延時建立的,在添加Document時才真正建立 文檔添加, _id自動建立 import datetime post = {"author": "Mike", "text": "My first blog post!", "tags": ["mongodb", "python", "pymongo"], "date": datetime.datetime.utcnow()} posts = db.posts posts.insert(post) ObjectId('...') 批量插入 new_posts = [{"author": "Mike", "text": "Another post!", "tags": ["bulk", "insert"], "date": datetime.datetime(2009, 11, 12, 11, 14)}, {"author": "Eliot", "title": "MongoDB is fun", "text": "and pretty easy too!", "date": datetime.datetime(2009, 11, 10, 10, 45)}] posts.insert(new_posts) [ObjectId('...'), ObjectId('...')] 獲取全部collection db.collection_names() # 至關於SQL的show tables 獲取單個文檔 posts.find_one() 查詢多個文檔 for post in posts.find(): post 加條件的查詢 posts.find_one({"author": "Mike"}) 高級查詢 posts.find({"date": {"$lt": "d"}}).sort("author") 統計數量 posts.count() 加索引 from pymongo import ASCENDING, DESCENDING posts.create_index([("date", DESCENDING), ("author", ASCENDING)]) 查看查詢語句的性能 posts.find({"date": {"$lt": "d"}}).sort("author").explain()["cursor"] posts.find({"date": {"$lt": "d"}}).sort("author").explain()["nscanned"] 斐波那契 #將函數結果做爲列表可用於循環 def fab(max): n, a, b = 0, 0, 1 while n < max: yield b a, b = b, a + b n = n + 1 for n in fab(5): print n 乘法口訣 #!/usr/bin/python for i in range(1,10): for j in range(1,i+1): print j,'*',i,'=',j*i, else: print '' 最小公倍數 # 1-70的最小公倍數 def c(m,n): a1=m b1=n r=n%m while r!=0: n=m m=r r=n%m return (a1*b1)/m d=1 for i in range(3,71,2): d = c(d,i) print d PIL圖像處理 import Image im = Image.open("j.jpg") # 打開圖片 print im.format, im.size, im.mode # 打印圖像格式、像素寬和高、模式 # JPEG (440, 330) RGB im.show() # 顯示最新加載圖像 box = (100, 100, 200, 200) region = im.crop(box) # 從圖像中提取出某個矩形大小的圖像 圖片等比縮小 # -*- coding: cp936 -*- import Image import glob, os #圖片批處理 def timage(): for files in glob.glob('D:\\1\\*.JPG'): filepath,filename = os.path.split(files) filterame,exts = os.path.splitext(filename) #輸出路徑 opfile = r'D:\\22\\' #判斷opfile是否存在,不存在則建立 if (os.path.isdir(opfile)==False): os.mkdir(opfile) im = Image.open(files) w,h = im.size #im_ss = im.resize((400,400)) #im_ss = im.convert('P') im_ss = im.resize((int(w*0.12), int(h*0.12))) im_ss.save(opfile+filterame+'.jpg') if __name__=='__main__': timage() 取系統返回值賦給序列 cmd = os.popen("df -Ph|awk 'NR!=1{print $5}'").readlines(); cmd = os.popen('df -h').read().split('\n') cmd = os.popen('lo 2>&1').read() #取磁盤使用空間 import commands df = commands.getoutput("df -hP") [ x.split()[4] for x in df.split("\n") ] [ (x.split()[0],x.split()[4]) for x in df.split("\n") if x.split()[4].endswith("%") ] 將列表去重複 list(set(['qwe', 'as', '123', '123'])) 將列表轉換成字符串 '\t'.join(li) curses框 import curses myscreen = curses.initscr() # 初始化一個圖形框 myscreen.border(0) # 定義邊框寬度 myscreen.addstr(12, 25, "Python curses in action!") # 打印的位置 myscreen.refresh() # 使框生效 myscreen.getch() # 等待鍵盤輸入 curses.endwin() # 關閉 curses菜單 #!/usr/bin/env python #menu.py from os import system import curses def get_param(prompt_string): screen.clear() screen.border(0) screen.addstr(2, 2, prompt_string) screen.refresh() input = screen.getstr(10, 10, 60) # 等待用戶輸入內容賦值變量 return input def execute_cmd(cmd_string): system("clear") a = system(cmd_string) print "" if a == 0: print "Command executed correctly" else: print "Command terminated with error" raw_input("Press enter") print "" x = 0 while x != ord('4'): # 字符轉ASCII碼 chr screen = curses.initscr() screen.clear() screen.border(0) screen.addstr(2, 2, "Please enter a number...") screen.addstr(4, 4, "1 - Add a user") screen.addstr(5, 4, "2 - Restart Apache") screen.addstr(6, 4, "3 - Show disk space") screen.addstr(7, 4, "4 - Exit") screen.refresh() x = screen.getch() if x == ord('1'): username = get_param("Enter the username") homedir = get_param("Enter the home directory, eg /home/nate") groups = get_param("Enter comma-separated groups, eg adm,dialout,cdrom") shell = get_param("Enter the shell, eg /bin/bash:") curses.endwin() execute_cmd("useradd -d " + homedir + " -g 1000 -G " + groups + " -m -s " + shell + " " + username) if x == ord('2'): curses.endwin() execute_cmd("apachectl restart") if x == ord('3'): curses.endwin() execute_cmd("df -h") curses.endwin() 打印表格 map = [["a","b","c"], ["d","e","f"], ["g","h","i"]] def print_board(): for i in range(0,3): for j in range(0,3): print "|",map[i][j], #if j != 2: print '|' 生成html文件表格 log_file = file('check.html', 'w') log_file.write(""" <!DOCTYPE HTML> <html lang="utr-8"> <head> <meta charset="UTF-8"> <title></title> </head> <body> <table align='center' border='0' cellPadding='0' style='font-size:24px;'><tr ><td>狀態統計</td></tr></table> <style>.font{font-size:13px}</style> <table align='center' border='1' borderColor=gray cellPadding=3 width=1350 class='font'> <tr style='background-color:#666666'> <th width=65>IP</th> <th width=65>狀態</th> </tr> """) for i in list: log_file.write('<tr><td>%s</td><td>%s</td></tr>\n' %(i.split()[0],i.split()[1]) ) log_file.write(""" </table> </body> </html> """) log_file.flush() log_file.close() 井字遊戲 #!/usr/bin/python # http://www.admin10000.com/document/2506.html def print_board(): for i in range(0,3): for j in range(0,3): print map[2-i][j], if j != 2: print "|", print "" def check_done(): for i in range(0,3): if map[i][0] == map[i][1] == map[i][2] != " " \ or map[0][i] == map[1][i] == map[2][i] != " ": print turn, "won!!!" return True if map[0][0] == map[1][1] == map[2][2] != " " \ or map[0][2] == map[1][1] == map[2][0] != " ": print turn, "won!!!" return True if " " not in map[0] and " " not in map[1] and " " not in map[2]: print "Draw" return True return False turn = "X" map = [[" "," "," "], [" "," "," "], [" "," "," "]] done = False while done != True: print_board() print turn, "'s turn" print moved = False while moved != True: print "Please select position by typing in a number between 1 and 9, see below for which number that is which position..." print "7|8|9" print "4|5|6" print "1|2|3" print try: pos = input("Select: ") if pos <=9 and pos >=1: Y = pos/3 X = pos%3 if X != 0: X -=1 else: X = 2 Y -=1 if map[Y][X] == " ": map[Y][X] = turn moved = True done = check_done() if done == False: if turn == "X": turn = "O" else: turn = "X" except: print "You need to add a numeric value" 網段劃分 題目 192.168.1 192.168.3 192.168.2 172.16.3 192.16.1 192.16.2 192.16.3 10.0.4 輸出結果: 192.16.1-192.16.3 192.168.1-192.168.3 172.16.3 10.0.4 答案 #!/usr/bin/python f = file('a.txt') c = f.readlines() dic={} for i in c: a=i.strip().split('.') if a[0]+'.'+a[1] in dic.keys(): key=dic["%s.%s" %(a[0],a[1])] else: key=[] key.append(a[2]) dic[a[0]+'.'+a[1]]=sorted(key) for x,y in dic.items(): if y[0] == y[-1]: print '%s.%s' %(x,y[0]) else: print '%s.%s-%s.%s' %(x,y[0],x,y[-1]) 統計日誌IP # 打印出獨立IP,並統計獨立IP數 219.140.190.130 - - [23/May/2006:08:57:59 +0800] "GET /fg172.exe HTTP/1.1" 200 2350253 221.228.143.52 - - [23/May/2006:08:58:08 +0800] "GET /fg172.exe HTTP/1.1" 206 719996 221.228.143.52 - - [23/May/2006:08:58:08 +0800] "GET /fg172.exe HTTP/1.1" 206 713242 #!/usr/bin/python dic={} a=open("a").readlines() for i in a: ip=i.strip().split()[0] if ip in dic.keys(): dic[ip] = dic[ip] + 1 else: dic[ip] = 1 for x,y in dic.items(): print x," ",y 打印a-z import string string.lowercase # a-z小寫 string.uppercase # A-Z大小