伴隨視頻能夠觀看html
由於Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執行前,必須先得到GIL鎖,而後,每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把全部線程的執行代碼都給上了鎖,因此,多線程在Python中只能交替執行,即便100個線程跑在100核CPU上,也只能用到1個核。 同一時刻,只可能有一個線程在 解釋器(cpython) 上運行
git push --set-upstream origin dev git clean -d -fx git stash git pull git stash pop 當你屢次使用’git stash’命令後,你的棧裏將充滿了未提交的代碼,這時候你會對將哪一個版本應用回來有些困惑, ’git stash list’ 命令能夠將當前的Git棧信息打印出來,你只須要將找到對應的版本號,例如使用’git stash apply stash@{1}’就能夠將你指定版本號爲stash@{1}的工做取出來,當你將全部的棧都應用回來的時候,可使用’git stash clear’來將棧清空。 git push origin --delete dev git branch -d dev
5四、os和sys模塊的做用? os模塊負責程序與操做系統的交互,提供了訪問操做系統底層的接口; sys模塊負責程序與python解釋器的交互,提供了一系列的函數和變量,用於操控python的運行時環境。 閉包 LEGB def num(): return [lambda x:i*x for i in range(4)] if __name__ == '__main__': logging.debug([func(2) for func in num()]) # 答案:[6, 6, 6, 6] # 解析: 問題的本質在與python中的屬性查找規則,LEGB(local,enclousing,global,bulitin), # 在上面的例子中,i就是在閉包做用域(enclousing),而Python的閉包是 # 遲綁定 , # 這意味着閉包中用到的變量的值,是在內部函數被調用時查詢獲得的 # 因此:[lambda x: i * x for i in range(4)] # 打印出來是含有四個內存地址的列表,每一個內存地址中的i # 在在本內存中都沒有被定義,而是經過閉包做用域中的i值,當for循環執行結束後,i的值等於3,因此 # 再執行[m(2) # for m in num()]時,每一個內存地址中的i值等於3,當x等於2時,打印出來的結果都是6, # 從而獲得結果[6, 6, 6, 6]。
# 給 list 去重 li = [1, 1, 1, 23, 3, 4, 4] li_set = {}.fromkeys(li).keys() or set(li) assert list(map(lambda x:x**2,range(1,11))) == [1, 4, 9, 16, 25, 36, 49, 64, 81, 100] not_found = True def find_idx(target,li=None): low, high = 0, len(li) -1 while low<high: global not_found while not_found and low<high: if li[low] + li[high] == target: # not_found = False return low, high high -= 1 low += 1 raise BaseException('not found error') if __name__ == '__main__': li = [2, 7, 11, 15] low, high = find_idx(9,li) print(low,'--',high) # 基於生成器的單例 def singleton(cls): instance_dic = {} def wrapper(*args,**kwargs): if cls not in instance_dic: instance_dic[cls] = cls(*args, **kwargs) return instance_dic[cls] return wrapper @singleton class Utils(object): pass if __name__ == '__main__': utils_1 = Utils() utils_2 = Utils() assert utils_1 is utils_2 # 基於 __new__ 方法的 單例,跟 java 懶漢式同樣須要考慮線程安全問題 import threading import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s') class Person: _instance_lock = threading.Lock() def __new__(cls, *args, **kwargs): if not hasattr(cls,'_instance'): with cls._instance_lock: cls._instance = object.__new__(cls) return cls._instance if __name__ == '__main__': person_1 = Person() person_2 = Person() assert person_1 is person_2 import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s') def bin_find(num,li=None): li.sort() # 二分查找前提就是先要保證有序 low, high = 0, len(li) indx = None while low<=high: mid = (low+high) // 2 if li[mid] > num: high = mid-1 elif li[mid]<num: low = mid+1 else: indx = mid break return indx if __name__ == '__main__': lis = [0, 1, 3, 4, 5, 6, 7, 9, 10, 11, 12, 16, 17] logging.debug(bin_find(12,lis)) # 模擬棧操做 class Stack(object): def __init__(self): self._stack = [] def push(self,element): self._stack.append(element) def pop(self): self._stack.pop() def is_empty(self): return bool(self._stack) def top(self): try: top_value = self._stack[0] except Exception: raise ValueError('empty stack...')
import random random.shuffle random.choice random.sample random.random 青出於藍的 requests >> urllib Pillow(新) PIL(2.7 遠古時代) psutils <== process and system utilities import chardet from contextlib import contextmanager,closing reload(sys) sys.setdefaultencoding("utf-8") 在Python 3.x中很差使了 提示 name ‘reload’ is not defined 在3.x中已經被斃掉了被替換爲 import importlib importlib.reload(sys) pylint pyflakes pysonar2 Fabric import traceback sys.argv與optparse與argparse與getopt 谷歌的 fire 模塊 import dis 分析函數過程等... 代碼統計 cloc excel 讀寫 pandas + xlrd , xlsxwriter lxml shutil f-string P=NP? 方法二:堆棧的思想處理 import os url = r'C:\Users\Mr.Wang\PycharmProjects\untitled\python基礎' lis = [url] while lis: url = lis.pop() ret_list = os.listdir(url) for name in ret_list: abs_path = os.path.join(url,name) if os.path.isdir(abs_path): lis.append(abs_path) else:print(name) #生成 隨機 6 位 驗證碼: def six_token(): import string li = list(map(lambda x:x if isinstance(x,str) else str(x),range(10))) li.extend(string.ascii_letters) return ''.join(random.sample(li,6)) 8一、代碼實現隨機發紅包功能 import random def red_packge(money,num): li = random.sample(range(1,money*100),num-1) li.extend([0,money*100]) li.sort() return [(li[index+1]-li[index])/100 for index in range(num)] ret = red_packge(100,10) print(ret) --------------------------生成器版------------------------------------------- import random def red_packge(money,num): li = random.sample(range(1,money*100),num-1) li.extend([0,money*100]) li.sort() for index in range(num): yield (li[index+1]-li[index])/100 ret = red_packge(100,10) print(ret) 8四、Python是如何進行內存管理的? 從三個方面來講,一對象的引用計數機制,二垃圾回收機制,三內存池機制 1、對象的引用計數機制 Python內部使用引用計數,來保持追蹤內存中的對象,全部對象都有引用計數。 引用計數增長的狀況: 1,一個對象分配一個新名稱 2,將其放入一個容器中(如列表、元組或字典) 引用計數減小的狀況: 1,使用del語句對對象別名顯示的銷燬 2,引用超出做用域或被從新賦值 sys.getrefcount( )函數能夠得到對象的當前引用計數 多數狀況下,引用計數比你猜想得要大得多。對於不可變數據(如數字和字符串),解釋器會在程序的不一樣部分共享內存,以便節約內存。 2、垃圾回收 1,當一個對象的引用計數歸零時,它將被垃圾收集機制處理掉。 2,當兩個對象a和b相互引用時,del語句能夠減小a和b的引用計數,並銷燬用於引用底層對象的名稱。然而因爲每一個對象都包含一個對其餘對象的應用,所以引用計數不會歸零,對象也不會銷燬。(從而致使內存泄露)。爲解決這一問題,解釋器會按期執行一個循環檢測器,搜索不可訪問對象的循環並刪除它們。 3、內存池機制 Python提供了對內存的垃圾收集機制,可是它將不用的內存放到內存池而不是返回給操做系統。 1,Pymalloc機制。爲了加速Python的執行效率,Python引入了一個內存池機制,用於管理對小塊內存的申請和釋放。 2,Python中全部小於256個字節的對象都使用pymalloc實現的分配器,而大的對象則使用系統的malloc。 3,對於Python對象,如整數,浮點數和List,都有其獨立的私有內存池,對象間不共享他們的內存池。也就是說若是你分配又釋放了大量的整數,用於緩存這些整數的內存就不能再分配給浮點數。 2八、Python垃圾回收機制? python採用的是引用計數機制爲主,標記-清除和分代收集(隔代回收、分代回收)兩種機制爲輔的策略 計數機制 Python的GC模塊主要運用了引用計數來跟蹤和回收垃圾。在引用計數的基礎上,還能夠經過「標記-清除」 解決容器對象可能產生的循環引用的問題。經過分代回收以空間換取時間進一步提升垃圾回收的效率。 標記-清除: 標記-清除的出現打破了循環引用,也就是它只關注那些可能會產生循環引用的對象 缺點:該機制所帶來的額外操做和須要回收的內存塊成正比。 隔代回收 原理:將系統中的全部內存塊根據其存活時間劃分爲不一樣的集合,每個集合就成爲一個「代」, 垃圾收集的頻率隨着「代」的存活時間的增大而減少。也就是說,活得越長的對象,就越不多是垃圾, 就應該減小對它的垃圾收集頻率。那麼如何來衡量這個存活時間:一般是利用幾回垃圾收集動做來衡量, 若是一個對象通過的垃圾收集次數越多,能夠得出:該對象存活時間就越長。
ip代理java
import inspect def a(a, b=0, *c, d, e=1, **f): pass aa = inspect.signature(a) print("inspect.signature(fn)是:%s" % aa) print("inspect.signature(fn)的類型:%s" % (type(aa))) print("\n") bb = aa.parameters print("signature.paramerters屬性是:%s" % bb) print("ignature.paramerters屬性的類型是%s" % type(bb)) print("\n") for cc, dd in bb.items(): print("mappingproxy.items()返回的兩個值分別是:%s和%s" % (cc, dd)) print("mappingproxy.items()返回的兩個值的類型分別是:%s和%s" % (type(cc), type(dd))) print("\n") ee = dd.kind print("Parameter.kind屬性是:%s" % ee) print("Parameter.kind屬性的類型是:%s" % type(ee)) print("\n") gg = dd.default print("Parameter.default的值是: %s" % gg) print("Parameter.default的屬性是: %s" % type(gg)) print("\n") ff = inspect.Parameter.KEYWORD_ONLY print("inspect.Parameter.KEYWORD_ONLY的值是:%s" % ff) print("inspect.Parameter.KEYWORD_ONLY的類型是:%s" % type(ff))
import inspect def func_a(arg_a, *args, arg_b='hello', **kwargs): print(arg_a, arg_b, args, kwargs) class Fib: def __init__(self,n): a, b = 0, 1 i = 0 self.fib_list = [] while i<n: self.fib_list.append(a) a, b = b, a+b i+=1 def __getitem__(self, item): return self.fib_list[item] if __name__ == '__main__': fib = Fib(5) print(fib[0:3]) # 獲取函數簽名 func_signature = inspect.signature(func_a) func_args = [] # 獲取函數全部參數 for k, v in func_signature.parameters.items(): # 獲取函數參數後,須要判斷參數類型 # 當kind爲 POSITIONAL_OR_KEYWORD,說明在這個參數以前沒有任何相似*args的參數,那這個函數能夠經過參數位置或者參數關鍵字進行調用 # 這兩種參數要另外作判斷 if str(v.kind) in ('POSITIONAL_OR_KEYWORD', 'KEYWORD_ONLY'): # 經過v.default能夠獲取到參數的默認值 # 若是參數沒有默認值,則default的值爲:class inspect_empty # 因此經過v.default的__name__ 來判斷是否是_empty 若是是_empty表明沒有默認值 # 同時,由於類自己是type類的實例,因此使用isinstance判斷是否是type類的實例 if isinstance(v.default, type) and v.default.__name__ == '_empty': func_args.append({k: None}) else: func_args.append({k: v.default}) # 當kind爲 VAR_POSITIONAL時,說明參數是相似*args elif str(v.kind) == 'VAR_POSITIONAL': args_list = [] func_args.append(args_list) # 當kind爲 VAR_KEYWORD時,說明參數是相似**kwargs elif str(v.kind) == 'VAR_KEYWORD': args_dict = {} func_args.append(args_dict) print(func_args)
from collections import defaultdict import logging logging.basicConfig(level=logging.DEBUG) def group_by_firstletter(words=None): word_dict = {} for word in words: first_letter = word[0] if first_letter in word_dict: word_dict[first_letter] += 1 else: word_dict[first_letter] = 1 return word_dict def group_by_firstletter2(words=None): default_word_dict = defaultdict(int) for word in words: default_word_dict[word[0]]+=1 return default_word_dict def group_by_firstletter3(words=None): words_dict = {} for word in words: if word[0] in words_dict: words_dict[word[0]].append(word) else: words_dict[word[0]] = [word] return words_dict def group_by_firstletter4(words=None): default_word_dict = defaultdict(list) for word in words: default_word_dict[word[0]].append(word) return default_word_dict if __name__ == '__main__': words = ['apple', 'bat', 'bar', 'atom', 'book'] logging.info(group_by_firstletter(words)) logging.info(group_by_firstletter2(words)) logging.info(group_by_firstletter3(words)) logging.info(group_by_firstletter4(words)) from collections import Iterator, Iterable from collections import defaultdict from collections import Counter, ChainMap, OrderedDict, namedtuple, deque from itertools import islice # 替代 切片,可是隻能 是正數 from itertools import zip_longest # 替代 zip 能夠 對不同個數的 進行迭代 from concurrent.futures import ThreadPoolExecutor as Pool from collections import namedtuple, deque, defaultdict, OrderedDict, ChainMap, Counter Point = namedtuple('Poing',['x','y','z']) p = Point(1,2,3) print(p.x,'--',p.y,'--',p.z) # 雙向列表 dq = deque([1,2,3,4]) dq.append(5) dq.appendleft('a') dq.popleft() default_dict = defaultdict(lambda:'N/A') # 多了一個默認值 default_dict['name']='frank' default_dict['age'] od = OrderedDict([('b',1),('a',2),('c',3)]) # 按照插入的順序有序 od.get('a') # 能夠實現一個FIFO(先進先出)的dict,當容量超出限制時,先刪除最先添加的Key from collections import OrderedDict class LastUpdatedOrderedDict(OrderedDict): def __init__(self, capacity): super(LastUpdatedOrderedDict, self).__init__() self._capacity = capacity def __setitem__(self, key, value): containsKey = 1 if key in self else 0 if len(self) - containsKey >= self._capacity: last = self.popitem(last=False) print('remove:', last) if containsKey: del self[key] print('set:', (key, value)) else: print('add:', (key, value)) OrderedDict.__setitem__(self, key, value) # 應用場景 設置參數優先級 from collections import ChainMap import os, argparse # 構造缺省參數: defaults = { 'color': 'red', 'user': 'guest' } # 構造命令行參數: parser = argparse.ArgumentParser() parser.add_argument('-u', '--user') parser.add_argument('-c', '--color') namespace = parser.parse_args() command_line_args = { k: v for k, v in vars(namespace).items() if v } # 組合成ChainMap: combined = ChainMap(command_line_args, os.environ, defaults) # 打印參數: print('color=%s' % combined['color']) print('user=%s' % combined['user'])
# itertools from itertools import count, repeat, cycle, chain, takewhile, groupby def times_count(base,n): for x in count(base): if n<=0: break yield str(x) n-=1 def times_repeat(s,n): return '-'.join(repeat(s,n)) def times_cycle(s,n): for v in cycle(s): if n<= 0: break yield s n-=1 if __name__ == '__main__': print(times_repeat('*',3)) for s in times_cycle('ABC',3): print(s) r = ','.join(chain('ABC', 'XYZ')) print(r) print(','.join(times_count(5,3))) print(','.join( takewhile(lambda x:int(x)<10, times_count(1,30)))) group_dict = {key:list(group) for key, group in groupby(['abort','abandon','book','cook','bird'], lambda ch: ch[0].upper())} print(group_dict) # -*- coding: utf-8 -*- import itertools from functools import reduce def pi(N): ' 計算pi的值 ' # step 1: 建立一個奇數序列: 1, 3, 5, 7, 9, ... odd_iter = itertools.count(1, 2) # step 2: 取該序列的前N項: 1, 3, 5, 7, 9, ..., 2*N-1. odd_head = itertools.takewhile(lambda n: n <= 2 * N - 1, odd_iter) # print(list(odd_head),end=',') # step 3: 添加正負符號並用4除: 4/1, -4/3, 4/5, -4/7, 4/9, ... odd_final = [4 / n * ((-1) ** i) for i, n in enumerate(odd_head)] # step 4: 求和: value = reduce(lambda x, y: x + y, odd_final) return value # 測試: print(pi(10)) print(pi(100)) print(pi(1000)) print(pi(10000)) assert 3.04 < pi(10) < 3.05 assert 3.13 < pi(100) < 3.14 assert 3.140 < pi(1000) < 3.141 assert 3.1414 < pi(10000) < 3.1415 print('ok')
# -*- coding: utf-8 -*- __author__ = 'Frank Li' import socket server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server.bind(('127.0.0.1',6666)) clients = set() print('server bind 127.0.0.1:6666...') while 1: try: data,addr = server.recvfrom(1024) clients.add(addr) if not data or data.decode('utf-8')=='pong': continue print('%s:%s >>> %s' % (addr[0],addr[1],data.decode('utf-8'))) for usr in clients: if usr!=addr: server.sendto(('%s:%s >>> %s' % (addr[0],addr[1],data.decode('utf-8'))).encode('utf-8'),usr) except Exception as e: pass ######################################################## # -*- coding: utf-8 -*- __author__ = 'Frank Li' import socket,threading,os client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) client.sendto(b'pong',('127.0.0.1',6666)) def myinput(): while 1: try: msg = input('>>>') yield msg except Exception as e: os._exit(0) def getMsg(client): while 1: try: r = client.recv(1024) print('\n',r.decode('utf-8'),'\n>>>',end='') except Exception as e: pass c = myinput() def sendMsg(msg): while 1: msg = next(c) client.sendto(msg.encode('utf-8'),('127.0.0.1',6666)) threading.Thread(target=sendMsg,args=(client,)).start() threading.Thread(target=getMsg,args=(client,)).start()
def my_dict2obj(d=None): if not isinstance(d,dict): raise TypeError('only dict supported...') class obj: def __init__(self,d=None): self.d = d for key, value in d.items(): if isinstance(value,(tuple,list)): setattr(self,key,[obj(i) if isinstance(i,dict) else i for i in value]) else: setattr(self,key,obj(value) if isinstance(value, dict) else value) # def __str__(self): # return '{}'.format(self.d) # __repr__ = __str__ return obj(d) if __name__ == '__main__': d = {'a': 1, 'b': {'c': 2}, 'd': ["hi", {'foo': "bar"}]} x = my_dict2obj(d) print(x.__dict__) # 拆箱,解包 *p, q = d.items() print(p) print(q)
from html.parser import HTMLParser from html.entities import name2codepoint class MyHTMLParser(HTMLParser): def handle_starttag(self, tag, attrs): print('<%s>' % tag) def handle_endtag(self, tag): print('</%s>' % tag) def handle_startendtag(self, tag, attrs): print('<%s/>' % tag) def handle_data(self, data): print(data) def handle_comment(self, data): print('<!--', data, '-->') def handle_entityref(self, name): print('&%s;' % name) def handle_charref(self, name): print('&#%s;' % name) parser = MyHTMLParser() parser.feed('''<html> <head></head> <body> <!-- test html parser --> <p>Some <a href=\"#\">html</a> HTML tutorial...<br>END</p> </body></html>''')
import lxml from xml.parsers.expat import ParserCreate class DefaultSaxHandler(object): def start_element(self, name, attrs): print('sax:start_element: %s, attrs: %s' % (name, str(attrs))) def end_element(self, name): print('sax:end_element: %s' % name) def char_data(self, text): print('sax:char_data: %s' % text) xml = r'''<?xml version="1.0"?> <ol> <li><a href="/python">Python</a></li> <li><a href="/ruby">Ruby</a></li> </ol> ''' handler = DefaultSaxHandler() parser = ParserCreate() parser.StartElementHandler = handler.start_element parser.EndElementHandler = handler.end_element parser.CharacterDataHandler = handler.char_data parser.Parse(xml)
# datetime from datetime import datetime,timedelta now = datetime.now() # datetime 轉 timestamp now_timestamp = now.timestamp() # timestampe 轉本地 datetime dt_local = datetime.fromtimestamp(now_timestamp) # timestampe 轉utc datetime dt_utc = datetime.utcfromtimestamp(now_timestamp) # 時間戳 沒有時區, datetime中攜帶 print(dt_local.timestamp(),'<-->',dt_utc.timestamp()) print('{}\n{}\n{}\n{}'.format(now,now_timestamp,dt_local,dt_utc)) # 獲取指定 日期和時間 year = 2019 month =3 day =3 hour = 15 minute = 7 dt_specified = datetime(year,month,day,hour,minute) print(dt_specified) # str 轉 datetime str parse datetime_str = '2019-03-03 15:22:00' datetime_parse_format = '%Y-%m-%d %H:%M:%S' cday = datetime.strptime(datetime_str,datetime_parse_format) print(cday) # datetime 轉 str str format print(cday.strftime('%Y/%m/%d')) # 日期變化(delta) 用 timedelta now = datetime.now() now_next3_hours = now+timedelta(hours=3) now_previous3_days = now+timedelta(days=-3) print('next 3 hours: {}'.format(now_next3_hours)) print('now_previous3_days: {}'.format(now_previous3_days)) from datetime import timezone tz_utc_8 = timezone(timedelta(hours=8)) now = datetime.now() # 一開始 now 時區信息爲 None print(now.tzinfo) # 暴力設置一個時區 now.replace(tzinfo=tz_utc_8) print(now) utc_now = datetime.utcnow() # 一開始這玩意兒壓根木有時區信息啊 print(utc_now.tzinfo) # 暴力設置時區信息 utc_now = utc_now.replace(tzinfo=timezone.utc) #北京日期時間 東八區 bj_dt = utc_now.astimezone(timezone(timedelta(hours=8))) # 西八區 pst_dt = utc_now.astimezone(timezone(timedelta(hours=-8))) # 東 9 區 tokyo_dt = utc_now.astimezone(timezone(timedelta(hours=9))) print('bj_dt: ',bj_dt) print('pst_dt: ',pst_dt) print('tokyo_dt: ',tokyo_dt) from datetime import datetime, timezone,timedelta import re def to_timestamp(dt_str,tz_str): re_dt_str_1 = r'\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}' re_tz_str = r'^UTC([+-])(\d{1,2}):\d{2}$' tz_grps = re.match(re_tz_str,tz_str).groups() sign = tz_grps[0] hours = int(tz_grps[1]) if re.match(re_dt_str_1,dt_str): dt = datetime.strptime(dt_str,'%Y-%m-%d %H:%M:%S') if sign=='+': tz_info_x = timezone(timedelta(hours=hours)) else: tz_info_x = timezone(timedelta(hours=-hours)) dt = dt.replace(tzinfo=tz_info_x) else: print('re is wrong!') return dt.timestamp() # 測試: t1 = to_timestamp('2015-6-1 08:10:30', 'UTC+7:00') assert t1 == 1433121030.0, t1 t2 = to_timestamp('2015-5-31 16:10:30', 'UTC-09:00') assert t2 == 1433121030.0, t2 print('ok')
digital_dict = {'0':0,'1':1,'2':2,'3':3,'4':4,'5':5,'6':6,'7':7,'8':8,'9':9} from functools import reduce def str2int(s): return reduce(lambda x,y:x*10+y,map(lambda x:digital_dict.get(x),s)) str2int('13579') def _odd_iter(): n = 1 while True: n = n + 2 yield n def _not_divisible(n): return lambda x: x % n > 0 def primes(): yield 2 it = _odd_iter() # 初始序列 while True: n = next(it) # 返回序列的第一個數 yield n it = filter(_not_divisible(n), it) # 構造新序列 # 打印1000之內的素數: for n in primes(): if n < 1000: print(n) else: break def _odd_iter3(): n = 3 while True: yield n n+=2 def _not_divisible_3(n): return lambda x:x%n>0 def prime_iter3(): yield 2 it = _odd_iter() while True: base_num = next(it) yield base_num it = filter(lambda x,y=base_num:x%y>0,it) for i in prime_iter3(): if i>50: break else: print(i,end=',') # -*- coding: utf-8 -*- L = [('Bob', 75), ('Adam', 92), ('Bart', 66), ('Lisa', 88)] def by_score(x): return x[1] def by_name(x): return x[0] sorted(L,key=by_score,reverse=True) sorted(L,key=by_name,reverse=True) def createCounter(): count = 0 def counter(): nonlocal count count += 1 return count return counter def createCounter(): def f(): n=1 while True: yield n n +=1 g=f() def counter(): return next(g) return counter # 測試: counterA = createCounter() print(counterA(), counterA(), counterA(), counterA(), counterA()) # 1 2 3 4 5 counterB = createCounter() if [counterB(), counterB(), counterB(), counterB()] == [1, 2, 3, 4]: print('測試經過!') else: print('測試失敗!') def createCounter(): x = 0 def counter(): nonlocal x x += 1 return x return counter from collections import Counter Counter(s=3, c=2, e=1, u=1) Counter({'s': 3, 'c': 2, 'u': 1, 'e': 1}) some_data=('c', '2', 2, 3, 5, 'c', 'd', 4, 5, 'd', 'd') Counter(some_data).most_common(2) [('d', 3), ('c', 2)] some_data=['c', '2', 2, 3, 5, 'c', 'd', 4, 5, 'd', 'd'] Counter(some_data).most_common(2) [('d', 3), ('c', 2)] some_data={'c', '2', 2, 3, 5, 'c', 'd', 4, 5, 'd', 'd'} Counter(some_data).most_common(2) [('c', 1), (3, 1)]
# 事實證實,全部序列操做都應該會先走特定的魔法函數,而後實在沒有轉入 __getitem__ from collections.abc import Iterable, Iterator from types import GeneratorType from contextlib import contextmanager class Company: def __init__(self,employee_list): self.employee_list = employee_list # 序列相關 def __getitem__(self, item): print('getitem executed...') cls = type(self) if isinstance(item,slice): return cls(self.employee_list[item]) elif isinstance(item,int): return cls([self.employee_list[item]]) def __setitem__(self, key, value): self.employee_list[key] = value def __delitem__(self, key): del self.employee_list[key] def __len__(self): print('len executed...') return len(self.employee_list) def __contains__(self, item): print('contains executed...') return item in self.employee_list # 迭代相關 # 實現了 __iter__ 僅僅是刻碟帶對象 (Iterable) def __iter__(self): print('iter executed...') return iter(self.employee_list) # 實現 __next__ 僅僅只是迭代器(Iterator)不是生成器 def __next__(self): print('next executed...') pass # 可調用 def __call__(self, *args, **kwargs): print('__call__ executed...') pass # 上下文管理 def __enter__(self): # self.fp = open('xxx') print('__enter__ executed...') pass def __exit__(self, exc_type, exc_val, exc_tb): print('__exit__ executed...') pass # 釋放資源等操做 self.fp.close() @contextmanager def Resource(self): self.fp = open('./sample.csv') yield self.fp self.fp.close() def __repr__(self): return ','.join(self.employee_list) __str__ = __repr__ if __name__ == '__main__': company = Company(['Frank','Tom','May']) company() for employee in company: print(employee) print(company[1:]) print(isinstance(company,Iterable)) print(isinstance(company,Iterator)) print(isinstance(company,GeneratorType)) print(isinstance((employee for employee in company),GeneratorType)) print(len(company)) print('Jim' in company) class MyVector(object): def __init__(self,x,y): self.x = x self.y = y def __add__(self, other): cls = type(self) return cls(self.x+other.x, self.y+other.y) def __repr__(self): return '({},{})'.format(self.x,self.y) def __str__(self): return self.__repr__() if __name__ == '__main__': vector1 = MyVector(1,2) vector2 = MyVector(2,3) assert str(vector1+vector2) == '(3,5)' assert (vector1+vector2).__repr__() == '(3,5)' import abc class CacheBase(metaclass=abc.ABCMeta): @abc.abstractmethod def set(self,key): pass @abc.abstractmethod def get(self,value): pass class RedisCache(CacheBase): pass # 實際用抽象基類很少,更多的是用的 mixin 作法 鴨子類型,能夠參考 Django restfulAPI framework if __name__ == '__main__': redis_cache = RedisCache() # TypeError: Can't instantiate abstract class RedisCache with abstract methods get, set from collections import namedtuple,defaultdict,deque,Counter,OrderedDict,ChainMap # named_tuple def test(): User = namedtuple('User',['name','age','height','edu']) user_tuple = ('Frank',18,180,'master') user_dict = dict(name='Tom',age=20,height=175,edu='PHD') user = User._make(user_tuple) user = User._make(user_dict) print(','.join(map(lambda x:str(x) if not isinstance(x,str) else x,user))) ordered_user_dict = user._asdict() print(ordered_user_dict) # default dict def test2(): user_dict = {} user_list = ['frank','tom','tom','jim','Tom'] for user in user_list: u = user.lower() user_dict.setdefault(u,0) user_dict[u]+=1 # if not u in user_dict: # user_dict[u] = 1 # else: # user_dict[u]+=1 print(user_dict) def gen_default_0(): return 0 def test3(): user_dict = defaultdict(int or gen_default_0 or (lambda :0)) user_list = ['frank','tom','Tom','jim'] for user in user_list: u = user.lower() user_dict[u]+=1 print(user_dict) # deque 線程安全 def test4(): dq = deque(['a','b','c']) dq.appendleft('1') print(dq) dq.extendleft(['e','f','g']) print(dq) dq.popleft() print(dq) dq.insert(0,'g') print(dq) # Counter def test5(): user_list = ['frank','tom','tom','jim'] user_counter = Counter(user_list) print(user_counter.most_common(2)) alpha_counter = Counter('abccddadfaefedasdfwewefwfsfsfadadcdffghethethklkijl') alpha_counter.update('fsfjwefjoe9uefjsljdfljdsoufbadflfmdlmjjdsnvdljflasdj') print(alpha_counter.most_common(3)) #OrderedDict 只是說按照插入順序有序。。。!!! def test6(): ordered_dict = OrderedDict() ordered_dict['b'] = '2' ordered_dict['a'] = '1' ordered_dict['c'] = '3' # print(ordered_dict.popitem(last=False)) # last=True 從最後一個開始pop 不然從第一個開始 # print(ordered_dict.pop('a')) # 返回 被 pop 掉對應的 value ordered_dict.move_to_end('b') #將指定 key 的 鍵值對移到最後位置 print(ordered_dict) # 將多個 dict 串成鏈 車珠子。。。 def test7(): user_dict_1 = dict(a=1,b=2) user_dict_2 = dict(b=3,c=5) # 兩個出現一樣key,採起第一次出現的value chain_map = ChainMap(user_dict_1,user_dict_2) new_chain_map = chain_map.new_child({'d': 6, 'e': 7, 'f': 8}) for key, value in chain_map.items(): print('{}--->{}'.format(key,value)) print('*'*100) for key, value in new_chain_map.items(): print('{}--->{}'.format(key,value)) if __name__ == '__main__': test() test2() test3() test4() test5() test6() test7() import inspect def func_a(arg_a, *args, arg_b='hello', **kwargs): print(arg_a, arg_b, args, kwargs) if __name__ == '__main__': # 獲取函數簽名 func_signature = inspect.signature(func_a) func_args = [] # 獲取函數全部參數 for k, v in func_signature.parameters.items(): # 獲取函數參數後,須要判斷參數類型 # 當kind爲 POSITIONAL_OR_KEYWORD,說明在這個參數以前沒有任何相似*args的參數,那這個函數能夠經過參數位置或者參數關鍵字進行調用 # 這兩種參數要另外作判斷 if str(v.kind) in ('POSITIONAL_OR_KEYWORD', 'KEYWORD_ONLY'): # 經過v.default能夠獲取到參數的默認值 # 若是參數沒有默認值,則default的值爲:class inspect_empty # 因此經過v.default的__name__ 來判斷是否是_empty 若是是_empty表明沒有默認值 # 同時,由於類自己是type類的實例,因此使用isinstance判斷是否是type類的實例 if isinstance(v.default, type) and v.default.__name__ == '_empty': func_args.append({k: None}) else: func_args.append({k: v.default}) # 當kind爲 VAR_POSITIONAL時,說明參數是相似*args elif str(v.kind) == 'VAR_POSITIONAL': args_list = [] func_args.append(args_list) # 當kind爲 VAR_KEYWORD時,說明參數是相似**kwargs elif str(v.kind) == 'VAR_KEYWORD': args_dict = {} func_args.append(args_dict) print(func_args)
import random def random_line(cols): alphabet_list = [chr(i) for i in range(65, 91, 1)] + [chr(i) for i in range(97, 123, 1)] # for i in range(cols): # yield random.choice(alphabet_list) return (random.choice(alphabet_list) for i in range(cols)) def randome_generate_file(file_path='./sample.csv',lines=10000,cols=1000): with open(file_path,'w') as fw: for i in range(lines): fw.write(','.join(random_line(cols))) fw.write('\n') fw.flush() def load_list_data(file_path='./sample.csv',total_num=10000,target_num=1000): all_data = [] target_data = [] with open(file_path,'r') as fr: for count, line in enumerate(fr): if count > total_num: break else: all_data.append(line) while len(target_data)<=target_num: index = random.randint(0,total_num) if all_data[index] not in target_data: target_data.append(all_data[index]) return all_data, target_data def load_dict_data(file_path='./sample.csv',total_num=10000,target_num=1000): all_data = {} target_data = [] with open(file_path,encoding='utf8',mode='r') as fr: for idx, line in enumerate(fr): if idx>total_num: break all_data[line]=0 all_data_list = list(all_data) while len(target_data)<=target_num: random_index = random.randint(0,total_num) if all_data_list[random_index] not in target_data: target_data.append(all_data_list[random_index]) return all_data, target_data def find_test(all_data,target_data): test_times = 100 total_times_cnt = 0 import time for t in range(test_times): start = time.time() for item in target_data: if item in all_data: pass cost_once = time.time() - start total_times_cnt+= cost_once return total_times_cnt / test_times if __name__ == '__main__': # randome_generate_file() # all_data, target_data = load_list_data() all_data, target_data = load_dict_data() last_time = find_test(all_data,target_data) print(last_time)
# 第一章 一切皆對象 from functools import wraps import time def time_decor(func): @wraps(func) def wrapper_func(*args,**kw): start = time.time() result = func(*args,**kw) end = time.time() print('{} cost {:.2f} s '.format(func.__name__,end-start)) return result return wrapper_func @time_decor def ask(name): print(name) class Person: def __init__(self,name): print('hi, '+name) my_ask = ask my_ask('frank') print(type(my_ask)) person = Person('frank') print(person) print('*'*100) class_list = [] class_list.append(my_ask) class_list.append(Person) for item in class_list: item('tom')
>>> type(type) <class 'type'> >>> object.__bases__ () >>> type.__bases__ (<class 'object'>,) >>> type(object) <class 'type'> type 產生 type 類自己的 實例 產生 object 類, dict 等內建類, class 爲萬物之始,包括 type(object), class 生 object 只道法天然 str <-- 'abc' object 是全部對象的 基類包括 type.__bases__, object.__bases__ 之上再無父類 python 是基於協議的編程語言,因其動態語言的特性,也使得python開發效率極高,但同時也會容易產生不少問題,由於一切皆對象包括類自己,不少問題只有在運行時才能檢測出來, 而像JAVA 這種靜態語言,在編譯時候就可以檢測出問題,如:類型檢測等 第三章 魔法函數 def my_hex(num): alpha_list = ['A', 'B', 'C', 'D', 'E', 'F'] hex_list = [] while True: mod_, num = num%16, num//16 hex_list.append(alpha_list[mod_-10] if mod_>9 else mod_) if num==0: break hex_list.append('0x') hex_list.reverse() return ''.join(map(lambda x:str(x) if not isinstance(x,str) else x,hex_list)) def my_octonary(num): octonary_list = [] while True: mod_, num = num%8, num//8 octonary_list.append(str(mod_)) if num==0: break octonary_list.append('0o') octonary_list.reverse() return ''.join(octonary_list) print(hex(60)) print(my_hex(60)) print(oct(9)) print(my_octonary(9)) def fac(n,res): if n==1: return res else: return fac(n-1,n*res) print(fac(6,1)) d = {'a': 1, 'b': {'c': 2}, 'd': ["hi", {'foo': "bar"}]} def my_dict2obj(args): class obj(object): def __init__(self,d): for key,value in d.items(): if not isinstance(value,(list,tuple)): setattr(self,key,obj(value) if isinstance(value,dict) else value) else: setattr(self,key,[obj(i) if isinstance(i,dict) else i for i in value]) return obj(args) x = my_dict2obj(d) print(x.__dict__) words = ['apple','bat','bar','atom','book'] alpha_dict = {} for word in words: word_list = [] if word[0] not in alpha_dict: word_list.append(word) alpha_dict[word[0]] = word_list else: alpha_dict[word[0]].append(word) print(alpha_dict) from collections import namedtuple stock_list = [['AAPL','10.30','11.90'],['YAHO','9.23','8.19'],['SINA','22.80','25.80']] stock_info = namedtuple('stock_info',['name','start','end']) stock_list_2 = [stock_info(name,start,end) for name,start,end in stock_list ] print(stock_list_2) from collections import namedtuple Card = namedtuple('Card',['suit','rank']) class French_Deck(): rank = [i for i in range(2,11,1)]+['J','Q','K','A'] suit = 'Spade,Club,Heart,Diamond'.split(r',') def __init__(self): self._card = [Card(s,r) for r in French_Deck.rank for s in French_Deck.suit] def __getitem__(self, item): if isinstance(item,int): return self._card[item] elif isinstance(item,slice): return self._card[item] def __len__(self): return len(self._card) frenck_deck = French_Deck() print(frenck_deck[1:3])
# -*- coding: utf-8 -*- import numbers import bisect class Group(object): # 支持切片 def __init__(self,group_name,company_name,staffs): self.group_name = group_name self.company_name = company_name self.staffs = staffs def __reversed__(self): self.staffs.reverse() def __getitem__(self, item): cls = type(self) if isinstance(item,slice): return cls(group_name=self.group_name,company_name=self.company_name,staffs=self.staffs[item]) elif isinstance(item,numbers.Integral): return cls(group_name=self.group_name,company_name=self.company_name,staffs=[self.staffs[item]]) def __len__(self): return len(self.staffs) def __iter__(self): return iter(self.staffs) def __contains__(self, item): return item in self.staffs if __name__ == '__main__': group = Group(group_name='AI Team',company_name='Intel',staffs=['Frank','Tom','Jim']) print(len(group)) print(group[2].staffs) reversed(group) # 反轉 for item in group[1:]: print(item)
# -*- coding: utf-8 -*- import bisect from collections import deque def test(): insert_seq = deque() bisect.insort(insert_seq,3) bisect.insort(insert_seq,2) bisect.insort(insert_seq,4) return insert_seq if __name__ == '__main__': res = test() print(res) # 應該 print(bisect.bisect(res,7)) #bisect = bisect_right # backward compatibility print(res)
import array my_array = array.array('i') for i in range(10): my_array.append(i) print(my_array) my_list = ['person1','person2'] my_dict = dict.fromkeys(my_list,[{'name':'frank'},{'name':'tom'}]) print(my_dict)
# -*- coding: utf-8 -*- from pandas import DataFrame import numpy as np def test(): df = DataFrame(np.arange(12).reshape(3,4),columns=['col1','col2','col3','col4']) return df if __name__ == '__main__': df = test() df.iloc[0:1,0:1] = None print(df) df.dropna(axis=0,how='all',subset=['col1'],inplace=True) # col for col in df.columns if col.startswith('col') print(df) gene_cols_tmp = [col for col in df.columns if col.startswith('gene_')] for col in gene_cols_tmp: df[col]=df.apply(lambda x: None if x==0 else x, axis=1)
描述符分爲數據描述符和非數據描述符。把至少實現了內置屬性__set__()和__get__()方法的描述符稱爲數據描述符;把實現了除__set__()之外的方法的描述符稱爲非數據描述符。之因此要區分描述符的種類,主要是由於它在代理類屬性時有着嚴格的優先級限制。例如當使用數據描述符時,由於數據描述符大於實例屬性,因此當咱們實例化一個類並使用該實例屬性時,該實例屬性已被數據描述符代理,此時咱們對該實例屬性的操做是對描述符的操做。描述符的優先級的高低以下: 類屬性 > 數據描述符 > 實例屬性 > 非數據描述符 > 找不到的屬性觸發__getattr__()
# -*- coding: utf-8 -*- __author__ = 'Frank Li' import random class Field(object): def __init__(self,name,column_type,is_pk,default): self.name = name self.column_type = column_type self.is_pk = is_pk self.default = default class IntField(Field): def __init__(self,name=None,column_type='bigint',is_pk=True,default=0): super(IntField,self).__init__(name,column_type,is_pk,default) def __get__(self, instance, owner): print('get in data descriptor...') def gen_id(): print('get in User class __dict__...') return random.randint(0,10) class User: id = IntField() # rand_id = gen_id() # # def __init__(self,name): # print('get in user instance __dict__ ...') # self.name = name if __name__ == '__main__': user = User() user.id # user.rand_id
# 此處省略一萬字
# -*- coding: utf-8 -*- __author__ = 'Frank Li' import random class Field(object): def __init__(self,name,column_type,is_pk,default): self.name = name self.column_type = column_type self.is_pk = is_pk self.default = default class IntField(Field): def __init__(self,name=None,column_type='bigint',is_pk=True,default=0): super(IntField,self).__init__(name,column_type,is_pk,default) def __get__(self, instance, owner): print('get in data descriptor...') def gen_id(): print('get in User class __dict__...') return random.randint(0,10) class User: # id = IntField() rand_id = gen_id() # # def __init__(self,name): # print('get in user instance __dict__ ...') # self.name = name if __name__ == '__main__': user = User() user.rand_id # user.rand_id
from collections import namedtuple User = namedtuple('User',['name','age','height','edu']) user_tuple = ('Frank',18,180,'master') user_dict = dict(name='Tom',age=20,height=175,edu='PHD') user = User._make(user_tuple) print(','.join(map(lambda x:str(x) if not isinstance(x,str) else x,user))) ordered_user_dict = user._asdict() print(ordered_user_dict)
from collections import namedtuple,defaultdict,deque,Counter,OrderedDict,ChainMap # named_tuple def test(): User = namedtuple('User',['name','age','height','edu']) user_tuple = ('Frank',18,180,'master') user_dict = dict(name='Tom',age=20,height=175,edu='PHD') user = User._make(user_tuple) user = User._make(user_dict) print(','.join(map(lambda x:str(x) if not isinstance(x,str) else x,user))) ordered_user_dict = user._asdict() print(ordered_user_dict) # default dict def test2(): user_dict = {} user_list = ['frank','tom','tom','jim','Tom'] for user in user_list: u = user.lower() user_dict.setdefault(u,0) user_dict[u]+=1 # if not u in user_dict: # user_dict[u] = 1 # else: # user_dict[u]+=1 print(user_dict) def gen_default_0(): return 0 def test3(): user_dict = defaultdict(int or gen_default_0 or (lambda :0)) user_list = ['frank','tom','Tom','jim'] for user in user_list: u = user.lower() user_dict[u]+=1 print(user_dict) # deque 線程安全 def test4(): dq = deque(['a','b','c']) dq.appendleft('1') print(dq) dq.extendleft(['e','f','g']) print(dq) dq.popleft() print(dq) dq.insert(0,'g') print(dq) # Counter def test5(): user_list = ['frank','tom','tom','jim'] user_counter = Counter(user_list) print(user_counter.most_common(2)) alpha_counter = Counter('abccddadfaefedasdfwewefwfsfsfadadcdffghethethklkijl') alpha_counter.update('fsfjwefjoe9uefjsljdfljdsoufbadflfmdlmjjdsnvdljflasdj') print(alpha_counter.most_common(3)) #OrderedDict 只是說按照插入順序有序。。。!!! def test6(): ordered_dict = OrderedDict() ordered_dict['b'] = '2' ordered_dict['a'] = '1' ordered_dict['c'] = '3' # print(ordered_dict.popitem(last=False)) # last=True 從最後一個開始pop 不然從第一個開始 # print(ordered_dict.pop('a')) # 返回 被 pop 掉對應的 value ordered_dict.move_to_end('b') #將指定 key 的 鍵值對移到最後位置 print(ordered_dict) # 將多個 dict 串成鏈 車珠子。。。 def test7(): user_dict_1 = dict(a=1,b=2) user_dict_2 = dict(b=3,c=5) # 兩個出現一樣key,採起第一次出現的value chain_map = ChainMap(user_dict_1,user_dict_2) new_chain_map = chain_map.new_child({'d': 6, 'e': 7, 'f': 8}) for key, value in chain_map.items(): print('{}--->{}'.format(key,value)) print('*'*100) for key, value in new_chain_map.items(): print('{}--->{}'.format(key,value)) if __name__ == '__main__': test() test2() test3() test4() test5() test6() test7()
# -*- coding: utf-8 -*- __author__ = 'Frank Li' from collections import UserDict from numbers import Integral class Field(object): pass class IntField(Field): def __init__(self,db_column,min_value=None,max_value=None): self._value = None self.min_value = min_value self.max_value = max_value self.db_column = db_column if min_value: if not isinstance(min_value,Integral): raise ValueError('min_value must be int') elif min_value < 0: raise ValueError('min_value must be positive int') if max_value: if not isinstance(max_value,Integral): raise ValueError('max_value must be int') elif max_value < 0: raise ValueError('max_value should be positive int') if min_value and max_value: if min_value > max_value: raise ValueError('min_value must be smaller than max_value') def __get__(self, instance, owner): return self._value # 數據描述符的標誌 def __set__(self, instance, value): if not isinstance(value,Integral): raise ValueError('value must be int') if self.min_value and self.max_value: if not (self.min_value <= self._value <= self.max_value): raise ValueError('value should between min_value and max_value!') self._value = value class CharField(Field): def __init__(self,db_column=None,max_length=None): self._value = None self.db_column = db_column if not max_length: raise ValueError('you must spcify max_length for charfield ') self.max_lenght = max_length def __get__(self, instance, owner): return self._value def __set__(self, instance, value): if not isinstance(value,str): raise ValueError('value should be an instance of str') if len(value) > self.max_lenght: raise ValueError('value len excess len of max_length') self._value = value class ModelMetaclass(type): def __new__(cls, name,bases,attrs): if name == 'BaseModel': return super().__new__(cls,name,bases,attrs) fields = {} for key, value in attrs.items(): if isinstance(value,Field): fields[key] = value attrs_meta = attrs.get("Meta", None) _meta = {} db_table = name.lower() if attrs_meta: table = getattr(attrs_meta,'db_table',None) if table: db_table = table _meta["db_table"] = db_table attrs["_meta"] = _meta attrs['fields'] = fields del attrs['Meta'] return super().__new__(cls,name,bases,attrs) class BaseModel(metaclass=ModelMetaclass): def __init__(self,**kwargs): for key, value in kwargs.items(): setattr(self,key,value) super(BaseModel,self).__init__() def save(self): fields = [] values = [] for key, value in self.fields.items(): db_column = value.db_column if not db_column: db_column = key.lower() fields.append(db_column) value = getattr(self,key) values.append(str(value) if not isinstance(value,str) else "'{}'".format(value)) sql = 'insert into {db_table} ({field_list}) values({value_list})'.format(db_table=self._meta.get('db_table'),field_list=','.join(fields),value_list=','.join(values)) print(sql) pass class User(BaseModel): age = IntField(db_column='age',min_value=0,max_value=100) name = CharField(db_column='column',max_length=10) class Meta: db_table = 'user' if __name__ == '__main__': user = User() user.name = 'frank' user.age = 18 user.save()
# -*- coding: utf-8 -*- __author__ = 'Frank Li' from collections import Iterator class Company: def __init__(self,employee_list=None): if not isinstance(employee_list,(tuple,list)): raise TypeError('employee_list should be a instance of tuple or list...') self.employee_list = employee_list def __iter__(self): return CompanyIterator(self.employee_list) #iter(self.employee_list) class CompanyIterator(Iterator): # 若不繼承 ,則須要 覆寫 __iter__ 協議 def __init__(self,employee_list): self.employee_list = employee_list self._index = 0 def __iter__(self): # 繼承 Iterator 能夠省略 return self def __next__(self): try: word = self.employee_list[self._index] except IndexError: raise StopIteration self._index+=1 return word if __name__ == '__main__': company = Company(['a','b','c']) for c in company: print(c)
def read_file_chunk(file_path,new_line='\n',chunk_size=4096): buf = '' with open(file_path) as f: while True: chunk = f.read(chunk_size) while new_line in buf: pos = buf.index(new_line) yield buf[:pos] buf = buf[pos+len(new_line):] if not chunk: yield buf break buf+=chunk
# -*- coding: utf-8 -*- __author__ = 'Frank Li' import dis import inspect frame = None def foo(): bar() pass def bar(): global frame frame = inspect.currentframe() if __name__ == '__main__': print(dis.dis(foo)) foo() print('*'*100) print(frame.f_code.co_name) caller_frame = frame.f_back print(caller_frame.f_code.co_name)
import dis def gen_func(): yield 1 name = 'frank' yield 2 age = 30 yield age return "imooc" if __name__ == '__main__': # print(dis.dis(foo)) # foo() # print('*'*100) # print(frame.f_code.co_name) # caller_frame = frame.f_back # print(caller_frame.f_code.co_name) gen = gen_func() print(dis.dis(gen)) print(gen.gi_frame.f_lasti) print(gen.gi_frame.f_locals) next(gen) print(gen.gi_frame.f_lasti) print(gen.gi_frame.f_locals) next(gen) print(gen.gi_frame.f_lasti) print(gen.gi_frame.f_locals) next(gen) print(gen.gi_frame.f_lasti) print(gen.gi_frame.f_locals)
# -*- coding: utf-8 -*- __author__ = 'Frank Li' # 解決 文件過大,且 全部數據在一行的狀況 def read_file_chunk(file_path='./input.txt',chunksize=4096,delimeter='{|}'): with open(file_path) as f: buf = '' while True: block_buf = f.read(chunksize) while delimeter in buf: # 肯定 每一次 idx 索引位置 idx = buf.index(delimeter) # 這裏利用 生成器 返回每個 數據 yield buf[:idx] # 記得這裏把 delimeter 自己長度算上 buf = buf[idx+len(delimeter):] # 若是沒有數據了 那麼跳出循環 if not block_buf: break # 注意 buf 可能有剩餘沒有delimeter的部分 buf += block_buf if __name__ == '__main__': for line in read_file_chunk(): print(line) # l = list(zip(*[iter([chr(i) for i in range(65,92,1)])]*3)) # ss = [''.join(i) for i in l] # with open('input.txt','w') as fw: # fw.write('{|}'.join(ss)*10)
import abc class CacheBase(metaclass=abc.ABCMeta): @abc.abstractmethod def get(self): pass @abc.abstractmethod def set(self): pass class RedisCache(CacheBase): pass if __name__ == '__main__': RedisCache()
from threading import Thread import time import logging logging.basicConfig(level=logging.DEBUG) class Get_html(Thread): def __init__(self, name): super(Get_html,self).__init__(name=name) def run(self): logging.info('thread {name} started...'.format(name=self.name)) time.sleep(2) logging.info('thread {name} ended...'.format(name=self.name)) class Parse_html(Thread): def __init__(self, name): super().__init__(name=name) def run(self): logging.info('Thread {name} started...'.format(name=self.name)) time.sleep(4) logging.info('Thread {name} ended...'.format(name=self.name)) if __name__ == '__main__': start = time.time() get_html_thread = Get_html('get_html_thread') parse_html_thread = Parse_html('parse_html_thread') get_html_thread.start() parse_html_thread.start() get_html_thread.join() parse_html_thread.join() logging.info('cost {} in total...'.format(time.time()-start))
>>> import chardet >>> import requests >>> response = requests.get('http://www.baidu.com') >>> chardet.detect(response.content) {'encoding': 'utf-8', 'confidence': 0.99, 'language': ''}
# -*- coding: utf-8 -*- __author__ = 'Frank Li' from threading import (Thread,Lock) lock = Lock() total=0 def ascend(): global total global lock for i in range(10**6): with lock: total+=1 def descend(): global total global lock for i in range(10**6): lock.acquire() total-=1 lock.release() if __name__ == '__main__': ascend_thread = Thread(target=ascend) descend_thread = Thread(target=descend) ascend_thread.start() descend_thread.start() ascend_thread.join() descend_thread.join() print(total)
# -*- coding: utf-8 -*- __author__ = 'Frank Li' from threading import (Thread,Lock,RLock) ### 線程間同步問題 用 鎖來保證安全, 可是要防止死鎖的發生,因此在單個線程裏引入 RLock(可重入鎖) # lock = Lock() lock = RLock() total=0 def ascend(): global total global lock for i in range(10**6): with lock: total+=1 def descend(): global total global lock for i in range(10**6): lock.acquire() lock.acquire() # lock 爲 Lock 時候 死鎖, RLock則不會 total-=1 lock.release() # 爲了 防止線程間 死鎖,這裏釋放一下 lock.release() if __name__ == '__main__': ascend_thread = Thread(target=ascend) descend_thread = Thread(target=descend) ascend_thread.start() descend_thread.start() ascend_thread.join() descend_thread.join() print(total)
from threading import (Thread,Condition) class XiaoAI(Thread): def __init__(self,cond,name='小愛'): super().__init__(name=name) self.cond = cond def run(self): with self.cond: self.cond.wait() print('{name}: 在'.format(name=self.name)) self.cond.notify() self.cond.wait() print('{name}: 好啊!'.format(name=self.name)) self.cond.notify() class TianMao(Thread): def __init__(self,cond,name='天貓'): super().__init__(name=name) self.cond = cond def run(self): with cond: print('{name}:小愛同窗'.format(name=self.name)) self.cond.notify() self.cond.wait() print('{name}: 咱們來對古詩吧。'.format(name=self.name)) self.cond.notify() self.cond.wait() if __name__ == '__main__': cond = Condition() xiao = XiaoAI(cond) tian = TianMao(cond) xiao.start() tian.start() xiao.join() tian.join() from threading import (Thread,Semaphore) from urllib.parse import urlencode import requests import chardet import logging from os import path import random import re logging.basicConfig(level=logging.DEBUG) # https://tieba.baidu.com/f?kw=%E5%B8%83%E8%A2%8B%E6%88%8F&ie=utf-8&pn=100 class TieBaSpider(Thread): def __init__(self,url,sem,name='TieBaSpider'): super(TieBaSpider,self).__init__(name=name) self.url = url self.sem = sem def _save(self,text): parent_dir = r'D:\tieba' file_name = path.join(parent_dir,path.split(re.sub(r'[%|=|&|?]','',self.url))[1])+'.html' with open(file_name,'w',encoding='utf-8') as fw: fw.write(text) fw.flush() return 1 def run(self): # ua_list = ["Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv2.0.1) Gecko/20100101 Firefox/4.0.1", # "Mozilla/5.0 (Windows NT 6.1; rv2.0.1) Gecko/20100101 Firefox/4.0.1", # "Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11", # "Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11", # "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"] # header = {'User-Agent':random.choice(ua_list)} response = requests.get(self.url)#header=header) content = response.content logging.info(response.encoding) # result = chardet.detect(content) # logging.info(result) # code = result.get('encoding','utf-8') self._save(content.decode(response.encoding)) self.sem.release() class UrlProducer(Thread): def __init__(self,tb_name,sem,pages_once=3,start_index=1,end_index=9):# end-start % pages_once == 0 super(UrlProducer,self).__init__(name=tb_name) self.tb_name = urlencode(tb_name) self.sem = sem logging.info(self.tb_name) self.pages_once = pages_once self.start_index = start_index self.end_index = end_index def run(self): for page_idx in range(self.start_index,self.end_index+1): self.sem.acquire() url_prefix = r'https://tieba.baidu.com/f?' url_suffix = r'&fr=ala0&tpl=' self.url = url_prefix+self.tb_name+url_suffix+str(page_idx) tb_spider = TieBaSpider(self.url,self.sem) tb_spider.start() if __name__ == '__main__': kw_dict = dict(kw=r'國家地理') sem = Semaphore(3) # 控制一次併發 3 個線程 url_producer = UrlProducer(kw_dict,sem=sem) url_producer.start() url_producer.join() from concurrent.futures import ThreadPoolExecutor, as_completed import time from concurrent.futures import Future def get_html(times): time.sleep(times) print('get page {} success'.format(times)) return times if __name__ == '__main__': pool = ThreadPoolExecutor(max_workers=2) task_2 = pool.submit(get_html,(2)) task_3 = pool.submit(get_html,(3)) # print(dir(task_2)) #Future # print(task_3.done()) # # if task_3.done(): # print(task_3.result()) # # time.sleep(5) # print(task_3.done()) # if task_3.done(): # print(task_3.result()) urls = [1,2,3,4] all_tasks = [pool.submit(get_html,url) for url in urls] for future in as_completed(all_tasks): res = future.result() print('get result {}'.format(res)) print('*'*100) for res in pool.map(get_html,urls): print('get result {} using map'.format(res))
''' cpu 計算密集型, 多進程 消耗時間少於線程 由於 GIL 鎖的存在 iO 密集型, 多線程其實由於 GIL 鎖 本應該也要弱於多進程,可是切換線程的開銷比較多進程切換而言更低 一個主機能夠開的線程數與能夠開的進程數是不可同日而語的,因此,python的多線程也並非一無可取 io 主要花在時間等待上故能夠用 time.sleep 來模擬, cpu 主要花在計算能夠用斐波拉契數列來模擬 '''
先看 cpu 密集結果: INFO:root:res: 75025 INFO:root:res: 121393 INFO:root:res: 196418 INFO:root:res: 317811 INFO:root:res: 514229 INFO:root:res: 832040 INFO:root:res: 1346269 INFO:root:res: 2178309 INFO:root:res: 3524578 INFO:root:res: 5702887 INFO:root:thread_cpu cost 4.97 s INFO:root:**************************************************************************************************** INFO:root:res: 75025 省略 n 個 INFO:root:res: 196418 INFO:root:process_cpu cost 4.16 s ### 仔細看代碼 from concurrent.futures import (ThreadPoolExecutor, ProcessPoolExecutor, as_completed) from functools import wraps import time import logging logging.basicConfig(level=logging.DEBUG) ''' cpu 計算密集型, 多進程 消耗時間少於線程 由於 GIL 鎖的存在 iO 密集型, 多線程其實由於 GIL 鎖 本應該也要弱於多進程,可是切換線程的開銷比較多進程切換而言更低 一個主機能夠開的線程數與能夠開的進程數是不可同日而語的,因此,python的多線程也並非一無可取 io 主要花在時間等待上故能夠用 time.sleep 來模擬, cpu 主要花在計算能夠用斐波拉契數列來模擬 ''' def time_decor(func): @wraps(func) def wrapper_func(*args,**kw): start = time.time() result = func(*args,**kw) logging.info('{} cost {:.2f} s'.format(func.__name__,(time.time()-start))) return result return wrapper_func def fib(n): if n<=2: return 1 else: return fib(n-1) + fib(n-2) @time_decor def thread_cpu(n): with ThreadPoolExecutor(n) as executor: all_tasks = [executor.submit(fib,(i)) for i in range(25,35)] for feature in as_completed(all_tasks): res = feature.result() logging.info('res: {}'.format(res)) @time_decor def process_cpu(n): with ProcessPoolExecutor(n) as executor: all_tasks = [executor.submit(fib,(i)) for i in range(25,35)] # for res in executor.map(fib,range(25,35)): # logging.info(''.format(res)) for future in as_completed(all_tasks): res = future.result() logging.info('res: {}'.format(res)) if __name__ == '__main__': thread_cpu(3) logging.info('*'*100) process_cpu(3)
先看 io 密集 結果: INFO:root:res: 2 INFO:root:res: 2 此處省略 n 次 中間結果。。。。。 INFO:root:res: 2 INFO:root:thread_io cost 20.01 s INFO:root:**************************************************************************************************** INFO:root:res: 2 INFO:root:res: 2 此處省略 n 次 中間結果。。。。。 INFO:root:res: 2 INFO:root:process_io cost 20.52 s ### 具體代碼 from concurrent.futures import (ThreadPoolExecutor,ProcessPoolExecutor,as_completed) from functools import wraps import time import logging logging.basicConfig(level=logging.DEBUG) def time_decor(func): @wraps(func) def wrapper_func(*args,**kw): start_time = time.time() result = func(*args,**kw) logging.info('{} cost {:.2f} s'.format(func.__name__,(time.time()-start_time))) return result return wrapper_func def monitor_io(n): time.sleep(n) return n @time_decor def thread_io(n): with ThreadPoolExecutor(n) as executor: all_tasks = [executor.submit(monitor_io,i) for i in [2]*30] for future in as_completed(all_tasks): res = future.result() logging.info('res: {}'.format(res)) return n @time_decor def process_io(n): with ProcessPoolExecutor(n) as executor: all_task = [executor.submit(monitor_io,i) for i in [2]*30] for future in as_completed(all_task): res = future.result() logging.info('res: {}'.format(res)) if __name__ == '__main__': thread_io(3) logging.info('*'*100) process_io(3)
# -*- coding: utf-8 -*- __author__ = 'Frank Li' from urllib.parse import urlparse import socket import logging logging.basicConfig(level=logging.DEBUG) def get_url(url): url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設置非阻塞 IO , 可是這樣 仍是須要 不停地詢問連接是否創建好,須要while 循環不停地去檢查狀態, # 作計算任務或者再次發起其餘的鏈接請求,若是接下來的跟鏈接是否創建好沒有關係,那麼這種非阻塞很好 client.setblocking(False) try: client.connect((host, 80)) except BlockingIOError: pass while True: try: client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path,host).encode('utf8')) break except OSError as os_err: continue d = b"" while True: try: data = client.recv(1024) if not data: break d += data break except BaseException as baseEx: continue logging.info('\n'+d.decode('utf-8')) client.close() if __name__ == '__main__': get_url("http://www.baidu.com")
# -*- coding: utf-8 -*- __author__ = 'Frank Li' from urllib.parse import urlparse import re import socket import logging logging.basicConfig(level=logging.DEBUG) from selectors import DefaultSelector, EVENT_WRITE,EVENT_READ selector = DefaultSelector() urls = ["http://www.baidu.com"] STOP = False class Fetcher: def get_url(self,url): self.spider_url = url url = urlparse(url) self.host = url.netloc self.path = url.path if self.path == "": self.path = "/" self.data = b"" self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設置非阻塞 IO , 可是這樣 仍是須要 不停地詢問連接是否創建好,須要while 循環不停地去檢查狀態, # 作計算任務或者再次發起其餘的鏈接請求,若是接下來的跟鏈接是否創建好沒有關係,那麼這種非阻塞很好 self.client.setblocking(False) try: self.client.connect((self.host, 80)) except BlockingIOError: pass selector.register(self.client.fileno(), EVENT_WRITE, self.connected) def connected(self,key): selector.unregister(key.fd) self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path,self.host).encode('utf8')) selector.register(self.client.fileno(), EVENT_READ, self.readable) # 註冊 def readable(self,key): d = self.client.recv(1024) if d: self.data += d else: selector.unregister(key.fd) data = self.data.decode('utf8') html_data = re.split(r'\r\n\r\n',data)[1] logging.info(html_data) self.client.close() urls.remove(self.spider_url) global STOP if not urls: STOP = True def loop(): # 時間循環, 不停地請求 socket 的狀態 並調用對應的回調函數 #1. select 自己不支持 register 模式, selector 提供註冊 #2. socket 狀態變化之後的 回調 是 由程序員完成的 while not STOP: ready = selector.select() for key, mask in ready: call_back = key.data call_back(key) # 回調+ 時間循環+select(pool\epoll) if __name__ == '__main__': fetcher = Fetcher() fetcher.get_url("http://www.baidu.com") loop()
import socket from selectors import (DefaultSelector ,EVENT_WRITE ,EVENT_READ) from urllib.parse import urlparse import logging logging.basicConfig(level=logging.DEBUG) selector = DefaultSelector() urls = ["http:/www.baidu.com"] STOP = False class Fetcher: """ 事件循環 + SELECT (狀態查詢) + 回調 """ def get_url(self, url): self._spider_url = url url = urlparse(url) self.host = url.netloc self.path = url.path self.data = b"" if self.path=="": self.path = "/" self.client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.client.setblocking(False) try: self.client.connect((self.host, 80)) except BlockingIOError: pass # 註冊 連上後的 回調操做 , 由於 Send 是 寫操做說以是 EVENT_WRITE selector.register(self.client.fileno(),EVENT_WRITE,self.connected) def connected(self,key): """ :param key: key.data ==> callback :return: """ # 先移除回調 selector.unregister(key.fd) logging.info("GET {path} HTTP/1.1\r\nHost:{host}\r\nConnection:close\r\n\r\n".format(path=self.path,host=self.host)) self.client.send("GET {path} HTTP/1.1\r\nHost:{host}\r\nConnection:close\r\n\r\n".format(path=self.path,host=self.host).encode('utf-8')) # 準備接受服務器端數據,因此是 EVENT_READ 事件 selector.register(self.client.fileno(),EVENT_READ,self.readable) def readable(self, key): d = self.client.recv(1024) if d: self.data += d else: selector.unregister(key.fd) data = self.data.decode('utf8') html_data = data.split(r'\r\n\r\n')[1] logging.info('\n'+html_data) self.client.close() urls.remove(self._spider_url) if not urls: STOP = True def loop(): while not STOP: ready = selector.select() for key, mask in ready: callback = key.data callback(key) if __name__ == '__main__': fetcher = Fetcher() fetcher.get_url("http:/www.baidu.com") loop()
# -*- coding: utf-8 -*- __author__ = 'Frank Li' import requests import json import pprint url_html = {} def sub_gene(url): while True: url = yield if not url: break response = requests.get(url) code = response.encoding html = response.content.decode(code) return html[0:20] # 爲了便於觀看,這裏就只返回前20個字符 def delegate_gene(url): while True: url_html[url] = yield from sub_gene(url) def main(urls): for url in urls: print(url) dele_g = delegate_gene(url) dele_g.send(None) #預激 委託生成器 dele_g.send(url) # 直接將 url 經過 創建好的通道傳給子生成器 dele_g.send(None) # 向子生成器發送 None 結束任務 if __name__ == '__main__': urls = ['http://www.baidu.com','http://www.sina.com'] main(urls) pprint.pprint(url_html)
import asyncio import logging import time from functools import wraps logging.basicConfig(level=logging.DEBUG) async def async_func(url): await asyncio.sleep(2) return 'url content: {}'.format(url) async def get_url(url): logging.debug('start to fetch html from: {}'.format(url)) result = await async_func(url) logging.debug('finished fetch html from: {}'.format(url)) return result def time_count(func): @wraps(func) def wrapper_func(*args,**kwargs): start_time = time.time() result = func(*args,**kwargs) logging.debug('{} cost {:.2f} s'.format(func.__name__,time.time()-start_time)) return result return wrapper_func @time_count def main(): # 得到事件循環 event_loop = asyncio.get_event_loop() task1 = event_loop.create_task(get_url('https://www.baidu.com')) event_loop.run_until_complete(task1) logging.debug(task1.result()) if __name__ == '__main__': main()
import asyncio from concurrent.futures import ThreadPoolExecutor from urllib.parse import urlparse import socket import logging logging.basicConfig(level=logging.DEBUG) def get_url(url): url = urlparse(url) host = url.netloc path = url.path if not path: path = '/' client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, 80)) client.send('GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection:close\r\n\r\n'.format(host=host,path=path).encode('utf-8')) d = b'' while True: data = client.recv(1024) if not data: break d += data logging.debug('\n') logging.debug(d.decode()) if __name__ == '__main__': loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(3) tasks = [] for i in range(20): url = 'http://shop.projectsedu.com/goods/{}/'.format(i) task = loop.run_in_executor(executor, get_url, url) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks))
https://www.cnblogs.com/alan-babyblog/p/5260252.html
https://www.bilibili.com/video/av41733850
df['zero_count'] = df.apply(lambda x:x.value_counts().get(0,0),axis=1)