python 高級部分

伴隨視頻能夠觀看html

由於Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執行前,必須先得到GIL鎖,而後,每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把全部線程的執行代碼都給上了鎖,因此,多線程在Python中只能交替執行,即便100個線程跑在100核CPU上,也只能用到1個核。
同一時刻,只可能有一個線程在 解釋器(cpython) 上運行
git push --set-upstream origin dev

git clean -d -fx

git stash

git pull

git stash pop

當你屢次使用’git stash’命令後,你的棧裏將充滿了未提交的代碼,這時候你會對將哪一個版本應用回來有些困惑,

’git stash list’ 命令能夠將當前的Git棧信息打印出來,你只須要將找到對應的版本號,例如使用’git stash apply stash@{1}’就能夠將你指定版本號爲stash@{1}的工做取出來,當你將全部的棧都應用回來的時候,可使用’git stash clear’來將棧清空。


git push origin --delete dev
git branch -d dev
5四、os和sys模塊的做用?

os模塊負責程序與操做系統的交互,提供了訪問操做系統底層的接口;
 sys模塊負責程序與python解釋器的交互,提供了一系列的函數和變量,用於操控python的運行時環境。

閉包
LEGB

def num():
    return [lambda x:i*x for i in range(4)]

if __name__ == '__main__':
    logging.debug([func(2) for func in num()])
    # 答案:[6, 6, 6, 6]
    # 解析: 問題的本質在與python中的屬性查找規則,LEGB(local,enclousing,global,bulitin),
    # 在上面的例子中,i就是在閉包做用域(enclousing),而Python的閉包是
    # 遲綁定 ,
    # 這意味着閉包中用到的變量的值,是在內部函數被調用時查詢獲得的
    # 因此:[lambda x: i * x for i in range(4)]
    # 打印出來是含有四個內存地址的列表,每一個內存地址中的i
    # 在在本內存中都沒有被定義,而是經過閉包做用域中的i值,當for循環執行結束後,i的值等於3,因此
    # 再執行[m(2)
    # for m in num()]時,每一個內存地址中的i值等於3,當x等於2時,打印出來的結果都是6,
    # 從而獲得結果[6, 6, 6, 6]。
# 給 list 去重

li = [1, 1, 1, 23, 3, 4, 4]
li_set = {}.fromkeys(li).keys() or set(li)


assert  list(map(lambda x:x**2,range(1,11))) ==  [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]


not_found = True

def find_idx(target,li=None):
    low, high = 0, len(li) -1
    while low<high:
        global not_found
        while not_found and low<high:
            if li[low] + li[high] == target:
                # not_found = False
                return low, high
            high -= 1
        low += 1

    raise BaseException('not found error')


if __name__ == '__main__':
    li = [2, 7, 11, 15]
    low, high = find_idx(9,li)
    print(low,'--',high)

# 基於生成器的單例
def singleton(cls):
    instance_dic = {}
    def wrapper(*args,**kwargs):
        if cls not in instance_dic:
            instance_dic[cls] = cls(*args, **kwargs)
        return instance_dic[cls]
    return wrapper

@singleton
class Utils(object):
    pass

if __name__ == '__main__':
    utils_1 = Utils()
    utils_2 = Utils()
    assert  utils_1 is utils_2

# 基於 __new__ 方法的 單例,跟 java 懶漢式同樣須要考慮線程安全問題

import threading
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')

class Person:
    _instance_lock = threading.Lock()
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls,'_instance'):
            with cls._instance_lock:
                cls._instance = object.__new__(cls)
        return cls._instance

if __name__ == '__main__':
    person_1 = Person()
    person_2 = Person()
    assert  person_1 is person_2

import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')

def bin_find(num,li=None):
    li.sort() # 二分查找前提就是先要保證有序
    low, high = 0, len(li)
    indx = None
    while low<=high:
        mid = (low+high) // 2
        if li[mid] > num:
            high = mid-1
        elif li[mid]<num:
            low = mid+1
        else:
            indx = mid
            break
    return indx


if __name__ == '__main__':
    lis = [0, 1, 3, 4, 5, 6, 7, 9, 10, 11, 12, 16, 17]
    logging.debug(bin_find(12,lis))


# 模擬棧操做

class Stack(object):

    def __init__(self):
        self._stack = []
    def push(self,element):
        self._stack.append(element)
    def pop(self):
        self._stack.pop()
    def is_empty(self):
        return bool(self._stack)
    def top(self):
        try:
            top_value = self._stack[0]
        except Exception:
            raise ValueError('empty stack...')
import random
random.shuffle
random.choice
random.sample
random.random

青出於藍的 requests >> urllib
Pillow(新)  PIL(2.7 遠古時代)
psutils  <== process and system utilities
import chardet
from contextlib import contextmanager,closing

reload(sys)
sys.setdefaultencoding("utf-8")

在Python 3.x中很差使了 提示 name ‘reload’ is not defined

在3.x中已經被斃掉了被替換爲

import importlib
importlib.reload(sys)
pylint
pyflakes
pysonar2
Fabric
import traceback

sys.argv與optparse與argparse與getopt
谷歌的 fire 模塊
import dis 分析函數過程等...
代碼統計 cloc
excel 讀寫 pandas + xlrd , xlsxwriter
lxml
shutil
f-string

P=NP?



方法二:堆棧的思想處理
 import os
 url = r'C:\Users\Mr.Wang\PycharmProjects\untitled\python基礎'

lis = [url]
while lis:
    url = lis.pop()
    ret_list = os.listdir(url)
    for name in ret_list:
        abs_path = os.path.join(url,name)
        if os.path.isdir(abs_path):
            lis.append(abs_path)
        else:print(name)

#生成 隨機 6 位 驗證碼:
def six_token():
    import string
    li = list(map(lambda x:x if isinstance(x,str) else str(x),range(10)))
    li.extend(string.ascii_letters)
    return ''.join(random.sample(li,6))



8一、代碼實現隨機發紅包功能

import random
def red_packge(money,num):
    li = random.sample(range(1,money*100),num-1)
    li.extend([0,money*100])
    li.sort()
    return [(li[index+1]-li[index])/100 for index in range(num)]

ret = red_packge(100,10)
print(ret)

--------------------------生成器版-------------------------------------------
import random
def red_packge(money,num):
    li = random.sample(range(1,money*100),num-1)
    li.extend([0,money*100])
    li.sort()
    for index in range(num):
        yield (li[index+1]-li[index])/100

ret = red_packge(100,10)
print(ret)

8四、Python是如何進行內存管理的?

從三個方面來講,一對象的引用計數機制,二垃圾回收機制,三內存池機制

1、對象的引用計數機制

Python內部使用引用計數,來保持追蹤內存中的對象,全部對象都有引用計數。
 引用計數增長的狀況:
 1,一個對象分配一個新名稱
 2,將其放入一個容器中(如列表、元組或字典)
 引用計數減小的狀況:
 1,使用del語句對對象別名顯示的銷燬
 2,引用超出做用域或被從新賦值
 sys.getrefcount( )函數能夠得到對象的當前引用計數
 多數狀況下,引用計數比你猜想得要大得多。對於不可變數據(如數字和字符串),解釋器會在程序的不一樣部分共享內存,以便節約內存。

2、垃圾回收

1,當一個對象的引用計數歸零時,它將被垃圾收集機制處理掉。
 2,當兩個對象a和b相互引用時,del語句能夠減小a和b的引用計數,並銷燬用於引用底層對象的名稱。然而因爲每一個對象都包含一個對其餘對象的應用,所以引用計數不會歸零,對象也不會銷燬。(從而致使內存泄露)。爲解決這一問題,解釋器會按期執行一個循環檢測器,搜索不可訪問對象的循環並刪除它們。

3、內存池機制

Python提供了對內存的垃圾收集機制,可是它將不用的內存放到內存池而不是返回給操做系統。
 1,Pymalloc機制。爲了加速Python的執行效率,Python引入了一個內存池機制,用於管理對小塊內存的申請和釋放。
 2,Python中全部小於256個字節的對象都使用pymalloc實現的分配器,而大的對象則使用系統的malloc。
 3,對於Python對象,如整數,浮點數和List,都有其獨立的私有內存池,對象間不共享他們的內存池。也就是說若是你分配又釋放了大量的整數,用於緩存這些整數的內存就不能再分配給浮點數。


2八、Python垃圾回收機制?

python採用的是引用計數機制爲主,標記-清除和分代收集(隔代回收、分代回收)兩種機制爲輔的策略
 計數機制
 Python的GC模塊主要運用了引用計數來跟蹤和回收垃圾。在引用計數的基礎上,還能夠經過「標記-清除」
 解決容器對象可能產生的循環引用的問題。經過分代回收以空間換取時間進一步提升垃圾回收的效率。
 標記-清除:
 標記-清除的出現打破了循環引用,也就是它只關注那些可能會產生循環引用的對象
 缺點:該機制所帶來的額外操做和須要回收的內存塊成正比。
 隔代回收
 原理:將系統中的全部內存塊根據其存活時間劃分爲不一樣的集合,每個集合就成爲一個「代」,
 垃圾收集的頻率隨着「代」的存活時間的增大而減少。也就是說,活得越長的對象,就越不多是垃圾,
 就應該減小對它的垃圾收集頻率。那麼如何來衡量這個存活時間:一般是利用幾回垃圾收集動做來衡量,
 若是一個對象通過的垃圾收集次數越多,能夠得出:該對象存活時間就越長。

ip代理java

import inspect

def a(a, b=0, *c, d, e=1, **f):
    pass

aa = inspect.signature(a)
print("inspect.signature(fn)是:%s" % aa)
print("inspect.signature(fn)的類型:%s" % (type(aa)))
print("\n")

