目錄前端
計算機的主要組成部分時主板、CPU、硬盤、內存及一些外設設備組成。java
操做系統(OS),是最接近物理硬件的系統軟件。主要用來協調、控制、分配計算機硬件資源,使計算機各組件能夠發揮最優性能。python
軟件是運行於操做系統之上的應用程序。mysql
計算機語言主要是有解釋器/編譯器/虛擬機,再加上語法規則構成用來開發其餘軟件的工具。解釋型語言,一般是由解釋器邊解釋邊執行的計算機語言,例如:python、ruby、PHP、perl。編譯型語言,一般是由編譯器編譯整個代碼文件,生成計算機能夠識別的文件,交由計算機處理。linux
首先,在官網下載py2與py3,2,3版本有不少不兼容全部會存在兩個版本共存的問題。目前,mac、ubuntu等一些系統已經內置了python2版本。git
爲了開發須要,咱們須要下載並安裝python3。用於開發的軟件,一般安裝版本通常是找次新版本進行安裝。安裝pycharm IDE 開發工具。程序員
環境變量的功能,及其配置。環境變量分爲,用戶環境變量(只對當前用戶生效)和系統環境變量(全部用戶均有效)。主要用於終端使用,不須要輸入python解釋器的完整路徑,只要輸入pythonx 就可以使用。web
PATH:方便用戶在終端執行程序。即,將可執行程序所在的目錄添加到環境變量,之後直接調用便可。正則表達式
mac的環境變量在~/.bash_profile文件中。一般在安裝的時候python會自動生成環境變量,無需手動配置。redis
當前,比較通用的編碼有ASCII、Unicode、UTF-八、GB23十二、GBK。因爲計算機最初使用的是ASCII編碼,因此其餘編碼必須兼容ASCII碼。
ASCII
ASCII碼,7bits表示一個字符,最高位用0表示,後來IBM進行擴展,造成8bit擴展的ASCII碼。包含全部英文字母和經常使用的字符,最多能夠表示127或256種。
Unicode
Unicode(萬國碼),隨着計算機的普及,計算機須要兼容多國語言,Unicode編碼應運而生。32bits表示一個字符,總共能夠表示2**32種不一樣的符號,遠遠超出目前全部文字及字符,迄今使用21bits。通用的特色換來的是存儲空間的浪費,通常只用於計算機內部處理計算。
utf-8
爲彌補unicode的不足,utf-8針對unicode進行壓縮和優化,去掉前面多餘的0,只保留有效部分。完整保留ASCII碼,歐洲文字通常用2bytes表示,中文通常用3bytes表示。
GBK
全稱是GB2312-80《信息交換用漢字編碼字符集 基本集》,1980年發佈,是中文信息處理的國家標準,在大陸及海外使用簡體中文的地區(如新加坡等)是強制使用的惟一中文編碼。
中文使用2bytes表示。GBK,是對GB2312的擴展,又稱GBK大字符集,簡而言之就是全部亞洲文字的雙字節字符。
# IDE:統一使用UTF-8, 全局和項目均如此
變量的主要做用是爲了屢次重複使用方便。
# 查看python關鍵字 import keyword keyword.kwlist
常量:不容許修改的值,python中只是約定
Python語句中通常以新行做爲語句的結束符。
可是咱們可使用斜槓( )將一行的語句分爲多行顯示,以下所示:
total = item_one + \ item_two + \ item_three
input("按下 enter 鍵退出,其餘任意鍵顯示...\n") # 不換行輸出 print(x, end='') print(y, end='')
a = b = c = 1 a, b, c = 1, 2, "john"
# 若是在指定的序列中找到值返回 True,不然返回 False in # 若是在指定的序列中找到值返回 False,不然返回 True not in
is # is 是判斷兩個標識符是否是引用自一個對象 x is y 相似 id(x) == id(y) , 若是引用的是同一個對象則返回 True,不然返回 False is not # is not 是判斷兩個標識符是否是引用自不一樣對象 x is not y 相似 id(a) != id(b)。若是引用的不是同一個對象則返回結果 True,不然返回 False。
執行Python代碼時,若是導入了其餘的 .py 文件,那麼,執行過程當中會自動生成一個與其同名的 .pyc 文件,該文件就是Python解釋器編譯以後產生的字節碼。
字節碼(Bytecode)是一種包含執行程序、由一序列 op 代碼/數據對 組成的二進制文件。字節碼是一種中間碼。它比機器碼更抽象,須要直譯器轉譯後才能成爲機器碼的中間代碼。
機器碼(machine code),學名機器語言指令,有時也被稱爲原生碼(Native Code),是電腦的CPU可直接解讀的數據。機器碼是電腦CPU直接讀取運行的機器指令。
ps:代碼通過編譯能夠產生字節碼;字節碼經過反編譯也能夠獲得代碼。
v = u'henry' print(v, type(v)) # unicode類型 # py2 py3 數據類型對應關係 unicode<class> <--> str eg.u'alex' <--> 'alex' str <--> bytes eg.'alex' <--> b'alex
Py2 | Py3 | ||
---|---|---|---|
1 | 字符串類型不一樣 | ||
2 | py2py3默認解釋器編碼 | ASCII | UTF-8 |
3 | 輸入輸出 | raw_input() ; print | input() ; print() |
4 | int / long | int 和 long,除法只保留整數 | 只用int,除法保留小數 |
5 | range/xrange | range/xrange | 只有range,至關於py2的xrange |
6 | info.keys,info.values,info .items | 數據類型是list | 數據類型是<class 'dict_keys'> |
7 | map/filter | 數據類型是list | 返回的是iterator,能夠list()查看<map object at 0x108bfc668> |
8 | reduce | 內置 | 移到functools |
9 | 模塊和包 | 須要__init__.py | — |
10 | 經典類和新式類 | 同時擁有 | 只有新式類 |
# None 無操做 # bytes 類
# 只有str 能夠強轉 s = '99' print(type(int(s)))
# bool 布爾,主要用於條件判斷。 # None, 0 , '', [], {}, set() # 以上都是False
str操做經常使用的 14 (9+5) 種 1.upper/lower 2.isdigit/isdecimal 3.strip 4.replace 5.split 6.startswith/endswith 7.encode 8.format 9.join
# 大小寫轉換 s = 'Henry' print(s.upper(), s.lower())
# 判斷是不是數字 s1 = '123' s2 = '12.3' print(s1.isdigit(), s2.isdigit()) # True Flase # isdecimal只判斷是不是整數
# 默認去除兩邊空格+ \n + \t s = ' asdfgh, ' print('-->', s.strip(), '<--') print('-->', s.strip().strip(','), '<--')
# repalce 中的 a 和 b 必須是str類型, n 必須是int類型 s = 'adsafga' print(s.replace('a', '666')) print(s.replace('a', '666', 1))
# str的分割 s = 'henry_echo_elaine_' li = s.split('_') print(li) # 分割後的元素永遠比分隔符號多一個
# 判斷開始/結束位置是不是指定元素 s = 'abghjkdc' print(s.startswith('ab'), s.endswith('cd')) # True Flase
# 若是使用格式化輸入,打印%須要使用 %% # %s,%d :表示佔位符 # way1 一般用於函數 "***{0}***{1}**".format(a, b) # % (a, ) :這裏表示tuple,建議加逗號 # way2 "***%s, ***%s***" % (a, b,)
# 示例 # way1 '{0} ***{1}'.format(a, b) a = 'abcd' b = '666' s = '{0} ***{1}'.format(a, b) print(s) # way2 s1 = '%s***%s' % (a, b,) print(s1)
擴展使用示例:
# %s ,只能是tuple msg = '我是%s, 年齡%s' % ('alex', 19) msg = '我是%(name)s, 年齡%(age)s' % {'name': 'alex', 'age': 19} # format格式化 v1 = '我是{name}, 年齡{age}'.format(name = 'alex', age = 18) v1 = '我是{name}, 年齡{age}'.format(** {'name': 'alex', 'age': 19}) v2 = '我是{0}, 年齡{1}'.format('alex', 18) v2 = '我是{0}, 年齡{1}'.format(*('alex', 18) )
# 指定編碼類型 s = '你好' print(s.encode('utf-8')) # 6個字節 print( s.encode('gbk')) # 4個字節
# 用於循環加入指定字符 # s 必須是iterable # s 但是str,list,tuple,dict,set(str + 容器類) # s 中元素值必須是str類型 '_'.join(s)
# 示例 # 指定元素循環鏈接str中的元素 s = 'henry' print('_'.join(s)) # 下劃線鏈接個字符
# 返回s長度 s = '1234567890' print(len(s)) # 10
# 索引取值 s = '123456789' print(s[3]) # 4
s = '123456789' print(s[3:5]) # 45
# 根據step進行切片 s = '123456789' print(s[3::2]) # 468
s = '123456789' for i in s: print(i)
list操做目前一共有15(8+7)種, 1.append 2.insert 3.remove 4.pop 5.clear 6.reverse 7.sort 8.extend
# 任意類型數據,li操做不能直接放在print()中 li = [1, 2, 3, 4, 5, 6] li.append('666') print(li) li.append(['henry']) print(li)
# 按照index位置插入指定內容 li = [1, 2, 3, 4, 5, 6] li.insert(3, 'henry') print(li)
# 刪除指定list中的元素 li = ['aa', 'a', 'aacde'] li.remove('aa') print(li) li.remove('bb') print(li) # 會報錯
# 按index刪除list中的元素 li = [1, 2, 3, 4, 5, 6] li.pop() print(li) li.pop(3) print(li)
# 清空list中的全部元素 li = [1, 2, 3, 4, 5, 6] li.clear() print(li)
# 反轉list中的元素 li = [1, 2, 3, 4, 5, 6] li.reverse() print(li)
# reverse = True 從大到小 # 只能是同一類型的元素 # dict,tuple不支持排序 li = [6, 2, 3, 1, 5, 4] li.sort() print(li) li = ['ba', 'ab', 'c', 'd'] li.sort(reverse=True) print(li) li = [[6], [2, 3], [1, 5, 4]] li.sort() print(li) li = [(6, 2, 3, 1, 5, 4)] li.sort() print(li)
# 把s中的元素,循環取出,逐個追加到list中 # s能夠是str, list, tuple, dict, set # dict只取keys 追加到list中 s = 'henry' li = [1, 2, 3, 4, 5, 6] li.extend(s) print(li) s = ['a', 'b', 'c', 'd'] li.extend(s) print(li)
li = [1, 2, 3, 4, 5, 6] print(len(li))
li = [1, 2, 3, 4, 5, 6] print(li[2])
li = [1, 2, 3, 4, 5, 6] print(li[2:5])
li = [1, 2, 3, 4, 5, 6] print(li[2::2])
li = [1, 2, 3, 4, 5, 6] for i in li: print(i)
# 使用index修改,若是隻是一個值,則正常修改 li = [1, 2, 3, 4, 5, 6] li[2] = 'henry' print(li) # 使用index修改,若是是多個值,認爲是一個tuple li[2] = 'a', 'b', 'c' print(li) # 使用切片[]修改,則循環取值加入 li[2:3] = 'a', 'b', 'c' print(li)
# del 也不能放在print()裏面 li = [1, 2, 3, 4, 5, 6] del li[2] print(li) del li[2:] print(li
# 沒有獨有操做,目前只有5種 # tuple裏,最後一個值最好加一個 逗號 ,以區別於運算符
t = (1, 2, 3,) print(len(t))
t = (1, 2, 3,) print(t[2])
t = (1, 2, 3,) print(t[1:])
t = (1, 2, 3, 4, 5, 6, 7, 8, 9, 0) print(t[1::2])
t = (1, 2, 3, 4, 5, 6, 7, 8, 9, 0) for i int t: print(i)
dict 目前一共有 14(9 + 5) 種操做, 1.keys 2.values 3.items 4.get 5.pop 6.update 7.setdefault 8.popitem 9.clear # {key1: value1, k2:value2} # key不能重複
# 取全部key info = {1: 'henry', 2: 'echo', 3: 'eliane'} for key i info: print(key)
# 取全部value info = {1: 'henry', 2: 'echo', 3: 'eliane'} for v in info.values(): print(v)
# 取全部key值對 # 取出的是 tuple 類型 info = {1: 'henry', 2: 'echo', 3: 'eliane'} for pair in info.items(): print(pair)
# 有key則取出, 沒有則返回指定 值 # 若是沒有指定值,則返回 None info = {1: 'henry', 2: 'echo', 3: 'eliane'} print(info.get(1, 666)) print(info.get(4, 666))
info = {1: 'henry', 2: 'echo', 3: 'eliane'} print(info.pop(1)) print(info.pop(4))
# 只能用dict類型更新 info = {} info1 = {1: 'henry', 2: 'echo', 3: 'eliane'} info.update(info1) print(info)
# 查詢key,有則取出,沒有則添加 info = {1: 'henry', 2: 'echo', 3: 'eliane'} info.setdefault(4, 'hello') print(info) # 取出須要賦值給其餘變量 val = info.setdefault(4, 'i hate you') print(val)
# 不能加參數,刪除最後一個key值對 info = {1: 'henry', 2: 'echo', 3: 'eliane'} v = info.popitem() print(v,info) # v是tuple
# 清空全部元素 info = {1: 'henry', 2: 'echo', 3: 'eliane'} info.clear() print(info)
li = [1, 2, 3, 4, 5] info = {'a': 1, 'b': 2} v = info.fromkeys(li, 'hello') print(v, info)
info = {1: 'henry', 2: 'echo', 3: 'eliane'} print(len(info))
info = {1: 'henry', 2: 'echo', 3: 'eliane'} print(info[1])
info = {1: 'henry', 2: 'echo', 3: 'eliane'} for i in info: print(i) for v in info.values(): print(v) for pair in info.items(): print(pair)
# key同樣則修改,不同則追加 info = {1: 'henry', 2: 'echo', 3: 'eliane'} info[1] = 'hello' print(info) info[4] = 'you are smart' print(info)
info = {1: 'henry', 2: 'echo', 3: 'eliane'} del info[1] print(info)
# __getitem__ set, del from collections import OrderdDict info = OrderedDict() info['k1'] = 123 info['k2'] = 456
set 目前一共有11(10 + 2)種操做,空集合用set()表示。 1.add 2.update 3.pop 4.discard 5.remove 6. clear 7.intersection 8.union 9.difference 10.symmetric_difference # 無序,不重複
s = {1, 'henry', 2, 'echo', 3, 'eliane'} s.add(5) print(s)
# 能夠用str , list, tuple, dict, set, 也能夠混合多種類型放入update中 s = {1, 'henry', 2, 'echo', 3, 'eliane'} s1 = {5, 6, 7, 8, 1, 2, 3} s2 = [5, 6, 7, 8, 1, 2, 3] s3 = (5, 6, 7, 8, 1, 2, 3) s4 = {5: 6, 7: 8, 1: 2} s.update(s1) print(s) s = {1, 'henry', 2, 'echo', 3, 'eliane'} s.update(s2) print(s) s = {1, 'henry', 2, 'echo', 3, 'eliane'} s.update(s3) print(s) s = {1, 'henry', 2, 'echo', 3, 'eliane'} s.update(s4) print(s)
# 隨機刪除,此時pop中不能有任何參數 s = {1, 'henry', 2, 'echo', 3, 'eliane'} s.pop() # 默認刪除第一個元素/隨機 print(s)
# 必須有一個參數,沒有不報錯, 不會返回值 s = {1, 'henry', 2, 'echo', 3, 'eliane'} val = s.discard(3) print(s) print(val)
# 必須有一個參數,沒有會報錯 s = {1, 'henry', 2, 'echo', 3, 'eliane'} s.remove(3) print(s)
s = {1, 'henry', 2, 'echo', 3, 'eliane'} s.clear() print(s)
# 取v1 和v2 的交集 v1 = {1, 'henry', 2, 'echo', 3, 'eliane'} v2 = {1, 3, 5, 7} v = v1.intersection(v2) print(v)
# 取並集 v1 = {1, 'henry', 2, 'echo', 3, 'eliane'} v2 = {1, 3, 5, 7} v = v1.union(v2) print(v)
v1 = {1, 'henry', 2, 'echo', 3, 'eliane'} v2 = {1, 3, 5, 7} v = v1.difference(v2) print(v)
v1 = {1, 'henry', 2, 'echo', 3, 'eliane'} v2 = {1, 3, 5, 7} v = v1.symmetric_difference(v2) print(v)
# 集合的運算 方法 運算法
v = {1, 'henry', 2, 'echo', 3, 'eliane'} print(len(v))
# 無序輸出 v = {1, 'henry', 2, 'echo', 3, 'eliane'} for i in v: print(i)
int | bool | str | list | tuple | dict | set | |
---|---|---|---|---|---|---|---|
len | — | — | ✓ | ✓ | ✓ | ✓ | ✓ |
index | — | — | ✓ | ✓ | ✓ | ✓ | — |
切片 | — | — | ✓ | ✓ | ✓ | — | — |
step | — | — | ✓ | ✓ | ✓ | — | — |
for循環/ iterable | — | — | ✓ | ✓ | ✓ | ✓ | ✓ |
修改 | — | — | — | ✓ | — | ✓ | ✓ |
刪除 | — | — | — | ✓ | — | ✓ | ✓ |
小數據池
緩存規則:
文件操做主要用來讀取、修改、和建立指定文件。
f = open('文件路徑',mode='r/w/a...',encoding='utf-8')
# mode= 'w' # 打開文件時,會先清空歷史文件,沒有則建立 f.write('a')
# mode= 'r' # way1 整個文件直接讀取到RAM f.read() # 若是指定編碼格式,會讀出 1 個字符 # 若是是 mode= rb 會讀出 1 個字節 f.read(1) # way2 按行讀取文件 # 通常用於for循環中,可用來讀取 GB 級別的文件 f.readline() 只讀取一行 f.readlines() # 一次性加載全部內容到內存,並根據行分割成字符串 # 讀取一行時也可使用 for line in v: line = line.strip('\n')
# 當對文件操做完成後必須關閉,不然不會存儲到本地磁盤 f.colse() # 刷新緩衝區裏任何還沒寫入的信息
mode常見的有r/w/a(只讀/寫/追加),r+/w+/a+(讀寫/寫讀/追加讀),rb/wb/ab(以二進制方式進行讀/寫/追加),r+b/w+b/a+b。
模式 | r | r+ | w | w+ | a | a+ |
---|---|---|---|---|---|---|
讀 | + | + | + | + | ||
寫 | + | + | + | + | + | |
建立 | + | + | + | + | ||
覆蓋 | + | + | ||||
指針在開始 | + | + | + | + | ||
指針在結尾 | + | + |
斷點續傳,經過終端與服務器之間的交互,找到文件斷點位置,從而實現文件的單次傳輸。此種操做可使用file.seek(n)實現,n表示字節數。
seek(offset [,from])方法改變當前文件的位置。Offset變量表示要移動的字節數。From變量指定開始移動字節的參考位置。若是from被設爲0,這意味着將文件的開頭做爲移動字節的參考位置。若是設爲1,則使用當前的位置做爲參考位置。若是它被設爲2,那麼該文件的末尾將做爲參考位置。
# 示例 #!/usr/bin/python # -*- coding: UTF-8 -*- import os # 重命名文件test1.txt到test2.txt。 os.rename( "test1.txt", "test2.txt" )
#!/usr/bin/python # -*- coding: UTF-8 -*- import os # 刪除一個已經存在的文件test2.txt os.remove("test2.txt")
v = open('a.txt',mode='a',encoding='utf-8') while True: val = input('請輸入:') v.write(val) # 強制把內存中的數據,刷到硬盤中 v.flush() v.close()
v = open('a.txt', mode='a', encoding='utf-8') v1 = v.read() v.close()
# 縮進中的代碼執行完畢後,自動關閉文件 with open('a.txt', mode='a', encoding='utf-8') as v: data = v.read()
Note:
# 示例1 # 文件的修改,須要先把內容讀到內存,修改後再存儲 with open('a.txt', mode='r', encoding='utf-8') as v: data = v.read() print(data) new_data = data.replace('a', 666) with open('a.txt', mode='w', encoding='utf-8') as v: data = v.wirte(new_data)
# 示例2 修改指定字符 # 大文件的修改 f1 = open('a.txt', mode='r', encoding='utf-8') f2 = open('b.txt', mode='r', encoding='utf-8') for line in f1: line = line.replace('a', 'b') f2.write(line) f1.close f2.close
# 一次性打開修改和關閉 with open('a.txt', mode='r', encoding='utf-8') as f1,open('b.txt', mode='r', encoding='utf-8') as f2: for line in f1.readlines(): # 或者以下寫法 for line in f1: # 這種寫法,也會一行行讀取,包括 \n 也會單獨一行讀出 line = line.replace('a', 'b') f2.write(line)
以雙下劃線開頭的 **__foo** 表明類的私有成員,以雙下劃線開頭和結尾的 **__foo__** 表明 Python 裏特殊方法專用的標識,如 init() 表明類的構造函數。
又稱爲三目運算
和預算符相關
val = v if v else 666 val = v or 666 # 源碼中會見到
Note:爲了賦值
# 簡單條件賦值時使用 v = 前面 if 條件 else 後面
# 用戶輸入,若是是整數,則轉換,不然賦值爲None data = input('>>>') value = int(data) if data.isdecimal() else Non
面向過程【可讀性差、可重用性差】—> 函數式編程—>面向對象
# 若是給其餘人發送郵件,能夠把發送程序進行封裝起來,可縮減代碼長度和提升重複利用性。
函數式編程
能夠定義一個由本身想要功能的函數,如下是簡單的規則:(5)
def functionname( parameters ): "函數_文檔字符串" function_suite # 函數體 return [expression] # 默認狀況下,參數值和參數名稱是按函數聲明中定義的順序匹配起來的。
# 函數定義 # way1 def 函數名(): # 函數體 pass # 函數的執行 函數名() # 會自動執行 # way2 # 形參能夠是多個 def 函數名(形參): # 函數體 pass 函數名(實參)
形參(形式參數)與實參(實際參數)的位置關係。
def 函數名(形參): # 函數體 pass 函數名(實參)
# 無形參示例 def get_sum_list() sum = 0 for i in li: sum += i print(get_sum_list())
# 有形參示例 # 請寫一個函數,函數計算列表 info = [11,22,33,44,55] 中全部元素的和。 info = [11, 22, 33, 44] def get_list_sum(li): sum = 0 for i in li: sum += i print(sum) get_list_sum(info)
def func(arg): return 9 # 返回值爲9,默認爲None,能夠返回任何類型的數據 val = def func(v)
# 示例2 # 讓用戶輸入一段字符串,計算字符串中有多少A,就在文件中寫入多少‘echo’ def get_char_count(arg): count = 0 for i in arg: count += 1 def write_file(data): open('a.txt', mode='w', encoding='utf-8') as v: if len(data) == 0:或者 if not bool(data): return '寫入失敗' v.write(data) return '寫入成功' print(count) content = input()
# way1 無形參,無return def fun1(): pass fun() # way2 有形參,無return def fun2(arg): pass fun2(v) # way3 無形參,有return(指定值) def fun3(): pass return 9 val = fun3(v) # way4 有形參,有return(變量) def fun4(arg1, arg2): pass return arg1 + arg2 val = fun4(v1 + v2)
# 1. 寫函數,計算一個list中有多少個數字,打印,有%s個數字 # 判斷數字:type(a) == int # 2. 寫函數,計算一個列表中偶數索引位置的數據構形成另一個列表,並返回。 # 3. 讀取文件,將文件的內容構形成指定格式的數據,並返回。 a.log文件 alex|123|18 eric|uiuf|19 ... 目標結構: a. ["alex|123|18","eric|uiuf|19"] 並返回。 b. [['alex','123','18'],['eric','uiuf','19']] c. [ {'name':'alex','pwd':'123','age':'18'}, {'name':'eric','pwd':'uiuf','age':'19'}, ]
參數傳遞方式分爲位置傳參、關鍵字傳參、函數做爲參數進行傳遞。
# 示例 def func(a1, a2): pass func(1, 3) func(1, [1, 2, 3])
# 示例 def func(a1, a2): pass func(a1 = 1, a2 = [1, 2, 3]) func(a1 = 1, 2 ) # 此時會報順序錯誤
函數定義中,def func() 括號中能夠省略、默認參數和 *args/**kwargs。
# 示例 def func(): pass
# 示例 def func(a1, a2=2): pass # 調用方法,有默認參數時,能夠不用省略,採起默認值,也能夠從新賦值 func(1) func(1, 3) # 默認形參時,若是默認是可變類型的須要謹慎使用
# 若是想要給values設置默認是空list def func(data, value=[]): pass # 推薦 def func(data, value=None): if not value: valu=[]
# 示例 : 能夠傳遞任意類型數據 def func(*args): pass # [1, 2, 3]會被當成總體變成tuple中的一個元素 func(1, 2, 3, [1, 2 ,3]) # 直接賦值, [1, 2, 3]也會循環取出追加到tuple中 func(4, 5 ,6 ,*[1, 2 ,3])
# 示例 :只能經過關鍵字傳參,或者dict賦值 def func(**kwargs): print(kwargs) # [1, 2, 3]會被當成總體變成dict中 'd': [1, 2, 3] func(a=1, b=2, c=3, d=[1, 2 ,3]) # 直接賦值, {'k1': 4, 'k2': 5}也會循環取出追加到形參的dict中 func(a=1, b=2, c=3, d=[1, 2, 3], **{'k1': 4, 'k2': 5})
變量做用域時是變量的有效做用範圍,在python中函數就是一個局部做用域。因爲做用域的不一樣,變量的有效範圍也不一樣,根據做用範圍能夠把變量分爲,全局變量和局部變量。
全局變量:可供任何函數進行使用,修改,在python文件第一層的變量。在python中通常把全局變量命名爲所有大寫(規範),例如:USRE_NAME = 'henry'。
局部變量:能夠把函數中的變量視爲局部變量。函數體中變量爲函數所私有(只能被其子函數進行使用)。
# 示例1 a = 'henry' def func(): print(a) a = 123 func() # 此時使用的是全局變量, 結果是 123 # 示例2 a = 'henry' def func1(): def func2(): a = 'echo' print(a) func2() print(a) a = 123 func1() print(a) # echo 123 123
# 示例 # 對於可變變量能夠進行修改 a = [1, 3, 5, 7] def fun1(): a.append('henry') fun1() print(a) # 此時a會被修改
# 兩種賦值的方法 # 可使用 global 關鍵字對全局變量進行從新賦值 global name name = 'alex' # 給全局變量從新賦值 # 可使用 nolocal 關鍵字對父籍變量進行從新賦值, 在父籍找不到時,會報錯 nonlocal name name = 'alex' # 給父籍變量從新賦值
# <class 'function'> def func (): pass print(type(func)) # 函數能夠認爲是一變量
def func(): print(123) v = fun # 指向相同的地址 v()
# 示例 def func(): print(123) v1 = [func, func, func] v2 = [func(), func(), func()] print(v1) print(v2)
def func(arg): print(arg) def show(): return 999 func(show) # 999 None
def func(): print(1,2,3) def bar(): return func v = bar() # func v()
# 10 個函數, 通常是創建字典 def func(): print('話費查詢') def bar(): print('***') def base(): print('***') info = { 'f1': func, 'f2': bar, 'f3': base } choice = input('please input your choice: ') name = info.get('choice') if not name: print('輸入不存在') else: name()
# 三目運算,爲了解決簡單的if...esle的狀況 # lambda,爲了解決簡單函數的狀況 eg: def func(a1, a2): return a1 + a2 # 能夠改寫爲,a1 + 100 即爲return 值 func = lambda a1, a2: a1 + 100
# way1 直接使用 func = lambda : 100 func = lambda a: a*10 func = lambda *args, **kwargs: len(args) + len(kwargs) # way2 使用全局變量 DATA = 100 func = lambda a: a + DATA func(1) # way3 使用父籍變量 DATA = 100 def func(): DATA = 1000 func1 = lambda a: a + DATA v = func1(1) print(v) func() # way4 使用條件判斷 ######## func = lambda n1, n2: n1 if n1 > n2 else n2
# 練習1 USER_LIST = [] func1 = lambda x: USER_LIST.append(x) v1 = func1('alex') print(v1) # None print(USER_LIST) # ['alex'] # 練習2 func1 = lambda x: x.strip() v1 = func1(' alex') print(v1) # 'alex' # 練習3 func_list = [lambda x: x.strip(), lambda y: y+100, lambda x,y: x+y] v1 = func_list[0](' alex') print(v1) # 'alex'
# divmod. 練習 USER_LIST = [] for i in range(1, 836): tem = {name':'hello-%s' % i, 'email':'XXXX%s@qq.com' %s} USER_LIST.append(tem) """ 要求: 每頁展現10條 根據用戶輸入的頁碼,查看 """
# base 默認爲 10 v1 = '0b1101' result = int(v1, base = 2) # 轉8進制 v1 = '0o1101' result = int(v1, base = 8) # 轉16進制 v1 = '0x1101' result = int(v1, base = 16)
# ip 點分二進制,將十進制轉爲二進制並經過 . 鏈接ip = '192.168.12.79' ip = '192.168.12.79' li = ip.split('.') l = [] for i in li: i = int(i) i = bin(i) i = str(i).replace('0b', '') i = i.rjust(8, '0') l.append(i) s = '.'.join(l) print(s)
chr() :把int型數據,轉換爲unicode編碼
# 生成驗證碼 import random # 導入一個模塊 def get_random_data(length=6): data = [] for i in range(length): v = random.randint(65,90) # 獲得一個隨機數 data.append(v) return ' '.join(data) code = get_random_data() print(code)
map / filter / reduce(py2)/zip
# map操做的 func 是一個函數 v 必須是可迭代, v = [11, 22, 33] def func(arg): return arg + 100 result = map(func, v) # 將函數的返回值添加到空list中[111, 122, 133] print(list(result)) # 使用lambda 改寫 result = map(lambda x: x+100, v) print(result) # py2直接返回 print(list(resutl)) # py3會返回一個object,可用list()查看
v = [1, 2, 3, 'welcome', 4, 'hello'] result = filter(lambda x: type(x) == int, v) # 生成新list print(list(result)
import functools v = [1, 2, 3, 4] result = functools.reduce(lambda x,y: x*y, v) print(result)
a = [1,2,3] b = [4,5,6] c = [4,5,6,7,8] zipped = zip(a,b) # 打包爲元組的列表 [(1, 4), (2, 5), (3, 6)] zip(a,c) # 與 zip 相反,*zipped 可理解爲解壓,返回二維矩陣式 [(1, 2, 3), (4, 5, 6)] # 兩組序列,轉字典 list1 = ['key1','key2','key3'] list2 = ['1','2','3'] info = dict(zip(list1,list2)) print(info)
def func(name): def inner(): print(name) return inner v1 = func('henry') v1() v2 = func('echo') v2()
# 不是閉包 def func(name): def inner(): return 123 return inner # 閉包:封裝值 + 內層函數須要使用 def func(name): def inner(): print(name) return 123 return inner
def func(i): print(i) func(i+1)
# 斐波那契數列 def func(a, b): print(b) func(a, a+b)
# 遞歸 def fun(a): if a == 5: return 100 result = func(a+1) + 10 return result v = func(1) # 注意 def fun(a): if a == 5: return 100 result = func(a+1) + 10 v = func(1)
def func(): def inner(): pass return inner v = func() print(v) # inner 函數 # ############################## def func(arg): def inner(): print(arg) return inner v1 = func(1) # 1 v2 = func(2) # 2 # ############################## def func(arg): def inner(): arg() return inner def f1(): print(123) v = fucn(f1) v() # 123 # ############################## def func(arg): def inner(): arg() return inner def f1(): print(123) return 666 v1 = func(f1) result = v1() # 執行inner函數 / f1含函數 -> 123 print(result) # None # ############################## def func(arg): def inner(): return arg() return inner def f1(): print(123) return 666 v1 = func(f1) result = v1() # 123 666
def func(arg): def inner(): print('before') v = arg() print('after') return v return inner def index(): print('123') return 666 # 示例 v1 = index() # 123 v2 = func(index) # before 123 after v3 = v2() v4 = func(index) # before 123 after index = v4 index() index = func(index) # before 123 after index()
# 第一步,執行func函數,並將下面的函數看成函數傳遞,至關於func(index) # 第二部,將func返回值,從新賦值爲下面的函數名,index = func(index) def func(arg): def inner(): return arg() return inner @func def index(): print(123) return 666 print(index) # <function func.<locals>.inner at 0x1054a16a8>
應用示例:
# 計算函數執行時間 def wrapper(func): def inner(): start_time = time.time() func() end_time = time.time() print(end_time - start_time) return func() return inner import time @ warpper def func(): time.sleep(2) print(123) @ warpper def func(): time.sleep(1.5) print(123) # 判斷用戶是否登錄
# 裝飾器的編寫(示例) def wrapper(func): # 必須有一個參數 def inner(): ret = func() return ret return inner # 應用 index = wrapper(index) @wapper def index(): pass @wapper def manage(): pass # 在執行函數,自動觸發裝飾器 v = index() print(v)
# 導入本目錄下的其餘py文件 import a a.f1() a.f2() a.f3()
def wrapper(func): def inner(*args, **kwargs): return func(*args, **kwargs) return inner
爲何要加*args,**kwargs?
# 讓參數統一的目的:爲裝飾的函數傳參 def x(func): def inner(a, b): return func() return inner @x def index(): pass index(1, 2)
# 裝飾器建議寫法 def wrapper(function): def inner(*args, **kwargs): v = funtion(*args, **kwargs) return v return inner @wrapper def func(): pass
# 第一步:v = wrapper(9) # 第二步:ret = v(index) # 第三步:index = ret def x(counter): def wrapper(function): def inner(*args, **kwargs): v = funtion(*args, **kwargs) return v return inner return wrapper @x(9) def index(): pass
# 示例: # 寫一個帶參數的裝飾器,實現,參數是多少,被裝飾器就要執行多少次,最終返回一個list def x(*args): def wrapper(): def inner(): li = [index() for i in range(args[0])] return li return inner return wrapper @x(9) def index(): return 8 v = index() print(v)
list推導式(生成式)
vals = [i for i in 'henry'] v = [i for i in 可迭代對象 if 條件] # 知足條件生成list v = [i if i > 5 else i+1 for i in 可迭代對象 if 條件] # 知足條件生成list
# 新浪 def num(): return [lambda x: x * i for i in range(4)] print([m(2) for m in num()])
set推導式
# 知足條件生成set,會去重,條件判斷能夠省略 v = {i for i in 可迭代對象 if 條件}
dict推導式
# 知足條件生成dict,但須要key值和冒號:,條件判斷能夠省略 v = { 'k' + str(i): i for i in 可迭代對象 if 條件}
類:int ,str, list…. / bytes(b'xxx'), datetime
對象:由類建立的數據
類和對象
展現list中全部數據
v = [1, 2, 3, 4] val = iter(v) value = val.__next__() print(value)
def func(): pass func()
# 生成器函數(內部是否包含yield) def func(arg): arg = arg + 1 yield 1 yield 2 yield 100 # 函數內部代碼不執行,返回一個生成器 val = func(100) # 生成器:能夠被for循環的,一旦開始循環,函數內部代碼就開始執行 for i in val: print(i) # 遇到第一個yield會把後面的值賦值給 i # 若是yield已經執行完畢,則意味着for循環結束
# 邊使用邊執行 def func(): count = 1 while True: yield count count += 1 # v 只取yield值,是一個生成器對象 v = func() for i in v: print(i) # 查看v中有哪些方法 dir(v)
class Foo(object): def __iter__(self): return iter([1, 2, 3]) yield 1 yield 2 obj = Foo(object)
def func(): print(123) n = yield('aaa') print('----->', n) yield 'bbb' data = func() next(data) v = data.send('太厲害了,直接傳進去了') print(v)
# 示例:讀取文件 def func(): curse = 0 while True: f = open('db','r','utf-8') f.seek(curse) data_list = [] for i in range(10): line = f.readline() if not line: return data_list.append(line) curse = f.tell() f.close for row in data_list: yield row
# redis 示例 import redis coon = redis.Redis(host='192.168.12.12')
# yield from (py3.3以後) def base(): yield 88 yield 99 def bar(): return 123 def func(): yield 1 yield from base() yield from bar() # 報錯,int 不可迭代,若是可迭代,則循環取出 yield 2 yield
v1 = [i for i in range(10)] # list推導式,當即產生數據 def func(): for i in range(10): yield i v2 = func() # 與下面v2相同 v2 = (i for i in range(10)) # 生成器推導式,不會當即產生數據
# 示例1 ret = filter(lambda n: n%3==0, range(10)) print(len(list(ret))) # 4 print(len(list(ret))) # 0
# 示例2 def add(n, i): return n + i def test(): for i in range(4): yield i g = test() for n in [1, 10]: g = (add(n, i) for i in g) print(list(g)) # [20 21 22 23 24]
# 示例3 def add(n, i): return n + i def test(): for i in range(4): yield i g = test() for n in [1, 10, 5]: g = (add(n, i) for i in g) print(list(g)) # [15, 16, 17, 18]
# 示例1 try: val = input('請輸入數字:') num = int(val) except Exception as e: print('操做異常')
# 示例2 import requests try: ret = requests.get('http://www.baidu.com') print(ret.text) except Exception as e: print('請求異常')
# 示例3 def func(a): try: return a.strip() except Exception as e: pass return False v = func([1, 2, 3]) print(v)
# 練習1,函數接收一個list將list中的元素每一個都加100 def func(arg): li = [] for items in arg: if items.isdecimal(): li.append(int(items) + 100) return li
# 寫函數,接收一個list, 中全是url 訪問地址,並獲取結果 import requests def func(url_list): li = [] try: for i in url_list: reponse = requests.get(i) li.append(reponse.text) except Exception as e: pass return li func(['http://www.baidu.com', 'http://www.google.com', 'http://www.bing.com'])
# 比較異常 try 的位置不一樣,效果也不一樣 import requests def func(url_list): li = [] for i in url_list: try: reponse = requests.get(i) li.append(reponse.text) except Exception as e: pass return li func(['http://www.baidu.com', 'http://www.google.com', 'http://www.bing.com']) # 獲得的結果是text格式的文本文檔 reponse = requests.get('url', useragent:xxxxxx)
try: pass except ValueError as e: pass except IndexErro as e: pass except Exception as e: print(e) finally: print('final') # 不管對錯都要執行的代碼 # e 表明異常信息,是Exception類的對象,有一個錯誤信息 try: int('asdf') except Exception as e: print(e) try: int('asdf') except ValueError as e: print(e) try: int('asdf') except IndexError as e: print(e) # 即便遇到return 也會執行finally def func(): try: int('1') return except Exception as e: print(e) finally: print('final')
try: int('123') raise Exception('錯誤信息') # 主動拋出異常 except Exception as e: print(1)
# 打開一個文件, def func(): resutl = True try: with open('x.log', mode='r', encoding='utf-8') as f: data = f.read() if 'henry' not in data: raise Exception() except Exception as e: result = False return result
# 示例1 class MyException(Exception): pass try: raise MyException('haha,錯了吧') except MyException as e: print(e)
class MyException(Exception): def __init__(self, message): self.message = message try: raise MyExceptoin('123') except MyException as e: print(e.message)
# test爲文件夾,在當前工做目錄中,jd爲py文件,f1爲jd中的函數 import test.jd test.jd.f1() # test爲文件夾,在當前工做目錄中,jd爲py文件,f1爲jd中的函數 from test import jd jd.f1()
# 導入(絕對導入、相對. /..導入:相對導入必須有父籍包 # import # from 模塊.模塊 import 模塊 # from 模塊.模塊.模塊 import 函數 # 調用:模塊.函數(),函數() # 主文件:運行的文件(print(__name__)). if __name__ == '__main__
# __file__ python命令行中獲取的參數 import os import sys BASE_DIR = os.path.dirname(os.path.dirname(__file__)) sys.path.append(BASE_DIR)
分類:
# pip 安裝模塊 pip install module_name # 安裝成功,若是導入不成功,須要重啓pycharm
# a.py def f1(): pass def f2(): pass
# 調用自定義模塊中的功能 import a a.f1()
內置模塊目前有random,hashlib, getpass ,sys相關,os相關,shutil ,json,time&datetime, import lib, logging等 10個。
# random.randint(a, b) import random def get_random_data(length=6): data = [] for i in range(length): v = chr(random.randint(65, 90)).lower() # 獲得一個隨機數 data.append(v) return ' '.join(data)
摘要算法模塊,密文驗證/校驗文件獨立性
# 將指定的**str**摘要,可使用sha1/md5 # md5經常使用來文件完整性校驗 # hashlib.md5()/ .update() /.hexdigest() import hashlib def get_md5(data): obj = hashlib.md5() obj.update(data.encode('utf-8')) return obj.hexdigest() val = get_md5('123') print(val)
加鹽:
import hashlib def get_md5(data): obj = hashlib.md5('adsfg12fsg'.encode('utf-8')) obj.update(data.encode('utf-8')) return obj.hexdigest() val = get_md5('123') print(val)
密碼不顯示:
import getpass pwd = getpass.getpass('please input pwd: ') print(pwd)
import time v = time.time() # 獲取從1970年開始到目前的時間,單位爲秒 time.sleep(2) # 休眠時間,2秒
# 引用計數器 import sys a = [1, 2, 3] print(sys.getrefcount(a)) # python默認支持的遞歸數量 v = sys.getrecrusionlimit() # 輸入輸出,默認換行 sys.stdout.write('hello') # \n \t # \r: 回到當前行的起始位置,通常於end=‘’連用 print('123\r', end='') print('hello', end='') # 在輸出的時候,回到123前,從新打印 # 應用:進度條
# sys.argv shutil # 刪除 目錄 的腳本, 只能是directory import sys import shutil path = sys.argv[1] shutil.rmtree(path) print('remove the %s' % path)
# sys包含python 和 工做目錄 # 當前py文件所在路徑會加載到 sys.path中 # pycharm也會 自動添加工做目錄 和 項目路徑加入 # python導入模塊時默認查找路徑 # 只能導入目錄下的第一層文件 sys.path.append('module_path')
import os 1. 獲取文件大小 fiel_size = os.stat('filename').st_size # 單位爲字節 2. 讀取文件 chunk_size = 1024 with open('filename', mode='rb') as f1: v = r'path' # r 表示轉義,包括全部 os.path.dirname(v)
轉義 v = 'al\\nex' v = r'al\nex' # 推薦
import os v = 'test.txt' path = 'user/henry/desktop' new_path = os.path.join(path, v)
# 當前目錄下第一層文件 import os result = os.listdir(r'path') print(result) # 當前目錄下的全部文件 import os result = os.walk(r'path') # 生成器 for a, b, c in result: for i in c: # a 是目錄;b 是目錄下的文件夾;c 是目錄下的文件 path = os.path.join(a, i) print(path)
import shutil shutil.rmtree(r'path')
import shutil # 沒有返回值 shutil.rmtree('dir_name') # 重命名,能夠是文件/目錄 shutil.move('file_name1', 'new_file_name') # 壓縮文件(c_file_name.zip), 若是隻給定文件名,壓縮到py腳本所在目錄 shutil.make_archive('c_file_name', 'zip', 'dir_name') # 解壓文件,默認是當前路徑, 指定目錄不存在會建立文件目錄 shutil.unpack_archive('c_file_name.zip', extra=r'dir_paths', format='zip', )
from datetime import datetime # 當前時間 ctime = datetim.now().strftime('%Y-%m-%d %H:%M:%S') # 1.壓縮test文件夾 # 2.放到code目錄(默認不存在) # 3.將文件解壓到/User/henry/Desktop/t中
序列化:將本來的字典、列表等內容轉換成一個字符串的過程就叫作序列化。
目的:
# 只能包含,int,bool,str,list,dict # 最外層必須是list/dict # json 中若是包含str,必須是 雙引號 # 若是是tuple類型數據,則會轉換爲list - 特殊的字符串(list和dict嵌套的string) - 不一樣語言間的數據交互 - 序列化/反序列化:把其語言的數據轉化成json格式/ 相反
import json v = [12, 3, 4, {'k1': 1}, True, 'adsf'] # 序列化 v = json.dumps(v) # 反序列化 json.loads(v)
# 可轉爲json的數據中包含中文,讓中文徹底顯示 v = {'k1': 'alex', 'k2': '你好'} val = json.dumps(v, ensure_ascii=False) print(val, type(val)) val = json.dumps(v) print(val, type(val))
# 使用pickle序列化後,結果是編碼後的二進制 import pickle v = {1, 2, 3} val = pickle.dumps(v) print(val, typ(val)) val = pickle.loads(v) print(val, typ(val)) # json dump 獲得的是str, pickle獲得的是bytes
UTC/GMT:世界協調時間
本地時間:本地時區的時間
# 獲取datetime格式時間 from datetime import datetime, timezone, timedelta v1 = datetime.now() v2 = datetime.utcnow() tz = timezone(timedelta(hours = 7)) # 東7區 v3 = datetime.now(tz) # 當前東7區時間 <class 'datetime.datetime'>
# 將datetime格式時間轉化爲str v1 = datetime.now() v1.strftime('%Y-%m-%d') # 鏈接不能使用漢字(Mac,linux沒問題),可使用.format()方法
# str轉datetime,時間加減 val = datetime.strptime('2019-04-18', '%Y-%m-%d') v = val +/- timedelta(days=40) # 當前時間加/減40天
# 時間戳和datetime關係 import time, datetime ctime = time.time() datetime.fromtimestamp(ctime,tz) # 當前時間,tz和上述相同
v = datetime.now() val = v.timestamp() print(val)
做用:根據字符串形式導入模塊
開放封閉原則:配置文件開放,代碼封閉
# 用字符串形式,去對象中找到其成員 import importlib redis = importlib.import_module('utils.redis') getattr(redis, 'func')()
import importlib path = 'utils.redis.func' module_path, func_name = path.rsplit('.', 1) getattr(module_path, func_name)()
# 導入模塊 import importlib middleware_classes = [ 'utils.redis.Redis', 'utils.mysql.MySQL', 'utils.mongo.Mongo' ] for path in middleware_classes: module_path,class_name = path.rsplit('.',maxsplit=1) module_object = importlib.import_module(module_path) # from utils import redis cls = getattr(module_object,class_name) obj = cls() obj.connect() # 用字符串的形式導入模塊。 # redis = importlib.import_module('utils.redis') # 用字符串的形式去對象(模塊)找到他的成員。 # getattr(redis,'func')()
日誌等級(level) | 描述 |
---|---|
DEBUG | 最詳細的日誌信息,典型應用場景是 問題診斷 |
INFO | 信息詳細程度僅次於DEBUG,一般只記錄關鍵節點信息,用於確認一切都是按照咱們預期的那樣進行工做 |
WARNING | 當某些不指望的事情發生時記錄的信息(如,磁盤可用空間較低),可是此時應用程序仍是正常運行的 |
ERROR | 因爲一個更嚴重的問題致使某些功能不能正常運行時記錄的信息 |
CRITICAL | 當發生嚴重錯誤,致使應用程序不能繼續運行時記錄的信息 |
# 方法1, # basicConfig 不能實現中文編碼,不能同時向文件和屏幕輸出 import logging # logging.Error 默認級別 logging.basicConfig(fielname='cmdb.log', format='%(asctime)s - %(name)s - %(levelname)s -%(module)s: %(message)s' datefmt = '%Y-%m-%d-%H-%M-%S' level=logging.WARNING) logging.log(10, '日誌內容') # 不寫 logging.debug('asdfgh') logging.log(30, 'asdfgh') # 寫 logging.warning('asdfgh')
應用場景:對於異常處理捕獲的內容,使用日誌模塊將其保存到日誌
try: requests.get('http://www.google.com') except Exception as e: msg = str(e) # 調用e.__str__方法 logging.error(msg, exc_info=True) # 線程安全,支持併發
# 方法2 import logging # 對象1:文件 + 格式 file_handler = logging.FileHandler('xxxxx', 'a', encoding='utf-8') fmt = logging.Formatter(fmt='%(asctime)s - %(name)s - %(levelname)s - %(module)s: %(message)s') file_handler.setFormatter(fmt) # 對象2:寫(封裝了對象1 ) logger = logging.Logger('xxx(在log中會顯示)', level=logging.ERROR) logger.addHandler(file_handler) logger.error('你好')
# 推薦 import logging file_handler = logging.FileHandler(filename='x1.log', mode='a', encoding='utf-8',) logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s -%(module)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S %p', handlers=[file_handler,], level=logging.ERROR ) logging.error('你好')
logger對像
# warning和error寫入不一樣文件,須要建立不一樣對象 import logging # 須要加入name參數 logger = logging.getLogger() fh = logging.FileHandler('log.log') # 寫入文件 sh = logging.StreamHander() # 不須要參數,輸出到屏幕 logger.addHander(fh) logger.addHander(sh) # asctime:日誌寫入時間, name:logger對象名稱, levelname:日誌級別, module:模塊名稱 fmt=logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(module)s: %(message)s') fh.Setformatter(fmt) logger.waring('message')
import time import logging from logging import handlers # file_handler = logging.FileHandler(filename='x1.log', mode='a', encoding='utf-8',) file_handler = handlers.TimedRotatingFileHandler(filename='x3.log', when='s', interval=5, encoding='utf-8') logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s -%(module)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S %p', handlers=[file_handler,], level=logging.ERROR ) for i in range(1,100000): time.sleep(1) logging.error(str(i)) # 在應用日誌時,若是想要保留異常的堆棧信息,exc_info=True msg = str(e) # 調用e.__str__方法 logging.error(msg,exc_info=True)
# dict建立過程 info = dict([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
from collections import namedtuple # 可命名tuple(time 結構化時間) # 建立了一個Course類,這個類沒有方法,全部屬性值不能修改 Course = namedtuple('Course', ['name', 'price', 'teacher']) python = Course('python', 999, 'alex') print(python) print(python.name) print(python.price)
# 把數據轉換爲四個字節 import struct a = struct.pack('i', 1000) b = struct.pack('i', 78) a1 = struct.unpack('i', a) b1 = struct.unpack('i', b)
面向對象編程(Object Oriented Programming,OOP,面向對象程序設計)
優勢和應用場景:
# 定義一個類,Account class Account: # 方法, 哪一個對象調用方法,其就是self def login(self,name): print(123) return 666 def logout(self): pass # 調用類中的方法 x = Account() # 實例化(建立)Account類的對象,開闢一塊內存 val = x.login('henry') # 使用對象調用class中的方法 print(val)
class File: def read(self): with open(self.path, mode='r', encoding='utf-8') as f: data = f.read() def write(self, content): with open(self.path, mode='a', encoding='utf-8') as f: data = f.write() obj = File() # 建立對象,並使用 obj.path = 'test.txt' # 往obj對象中寫入一個私有對象 obj.write(content) # 定義私有屬性,私有屬性在類外部沒法直接進行訪問 obj2 = File('info.txt') obj2.write(content)
class Person: # __init__初始化方法(構造方法),給對象內部作初始化 def __init__(self, name, age, gender): self.name = name self.age = age self.gender = gender def show(self): temp = 'i am %s, age:%s, gender:%s ' % (self.name, self.age, self.gender) print(temp) # 類(),會執行__init__ obj = Person('henry', 19, 'male') obj.show() obj2 = Person('echo', 19, 'female') obj2.show()
# 類有一個名爲 __init__() 的構造方法,該方法在類實例化時會自動調用,通常經過object類進行格式化 # 類的方法與普通的函數只有一個特別的區別——它們必須有一個額外的第一個參數名稱, 按照慣例它的名稱是 self。
# self.__class__:查看實例所在的類 class Test: def prt(self): print(self) print(self.__class__) t = Test() t.prt()
在類的內部,使用 def 關鍵字來定義一個方法,與通常函數定義不一樣,類方法必須包含參數 self,且爲第一個參數,self 表明的是類的實例。self 的名字並非規定死的,也可使用 this,可是最好仍是按照約定是用 self。
類的私有方法**__private_method:兩個下劃線開頭,聲明該方法爲私有方法,只能在類的內部調用 ,不能在類的外部調用。self.__private_methods**。
# 循環讓用戶輸入:用戶名,密碼,郵箱,輸入完成後在打印 class Person(): def __init__(self, user, pwd, email): self.username = user self.password = pwd self.email = email def info(self): return temp = 'i am %s, pwd:%s, email:%s ' % (self.username, self.password, self.email,) USER_LIST = [] while 1: user = input('please input user name: ') pwd = input('please input user pwd: ') email = input('please input user email: ') p = Person(user, pwd, email) USER_LIST.append(p) for i in USER_LIST: data = i.info() print(i)
場景:多個類中,若是有公共的方法能夠放到基類中,增長代碼的重用性。
繼承:能夠對基類中的方法進行覆寫
# 父類(基類) class Base: def f1(self): pass # 單繼承,子類,Foo類繼承Base類 (派生類) class Foo(Base): def f2(self): pass # 建立了一個子類對象 obj = Foo() # 執行對象.方法時,優先在本身類中找,沒有則找其父類 obj.f2() obj.f1() # 建立了一個父類對象 obj = Base() obj.f1() obj.f2() # 會報錯
繼承關係中的查找方法:
從字面上能夠看出一個老一個新,新的必然包含了跟多的功能,也是以後推薦的寫法,從寫法上區分的話,若是當前類或者父類繼承了object類,那麼該類即是新式類,不然即是經典類。
class D(object): def bar(self): print 'D.bar' class C(D): def bar(self): print 'C.bar' class B(D): def bar(self): print 'B.bar' class A(B, C): def bar(self): print 'A.bar' a = A() # 執行bar方法時 # 首先去A類中查找,若是A類中沒有,則繼續去B類中找,若是B類中麼有,則繼續去C類中找,若是C類中麼有,則繼續去D類中找,若是仍是未找到,則報錯 # 因此,查找順序:A --> B --> C --> D # 在上述查找bar方法的過程當中,一旦找到,則尋找過程當即中斷,便不會再繼續找了 a.bar()
多態:一個類變現出來的多種狀態—>多個類表現出類似的狀態。
Pyhon不支持Java和C#這一類強類型語言中多態的寫法,可是原生多態,Python崇尚「鴨子類型」。list,tuple,python的多態是經過鴨子類型實現的
# 多態,鴨子模型 def func(arg): # 多種類型,不少事物 v = arg[-1] # 必須具備此方法,呱呱叫 print(v)
# 對於一個函數,python對參數類型不會限制,傳入參數時能夠是各類類型,在函數中若是有例如:arg.append方法,就會對傳入類型進行限制。 # 這就是鴨子模型,相似於上述的函數,咱們認爲只要能呱呱叫的就是鴨子,只要有append方法,就是咱們想要的類型
Python一樣支持運算符重載,咱們能夠對類的專有方法進行重載
class Vector: def __init__(self, a, b): self.a = a self.b = b def __str__(self): return 'Vector (%d, %d)' % (self.a, self.b) def __add__(self,other): return Vector(self.a + other.a, self.b + other.b) v1 = Vector(2,10) v2 = Vector(5,-2) print (v1 + v2)
對象成員:實例變量(字段)
Note:屬於誰的只容許誰去取,python容許對象去其類中取變量
class Foo: def __init__(self): self.name = 123 def func(self, a, b): print(self.name, a, b) # python內部裝飾器 @staticmethod def f(): print(1,2) Foo.f() obj = Foo() obj.func(1, 2) obj.f()
class Foo: def __init__(self): self.name = 123 def func(self, a, b): print(self.name, a, b) # python內部裝飾器 @classmethod def f(cls, a, b): print(a, b) Foo.f(1, 2) obj.f(1, 2) # 不推薦
class Foo: @property def func(self): print(123) print(666) obj = Foo() ret = obj.func print(ret)
# 示例:屬性 class Page: def __init__(self, total_count, current_page, per_page = 10): self.total_count = total_count self.current_page = current_page self.per_page = per_page @proporty def start_index(self): return(self.current_page -1 ) * self.per_page @property def end_index(self): returno self.current_page * self.per_page_count USER_LIST = [] for i in range(321): USER_LIST.append('henry-%s' % (i,)) # 請實現分頁 current_page = int(input('請輸入要查看的頁碼:')) p = Page(321, current_page) data_list = USER_LIST[p.start_index:p.end_index] for i in data_list: print(i)
class Foo: def __init__(self, name): self.__name = name def func(self): print(self.name) obj = Foo('alex') print(obj.__name) # 會報錯 obj.func() # 能夠訪問
class Foo: __x = 1 @staticmethod def func(): print(Foo.__x) obj = Foo() print(Foo.__x) # 會報錯 print(obj._Foo__x) # 強制訪問私有成員
class School(object): def __init__(self,title): self.title = title def rename(self): pass class Course(object): def __init__(self, name, school_obj): self.name = name self.school = school_obj def reset_price(self): pass class Classes(object): def __init__(self,cname, course_obj): self.cname = cname self.course = course_obj def sk(self): pass s1 = School('北京') c1 = Course('Python', s1) cl1 = Classes('全棧1期', c1)
# 示例1 class StarkConfig(object): pass class AdminSite(object): def __init__(self): self.data_list = [] def register(self, arg): self.data_list.append(arg) site = AdminSite() obj = StarkConfig() site.regisetr(obj)
# 示例2 class StarkConfig(object): def __init__(self, name, age): self.name = name self.age = aeg class AdminSite(object): def __init__(self): self.data_list = [] self.sk = None def set_sk(self, arg=StarkConfig): self.sk =arg site = AdminSite() site.set_sk(StarkConfig) site.sk('henry', 19)
# 示例3 class StarkConfig(object): list_display = 'henry' def changelist(self): print(self.list_display) class UserConfig(StarkConfig): list_display = 'echo' class AdminSite(object): def __init__(self): self._register = {} def registry(self, key, arg=StarkConfig): self._register[key] = arg def run(self): for key, val in self._register.items(): obj = val() obj.changelist() site = AdminSite() site.registry(1) site.registry(2, StackConfig) site.registry(3, UserConfig) # 易錯點 echo site.run()
特殊成員:爲了可以給快速實現某些方法而生。
# 填充數據,通常稱爲初始化 class Foo: """ 此類的做用 """ def __init__(self): """ 初始化方法 """ pass
Note
# __new__ 建立一個空對象 # 經過 __init__ 初始化對像 class Foo(object): def __new__(cls, *args, **kwargs): # 在 __init__ 以前 return 'henry'/ object.__new__(cls) obj = Foo() print(obj)
# 對象() 會執行類中的 __call__ 方法 class Foo: def __init__(self): pass def __call__(self, *args, **kwargs): print('哈哈,你變成我了吧') Foo()() # 第三方模塊。寫一個網站,用戶只要來訪問,就自動找到第三個參數並執行 make_server('ip', port, Foo())
obj = dict() obj['k1'] = 123 class Foo(object): def __setitem__(self, key, values): print(key, value) def __getitem__(self, item): return item + 'uuu' def __delitem__(self, key): print(key) obj1 = Foo() obj1['k1'] = 123 # 內部會自動調用__setitem__方法 obj1['xxx'] # 內部會自動調用__getitem__方法 del obj1['ttt'] # 內部會自動調用__delitem__方法
# 只有在打印時,會自動調用此方法,並將返回值顯示出來 # type 查看 class Foo: def __str__(self): print('變樣是否是不認識我了') return 'henry' obj = Foo() print(obj)
做用: 查看對象中有哪些變量
class Foo(object): def __init__(self, name, age, email): self.name = name self.age = age self.email = email obj = Foo('henry', 19, '123@qq.com') val = obj.__dict__ # 去對象中找到全部變量並將其轉換爲字典 print(val)
做用:使用with語法時,須要
class Foo(object): def __enter__(self): self.x = open('a.txt', mode='a', encoding='utf-8') return self.x def __exit__(self, exe_type, exc_val, exc_tb): self.x.close() with Foo() as f: # 須要 __enter__ 和 __exit__ 方法 f.write('henry') f.write('echo')
class Foo(object): def __init__(self, v): self.v = v def __add__(self, other): return self.v + other.v obj1 = Foo() obj2 = Foo() val = obj1 + obj2 # obj1觸發,把obj1傳給self
# 可迭代對象 class Foo: def __iter__(self): return iter([1, 2, 3, 4]) obj = Foo() # 示例2 class Foo: def __iter__(self): yield 1 yield 2 ... obj = Foo()
class Foo(object): pass obj = Foo() print('obj是Foo的對象,開心吧') if type(obj) == Foo else print('哪涼快呆哪去')
# 能夠多級繼承 class Base(object): pass class Bar(Base): pass class Foo(Bar): pass print(issubclass(Foo, Base))
# 判斷某個對象是否時 某個類 或 基類 的實例(對象) class Base(object): pass class Foo(Base): pass obj = Foo() print(isinstance(obj, Foo)) print(isinstance(obj, Base))
# super().func(),根據 self所屬類的繼承關係進行查找,默認找到第一個就中止 class Bar(object): def func(self): print('bar.func') return 123 class Base(Bar): def func(self): super().func() print('bar.func') return 123 class Foo(Base): def func(self): v = super().func() print('foo.func', v) obj = Foo() obj.func()
# 會打印 hello # 類裏的成員會加載,代碼會執行 # 函數只有在調用時執行 class Foo(object): print('hello') def func(self): pass
# 類的嵌套 class Foo(object): x = 1 def func(self): pass class Meta(object): y = 123 print('hello') def show(self): print(y.self)
# 可迭代對象示例1 class Foo: def __iter__(self): return iter([1, 2, 3, 4]) obj = Foo() # 示例2 class Foo: def __iter__(self): yield 1 yield 2 ...'' obj = Foo()
# python的約束,易錯點 # 約束子類中必需要有send方法,若是沒有則會拋出:NotImplementedError class Interface(object): def send(self): raise NotImplementedError() class Foo(Interface): def send(self): pass class Base(Interface): def func(arg): arg.send(arg)
# 應用場景示例 class BaseMassage(object): def send(self): raise NotImplementedError('子類中必須有send方法') class Msg(BaseMassage): def send(self): print('發送短信') class Email(BaseMassage): def send(self): print('發送郵件') class Wechat(BaseMassage): def send(self): print('發送微信') class DingDing(BaseMassage): def send(self): pass obj = Email() obj.send()
反射的概念是由Smith在1982年首次提出的,主要是指程序能夠訪問、檢測和修改它自己狀態或行爲的一種能力(自省)。這一律唸的提出很快引起了計算機科學領域關於應用反射性的研究。它首先被程序語言的設計領域所採用,並在Lisp和麪向對象方面取得了成績。
反射:經過字符串的形式操做對象相關的屬性。python中的一切事物都是對象(均可以使用反射)
# getattr示例 class Foo(object): def __init__(self, name): self.name = name obj = Foo('alex') obj.name v1 = getattr(obj, 'name') # setattr示例 obj.name = 'eric' setattr(obj, 'name', 'eric')
# 反射當前文件內容 import sys getattr(sys.modules[__name__], 'ab') # 經過對象獲取、示例變量、綁定方法 # 經過類來獲取類變量、類方法、靜態方法 # 經過模塊名獲取模塊中的任意變量(普通變量、函數、類) # 經過本文件反射任意變量
# 應用示例 class Foo(object): def login(self): pass def regiseter(self): pass obj = Foo() func_name = input('please input method name: ') # 獲取方法 getattr(obj, func_name)()
# setattr 示例 class Foo(object): pass obj = Foo() setattr(obj, 'k1', 123) print(obj.k1)
# delattr 示例 class Foo(object): pass obj = Foo() obj.k1 = 999 delattr(obj, 'k1') print(obj.k1)
import x v = x.NUM # 等價於 v = getattr(x, 'NUM') print(v) v = getattr(x, 'func') v() v = getattr(x, 'Foo') val = v() val.x
示例:
# 瀏覽器兩類行爲 # way1: 輸入地址+回車 get.... # way2: 表單(輸入框+按鍵) post.... # 瀏覽器都會有get,post,dispatch方法 class View(object): def get(self): pass def Post(self): pass def Dispatch(self): # 請求第一步來這,在進行分發 pass
# 推薦使用性能較好 class Foo(object): def post(self): pass # 方式1 if hasattr(obj, 'get'): getattr(obj, 'get') # 方式2:推薦使用 v = getattr(obj, 'get', None) print(v)
場景:數據庫鏈接和數據庫鏈接池(數據一致時)
設計模式:23種設計模式
class Foo(object): pass # 每實例化一次,就建立一個新對象,內存地址 不同 obj1 = Foo() obj2 = Foo()
# 單例(Singleton)模式,不管是實例化多少次,都用第一次建立的那個對象,內存地址同樣 class Singleton(object): instance = None def __new__(cls, *args, **kwargs): if not cls.instance: cls.instance = object.__new__(cls) return cls.instance obj1 = Singleton() # 內存地址一致 obj2 = Singleton()
# 須要加鎖,多線程,併發
class FileHelper(object): instance = None def __init__(self, path): self.file_object = open(path, mode='r', encoding='utf-8') def __new__(cls, *args, **kwargs): if not cls.instance: cls.instance = object.__new__(cls) return cls.instance obj1 = FileHelper('x') # 內存地址一致 obj2 = FileHelper('x')
# 導入模塊,只是保留模塊內存 # 思考角度:函數名不能重複、內存溢出 from jd import n1 # 屢次導入,模塊只會加載一次,即便模塊中包含其餘模塊 import jd import jd print(456)
# 屢次導入,模塊只會加載一次,即便模塊中包含其餘模塊 import importlib import jd # 手動加載,會覆蓋第一次導入 importlib.reload(jd) print(456)
# jd.py class Foo(object): pass obj = Foo()
# app.py import jd # 加載jd.py,加載最後會實例化一個Foo對象並賦值給obj print(jd.obj)
import os import re import datetime import xlrd import requests
# app(程序入口)/src(業務相關)/lib(公共的類庫)/db(文件)/config(配置) app.py 越簡單越好,少於10行
# app(程序入口)/src(業務相關)/lib(公共的類庫)/db(文件)/config(配置) # bin(多個可執行文件例如:student.py,teacher.py,admin.py) # log (存儲日誌文件) # seetings(BASE_PATH,LOG_FILE_NAME...) path = sys.path.os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(path)
# 若果規則有重疊,須要長的在前面 www.(baidu|google).com # () 表示分組,給一部分正則規定爲一組,
1[3-9]\d{9} # 量詞前面一個重複次數,9次 1[3-9]\d{9,} # 量詞前面一個重複次數,9次以上 1[3-9]\d{n,m} # 量詞前面一個重複次數,n-m次 ? # ? 匹配到0次或1次,沒匹配上也算一次,匹配上算2次 #(無關緊要,只能有一個) + # + 匹配1次或屢次 * # * 匹配0次或屢次
# 匹配任意小數,保留兩位 \d+\.\d{2} # 匹配任意整數或小數 \d+\.?\d* # 有bug \d+(\.\d+)? # 分組實現
\d{7-12} # 默認是貪婪匹配,儘可能多匹配 # 回溯算法 # 非貪婪匹配,惰性匹配,老是匹配符合條件範圍內儘可能小的字符串 \d{2,3}? # 匹配兩位數 \d+?3 # 儘可能多取,遇到3結束 元字符 量詞 ?x # 按照元字符規則在量詞範圍內匹配,一旦遇到x中止 .*?x # 經常使用,先找x找到匹配結束
# 身份證號匹配(正則表達式,斷言) [1-9](\d{16}[\dx]|\d{14}) [1-9]\d{14}(\d{2}[\dx]) ^([1-9]\d{16}[0-9x]|[1-9]\d{14})$
. 是任意字符 * 是取 0 至 無限長度 ? 是非貪婪模式。 .*?x # 就是取前面任意長度的字符,直到一個x出現
# 匹配郵箱 \w[-\w.+]*@([A-Za-z0-9][-A-Za-z0-9]+\.)+[A-Za-z]{2,14} # url ^((https|http|ftp|rtsp|mms)?:\/\/)[^\s]+
標誌 | 含義 |
---|---|
re.S(DOTALL) | 使匹配包括換行在內的全部字符 |
re.I(IGNORECASE) | 使匹配對大小寫不敏感 |
re.L(LOCALE) | 作本地化識別(locale-aware)匹配,法語等 |
re.M (MULTILINE) | 多行匹配,影響^和$ |
re.X (VERBOSE) | 該標誌經過給予更靈活的格式以便將正則表達式寫得更易於理解 |
re.U | 根據Unicode字符集解析字符,這個標誌影響\w,\W,\b,\B |
import re tt = "Tina is a good girl, she is cool, clever, and so on..." rr = re.compile(r'\w*oo\w*') print(rr.findall(tt)) # 查找全部包含'oo'的單詞 執行結果以下: ['good', 'cool']
格式:re.match(pattern, string, flags=0)
# 從字符串開頭匹配,匹配上則返回一個match對像,有group()方法 import re ret = re.match('\d', '8alex83') print(ret)
格式:re.search(pattern, string, flags=0)
print(re.search('\dcom','www.4comrunoob.5com').group()) # 執行結果以下:4com
注:match和search一旦匹配成功,就是一個match object對象,而match object對象有如下方法:
a. group()返回re總體匹配的字符串,
b. group (n,m) 返回組號爲n,m所匹配的字符串,若是組號不存在,則返回indexError異常
c.groups()groups() 方法返回一個包含正則表達式中全部小組字符串的元組,從 1 到所含的小組號,一般groups()不須要參數,返回一個元組,元組中的元就是正則表達式中定義的組。
import re a = "123abc456" print(re.search("([0-9]*)([a-z]*)([0-9]*)",a).group(0)) #123abc456,返回總體 print(re.search("([0-9]*)([a-z]*)([0-9]*)",a).group(1)) #123 print(re.search("([0-9]*)([a-z]*)([0-9]*)",a).group(2)) #abc print(re.search("([0-9]*)([a-z]*)([0-9]*)",a).group(3)) #456 ###group(1) 列出第一個括號匹配部分,group(2) 列出第二個括號匹配部分,group(3) 列出第三個括號匹配部分。###
**格式**:re.findall(pattern, string, flags=0)
p = re.compile(r'\d+') print(p.findall('o1n2m3k4')) 執行結果以下: ['1', '2', '3', '4']
import re tt = "Tina is a good girl, she is cool, clever, and so on..." rr = re.compile(r'\w*oo\w*') print(rr.findall(tt)) print(re.findall(r'(\w)*oo(\w)',tt)) # ()表示子表達式 執行結果以下: ['good', 'cool'] [('g', 'd'), ('c', 'l')]
格式:re.finditer(pattern, string, flags=0)
# 匹配到結果爲 迭代器,每一項都是match對象,經過group取值 import re ret = re.finditer('\d', 'safh123ghakjdsfg234'*2000000) for i in ret: print(i.group())
**格式**:re.split(pattern, string[, maxsplit],maxsplit用於指定最大分割次數,不指定將所有分割。 - 按照可以匹配的子串將string分割後返回列表。 - 可使用re.split來分割字符串,如:re.split(r'\s+', text);將字符串按空格分割成一個單詞列表。 ```python import re ret = re.split('\d+', 'henry18') print(ret) # 保留分組中內容 ret = re.split('(\d+)', 'henry18') print(ret) ```
**格式**:re.sub(pattern, repl, string, count=0) **格式**:subn(pattern, repl, string, count=0, flags=0)
不返回/返回替換次數
import re text = "JGood is a handsome boy, he is cool, clever, and so on..." print(re.sub(r'\s+', lambda m:'['+m.group(0)+']', text,0)) # flags=0默認參數 執行結果以下: JGood[ ]is[ ]a[ ]handsome[ ]boy,[ ]he[ ]is[ ]cool,[ ]clever,[ ]and[ ]so[ ]on...
python # 替換 n 次 ret = re.sub('\d', 'G', 'henry18',n) print(ret) # 返回替換次數(tuple類型) ret = re.subn('\d', 'G', 'henry18') print(ret) # 返回值爲tuple類型
```
語法 | 含義 | 示例 | |
---|---|---|---|
(?P
|
分組,除了原有的編號外再指定一個額外的別名 | (?P
|
abcabc |
(?P=name) | 引用別名爲
|
(?P
|
1abc15abc5 |
<number> | 引用編號爲
|
(\d)abc\1 | 1abc15abc5 |
(?<=….) | 以…開頭,並不包括開頭 | ||
(?<!….) | 不以…結尾,並不包括開頭 |
s = '<h1>wahaha</h1>' ret = re.search('(\w+)>(.*?)</\w+>', s) print(ret.group()) print(ret.group(1)) print(ret.group(2))
ret = re.search('<(?P<tag>\w+)>(?P<content>.*?)</\w+>', s) print(ret.group('tag')) print(ret.group('content'))
s = '<h1>wahaha</h1>' ret = re.search('(?P<tag>\w+)>.*?</(?P=tag)>', s) print(ret.group())
s = '<h1>wahaha</h1>' # \1 在python中有特殊含義 ret = re.search(r'(\w+)>.*?</\1>', s) print(ret.group())
# findall 遇到正則中的分組 優先 顯示分組中的內容 import re ret = re.findall('\d(\d)', 'henry18') print(ret) # 取消分組優先(?:正則表達式) ret = re.findall('\d+(?:\.\d+)?', '1.234+2') print(ret)
# 保留分組中內容 ret = re.split('(\d+)', 'henry18') print(ret)
# 示例1:匹配單個數字,findall方法會有屏蔽全部其餘匹配項,只顯示分組中內容 import re ret = re.findall(r'\d+\.\d+|(\d)', '2+23*3.42/3.2') print(ret) while True: if '' not in ret:break ret.remove('') print(ret)
# 示例2:匹配以...開頭的數據,不包括開頭 import re m = re.findall('(?<=>)\w+', '\<a>wahaha\</a>\<b>banana\</b>\<h1>qqxing\</h1>') for i in m: print(i) # 匹配不以...開頭的數據,不包括結尾 m = re.findall('(?<!>)\w+', '\<a>wahaha\</a>\<b>banana\</b>\<h1>qqxing\</h1>') print(m)
# 示例3:以a開頭,由至少一個字母組成的字 ^a[a-zA-Z]+ ^a[a-zA-Z]*
# 以1開頭,中間3-5個數字,若是中間位置超過5個數字,則整個字符串不匹配 ^1\d{3,5}$
# 示例4:匹配用戶輸入的身份證號 import re content = input('用戶輸入:') ret = re.match('[1-9]\d{14}(\d{2}[\dx])?$', content)
# 示例5:第一個乘除法 import re ret = re.search('\d+(\.\d+)?[\*]-?\d+(\.\d+)?', s)
兩個運行中的程序傳遞信息?
網絡應用開發架構
OSI五層協議(簡化)
TCP/IP(arp在tcp/ip中屬於網絡層)
import socket socket.socket(family=AF_INET,type=SOCK_STREAM,proto=0,fileno=None) # 建立socket對象的參數說明:
參數 | 含義 |
---|---|
family | 地址系列應爲AF_INET(默認值),AF_INET6,AF_UNIX,AF_CAN或AF_RDS。 (AF_UNIX 域其實是使用本地 socket 文件來通訊) |
type | 套接字類型應爲SOCK_STREAM(默認值),SOCK_DGRAM,SOCK_RAW或其餘SOCK_常量之一。 SOCK_STREAM 是基於TCP的,有保障的(即能保證數據正確傳送到對方)面向鏈接的SOCKET,多用於資料傳送。 SOCK_DGRAM 是基於UDP的,無保障的面向消息的socket,多用於在網絡上發廣播信息。 |
proto | 協議號一般爲零,能夠省略,或者在地址族爲AF_CAN的狀況下,協議應爲CAN_RAW或CAN_BCM之一。 |
fileno | 若是指定了fileno,則其餘參數將被忽略,致使帶有指定文件描述符的套接字返回。 與socket.fromfd()不一樣,fileno將返回相同的套接字,而不是重複的。 這可能有助於使用socket.close()關閉一個獨立的插座。 |
type = socket.SOCK_STREAM # 表示tcp協議 # server 端 import socket sk = socket.socket() sk.bind(('127.0.0.1'), port號) sk.listen(n) # 監聽連接,n 表示容許多少個客戶端等待,3.7以後無限制可省略 con,cli_addr = sk.accept() # 接受客戶端連接,阻塞,服務端須要一直監聽,不能關閉 con.recv(size) # 接收字節數 con.send('content'.encode('utf-8')) # socket 發送接收都是字節流,即二進制 con.close() #關閉客戶端套接字 sk.close() #關閉服務器套接字(可選) # client 端 import socket sk = socket.socket() sk.connet(('ip', port號)) sk.send('content'.encode('utf-8')) sk.recv(size) sk.close()
# ip和端口占用解決方法,針對macos import socket from socket import SOL_SOCKET,SO_REUSEADDR # 加入一條socket配置,重用ip和端口 sk = socket.socket() sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # 就是它,在bind前加 sk.bind(('127.0.0.1',8898)) # 把地址綁定到套接字
# 自定義協議,解決黏包問題 # server端 import struct import socket sk = socket.socket() sk.bind(('ip', port)) sk.listen() con, cli_addr = sk.accept() size = con.recv(4) size = struct.unpack(size)[0] # unpack,爲一tuple類型 content = con.recv(size).decode('utf-8')# 接收文件內容 con.close() sk.close() # client端 import struct import socket sk = socket.socket() sk.connect(('ip', port)) content = '我是henry'.encode('utf-8') # 字節流 size = struct.pack('i', len(content)) # 發送內容長度進行struct sk.send(size) sk.send(content) sk.close()
# server import socket sk = socket.socket(type = socket.SOCK_DGRAM) sk.bind(('127.0.0.1', 9000)) msg, client_addr = sk.recvfrom(1024) print(msg) sk.sendto(b'received', client_addr) sk.close() # client import socket sk = socket.socket(type = socket.SOCK_DGRAM) sk.sendto(b'hello', ('127.0.0.1', 9000)) ret = sk.recv(1024) print(ret) sk.close()
import socket # 服務端套接字函數 s.bind() # 綁定(主機,端口號)到套接字 s.listen() # 開始TCP監聽 s.accept() # 被動接受TCP客戶的鏈接,(阻塞式)等待鏈接的到來 # 客戶端套接字函數 s.connect() # 主動初始化TCP服務器鏈接 s.connect_ex() # connect()函數的擴展版本,出錯時返回出錯碼,而不是拋出異常 # 公共用途的套接字函數 s.recv() # 接收TCP數據 s.send() # 發送TCP數據 s.sendall() # 發送TCP數據 s.recvfrom() # 接收UDP數據 s.sendto() # 發送UDP數據 s.getpeername() # 鏈接到當前套接字的遠端的地址 s.getsockname() # 當前套接字的地址 s.getsockopt() # 返回指定套接字的參數 s.setsockopt() # 設置指定套接字的參數 s.close() # 關閉套接字 # 面向鎖的套接字方法 s.setblocking() # 設置套接字的阻塞與非阻塞模式 s.settimeout() # 設置阻塞套接字操做的超時時間 s.gettimeout() # 獲得阻塞套接字操做的超時時間 # 面向文件的套接字的函數 s.fileno() # 套接字的文件描述符 s.makefile() # 建立一個與該套接字相關的文件
# 官方文檔對socket模塊下的socket.send()和socket.sendall()解釋以下: socket.send(string[, flags]) Send data to the socket. The socket must be connected to a remote socket. The optional flags argument has the same meaning as for recv() above. Returns the number of bytes sent. Applications are responsible for checking that all data has been sent; if only some of the data was transmitted, the application needs to attempt delivery of the remaining data. # send()的返回值是發送的字節數量,這個數量值可能小於要發送的string的字節數,也就是說可能沒法發送string中全部的數據。若是有錯誤則會拋出異常。 socket.sendall(string[, flags]) Send data to the socket. The socket must be connected to a remote socket. The optional flags argument has the same meaning as for recv() above. Unlike send(), this method continues to send data from string until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully sent. # 嘗試發送string的全部數據,成功則返回None,失敗則拋出異常。 # 故,下面兩段代碼是等價的: sock.sendall('Hello world\n') buffer = 'Hello world\n' while buffer: bytes = sock.send(buffer) buffer = buffer[bytes:]
# server端 import socket sk = socket.socket() sk.bind(('127.0.0.1', 9000)) sk.setblocking(False) # 設置爲非阻塞狀態 sk.listen() user = [] del_user = [] while True: try: con, addr = sk.accept() user.append(con) except BlockingIOError: for i in user: try: content = i.recv(1024).decode('utf-8') if not content: del_user.append(i) continue i.send(content.upper(). encode('utf-8')) # 發送的bytes類型能夠直接解釋出(ascii字符) except BlockingIOError:pass # 注意異常,會報錯 for i in del_user: user.remove(i) del_user.clear() sk.close()
# clinet端 import time import socket sk = socket.socket() sk.connect(('127.0.0.1', 9000)) while True: sk.send(b'hello') msg = sk.recv(1024) print(msg) time.sleep(0.2) sk.close()
# 客戶端使用對象是用戶,直接登錄驗證 # 能夠看到源碼,在服務端進行驗證登錄 # 客戶端使用對象是機器
import hmac secret_key = b'asdfgh' random_seq = os.urandom(32) hmac.new(secret_key, random_seq) ret = hmac.digest() # 結果是bytes類型數據
# 使用TCP協議發送數據爲空時,默認不會發送 # server端 import os import hmac import socket def chat(con): while True: msg = con.recv(1024).decode('utf-8') print('------>', msg) con.send(msg.upper().encode('utf-8')) # con.send(''.encode('utf-8')) # tcp不會發送 sk = socket.socket() sk.bind(('127.0.0.1', 9000)) sk.listen() com_key = b'henry' while True: con, addr = sk.accept() sec_key = os.urandom(32) con.send(sec_key) # 第一次發送 val = hmac.new(com_key, sec_key).digest() data = con.recv(32) # 第一次接收 if data == val: print('客戶端合法') chat(con) else: print('客戶端不合法') con.close() sk.close()
# client 端 import socket import hmac def chat(sk): while True: sk.send('hello'.encode('utf-8')) msg = sk.recv(1024).decode('utf-8') print('------>', [msg]) sk = socket.socket() sk.connect(('127.0.0.1', 9000)) sec_key = sk.recv(32) # 第一次接收 com_key = b'henry' val = hmac.new(com_key, sec_key).digest() sk.send(val) # 第一次發送 chat(sk) sk.close()
# 進階示例 from socket import * import hmac,os secret_key=b'henry bang bang bang' def conn_auth(conn): ''' 認證客戶端連接''' print('開始驗證新連接的合法性') msg=os.urandom(32) conn.sendall(msg) h=hmac.new(secret_key,msg) digest=h.digest() respone=conn.recv(len(digest)) return hmac.compare_digest(respone,digest) def data_handler(conn,bufsize=1024): if not conn_auth(conn): print('該連接不合法,關閉') conn.close() return print('連接合法,開始通訊') while True: data=conn.recv(bufsize) if not data:break conn.sendall(data.upper()) def server_handler(ip_port,bufsize,backlog=5): '''只處理連接''' tcp_socket_server=socket(AF_INET,SOCK_STREAM) tcp_socket_server.bind(ip_port) tcp_socket_server.listen(backlog) while True: conn,addr=tcp_socket_server.accept() print('新鏈接[%s:%s]' %(addr[0],addr[1])) data_handler(conn,bufsize) if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 server_handler(ip_port,bufsize)
# 客戶端 __author__ = 'Linhaifeng' from socket import * import hmac,os secret_key=b'linhaifeng bang bang bang' def conn_auth(conn): '''驗證客戶端到服務器的連接''' msg=conn.recv(32) h=hmac.new(secret_key,msg) digest=h.digest() conn.sendall(digest) def client_handler(ip_port,bufsize=1024): tcp_socket_client=socket(AF_INET,SOCK_STREAM) tcp_socket_client.connect(ip_port) conn_auth(tcp_socket_client) while True: data=input('>>: ').strip() if not data:continue # tcp協議不支持發送數據爲空 if data.lower() == 'q':break tcp_socket_client.sendall(data.encode('utf-8')) respone=tcp_socket_client.recv(bufsize) print(respone.decode('utf-8')) tcp_socket_client.close() if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 client_handler(ip_port,bufsize)
# server端 import socketserver # socket是socketserver的底層模塊和time,datetime同樣 class Myserver(socketserver.BaseRequestHandler): def handle(self): # 自動觸發handle方法,self.request == con print(self.request) # con server = socketsever.ThreadingTCPServer(('127.0.0.1', 9000), Myserver) server.server_forever() # client import socket sk = socket.socket() sk.connect(('127.0.0.1', 9000))
# 進階示例 import socketserver class Myserver(socketserver.BaseRequestHandler): def handle(self): self.data = self.request.recv(1024).strip() # self.client_address print("{} wrote:".format(self.client_address[0])) print(self.data) self.request.sendall(self.data.upper()) if __name__ == "__main__": HOST, PORT = "127.0.0.1", 9999 # 設置allow_reuse_address容許服務器重用地址 socketserver.TCPServer.allow_reuse_address = True # 建立一個server, 將服務地址綁定到127.0.0.1:9999 server = socketserver.TCPServer((HOST, PORT),Myserver) # 讓server永遠運行下去,除非強制中止程序 server.serve_forever() # client端 import socket HOST, PORT = "127.0.0.1", 9999 data = "hello" # 建立一個socket連接,SOCK_STREAM表明使用TCP協議 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect((HOST, PORT)) # 連接到客戶端 sock.sendall(bytes(data + "\n", "utf-8")) # 向服務端發送數據 received = str(sock.recv(1024), "utf-8") # 從服務端接收數據 print("Sent: {}".format(data)) print("Received: {}".format(received))
—>磁帶存儲+批處理(下降數據的讀取時間,提升cpu的利用率)
—>多道操做系統:數據隔離、時空複用(可以遇到I/O操做的時候主動把cpu讓出來,給其餘任務使用,切換須要時間,由OS完成)
—> 短做業優先算法、先來先服務算法
—>分時OS:時間分片,CPU輪轉,每個程序分配一個時間片,下降了cpu利用率,提升了用戶體驗
—>分時OS + 多道OS:多個程序一塊兒執行,遇到IO切換,時間片到了也要切換
Note:遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來,這樣
才能保證下次切換回來時,能基於上次切走的位置繼續運行。
OS做用:將應用程序對硬件資源的競態請求變得有序化
進程狀態:運行(runing) 就緒(ready) 阻塞(blocking)
進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是OS結構的基礎。
在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。
顧名思義,進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。
進程的概念起源於操做系統,是操做系統最核心的概念,也是操做系統提供的最古老也是最重要的抽象概念之一。操做系統的其餘全部內容都是圍繞進程的概念展開的。
PS:即便能夠利用的cpu只有一個(早期的計算機確實如此),也能保證支持(僞)併發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路複用和空間多路複用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。
進程概念
特色
# 獲取進程的pid, 父進程的id及ppid import os import time print('start') time.sleep(20) print(os.getpid(),os.getppid(),'end')
# 把func函數交給子進程執行 import os import time from multiprocessing import Process def func(): print('start', os.getpid()) time.sleep(1) print('end', os.getpid()) if __name__ == '__main__': p = Process(target=func) # 建立一個即將要執行的進程對象 p.start() # 開啓一個進程,異步非阻塞 p.join() # 同步阻塞,直到子進程執行完畢 print('main', os.getpid()) # 異步的程序,調用開啓進程的方法,並不等待這個進程的開啓
ps:__name__ 只有兩種狀況,文件名或雙下劃線main字符串
# windows 經過(模塊導入)執行父進程文件中的代碼獲取父進程中的變量 只要是不但願被子進程執行的代碼,就寫在if __name__ == '__mian__'下 進入導入時,父進程文件中的 __name__ != '__mian__' # linux/macos 建立新的子進程是copy父進程內存空間,完成數據導入工做(fork),正常寫就能夠 公司開發環境都是linux,無需考慮win中的缺陷 # windows中至關於把主進程中的文件又從頭執行了一遍 # linux,macos不執行代碼,直接執行調用的函數在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。所以若是將process() 直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。
# 在多個子進程中使用join方法 from multiprocessing import Process def send_mail(i): print('郵件已發送', i) if __name__ == '__main__': li = [] for i in range(10): p = Process(target=send_mail, args=(i,)) # args必須是元組,給子進程中的函數傳參數 p.start() li.append(p) for p in li: p.join() # 阻塞,知道全部子進程執行完畢 print('100封郵件已發送') # 主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
import time from multiprocessing import Process def son1(): while True: print('is alive') time.sleep(0.5) def son2(): for i in range(5): print('in son2') time.sleep(1) if __name__ == '__main__': p = Process(target=son1) p.daemon = True # 把p子進程設置成了守護進程 p.start() p2 = Process(target=son2) p2.start() time.sleep(2) # 守護進程是隨着主進程‘代碼’結束而結束 # 全部子進程都必須在主進程結束以前結束 # 守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
import time from multiprocessing import Process def son1(): while True: print('is alive') time.sleep(0.5) if __name__ == '__main__': p = Process(target=son1) p.start() # 開啓了一個進程 print(p.is_alive) # 判斷子進程時候存活, True和False time.sleep(1) p.terminate() # 「異步非阻塞」,強制結束一個子進程 print(p.is_alive) # True,os還沒來得及關閉進程 time.sleep(0.01) print(p.is_alive) # False,OS已經響應了關閉進程的需求,再去檢測的時候,結果是進程已經結束
import os import time from multiprocessing import Process class MyProcess(Process): def __init__(self, x, y): # 子進程若是不須要參數,能夠省略 self.x = x self.y = y super().__init__() def run(self): while True: print(self.x, self.y, os.getpid()) print('in myprocess') if __name__ == '__main__': mp = MyProcess(1, 2) mp.daemon = True mp.start() # 開啓一個子進程,會調用run()方法 time.sleep(1) mp.terminate() # 結束進程,異步非阻塞 print(mp.is_alive()) # True time.sleep(0.01) print(mp.is_alive()) # False
# 數據操做時,不能同時進行修改 import json from multiprocessing import Process, Lock # 導入Lock def search_ticket(user): with open('tickets.txt') as f: dic = json.load(f) print('%s查詢結果:%s張餘票' %(user, dic['count'])) def buy_ticket(user, lock): # with lock: lock.acquire() # time.sleep(0.01) with open('tickets.txt') as f: dic = json.load(f) if dic["count"] > 0: print('%s已買到票' % user) dic["count"] -= 1 else: print('%s沒買到票' % user) with open('tickets.txt', 'w') as f: json.dump(dic, f) lock.release() if __name__ == '__main__': lock = Lock() # 實例化一個對象 for i in range(10): search_ticket('user%s '%(i+1),) p = Process(target=buy_ticket, args=('user%s '%(i+1), lock)) p.start()
from multiprocessing import Process n = 100 def func(): global n n -= 1 li = [] for i in range(10): p = Process(target=func) p.start() li.append(p) for p in li:p.join() print(n)
from multiprocessing import Process,Queue def func(exp,q): res = eval(exp) q.put(res) if __name__ == '__main__': q = Queue() p = Process(target=func, args=('1+2+3',q)) p.start() print(q.get())
from multiprocessing import Pipe pip = Pipe() pip.send() pip.recv()
# Process中的隊列 import queue from multiprocessing import Queue q = Queue(3) # 可設置隊列長度 q.put(1) q.put(2) # 對列爲滿時,繼續放數據會發生阻塞 q.put(3) print('----------') try: q.put_nowait(4) # 對列爲滿時,繼續放數據會報錯和丟失 except queue.Full:pass print('----------') q.get() q.get() q.get() # 對列爲空時,會發生阻塞 try: q.get_nowait() # 對列爲空時,會報錯,阻塞會取消 except queue.Empty:pass
q.empty() # 有缺陷 q.qsize() q.full()
# 生產者消費者模型示例 import time import random from multiprocessing import Process, Queue def producer(q, name, food): for i in range(10): time.sleep(random.random()) fd = '%s%s' % (food, i) q.put(fd) print('%s生產了一個%s' % (name, food)) def consumer(q, name): while True: food = q.get() if not food: q.put(None) break time.sleep(random.randint(1, 3)) print('%s吃了%s' % (name, food)) def cp(num1, num2): q = Queue(10) p_l = [] for i in range(num1): p = Process(target=producer, args=(q, 'henry', 'food')) p.start() p_l.append(p) for i in range(num2): c = Process(target=consumer, args=(q, 'echo%s' % (i+1,))) c.start() for i in p_l: i.join() q.put(None) if __name__ == '__main__': cp(1, 4)
# 生產者消費者模型示例之爬蟲 import re import requests from multiprocessing import Process, Queue def producer(q, url): response = requests.get(url) q.put(response.text) def consumer(q): while True: s = q.get() if not s: q.put(None) break com = re.compile( '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>\d+).*?<span class="title">(?P<title>.*?)</span>' '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)評價</span>', re.S) ret = com.finditer(s) for i in ret: print({ "id": i.group("id"), "title": i.group("title"), "rating_num": i.group("rating_num"), "comment_num": i.group("comment_num"), }) if __name__ == '__main__': count = 0 q = Queue() p_l = [] for i in range(10): count += 25 p = Process(target=producer, args=(q, 'https://movie.douban.com/top250?start=%s&filter=' % count)) p.start() p_l.append(p) for i in range(5): c = Process(target=consumer, args=(q,)) c.start() for i in p_l: i.join() q.put(None)
# joinable實現生產者、消費者模型 import time import random from multiprocessing import Process,JoinableQueue def producer(q, name, food): for i in range(10): time.sleep(random.random()) fd = '%s%s'%(food,i) print('%s生產了一個%s'%(name, food)) q.join() def consumer(q, name, food): while True: food = q.get() if not food: q.put(None) break time.sleep(random.randint(1, 3)) print('%s吃了%s'%(name, food)) q.task_done() if __name__ = '__main__': jq = JoinableQueue() p = Processor(target=producer, args=(jq, 'henry', 'food')) p.start()
# 進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的 # 雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
from mutliprocessing import Manager,Lock def func(dic, lock): with lock: dic['count'] -= 1 if __name__ = '__main__': # m = Manager() lock = Lock() with Manager() as m: l = Lock() dic = m.dict({'count': 100}) # 共享的dict,list li = m.list([1,2,3]) p_l = [] for i in range(100): p = Process(target=func, args=(dic,lock)) p.start() p_l.append(p) for p in p_l:p.join() print(dic)
GIL(global interpreter lock):全局解釋器鎖
# multiprocessing 是徹底仿照Threading類的 import os import time from threading imoprt Thread def func(): print('start son thread') time.sleep(1) print('end son thread', os.getpid()) Thread(target=func).start() # 開啓一個線程的速度很是快 print('start') time.sleep(0.5) print('end', os.getpid())
# 開啓多個線程 import os import time from threading imoprt Thread def func(): print('start son thread') time.sleep(1) print('end son thread', os.getpid()) for i in range(10): Thread(target=func).start() # 開啓一個線程的速度很是快 # 線程的調度由OS決定
import os import time from threading imoprt Thread def func(): print('start son thread') time.sleep(1) print('end son thread', os.getpid()) t_l = [] for i in range(10): t = Thread(target=func) t.start() t_l.append(t) for i in t_l:i.join() print('子線程執行結束')
from threading import Thread class MyThread(Thread): def __init__(self, i): self.i = i super().__init__() # 注意變量名避免與父類init中的變量重名 def run(self): print(self.i, self.ident) # 經過self.ident,查看子線程的id t_l = [] for i in range(100): t = MyThread(i) t_l.append(t) t.start() # start 方法封裝了開啓線程的方法和run方法 for t in t_l: t.join() print('主進程結束')
from threading import current_thread, enumerate, active_count def func(): print('start son thread', i , current_thread()) time.sleep(1) print('end son thread', os.getpid()) t = Thread(target=func) t.start() print(t.ident) print(current_thread().ident) # current_ident()在哪一個線程,就獲得這個線程id print(enumerate()) # 統計當前進程中多少線程活着,包含主線程 print(active_count()) # 統計當前進程中多少線程活着,個數,包含主線程 # 線程中不能從主線程結束一個子線程 current_thread().name # 當前線程名稱 current_thread().ident # 當前線程id current_thread().isalive() # 當前線程是否存活 current_thread().isdaemon() # 當前線程是不是守護線程
import time from threading import Thread from multiprocessing import Process def func(a, b): c = a + b if __name__ == '__main__': p_l = [] start = time.time() for i in range(100): p = Process(target=func, args=(i, i*2)) p.start() p_l.append(p) for i in P_l:i.join() print(time.time() - start) t_l = [] start = time.time() for i in range(100): t = Thread(target=func, args=(i, i*2)) t.start() t_l.append(t) for i in t_l:i.join() print(time.time() - start)
# 不要在子線程裏隨便修改全局變量 from threading import Thread n = 100 def son(): global n n -= 1 t_l = [] for i in range(100): t = Thread(target=son) t_l.append(t) t.start() for t in t_l:t.join() print(n)
import time from threading imoprt Thread def son(): while True: time.sleep(0.5) def son2(): for i in range(5): t = Thread(target=son) t.daemon = True t.start() time.sleep(3)
# 線程數據一樣不安全 import dis a = 0 def func(): global a a += 1 dis.dis(func) # 返回cpu指令
# 使用互斥鎖解決線程全局變量數據不安全問題 from threading import Thread, Lock a = 0 def add_f(lock): global a with lock: for i in range(2000000): a += 1 def sub_f(lock): global a with lock: for i in range(2000000): a -= 1 lock = Lock() t1 = Thread(target=add_f, args=(lock,)) t1.start() t2 = Thread(target=sub_f, args=(lock,)) t2.start() t1.join() t2.join() print(a)
互斥鎖:是鎖中的一種,在同一線程中,不能連續lock.acquire()屢次
from threading import Lock lock = Lock() lock.acquire() print('-------------') lock.acquite() print('-------------')
import time import random from threading import Thread class Singleton: from threading import Lock # 複用性考慮 __instance = None lock = Lock() def __new__(cls, *args, **kwargs): with cls.lock: if not cls.__instance: time.sleep(random.random()) # 切換GIL鎖 cls.__instance = super().__new__(cls) return cls.__instance def fun(): print(Singleton()) li = [] for i in range(10): t = Thread(target=fun) li.append(t) t.start() for t in li: t.join()
from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name,noddle_lock, fork_lock): noddle_lock.acquire() print('%s搶到面了'%name) fork_lock.acquire() print('%s搶到叉子了'%name) print('%s吃了一口面'%name) noddle_lock.release() print('%s放下面了'%name) fork_lock.release() print('%s放下叉子了'%name) def eat2(name,noddle_lock, fork_lock): fork_lock.acquire() print('%s搶到叉子了'%name) noddle_lock.acquire() print('%s搶到面了'%name) print('%s吃了一口面'%name) noddle_lock.release() print('%s放下面了'%name) fork_lock.release() print('%s放下叉子了'%name)
import time from threading import RLock, Thread noodle_lock = fork_lock = RLock() # 將多把互斥鎖變成了一把遞歸鎖 def eat1(name, noodle_lock, fork_lock): noodle_lock.acquire() print('%s搶到面了' % name) fork_lock.acquire() print('%s搶到叉子了' % name) print('%s吃了一口面' % name) time.sleep(0.1) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了' % name) def eat2(name, noodle_lock, fork_lock): fork_lock.acquire() print('%s搶到叉子了' % name) noodle_lock.acquire() print('%s搶到面了' % name) print('%s吃了一口面' % name) time.sleep(0.1) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) lst = ['henry', 'echo', 'dean', 'daniel'] Thread(target=eat1, args=(lst[0], noodle_lock, fork_lock)).start() Thread(target=eat2, args=(lst[1], noodle_lock, fork_lock)).start() Thread(target=eat1, args=(lst[2], noodle_lock, fork_lock)).start() Thread(target=eat2, args=(lst[3], noodle_lock, fork_lock)).start()
# 使用互斥鎖解決問題 import time from threading import Lock, Thread lock = Lock() def eat1(name, lock): lock.acquire() print('%s搶到面了' % name) print('%s搶到叉子了' % name) print('%s吃了一口面' % name) time.sleep(0.1) print('%s放下叉子了' % name) print('%s放下面了' % name) lock.release() def eat2(name, lock): lock.acquire() print('%s搶到叉子了' % name) print('%s搶到面了' % name) print('%s吃了一口面' % name) time.sleep(0.1) print('%s放下面了' % name) print('%s放下叉子了' % name) lock.release() lst = ['henry', 'echo', 'dean', 'daniel] Thread(target=eat1, args=(lst[0], lock)).start() Thread(target=eat2, args=(lst[1], lock)).start() Thread(target=eat1, args=(lst[2], lock)).start() Thread(target=eat2, args=(lst[3], lock)).start()
import queue # 先進先出隊列:服務 from queue import Queue q = Queue(5) q.put(1) q.get() # 後進先出隊列:算法 from queue import LifoQueue lfq = LifiQueue(4) lfq.put(1) lfq.put(2) lfq.get() lfq.get() # 優先級隊列:自動排序、vip用戶、告警級別 from queue import PriorityQueue pq = PriorityQueue() pq.put((10, 'henry')) pq.put((6, 'echo')) pq.put((10, 'dean')) pq.get() pq.get() pq.get()
# 進程池。 p.submit, p.shutdown import os,time, random from concurrent.futrures import ProcessPoolExecutor def func(i): print('start', os.getpid()) time.sleep(random.randint(1,3)) print('end', os.getpid()) return '%s * %s' %(i, os.getpid()) if __name__ == '__main__': p = ProcessPoolExecutor(5) # cpu核心數或多一 ret_l = [] for i in range(10): ret = p.submit(func, i) # 提交進程,參數直接放在其後 ret_l.append(ret) # ret爲future對象,ret.result()取值 # 關閉池,不能提交任務,阻塞,直到已經提交的任務執行完畢 p.shutdown() for ret in ret_l: # 會阻塞,至關於q.get() print('------>',ret.result()) # result,同步阻塞 print('main', os.getpid())
# 線程池。p.submit(), p.shutdown(), ret.result() from concurrent.futures import ThreadPoolExecutor def func(i): print('start', os.getpid()) time.sleep(random.randint(1,3)) print('end', os.getpid()) return '%s * %s' %(i, os.getpid()) tp = ThreadPoolExecutor(20) # 線程個數通常爲cpu核心數4-5倍 ret_l = [] for i in range(100): ret = tp.submit(func, 1) ret_l.append(ret) for ret in ret_l: print('------->', ret.result()) p.shutdown() print('main')
# 線程池。p.submit(), p.shutdown(), ret.result() from concurrent.futures import ThreadPoolExecutor def func(i): print('start', os.getpid()) time.sleep(random.randint(1,3)) print('end', os.getpid()) return '%s * %s' %(i, os.getpid()) tp = ThreadPoolExecutor(20) # 線程個數通常爲cpu核心數4-5倍 ret = tp.map(func, range(20)) # tp.map()方法 for i in ret:print(i) # ret 爲生成器對象
from concurrent.futures import ThreadPoolExecutor def get_page(url): content = requests.get(url) return {'url':url, 'content':content.text} def parserpage(dic): print(dic.result()['url']) for url in url_l: ret = get_page(url) ret.add_done_callback(parserpage) # 先執行完,先調用parserpage函數 # ret=add_done_callback(函數名)
from concurrent.futures import ProcessPoolExecutor import requests import os def get_page(url): print('<進程%s> get %s' % (os.getpid(), url)) respone = requests.get(url) if respone.status_code == 200: return {'url': url, 'text': respone.text} def parse_page(res): res = res.result() print('<進程%s> parse %s' % (os.getpid(), res['url'])) parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text'])) with open('db.txt', 'a') as f: f.write(parse_res) if __name__ == '__main__': urls = ['https://www.baidu.com', 'https://www.openstack.org', 'http://www.sina.com.cn/', 'https://www.tencent.com'] p = ProcessPoolExecutor(3) for url in urls: p.submit(get_page, url).add_done_callback(parse_page) # parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是協程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的,多個任務在一條線程上來回切換。
協程:用戶級別,本身寫的py代碼;控制切換是OS不可見的
在Cpython解釋器:協程和線程都不能利用多核
線程和協程對比
協程特色
對比OS控制線程的切換,用戶在單線程內控制協程的切換
優勢以下:
缺點以下:
# 切換,協程任務 def func1(): 1 + 2 2 + 3 yield 1 sleep(1) def func2(): g = func1() next(g) next(g)
# 完成切換 import time from greenlet import greenlet def func1(): print(123) time.sleep(0.5) g2.switch() print(123) def func2(): print(456) time.sleep(0.5) print(456) g1.switch() g1 = greenlet(func1) g2 = greenlet(func2) g1.switch()
def sleep(num): t = num + time.time() yield t print('sleep 結束') g1 = sleep(1) g2 = sleep(2) t1 = next(g1) t2 = next(g2) lst = [t1, t2] min_t = min(lst) time.sleep(min_t - time.time()) g1.next() min_t = min(lst) time.sleep(min_t - time.time()) g2.next()
# gevent不認識time.sleep import time import gevent from gevent import monkey monkey.patch_all() # 可能的阻塞方法 def func1(): print(123) gevent.sleep(1) print(123) def func2(): print(456) gevnet.sleep(1) print(456) g1 = gevent.spawn(fun1) g2 = gevent.spawn(fun2) # gevent.sleep(2) # 遇到阻塞纔會切換 # g1.join() # 阻塞直到g1任務完成爲止 # g2.join() gevent.joinall([g1, g2]) g_l = [] for i in range(10): g = gevent.spawn(func1) g_l.append(g) gevent.joinall(g_l)
# 示例大變身 import gevent from gevent import monkey,time monkey.patch_all() # 對io操做進行重現實現 def func1(): print(123) time.sleep(3) print(456) def func2(): print('---------') time.sleep(1) print('=========') g1 = gevent.spawn(func1) g2 = gevent.spawn(func2) gevent.joinall([g1, g2]) # 阻塞列表中的全部協程 print('main')
print(g1.value ) # value是屬性,若是沒有執行則爲None
import socket import gevent from gevent import monkey monkey.patch_all() sk = socket.socket() sk.bind(('127.0.0.1', 9000)) sk.listen() def chat(con): while True: msg = con.recv(1024).decode('utf-8') con.send(msg.upper().encode('utf-8')) while True: con, _ = sk.accept() gevent.spawn(chat, con)
import asyncio # 只識別自身的方法 # 起一個任務 async def demo(): # 必需要async修飾,協程方法 print('start') await asyncio.sleep(1) # 阻塞,await關鍵字 + 協程函數 print('end') loop = asyncio.get_event_loop() # 建立一個事件循環 loop.run_until_complete(demo()) # 阻塞,直到協程執行完畢 # 把demo任務丟到事件循環中執行
import asyncio # 只識別自身的方法 async def demo(): # 必需要async修飾,協程方法 print('start') await asyncio.sleep(1) # 阻塞,await關鍵字 + 協程函數 print('end') loop = asyncio.get_event_loop() # 建立一個事件循環 wait_obj = asyncio.wait([demo(), demo(), demo()]) loop.run_until_complete(wait_obj) # 沒有返回值
# 同步取返回值 import asyncio async def demo(): print('start') await asyncio.sleep(1) print('end') return 123 loop = asyncio.get_event_loop() # 建立一個事件循環 t1 = loop.create_task(demo()) t2 = loop.create_task(demo()) tasks = [t1, t2] # 任務列表 wait_obj = asyncio.wait([t1, t2]) loop.run_until_complete(wait_obj) for task in tasks: print(t.result()) # 獲取返回值
import asyncio async def demo(i): print('start') await asyncio.sleep(10-i) print('end') return i, 123 async def main(): task_l = [] for i in range(10): task = asyncio.ensure_future(demo(i)) task_l.append(task) for ret in asyncio.as_compeleted(task_l): res = await ret print(res) loop = asyncio.get_event_loop() loop.run_until_compeleted(main())