bb = aa.parameters
print("signature.paramerters屬性是:%s" % bb)
print("ignature.paramerters屬性的類型是%s" % type(bb))
print("\n")

for cc, dd in bb.items():
    print("mappingproxy.items()返回的兩個值分別是:%s和%s" % (cc, dd))
    print("mappingproxy.items()返回的兩個值的類型分別是:%s和%s" % (type(cc), type(dd)))
    print("\n")
    ee = dd.kind
    print("Parameter.kind屬性是:%s" % ee)
    print("Parameter.kind屬性的類型是:%s" % type(ee))
    print("\n")
    gg = dd.default
    print("Parameter.default的值是: %s" % gg)
    print("Parameter.default的屬性是: %s" % type(gg))
    print("\n")


ff = inspect.Parameter.KEYWORD_ONLY
print("inspect.Parameter.KEYWORD_ONLY的值是:%s" % ff)
print("inspect.Parameter.KEYWORD_ONLY的類型是:%s" % type(ff))
import inspect

def func_a(arg_a, *args, arg_b='hello', **kwargs):
    print(arg_a, arg_b, args, kwargs)

class Fib:
    def __init__(self,n):
        a, b = 0, 1
        i = 0
        self.fib_list = []
        while i<n:
            self.fib_list.append(a)
            a, b = b, a+b
            i+=1
    def __getitem__(self, item):
        return self.fib_list[item]

if __name__ == '__main__':
    fib = Fib(5)
    print(fib[0:3])


    # 獲取函數簽名
    func_signature = inspect.signature(func_a)
    func_args = []
    # 獲取函數全部參數
    for k, v in func_signature.parameters.items():
        # 獲取函數參數後,須要判斷參數類型
        # 當kind爲 POSITIONAL_OR_KEYWORD,說明在這個參數以前沒有任何相似*args的參數,那這個函數能夠經過參數位置或者參數關鍵字進行調用
        # 這兩種參數要另外作判斷
        if str(v.kind) in ('POSITIONAL_OR_KEYWORD', 'KEYWORD_ONLY'):
            # 經過v.default能夠獲取到參數的默認值
            # 若是參數沒有默認值,則default的值爲:class inspect_empty
            # 因此經過v.default的__name__ 來判斷是否是_empty 若是是_empty表明沒有默認值
            # 同時,由於類自己是type類的實例,因此使用isinstance判斷是否是type類的實例
            if isinstance(v.default, type) and v.default.__name__ == '_empty':
                func_args.append({k: None})
            else:
                func_args.append({k: v.default})
        # 當kind爲 VAR_POSITIONAL時,說明參數是相似*args
        elif str(v.kind) == 'VAR_POSITIONAL':
            args_list = []
            func_args.append(args_list)
        # 當kind爲 VAR_KEYWORD時,說明參數是相似**kwargs
        elif str(v.kind) == 'VAR_KEYWORD':
            args_dict = {}
            func_args.append(args_dict)

    print(func_args)
from collections import defaultdict
import logging
logging.basicConfig(level=logging.DEBUG)
def group_by_firstletter(words=None):
    word_dict = {}
    for word in words:
        first_letter = word[0]
        if first_letter in word_dict:
            word_dict[first_letter] += 1
        else:
            word_dict[first_letter] = 1
    return word_dict

def group_by_firstletter2(words=None):
    default_word_dict = defaultdict(int)
    for word in words:
        default_word_dict[word[0]]+=1
    return default_word_dict

def group_by_firstletter3(words=None):
    words_dict = {}
    for word in words:
        if word[0] in words_dict:
            words_dict[word[0]].append(word)
        else:
            words_dict[word[0]] = [word]
    return words_dict

def group_by_firstletter4(words=None):
    default_word_dict = defaultdict(list)
    for word in words:
        default_word_dict[word[0]].append(word)
    return default_word_dict

if __name__ == '__main__':
    words = ['apple', 'bat', 'bar', 'atom', 'book']
    logging.info(group_by_firstletter(words))
    logging.info(group_by_firstletter2(words))
    logging.info(group_by_firstletter3(words))
    logging.info(group_by_firstletter4(words))
    
from collections import Iterator, Iterable
from collections import defaultdict
from collections import Counter, ChainMap, OrderedDict, namedtuple, deque
from itertools import islice  #  替代 切片,可是隻能 是正數
from itertools import zip_longest # 替代 zip 能夠 對不同個數的 進行迭代

from concurrent.futures import ThreadPoolExecutor as Pool


from collections import namedtuple, deque, defaultdict, OrderedDict, ChainMap, Counter

Point = namedtuple('Poing',['x','y','z'])
p = Point(1,2,3)
print(p.x,'--',p.y,'--',p.z)

# 雙向列表
dq = deque([1,2,3,4])
dq.append(5)
dq.appendleft('a')
dq.popleft()

default_dict = defaultdict(lambda:'N/A') # 多了一個默認值
default_dict['name']='frank'
default_dict['age']

od = OrderedDict([('b',1),('a',2),('c',3)]) # 按照插入的順序有序
od.get('a')


# 能夠實現一個FIFO(先進先出)的dict,當容量超出限制時,先刪除最先添加的Key
from collections import OrderedDict

class LastUpdatedOrderedDict(OrderedDict):

    def __init__(self, capacity):
        super(LastUpdatedOrderedDict, self).__init__()
        self._capacity = capacity

    def __setitem__(self, key, value):
        containsKey = 1 if key in self else 0
        if len(self) - containsKey >= self._capacity:
            last = self.popitem(last=False)
            print('remove:', last)
        if containsKey:
            del self[key]
            print('set:', (key, value))
        else:
            print('add:', (key, value))
        OrderedDict.__setitem__(self, key, value)


# 應用場景 設置參數優先級
from collections import ChainMap
import os, argparse

# 構造缺省參數:
defaults = {
    'color': 'red',
    'user': 'guest'
}

# 構造命令行參數:
parser = argparse.ArgumentParser()
parser.add_argument('-u', '--user')
parser.add_argument('-c', '--color')
namespace = parser.parse_args()
command_line_args = { k: v for k, v in vars(namespace).items() if v }

# 組合成ChainMap:
combined = ChainMap(command_line_args, os.environ, defaults)

# 打印參數:
print('color=%s' % combined['color'])
print('user=%s' % combined['user'])
# itertools 
from itertools import count, repeat, cycle, chain, takewhile, groupby

def times_count(base,n):
    for x in count(base):
        if n<=0:
            break
        yield str(x)
        n-=1

def times_repeat(s,n):
    return '-'.join(repeat(s,n))

def times_cycle(s,n):
    for v in cycle(s):
        if n<= 0:
            break
        yield s
        n-=1

if __name__ == '__main__':
    print(times_repeat('*',3))
    for s in times_cycle('ABC',3):
        print(s)
    r = ','.join(chain('ABC', 'XYZ'))
    print(r)
    print(','.join(times_count(5,3)))
    print(','.join( takewhile(lambda x:int(x)<10, times_count(1,30))))
    group_dict = {key:list(group) for key, group in groupby(['abort','abandon','book','cook','bird'], lambda ch: ch[0].upper())}
    print(group_dict)

# -*- coding: utf-8 -*-
import itertools
from functools import reduce


def pi(N):
    ' 計算pi的值 '
    # step 1: 建立一個奇數序列: 1, 3, 5, 7, 9, ...
    odd_iter = itertools.count(1, 2)

    # step 2: 取該序列的前N項: 1, 3, 5, 7, 9, ..., 2*N-1.
    odd_head = itertools.takewhile(lambda n: n <= 2 * N - 1, odd_iter)
    #     print(list(odd_head),end=',')
    # step 3: 添加正負符號並用4除: 4/1, -4/3, 4/5, -4/7, 4/9, ...
    odd_final = [4 / n * ((-1) ** i) for i, n in enumerate(odd_head)]
    # step 4: 求和:
    value = reduce(lambda x, y: x + y, odd_final)
    return value


# 測試:
print(pi(10))
print(pi(100))
print(pi(1000))
print(pi(10000))
assert 3.04 < pi(10) < 3.05
assert 3.13 < pi(100) < 3.14
assert 3.140 < pi(1000) < 3.141
assert 3.1414 < pi(10000) < 3.1415
print('ok')
# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
import socket
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(('127.0.0.1',6666))
clients = set()
print('server bind 127.0.0.1:6666...')

while 1:
    try:
        data,addr = server.recvfrom(1024)
        clients.add(addr)
        if not data or data.decode('utf-8')=='pong':
            continue
        print('%s:%s >>> %s' % (addr[0],addr[1],data.decode('utf-8')))
        for usr in clients:
            if usr!=addr:
                server.sendto(('%s:%s >>> %s' % (addr[0],addr[1],data.decode('utf-8'))).encode('utf-8'),usr)
    except Exception as e:
        pass
        
########################################################
# -*- coding: utf-8 -*-
__author__ = 'Frank Li'

import socket,threading,os

client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
client.sendto(b'pong',('127.0.0.1',6666))

def myinput():
    while 1:
        try:
            msg = input('>>>')
            yield msg
        except Exception as e:
            os._exit(0)

def getMsg(client):
    while 1:
        try:
            r = client.recv(1024)
            print('\n',r.decode('utf-8'),'\n>>>',end='')
        except Exception as e:
            pass

c = myinput()
def sendMsg(msg):
    while 1:
        msg = next(c)
        client.sendto(msg.encode('utf-8'),('127.0.0.1',6666))

threading.Thread(target=sendMsg,args=(client,)).start()
threading.Thread(target=getMsg,args=(client,)).start()
def my_dict2obj(d=None):
    if not isinstance(d,dict):
        raise TypeError('only dict supported...')
    class obj:
        def __init__(self,d=None):
            self.d = d
            for key, value in d.items():
                if isinstance(value,(tuple,list)):
                    setattr(self,key,[obj(i) if isinstance(i,dict) else i for i in value])
                else:
                    setattr(self,key,obj(value)  if isinstance(value, dict) else value)
        # def __str__(self):
        #     return '{}'.format(self.d)
        # __repr__ = __str__
    return obj(d)

if __name__ == '__main__':
    d = {'a': 1, 'b': {'c': 2}, 'd': ["hi", {'foo': "bar"}]}
    x = my_dict2obj(d)
    print(x.__dict__)
    # 拆箱,解包
    *p, q = d.items()
    print(p)
    print(q)
from html.parser import HTMLParser
from html.entities import name2codepoint

class MyHTMLParser(HTMLParser):

    def handle_starttag(self, tag, attrs):
        print('<%s>' % tag)

    def handle_endtag(self, tag):
        print('</%s>' % tag)

    def handle_startendtag(self, tag, attrs):
        print('<%s/>' % tag)

    def handle_data(self, data):
        print(data)

    def handle_comment(self, data):
        print('<!--', data, '-->')

    def handle_entityref(self, name):
        print('&%s;' % name)

    def handle_charref(self, name):
        print('&#%s;' % name)

parser = MyHTMLParser()
parser.feed('''<html>
<head></head>
<body>
<!-- test html parser -->
    <p>Some <a href=\"#\">html</a> HTML&nbsp;tutorial...<br>END</p>
</body></html>''')
import lxml
from xml.parsers.expat import ParserCreate

class DefaultSaxHandler(object):
    def start_element(self, name, attrs):
        print('sax:start_element: %s, attrs: %s' % (name, str(attrs)))

    def end_element(self, name):
        print('sax:end_element: %s' % name)

    def char_data(self, text):
        print('sax:char_data: %s' % text)

xml = r'''<?xml version="1.0"?>
<ol>
    <li><a href="/python">Python</a></li>
    <li><a href="/ruby">Ruby</a></li>
</ol>
'''

handler = DefaultSaxHandler()
parser = ParserCreate()
parser.StartElementHandler = handler.start_element
parser.EndElementHandler = handler.end_element
parser.CharacterDataHandler = handler.char_data
parser.Parse(xml)
# datetime
from datetime import datetime,timedelta

now = datetime.now()

# datetime 轉 timestamp
now_timestamp = now.timestamp()

# timestampe 轉本地 datetime
dt_local = datetime.fromtimestamp(now_timestamp)
# timestampe 轉utc datetime
dt_utc = datetime.utcfromtimestamp(now_timestamp)

# 時間戳 沒有時區, datetime中攜帶
print(dt_local.timestamp(),'<-->',dt_utc.timestamp())

print('{}\n{}\n{}\n{}'.format(now,now_timestamp,dt_local,dt_utc))
# 獲取指定 日期和時間
year = 2019
month =3
day =3
hour = 15
minute = 7
dt_specified = datetime(year,month,day,hour,minute)
print(dt_specified)

# str 轉 datetime  str parse
datetime_str = '2019-03-03 15:22:00'
datetime_parse_format = '%Y-%m-%d %H:%M:%S'
cday = datetime.strptime(datetime_str,datetime_parse_format)
print(cday)

# datetime 轉 str  str format
print(cday.strftime('%Y/%m/%d'))

# 日期變化(delta) 用 timedelta
now = datetime.now()
now_next3_hours =  now+timedelta(hours=3)
now_previous3_days = now+timedelta(days=-3)
print('next 3 hours: {}'.format(now_next3_hours))

print('now_previous3_days: {}'.format(now_previous3_days))

from datetime import timezone

tz_utc_8 = timezone(timedelta(hours=8))
now = datetime.now()
# 一開始 now 時區信息爲 None
print(now.tzinfo)
# 暴力設置一個時區
now.replace(tzinfo=tz_utc_8)
print(now)

utc_now = datetime.utcnow()
# 一開始這玩意兒壓根木有時區信息啊
print(utc_now.tzinfo)
# 暴力設置時區信息
utc_now = utc_now.replace(tzinfo=timezone.utc)

#北京日期時間 東八區
bj_dt = utc_now.astimezone(timezone(timedelta(hours=8)))
# 西八區
pst_dt = utc_now.astimezone(timezone(timedelta(hours=-8)))
# 東 9 區
tokyo_dt = utc_now.astimezone(timezone(timedelta(hours=9)))

print('bj_dt: ',bj_dt)
print('pst_dt: ',pst_dt)
print('tokyo_dt: ',tokyo_dt)



from datetime import datetime, timezone,timedelta
import re

def to_timestamp(dt_str,tz_str):
    re_dt_str_1 = r'\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}'
    
    re_tz_str = r'^UTC([+-])(\d{1,2}):\d{2}$'
    
    tz_grps = re.match(re_tz_str,tz_str).groups()
    
    sign = tz_grps[0]
    hours = int(tz_grps[1])
    
    if re.match(re_dt_str_1,dt_str):
        dt = datetime.strptime(dt_str,'%Y-%m-%d %H:%M:%S')
        if sign=='+':
            tz_info_x = timezone(timedelta(hours=hours))
        else:
            tz_info_x = timezone(timedelta(hours=-hours))
        dt = dt.replace(tzinfo=tz_info_x)
    else:
        print('re is wrong!')
        
    return dt.timestamp()

# 測試:
t1 = to_timestamp('2015-6-1 08:10:30', 'UTC+7:00')

assert t1 == 1433121030.0, t1

t2 = to_timestamp('2015-5-31 16:10:30', 'UTC-09:00')
assert t2 == 1433121030.0, t2

print('ok')
digital_dict = {'0':0,'1':1,'2':2,'3':3,'4':4,'5':5,'6':6,'7':7,'8':8,'9':9}
from functools import reduce

def str2int(s):
    return reduce(lambda x,y:x*10+y,map(lambda x:digital_dict.get(x),s))
str2int('13579')


def _odd_iter():
    n = 1
    while True:
        n = n + 2
        yield n
        
def _not_divisible(n):
    return lambda x: x % n > 0

def primes():
    yield 2
    it = _odd_iter() # 初始序列
    while True:
        n = next(it) # 返回序列的第一個數
        yield n
        it = filter(_not_divisible(n), it) # 構造新序列
        
# 打印1000之內的素數:
for n in primes():
    if n < 1000:
        print(n)
    else:
        break


def _odd_iter3():
    n = 3
    while True:
        yield n
        n+=2
     
def _not_divisible_3(n):
    return lambda x:x%n>0

def prime_iter3():
    yield 2
    it = _odd_iter()
    
    while True:
        base_num = next(it)
        yield base_num
        it = filter(lambda x,y=base_num:x%y>0,it)
        
for i in prime_iter3():
    if i>50:
        break
    else:
        print(i,end=',')


# -*- coding: utf-8 -*-

L = [('Bob', 75), ('Adam', 92), ('Bart', 66), ('Lisa', 88)]

def by_score(x):
    return x[1]

def by_name(x):
    return x[0]

sorted(L,key=by_score,reverse=True)
sorted(L,key=by_name,reverse=True)


def createCounter():
    count = 0
    def counter():
        nonlocal count 
        count += 1
        return count
    return counter

def createCounter():
    def f():
        n=1 
        while True:
            yield n
            n +=1
    g=f()
    def counter():
        return next(g)
    return counter

# 測試:
counterA = createCounter()
print(counterA(), counterA(), counterA(), counterA(), counterA()) # 1 2 3 4 5
counterB = createCounter()
if [counterB(), counterB(), counterB(), counterB()] == [1, 2, 3, 4]:
    print('測試經過!')
else:
    print('測試失敗!')

def createCounter():
    x = 0
    def counter():
        nonlocal x
        x += 1
        return x
    return counter


from collections import Counter
Counter(s=3, c=2, e=1, u=1)
Counter({'s': 3, 'c': 2, 'u': 1, 'e': 1})
some_data=('c', '2', 2, 3, 5, 'c', 'd', 4, 5, 'd', 'd')
Counter(some_data).most_common(2)
[('d', 3), ('c', 2)]
some_data=['c', '2', 2, 3, 5, 'c', 'd', 4, 5, 'd', 'd']
Counter(some_data).most_common(2)
[('d', 3), ('c', 2)]
some_data={'c', '2', 2, 3, 5, 'c', 'd', 4, 5, 'd', 'd'}
Counter(some_data).most_common(2)
[('c', 1), (3, 1)]
# 事實證實,全部序列操做都應該會先走特定的魔法函數,而後實在沒有轉入 __getitem__
from collections.abc import Iterable, Iterator
from types import GeneratorType
from contextlib import contextmanager
class Company:
    def __init__(self,employee_list):
        self.employee_list = employee_list

    # 序列相關
    def __getitem__(self, item):
        print('getitem executed...')
        cls = type(self)
        if isinstance(item,slice):
            return cls(self.employee_list[item])
        elif isinstance(item,int):
            return cls([self.employee_list[item]])

    def __setitem__(self, key, value):
        self.employee_list[key] = value

    def __delitem__(self, key):
        del self.employee_list[key]

    def __len__(self):
        print('len executed...')
        return len(self.employee_list)

    def __contains__(self, item):
        print('contains executed...')
        return item in self.employee_list


    # 迭代相關
    # 實現了 __iter__ 僅僅是刻碟帶對象 (Iterable)
    def __iter__(self):
        print('iter executed...')
        return iter(self.employee_list)

    # 實現 __next__ 僅僅只是迭代器(Iterator)不是生成器
    def __next__(self):
        print('next executed...')
        pass

    # 可調用
    def __call__(self, *args, **kwargs):
        print('__call__ executed...')
        pass

    # 上下文管理
    def __enter__(self):
        # self.fp = open('xxx')
        print('__enter__ executed...')
        pass
    def __exit__(self, exc_type, exc_val, exc_tb):
        print('__exit__ executed...')
        pass
        # 釋放資源等操做 self.fp.close()

    @contextmanager
    def Resource(self):
        self.fp = open('./sample.csv')
        yield self.fp
        self.fp.close()

    def __repr__(self):
        return ','.join(self.employee_list)
    __str__ = __repr__

if __name__ == '__main__':
    company = Company(['Frank','Tom','May'])
    company()
    for employee in company:
        print(employee)
    print(company[1:])
    print(isinstance(company,Iterable))
    print(isinstance(company,Iterator))
    print(isinstance(company,GeneratorType))
    print(isinstance((employee for employee in company),GeneratorType))
    print(len(company))
    print('Jim' in company)

class MyVector(object):
    def __init__(self,x,y):
        self.x = x
        self.y = y

    def __add__(self, other):
        cls = type(self)
        return cls(self.x+other.x, self.y+other.y)

    def __repr__(self):
        return '({},{})'.format(self.x,self.y)
    def __str__(self):
        return self.__repr__()
if __name__ == '__main__':
    vector1 = MyVector(1,2)
    vector2 = MyVector(2,3)
    assert str(vector1+vector2) == '(3,5)'
    assert (vector1+vector2).__repr__() == '(3,5)'



import abc

class CacheBase(metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def set(self,key):
        pass
    @abc.abstractmethod
    def get(self,value):
        pass

class RedisCache(CacheBase):
    pass

# 實際用抽象基類很少,更多的是用的 mixin 作法 鴨子類型,能夠參考 Django restfulAPI framework
if __name__ == '__main__':
    redis_cache = RedisCache() # TypeError: Can't instantiate abstract class RedisCache with abstract methods get, set




from collections import namedtuple,defaultdict,deque,Counter,OrderedDict,ChainMap

# named_tuple
def test():
    User = namedtuple('User',['name','age','height','edu'])
    user_tuple = ('Frank',18,180,'master')
    user_dict = dict(name='Tom',age=20,height=175,edu='PHD')
    user = User._make(user_tuple)
    user = User._make(user_dict)
    print(','.join(map(lambda x:str(x) if not isinstance(x,str) else x,user)))
    ordered_user_dict = user._asdict()
    print(ordered_user_dict)

# default dict
def test2():
    user_dict = {}
    user_list = ['frank','tom','tom','jim','Tom']
    for user in user_list:
        u = user.lower()
        user_dict.setdefault(u,0)
        user_dict[u]+=1

        # if not u in user_dict:
        #     user_dict[u] = 1
        # else:
        #     user_dict[u]+=1
    print(user_dict)

def gen_default_0():
    return 0

def test3():
    user_dict = defaultdict(int or gen_default_0 or (lambda :0))
    user_list = ['frank','tom','Tom','jim']
    for user in user_list:
        u = user.lower()
        user_dict[u]+=1

    print(user_dict)


# deque 線程安全
def test4():
    dq = deque(['a','b','c'])
    dq.appendleft('1')
    print(dq)
    dq.extendleft(['e','f','g'])
    print(dq)
    dq.popleft()
    print(dq)
    dq.insert(0,'g')
    print(dq)

# Counter
def test5():
    user_list = ['frank','tom','tom','jim']
    user_counter = Counter(user_list)
    print(user_counter.most_common(2))
    alpha_counter = Counter('abccddadfaefedasdfwewefwfsfsfadadcdffghethethklkijl')
    alpha_counter.update('fsfjwefjoe9uefjsljdfljdsoufbadflfmdlmjjdsnvdljflasdj')
    print(alpha_counter.most_common(3))

#OrderedDict 只是說按照插入順序有序。。。!!!
def test6():
    ordered_dict = OrderedDict()
    ordered_dict['b'] = '2'
    ordered_dict['a'] = '1'
    ordered_dict['c'] = '3'

    # print(ordered_dict.popitem(last=False)) # last=True 從最後一個開始pop 不然從第一個開始
    # print(ordered_dict.pop('a'))  # 返回 被 pop 掉對應的 value
    ordered_dict.move_to_end('b') #將指定 key 的 鍵值對移到最後位置
    print(ordered_dict)

# 將多個 dict 串成鏈 車珠子。。。
def test7():
    user_dict_1 = dict(a=1,b=2)
    user_dict_2 = dict(b=3,c=5) # 兩個出現一樣key,採起第一次出現的value
    chain_map = ChainMap(user_dict_1,user_dict_2)
    new_chain_map = chain_map.new_child({'d': 6, 'e': 7, 'f': 8})
    for key, value in chain_map.items():
        print('{}--->{}'.format(key,value))
    print('*'*100)
    for key, value in new_chain_map.items():
        print('{}--->{}'.format(key,value))

if __name__ == '__main__':
    test()
    test2()
    test3()
    test4()
    test5()
    test6()
    test7()


import inspect


def func_a(arg_a, *args, arg_b='hello', **kwargs):
    print(arg_a, arg_b, args, kwargs)


if __name__ == '__main__':

    # 獲取函數簽名
    func_signature = inspect.signature(func_a)
    func_args = []
    # 獲取函數全部參數
    for k, v in func_signature.parameters.items():
        # 獲取函數參數後,須要判斷參數類型
        # 當kind爲 POSITIONAL_OR_KEYWORD,說明在這個參數以前沒有任何相似*args的參數,那這個函數能夠經過參數位置或者參數關鍵字進行調用
        # 這兩種參數要另外作判斷
        if str(v.kind) in ('POSITIONAL_OR_KEYWORD', 'KEYWORD_ONLY'):
            # 經過v.default能夠獲取到參數的默認值
            # 若是參數沒有默認值,則default的值爲:class inspect_empty
            # 因此經過v.default的__name__ 來判斷是否是_empty 若是是_empty表明沒有默認值
            # 同時,由於類自己是type類的實例,因此使用isinstance判斷是否是type類的實例
            if isinstance(v.default, type) and v.default.__name__ == '_empty':
                func_args.append({k: None})
            else:
                func_args.append({k: v.default})
        # 當kind爲 VAR_POSITIONAL時,說明參數是相似*args
        elif str(v.kind) == 'VAR_POSITIONAL':
            args_list = []
            func_args.append(args_list)
        # 當kind爲 VAR_KEYWORD時,說明參數是相似**kwargs
        elif str(v.kind) == 'VAR_KEYWORD':
            args_dict = {}
            func_args.append(args_dict)

    print(func_args)
import random

def random_line(cols):
    alphabet_list = [chr(i) for i in range(65, 91, 1)] + [chr(i) for i in range(97, 123, 1)]
    # for i in range(cols):
    #     yield random.choice(alphabet_list)
    return (random.choice(alphabet_list) for i in range(cols))

def randome_generate_file(file_path='./sample.csv',lines=10000,cols=1000):
    with open(file_path,'w') as fw:
        for i in range(lines):
            fw.write(','.join(random_line(cols)))
            fw.write('\n')
        fw.flush()


def load_list_data(file_path='./sample.csv',total_num=10000,target_num=1000):
    all_data = []
    target_data = []
    with open(file_path,'r') as fr:
        for count, line in enumerate(fr):
            if count > total_num:
                break
            else:
                all_data.append(line)

    while len(target_data)<=target_num:
        index = random.randint(0,total_num)
        if all_data[index] not in target_data:
            target_data.append(all_data[index])
    return all_data, target_data

def load_dict_data(file_path='./sample.csv',total_num=10000,target_num=1000):
    all_data = {}
    target_data = []
    with open(file_path,encoding='utf8',mode='r') as fr:
        for idx, line in enumerate(fr):
            if idx>total_num:
                break
            all_data[line]=0
    all_data_list = list(all_data)
    while len(target_data)<=target_num:
        random_index = random.randint(0,total_num)
        if all_data_list[random_index] not in target_data:
            target_data.append(all_data_list[random_index])

    return all_data, target_data

def find_test(all_data,target_data):
    test_times = 100
    total_times_cnt = 0

    import time
    for t in range(test_times):
        start = time.time()
        for item in target_data:
            if item in all_data:
                pass
        cost_once = time.time() - start
        total_times_cnt+= cost_once
    return total_times_cnt / test_times

if __name__ == '__main__':
    # randome_generate_file()
    # all_data, target_data = load_list_data()
    all_data, target_data = load_dict_data()
    last_time = find_test(all_data,target_data)
    print(last_time)
# 第一章 一切皆對象
from functools import wraps
import time
def time_decor(func):
    @wraps(func)
    def wrapper_func(*args,**kw):
        start = time.time()
        result = func(*args,**kw)
        end = time.time()
        print('{} cost {:.2f} s '.format(func.__name__,end-start))
        return result
    return wrapper_func

@time_decor
def ask(name):
    print(name)

class Person:
    def __init__(self,name):
        print('hi, '+name)

my_ask = ask
my_ask('frank')
print(type(my_ask))
person = Person('frank')
print(person)
print('*'*100)

class_list = []
class_list.append(my_ask)
class_list.append(Person)
for item in class_list:
    item('tom')
>>> type(type)
<class 'type'>
>>> object.__bases__
()
>>> type.__bases__
(<class 'object'>,)
>>> type(object)
<class 'type'>

type 產生 type 類自己的 實例 產生 object 類, dict 等內建類, class 爲萬物之始,包括 type(object), class 生 object 只道法天然 str <-- 'abc'
object 是全部對象的 基類包括 type.__bases__, object.__bases__ 之上再無父類


python 是基於協議的編程語言,因其動態語言的特性,也使得python開發效率極高,但同時也會容易產生不少問題,由於一切皆對象包括類自己,不少問題只有在運行時才能檢測出來,
而像JAVA 這種靜態語言,在編譯時候就可以檢測出問題,如:類型檢測等


第三章 魔法函數

def my_hex(num):
    alpha_list = ['A', 'B', 'C', 'D', 'E', 'F']
    hex_list = []
    while True:
        mod_, num = num%16, num//16
        hex_list.append(alpha_list[mod_-10] if mod_>9 else mod_)
        if num==0:
            break
    hex_list.append('0x')
    hex_list.reverse()
    return ''.join(map(lambda x:str(x) if not isinstance(x,str) else x,hex_list))

def my_octonary(num):
    octonary_list = []
    while True:
        mod_, num = num%8, num//8
        octonary_list.append(str(mod_))
        if num==0:
            break
    octonary_list.append('0o')
    octonary_list.reverse()
    return ''.join(octonary_list)
print(hex(60))
print(my_hex(60))
print(oct(9))
print(my_octonary(9))

def fac(n,res):
    if n==1:
        return res
    else:
        return fac(n-1,n*res)
print(fac(6,1))



d = {'a': 1, 'b': {'c': 2}, 'd': ["hi", {'foo': "bar"}]}

def my_dict2obj(args):
    class obj(object):
        def __init__(self,d):
            for key,value in d.items():
                if not isinstance(value,(list,tuple)):
                    setattr(self,key,obj(value) if isinstance(value,dict) else value)
                else:
                    setattr(self,key,[obj(i) if isinstance(i,dict) else i for i in value])
    return obj(args)

x = my_dict2obj(d)

print(x.__dict__)


words = ['apple','bat','bar','atom','book']
alpha_dict = {}

for word in words:
    word_list = []
    if word[0] not in alpha_dict:
        word_list.append(word)
        alpha_dict[word[0]] = word_list
    else:
        alpha_dict[word[0]].append(word)
print(alpha_dict)

from collections import namedtuple
stock_list = [['AAPL','10.30','11.90'],['YAHO','9.23','8.19'],['SINA','22.80','25.80']]
stock_info = namedtuple('stock_info',['name','start','end'])
stock_list_2 = [stock_info(name,start,end) for name,start,end in stock_list ]
print(stock_list_2)


from collections import namedtuple

Card = namedtuple('Card',['suit','rank'])

class French_Deck():
    rank = [i for i in range(2,11,1)]+['J','Q','K','A']
    suit = 'Spade,Club,Heart,Diamond'.split(r',')
    def __init__(self):
        self._card = [Card(s,r) for r in French_Deck.rank for s in French_Deck.suit]

    def __getitem__(self, item):
        if isinstance(item,int):
            return self._card[item]
        elif isinstance(item,slice):
            return self._card[item]

    def __len__(self):
        return len(self._card)
frenck_deck = French_Deck()
print(frenck_deck[1:3])


自定義序列類 支持切片操做

# -*- coding: utf-8 -*-
import numbers
import bisect
class Group(object):
    # 支持切片
    def __init__(self,group_name,company_name,staffs):
        self.group_name = group_name
        self.company_name = company_name
        self.staffs = staffs

    def __reversed__(self):
        self.staffs.reverse()

    def __getitem__(self, item):
        cls = type(self)
        if isinstance(item,slice):
            return cls(group_name=self.group_name,company_name=self.company_name,staffs=self.staffs[item])
        elif isinstance(item,numbers.Integral):
            return cls(group_name=self.group_name,company_name=self.company_name,staffs=[self.staffs[item]])

    def __len__(self):
        return len(self.staffs)

    def __iter__(self):
        return iter(self.staffs)

    def __contains__(self, item):
        return item in self.staffs



if __name__ == '__main__':
    group = Group(group_name='AI Team',company_name='Intel',staffs=['Frank','Tom','Jim'])
    print(len(group))
    print(group[2].staffs)
    reversed(group)  # 反轉
    for item in group[1:]:
        print(item)

使用 bisect 維護排序好的序列

# -*- coding: utf-8 -*-
import bisect
from collections import deque

def test():
    insert_seq = deque()
    bisect.insort(insert_seq,3)
    bisect.insort(insert_seq,2)
    bisect.insort(insert_seq,4)
    return insert_seq

if __name__ == '__main__':
    res = test()
    print(res)
    # 應該
    print(bisect.bisect(res,7))  #bisect = bisect_right   # backward compatibility
    print(res)

若是 一個數組類型 都同樣 建議使用 array ,由於其查找效率較高

import array
my_array = array.array('i')
for i in range(10):
    my_array.append(i)
print(my_array)

my_list = ['person1','person2']
my_dict = dict.fromkeys(my_list,[{'name':'frank'},{'name':'tom'}])
print(my_dict)

強大的 dataframe

# -*- coding: utf-8 -*-
from pandas import DataFrame
import numpy as np

def test():
    df = DataFrame(np.arange(12).reshape(3,4),columns=['col1','col2','col3','col4'])
    return df

if __name__ == '__main__':
    df = test()
    df.iloc[0:1,0:1] = None
    print(df)
    df.dropna(axis=0,how='all',subset=['col1'],inplace=True)  # col for col in df.columns if col.startswith('col')
    print(df)



gene_cols_tmp = [col for col in df.columns if col.startswith('gene_')]

for col in gene_cols_tmp:
    df[col]=df.apply(lambda x: None  if x==0 else x, axis=1)

視頻做者回答

描述符分爲數據描述符和非數據描述符。把至少實現了內置屬性__set__()和__get__()方法的描述符稱爲數據描述符;把實現了除__set__()之外的方法的描述符稱爲非數據描述符。之因此要區分描述符的種類,主要是由於它在代理類屬性時有着嚴格的優先級限制。例如當使用數據描述符時,由於數據描述符大於實例屬性,因此當咱們實例化一個類並使用該實例屬性時,該實例屬性已被數據描述符代理,此時咱們對該實例屬性的操做是對描述符的操做。描述符的優先級的高低以下:

  類屬性 > 數據描述符 > 實例屬性 > 非數據描述符 > 找不到的屬性觸發__getattr__()

屬性查找順序。。。先找 data descriptor 中的 get 拿!!!

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
import random
class Field(object):
    def __init__(self,name,column_type,is_pk,default):
        self.name = name
        self.column_type = column_type
        self.is_pk = is_pk
        self.default = default

class IntField(Field):
    def __init__(self,name=None,column_type='bigint',is_pk=True,default=0):
        super(IntField,self).__init__(name,column_type,is_pk,default)

    def __get__(self, instance, owner):
        print('get in data descriptor...')

def gen_id():
    print('get in User class __dict__...')
    return random.randint(0,10)

class User:
    id = IntField()

    # rand_id = gen_id()
    #
    # def __init__(self,name):
    #     print('get in user instance __dict__ ...')
    #     self.name = name

if __name__ == '__main__':
    user = User()
    user.id
    # user.rand_id

省略中間 self.__dict__

# 此處省略一萬字

最後查找 User.__dict__ 或者 NonDataDescriptor.get()

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
import random
class Field(object):
    def __init__(self,name,column_type,is_pk,default):
        self.name = name
        self.column_type = column_type
        self.is_pk = is_pk
        self.default = default

class IntField(Field):
    def __init__(self,name=None,column_type='bigint',is_pk=True,default=0):
        super(IntField,self).__init__(name,column_type,is_pk,default)

    def __get__(self, instance, owner):
        print('get in data descriptor...')

def gen_id():
    print('get in User class __dict__...')
    return random.randint(0,10)

class User:
    # id = IntField()

    rand_id = gen_id()
    #
    # def __init__(self,name):
    #     print('get in user instance __dict__ ...')
    #     self.name = name

if __name__ == '__main__':
    user = User()
    user.rand_id
    # user.rand_id

若是 id = NonDataDescriptor() 或者 1 這樣的值 則會 進入 self 實例中的 user.__dict__ 查找

再 若是沒有找到 那麼 調用 NonDataDescriptor 中的 get 或者 直接在 User.__dict__ 中查找

總而言之,就是 先進入 數據描述符 datadescriptor.get() > 在進入 實例對象的 dict > 再進入 nondatadescriptor.get() 或者 User.__dict

getattribute() >類屬性 > 數據描述符 > 實例屬性 > 非數據描述符 > 找不到的屬性觸發__getattr__() > throw AttributeError ?

collections 中 namedtuple

from collections import namedtuple

User = namedtuple('User',['name','age','height','edu'])
user_tuple = ('Frank',18,180,'master')
user_dict = dict(name='Tom',age=20,height=175,edu='PHD')
user = User._make(user_tuple)
print(','.join(map(lambda x:str(x) if not isinstance(x,str) else x,user)))
ordered_user_dict = user._asdict()
print(ordered_user_dict)
from collections import namedtuple,defaultdict,deque,Counter,OrderedDict,ChainMap

# named_tuple
def test():
    User = namedtuple('User',['name','age','height','edu'])
    user_tuple = ('Frank',18,180,'master')
    user_dict = dict(name='Tom',age=20,height=175,edu='PHD')
    user = User._make(user_tuple)
    user = User._make(user_dict)
    print(','.join(map(lambda x:str(x) if not isinstance(x,str) else x,user)))
    ordered_user_dict = user._asdict()
    print(ordered_user_dict)

# default dict
def test2():
    user_dict = {}
    user_list = ['frank','tom','tom','jim','Tom']
    for user in user_list:
        u = user.lower()
        user_dict.setdefault(u,0)
        user_dict[u]+=1

        # if not u in user_dict:
        #     user_dict[u] = 1
        # else:
        #     user_dict[u]+=1
    print(user_dict)

def gen_default_0():
    return 0

def test3():
    user_dict = defaultdict(int or gen_default_0 or (lambda :0))
    user_list = ['frank','tom','Tom','jim']
    for user in user_list:
        u = user.lower()
        user_dict[u]+=1

    print(user_dict)


# deque 線程安全
def test4():
    dq = deque(['a','b','c'])
    dq.appendleft('1')
    print(dq)
    dq.extendleft(['e','f','g'])
    print(dq)
    dq.popleft()
    print(dq)
    dq.insert(0,'g')
    print(dq)

# Counter
def test5():
    user_list = ['frank','tom','tom','jim']
    user_counter = Counter(user_list)
    print(user_counter.most_common(2))
    alpha_counter = Counter('abccddadfaefedasdfwewefwfsfsfadadcdffghethethklkijl')
    alpha_counter.update('fsfjwefjoe9uefjsljdfljdsoufbadflfmdlmjjdsnvdljflasdj')
    print(alpha_counter.most_common(3))

#OrderedDict 只是說按照插入順序有序。。。!!!
def test6():
    ordered_dict = OrderedDict()
    ordered_dict['b'] = '2'
    ordered_dict['a'] = '1'
    ordered_dict['c'] = '3'

    # print(ordered_dict.popitem(last=False)) # last=True 從最後一個開始pop 不然從第一個開始
    # print(ordered_dict.pop('a'))  # 返回 被 pop 掉對應的 value
    ordered_dict.move_to_end('b') #將指定 key 的 鍵值對移到最後位置
    print(ordered_dict)

# 將多個 dict 串成鏈 車珠子。。。
def test7():
    user_dict_1 = dict(a=1,b=2)
    user_dict_2 = dict(b=3,c=5) # 兩個出現一樣key,採起第一次出現的value
    chain_map = ChainMap(user_dict_1,user_dict_2)
    new_chain_map = chain_map.new_child({'d': 6, 'e': 7, 'f': 8})
    for key, value in chain_map.items():
        print('{}--->{}'.format(key,value))
    print('*'*100)
    for key, value in new_chain_map.items():
        print('{}--->{}'.format(key,value))

if __name__ == '__main__':
    test()
    test2()
    test3()
    test4()
    test5()
    test6()
    test7()

新 orm

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
from collections import UserDict
from numbers import Integral
class Field(object):
    pass

class IntField(Field):
    def __init__(self,db_column,min_value=None,max_value=None):
        self._value = None
        self.min_value = min_value
        self.max_value = max_value
        self.db_column = db_column
        if min_value:
            if not isinstance(min_value,Integral):
                raise ValueError('min_value must be int')
            elif min_value < 0:
                raise ValueError('min_value must be positive int')
        if max_value:
            if not isinstance(max_value,Integral):
                raise ValueError('max_value must be int')
            elif max_value < 0:
                raise ValueError('max_value should be positive int')
        if min_value and max_value:
            if min_value > max_value:
                raise ValueError('min_value must be smaller than max_value')

    def __get__(self, instance, owner):
        return self._value
    # 數據描述符的標誌
    def __set__(self, instance, value):
        if not isinstance(value,Integral):
            raise ValueError('value must be int')
        if self.min_value and self.max_value:
            if not (self.min_value <= self._value <= self.max_value):
                raise ValueError('value should between min_value and max_value!')
        self._value = value

class CharField(Field):
    def __init__(self,db_column=None,max_length=None):
        self._value = None
        self.db_column = db_column
        if not max_length:
            raise ValueError('you must spcify max_length for charfield ')
        self.max_lenght = max_length

    def __get__(self, instance, owner):
        return self._value
    def __set__(self, instance, value):
        if not isinstance(value,str):
            raise ValueError('value should be an instance of str')
        if len(value) > self.max_lenght:
            raise ValueError('value len excess len of max_length')
        self._value = value



class ModelMetaclass(type):
    def __new__(cls, name,bases,attrs):
        if name == 'BaseModel':
            return super().__new__(cls,name,bases,attrs)
        fields = {}
        for key, value in attrs.items():
            if isinstance(value,Field):
                fields[key] = value
        attrs_meta = attrs.get("Meta", None)
        _meta = {}
        db_table = name.lower()
        if attrs_meta:
            table = getattr(attrs_meta,'db_table',None)
            if table:
                db_table = table
        _meta["db_table"] = db_table
        attrs["_meta"] = _meta
        attrs['fields'] = fields
        del attrs['Meta']
        return super().__new__(cls,name,bases,attrs)


class BaseModel(metaclass=ModelMetaclass):
    def __init__(self,**kwargs):
        for key, value in kwargs.items():
            setattr(self,key,value)
        super(BaseModel,self).__init__()
    def save(self):
        fields = []
        values = []
        for key, value in self.fields.items():
            db_column = value.db_column
            if not db_column:
                db_column = key.lower()
            fields.append(db_column)
            value = getattr(self,key)
            values.append(str(value) if not isinstance(value,str) else "'{}'".format(value))
        sql = 'insert into {db_table} ({field_list}) values({value_list})'.format(db_table=self._meta.get('db_table'),field_list=','.join(fields),value_list=','.join(values))
        print(sql)
        pass

class User(BaseModel):
    age = IntField(db_column='age',min_value=0,max_value=100)
    name = CharField(db_column='column',max_length=10)

    class Meta:
        db_table = 'user'

if __name__ == '__main__':
    user = User()
    user.name = 'frank'
    user.age = 18
    user.save()

迭代器模式 iter() ==》 Iterator ,

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
from collections import Iterator
class Company:
    def __init__(self,employee_list=None):
        if not isinstance(employee_list,(tuple,list)):
            raise TypeError('employee_list should be a instance of tuple or list...')
        self.employee_list = employee_list

    def __iter__(self):
        return  CompanyIterator(self.employee_list)  #iter(self.employee_list)

class CompanyIterator(Iterator): # 若不繼承 ,則須要 覆寫 __iter__ 協議
    def __init__(self,employee_list):
        self.employee_list = employee_list
        self._index = 0
   
    def __iter__(self): # 繼承 Iterator 能夠省略
        return self

    def __next__(self):
        try:
            word = self.employee_list[self._index]

        except IndexError:
            raise StopIteration
        self._index+=1
        return word


if __name__ == '__main__':
    company = Company(['a','b','c'])
    for c in company:
        print(c)
def read_file_chunk(file_path,new_line='\n',chunk_size=4096):
    buf = ''
    with open(file_path) as f:
        while True:
            chunk = f.read(chunk_size)
            while new_line in buf:
                pos = buf.index(new_line)
                yield buf[:pos]
                buf = buf[pos+len(new_line):]
            if not chunk:
                yield buf
                break
            buf+=chunk

完全弄懂 函數 在 堆內存中 棧幀的 具體操做

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'

import dis
import inspect

frame = None
def foo():
    bar()
    pass

def bar():
    global frame
    frame = inspect.currentframe()


if __name__ == '__main__':
    print(dis.dis(foo))
    foo()
    print('*'*100)
    print(frame.f_code.co_name)
    caller_frame = frame.f_back
    print(caller_frame.f_code.co_name)

生成器原理 以及 協程的 最底層原理

import dis
def gen_func():
    yield 1
    name = 'frank'
    yield 2
    age = 30
    yield age
    return "imooc"

if __name__ == '__main__':
    # print(dis.dis(foo))
    # foo()
    # print('*'*100)
    # print(frame.f_code.co_name)
    # caller_frame = frame.f_back
    # print(caller_frame.f_code.co_name)
    gen = gen_func()
    print(dis.dis(gen))
    print(gen.gi_frame.f_lasti)
    print(gen.gi_frame.f_locals)
    next(gen)
    print(gen.gi_frame.f_lasti)
    print(gen.gi_frame.f_locals)
    next(gen)
    print(gen.gi_frame.f_lasti)
    print(gen.gi_frame.f_locals)
    next(gen)
    print(gen.gi_frame.f_lasti)
    print(gen.gi_frame.f_locals)

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'

# 解決 文件過大,且 全部數據在一行的狀況
def read_file_chunk(file_path='./input.txt',chunksize=4096,delimeter='{|}'):
    with open(file_path) as f:
        buf = ''
        while True:
            block_buf = f.read(chunksize)
            while delimeter in buf:
                # 肯定 每一次 idx 索引位置
                idx = buf.index(delimeter)
                
                # 這裏利用 生成器 返回每個 數據
                yield buf[:idx]
                # 記得這裏把 delimeter 自己長度算上
                buf = buf[idx+len(delimeter):]
                
            # 若是沒有數據了 那麼跳出循環    
            if not block_buf:
                break
            # 注意 buf 可能有剩餘沒有delimeter的部分
            buf += block_buf

if __name__ == '__main__':
    for line in read_file_chunk():
        print(line)



# l = list(zip(*[iter([chr(i) for i in range(65,92,1)])]*3))
# ss = [''.join(i) for i in l]
# with open('input.txt','w') as fw:
#     fw.write('{|}'.join(ss)*10)

單繼承 or MixIn ???

import abc
class CacheBase(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def get(self):
        pass
    @abc.abstractmethod
    def set(self):
        pass

class RedisCache(CacheBase):
    pass
if __name__ == '__main__':
    RedisCache()

多線程 第二種方法 ,繼承 threading.Thread 覆寫 run 方法 跟 java 同樣 ,還有一種就是 t = Thread(target=func_name,args=(arg1,arg2,))

from threading import Thread
import time
import logging
logging.basicConfig(level=logging.DEBUG)

class Get_html(Thread):
    def __init__(self, name):
        super(Get_html,self).__init__(name=name)

    def run(self):
        logging.info('thread {name} started...'.format(name=self.name))
        time.sleep(2)
        logging.info('thread {name} ended...'.format(name=self.name))

class Parse_html(Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        logging.info('Thread {name} started...'.format(name=self.name))
        time.sleep(4)
        logging.info('Thread {name} ended...'.format(name=self.name))

if __name__ == '__main__':
    start = time.time()
    get_html_thread = Get_html('get_html_thread')
    parse_html_thread = Parse_html('parse_html_thread')
    get_html_thread.start()
    parse_html_thread.start()

    get_html_thread.join()
    parse_html_thread.join()

    logging.info('cost {} in total...'.format(time.time()-start))
>>> import chardet
>>> import requests
>>> response = requests.get('http://www.baidu.com')
>>> chardet.detect(response.content)
{'encoding': 'utf-8', 'confidence': 0.99, 'language': ''}
# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
from threading import (Thread,Lock)

lock = Lock()
total=0

def ascend():
    global total
    global lock
    for i in range(10**6):
        with lock:
            total+=1

def descend():
    global total
    global lock
    for i in range(10**6):
        lock.acquire()
        total-=1
        lock.release()

if __name__ == '__main__':
    ascend_thread = Thread(target=ascend)
    descend_thread = Thread(target=descend)
    ascend_thread.start()
    descend_thread.start()

    ascend_thread.join()
    descend_thread.join()
    print(total)

可重入鎖

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
from threading import (Thread,Lock,RLock)

### 線程間同步問題 用 鎖來保證安全, 可是要防止死鎖的發生,因此在單個線程裏引入 RLock(可重入鎖)
# lock = Lock()
lock = RLock()
total=0

def ascend():
    global total
    global lock
    for i in range(10**6):
        with lock:
            total+=1

def descend():
    global total
    global lock
    for i in range(10**6):
        lock.acquire()
        lock.acquire()  # lock 爲 Lock 時候 死鎖, RLock則不會
        total-=1
        lock.release()  # 爲了 防止線程間 死鎖,這裏釋放一下
        lock.release()

if __name__ == '__main__':
    ascend_thread = Thread(target=ascend)
    descend_thread = Thread(target=descend)
    ascend_thread.start()
    descend_thread.start()

    ascend_thread.join()
    descend_thread.join()
    print(total)

(threading 模塊下) Condition 用於線程間同步 wait ,notify(all) ,Semaphore 用於控制每次建立線程數,方便實用固然是線程池,進程池(concurrent.futures 下)

from threading import (Thread,Condition)

class XiaoAI(Thread):
    def __init__(self,cond,name='小愛'):
        super().__init__(name=name)
        self.cond = cond

    def run(self):
        with self.cond:
            self.cond.wait()
            print('{name}: 在'.format(name=self.name))
            self.cond.notify()

            self.cond.wait()
            print('{name}: 好啊!'.format(name=self.name))
            self.cond.notify()
class TianMao(Thread):
    def __init__(self,cond,name='天貓'):
        super().__init__(name=name)
        self.cond = cond

    def run(self):
        with cond:
            print('{name}:小愛同窗'.format(name=self.name))
            self.cond.notify()
            self.cond.wait()
            print('{name}: 咱們來對古詩吧。'.format(name=self.name))
            self.cond.notify()
            self.cond.wait()


if __name__ == '__main__':
    cond = Condition()
    xiao = XiaoAI(cond)
    tian = TianMao(cond)

    xiao.start()
    tian.start()
    xiao.join()
    tian.join()


from threading import (Thread,Semaphore)
from urllib.parse import urlencode
import requests
import chardet
import logging
from os import path
import random
import re
logging.basicConfig(level=logging.DEBUG)
# https://tieba.baidu.com/f?kw=%E5%B8%83%E8%A2%8B%E6%88%8F&ie=utf-8&pn=100

class TieBaSpider(Thread):
    def __init__(self,url,sem,name='TieBaSpider'):
        super(TieBaSpider,self).__init__(name=name)
        self.url = url
        self.sem = sem

    def _save(self,text):
        parent_dir = r'D:\tieba'
        file_name = path.join(parent_dir,path.split(re.sub(r'[%|=|&|?]','',self.url))[1])+'.html'
        with open(file_name,'w',encoding='utf-8') as fw:
            fw.write(text)
            fw.flush()
        return 1


    def run(self):
        # ua_list = ["Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv2.0.1) Gecko/20100101 Firefox/4.0.1",
        #            "Mozilla/5.0 (Windows NT 6.1; rv2.0.1) Gecko/20100101 Firefox/4.0.1",
        #            "Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11",
        #            "Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11",
        #            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"]
        # header = {'User-Agent':random.choice(ua_list)}
        response = requests.get(self.url)#header=header)
        content = response.content
        logging.info(response.encoding)
        # result = chardet.detect(content)
        # logging.info(result)
        # code = result.get('encoding','utf-8')
        self._save(content.decode(response.encoding))
        self.sem.release()

class UrlProducer(Thread):
    def __init__(self,tb_name,sem,pages_once=3,start_index=1,end_index=9):# end-start % pages_once == 0
        super(UrlProducer,self).__init__(name=tb_name)
        self.tb_name = urlencode(tb_name)
        self.sem = sem
        logging.info(self.tb_name)
        self.pages_once = pages_once
        self.start_index = start_index
        self.end_index = end_index

    def run(self):
        for page_idx in range(self.start_index,self.end_index+1):
            self.sem.acquire()
            url_prefix = r'https://tieba.baidu.com/f?'
            url_suffix = r'&fr=ala0&tpl='
            self.url = url_prefix+self.tb_name+url_suffix+str(page_idx)
            tb_spider = TieBaSpider(self.url,self.sem)
            tb_spider.start()


if __name__ == '__main__':
    kw_dict = dict(kw=r'國家地理')
    sem = Semaphore(3) # 控制一次併發 3 個線程
    url_producer = UrlProducer(kw_dict,sem=sem)
    url_producer.start()

    url_producer.join()




from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from concurrent.futures import Future
def get_html(times):
    time.sleep(times)
    print('get page {} success'.format(times))
    return times

if __name__ == '__main__':
    pool = ThreadPoolExecutor(max_workers=2)
    task_2 = pool.submit(get_html,(2))
    task_3 = pool.submit(get_html,(3))

    # print(dir(task_2))  #Future
    # print(task_3.done())
    #
    # if task_3.done():
    #     print(task_3.result())
    #
    # time.sleep(5)
    # print(task_3.done())
    # if task_3.done():
    #     print(task_3.result())
    urls = [1,2,3,4]
    all_tasks = [pool.submit(get_html,url) for url in urls]

    for future in as_completed(all_tasks):
        res = future.result()
        print('get result {}'.format(res))

    print('*'*100)

    for res in pool.map(get_html,urls):
        print('get result {} using map'.format(res))

線程池與進程池分別進行 模擬 cpu 計算 跟 IO 等待 併發 總結

'''
cpu 計算密集型, 多進程 消耗時間少於線程 由於 GIL 鎖的存在
iO 密集型, 多線程其實由於 GIL 鎖 本應該也要弱於多進程,可是切換線程的開銷比較多進程切換而言更低
一個主機能夠開的線程數與能夠開的進程數是不可同日而語的,因此,python的多線程也並非一無可取

io 主要花在時間等待上故能夠用 time.sleep 來模擬, cpu 主要花在計算能夠用斐波拉契數列來模擬
'''

cpu 運算密集,結果 多進程略優於多線程,固然因爲時間限制,咱們將數字調低了,若是有時間等待調高計算次數,那麼差別應該很明顯

先看 cpu 密集結果:

INFO:root:res: 75025
INFO:root:res: 121393
INFO:root:res: 196418
INFO:root:res: 317811
INFO:root:res: 514229
INFO:root:res: 832040
INFO:root:res: 1346269
INFO:root:res: 2178309
INFO:root:res: 3524578
INFO:root:res: 5702887
INFO:root:thread_cpu cost 4.97 s
INFO:root:****************************************************************************************************
INFO:root:res: 75025
省略 n 個
INFO:root:res: 196418
INFO:root:process_cpu cost 4.16 s

### 仔細看代碼
from concurrent.futures import (ThreadPoolExecutor,
                                ProcessPoolExecutor,
                                as_completed)
from functools import wraps
import time
import logging
logging.basicConfig(level=logging.DEBUG)

'''
cpu 計算密集型, 多進程 消耗時間少於線程 由於 GIL 鎖的存在
iO 密集型, 多線程其實由於 GIL 鎖 本應該也要弱於多進程,可是切換線程的開銷比較多進程切換而言更低
一個主機能夠開的線程數與能夠開的進程數是不可同日而語的,因此,python的多線程也並非一無可取

io 主要花在時間等待上故能夠用 time.sleep 來模擬, cpu 主要花在計算能夠用斐波拉契數列來模擬
'''

def time_decor(func):
    @wraps(func)
    def wrapper_func(*args,**kw):
        start = time.time()
        result = func(*args,**kw)
        logging.info('{} cost {:.2f} s'.format(func.__name__,(time.time()-start)))
        return result
    return wrapper_func

def fib(n):
    if n<=2:
        return 1
    else:
        return fib(n-1) + fib(n-2)

@time_decor
def thread_cpu(n):
    with ThreadPoolExecutor(n) as executor:
        all_tasks = [executor.submit(fib,(i)) for i in range(25,35)]
        for feature in as_completed(all_tasks):
            res = feature.result()
            logging.info('res: {}'.format(res))

@time_decor
def process_cpu(n):
    with ProcessPoolExecutor(n) as executor:
        all_tasks = [executor.submit(fib,(i)) for i in range(25,35)]
        # for res in executor.map(fib,range(25,35)):
        #     logging.info(''.format(res))
        for future in as_completed(all_tasks):
            res = future.result()
            logging.info('res: {}'.format(res))

if __name__ == '__main__':
    thread_cpu(3)
    logging.info('*'*100)
    process_cpu(3)

io 密集時候 ,線程池略優於 進程池

先看 io 密集 結果:

INFO:root:res: 2
INFO:root:res: 2
此處省略 n 次 中間結果。。。。。
INFO:root:res: 2
INFO:root:thread_io cost 20.01 s
INFO:root:****************************************************************************************************
INFO:root:res: 2
INFO:root:res: 2
此處省略 n 次 中間結果。。。。。
INFO:root:res: 2
INFO:root:process_io cost 20.52 s

### 具體代碼
from concurrent.futures import (ThreadPoolExecutor,ProcessPoolExecutor,as_completed)
from functools import wraps
import time
import logging
logging.basicConfig(level=logging.DEBUG)

def time_decor(func):
    @wraps(func)
    def wrapper_func(*args,**kw):
        start_time = time.time()
        result = func(*args,**kw)
        logging.info('{} cost {:.2f} s'.format(func.__name__,(time.time()-start_time)))
        return result
    return wrapper_func


def monitor_io(n):
    time.sleep(n)
    return n

@time_decor
def thread_io(n):
    with ThreadPoolExecutor(n) as executor:
        all_tasks = [executor.submit(monitor_io,i) for i in [2]*30]
        for future in as_completed(all_tasks):
            res = future.result()
            logging.info('res: {}'.format(res))
    return n

@time_decor
def process_io(n):
    with ProcessPoolExecutor(n) as executor:
        all_task = [executor.submit(monitor_io,i) for i in [2]*30]
        for future in as_completed(all_task):
            res = future.result()
            logging.info('res: {}'.format(res))

if __name__ == '__main__':
    thread_io(3)
    logging.info('*'*100)
    process_io(3)

第二版 非阻塞 採用循環 詢問

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
from urllib.parse import urlparse
import socket
import logging
logging.basicConfig(level=logging.DEBUG)

def get_url(url):
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 設置非阻塞 IO , 可是這樣 仍是須要 不停地詢問連接是否創建好,須要while 循環不停地去檢查狀態,
    # 作計算任務或者再次發起其餘的鏈接請求,若是接下來的跟鏈接是否創建好沒有關係,那麼這種非阻塞很好
    client.setblocking(False)

    try:
        client.connect((host, 80))
    except BlockingIOError:
        pass

    while True:
        try:
            client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path,host).encode('utf8'))
            break
        except OSError as os_err:
            continue

    d = b""
    while True:
        try:
            data = client.recv(1024)
            if not data:
                break
            d += data
            break
        except BaseException as baseEx:
            continue
    logging.info('\n'+d.decode('utf-8'))
    client.close()

if __name__ == '__main__':
    get_url("http://www.baidu.com")

非阻塞 socket ,採用 select poll epoll 方式 讀取操做

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
from urllib.parse import urlparse
import re
import socket
import logging
logging.basicConfig(level=logging.DEBUG)
from selectors import DefaultSelector, EVENT_WRITE,EVENT_READ

selector = DefaultSelector()
urls = ["http://www.baidu.com"]
STOP = False

class Fetcher:
    def get_url(self,url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        if self.path == "":
            self.path = "/"
        self.data = b""
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 設置非阻塞 IO , 可是這樣 仍是須要 不停地詢問連接是否創建好,須要while 循環不停地去檢查狀態,
        # 作計算任務或者再次發起其餘的鏈接請求,若是接下來的跟鏈接是否創建好沒有關係,那麼這種非阻塞很好
        self.client.setblocking(False)
        try:
            self.client.connect((self.host, 80))
        except BlockingIOError:
            pass
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)

    def connected(self,key):
        selector.unregister(key.fd)
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path,self.host).encode('utf8'))
        selector.register(self.client.fileno(), EVENT_READ, self.readable)
        # 註冊

    def readable(self,key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode('utf8')
            html_data = re.split(r'\r\n\r\n',data)[1]
            logging.info(html_data)
            self.client.close()
            urls.remove(self.spider_url)
            global STOP
            if not urls:
                STOP = True


def loop():
    # 時間循環, 不停地請求 socket 的狀態 並調用對應的回調函數
    #1. select 自己不支持 register 模式, selector 提供註冊
    #2. socket 狀態變化之後的 回調 是 由程序員完成的
    while not STOP:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data
            call_back(key)
    # 回調+ 時間循環+select(pool\epoll)

if __name__ == '__main__':
    fetcher = Fetcher()
    fetcher.get_url("http://www.baidu.com")
    loop()
import socket
from selectors import (DefaultSelector
                       ,EVENT_WRITE
                       ,EVENT_READ)
from urllib.parse import urlparse
import logging
logging.basicConfig(level=logging.DEBUG)
selector = DefaultSelector()
urls = ["http:/www.baidu.com"]
STOP = False
class Fetcher:
    """
    事件循環 + SELECT (狀態查詢) + 回調
    """
    def get_url(self, url):
        self._spider_url = url

        url = urlparse(url)

        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path=="":
            self.path = "/"

        self.client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

        self.client.setblocking(False)
        try:
            self.client.connect((self.host, 80))
        except BlockingIOError:
            pass

        # 註冊 連上後的 回調操做 , 由於 Send 是 寫操做說以是 EVENT_WRITE
        selector.register(self.client.fileno(),EVENT_WRITE,self.connected)

    def connected(self,key):
        """
        :param key:  key.data ==> callback
        :return:
        """
        # 先移除回調
        selector.unregister(key.fd)
        logging.info("GET {path} HTTP/1.1\r\nHost:{host}\r\nConnection:close\r\n\r\n".format(path=self.path,host=self.host))
        self.client.send("GET {path} HTTP/1.1\r\nHost:{host}\r\nConnection:close\r\n\r\n".format(path=self.path,host=self.host).encode('utf-8'))

        # 準備接受服務器端數據,因此是 EVENT_READ 事件
        selector.register(self.client.fileno(),EVENT_READ,self.readable)

    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode('utf8')
            html_data = data.split(r'\r\n\r\n')[1]
            logging.info('\n'+html_data)
            self.client.close()
            urls.remove(self._spider_url)
            if not urls:
                STOP = True

def loop():
    while not STOP:
        ready = selector.select()
        for key, mask in ready:
            callback = key.data
            callback(key)

if __name__ == '__main__':
    fetcher = Fetcher()
    fetcher.get_url("http:/www.baidu.com")
    loop()

調用方 委託生成器 子生成器, 至此,我以爲 python 真的是適合我

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
import requests
import json
import pprint
url_html = {}
def sub_gene(url):
    while True:
        url = yield
        if not url:
            break
        response = requests.get(url)
        code = response.encoding
        html = response.content.decode(code)
    return html[0:20]  # 爲了便於觀看,這裏就只返回前20個字符

def delegate_gene(url):
    while True:
        url_html[url] = yield from sub_gene(url)

def main(urls):
    for url in urls:
        print(url)
        dele_g = delegate_gene(url)
        dele_g.send(None) #預激 委託生成器
        dele_g.send(url) # 直接將 url 經過 創建好的通道傳給子生成器
        dele_g.send(None) # 向子生成器發送 None  結束任務


if __name__ == '__main__':
    urls = ['http://www.baidu.com','http://www.sina.com']
    main(urls)
    pprint.pprint(url_html)

異步 IO 啓程

import asyncio
import logging
import time
from functools import wraps
logging.basicConfig(level=logging.DEBUG)

async def async_func(url):
    await asyncio.sleep(2)
    return 'url content: {}'.format(url)

async def get_url(url):
    logging.debug('start to fetch html from: {}'.format(url))
    result = await  async_func(url)
    logging.debug('finished fetch html from: {}'.format(url))
    return result

def time_count(func):
    @wraps(func)
    def wrapper_func(*args,**kwargs):
        start_time = time.time()
        result = func(*args,**kwargs)
        logging.debug('{} cost {:.2f} s'.format(func.__name__,time.time()-start_time))
        return result
    return wrapper_func

@time_count
def main():
    # 得到事件循環
    event_loop = asyncio.get_event_loop()
    task1 = event_loop.create_task(get_url('https://www.baidu.com'))
    event_loop.run_until_complete(task1)
    logging.debug(task1.result())

if __name__ == '__main__':
    main()

若是遇到 IO 阻塞耗費時間的,但又必須放入協程的,那麼 可使用以下 包裝 task 爲 協程 task

import asyncio
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse
import socket
import logging
logging.basicConfig(level=logging.DEBUG)

def get_url(url):
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if not path:
        path = '/'
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, 80))
    client.send('GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection:close\r\n\r\n'.format(host=host,path=path).encode('utf-8'))

    d = b''
    while True:
        data = client.recv(1024)
        if not data:
            break
        d += data

    logging.debug('\n')
    logging.debug(d.decode())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    executor = ThreadPoolExecutor(3)
    tasks = []
    for i in range(20):
        url = 'http://shop.projectsedu.com/goods/{}/'.format(i)
        task = loop.run_in_executor(executor, get_url, url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
https://www.cnblogs.com/alan-babyblog/p/5260252.html
https://www.bilibili.com/video/av41733850
df['zero_count'] = df.apply(lambda x:x.value_counts().get(0,0),axis=1)
相關文章
相關標籤/搜索