OpenStakc開發工程師面試必備技能

一、python垃圾回收機制
html

python也是帶有垃圾回收機制的,就像其它語言同樣,如java、ruby、go等,這裏介紹下python的垃圾回收機制是怎麼實現的?java

參考連接:http://jin-yang.github.io/blog/python-garbage-collection.htmlpython


二、python多進程、多線程git

這塊內容轉載來自:http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/0014319272686365ec7ceaeca33428c914edf8f70cca383000 github


先來介紹下併發和並行的概念。數據庫

併發:在同一時間作着不少事情,好比只有一個cpu核,那麼操做系統就會在各個程序之間快速切換。segmentfault

並行:確實在同一時間作着不少事情,若是有多個cpu核,確實可以同時執行多個程序。windows


整體來講,多任務的實現由3種方式:api

一、多進程模式ruby

二、多線程模式

三、多進程+多線程模式


一、多進程模式

python的os.fork能夠輕鬆建立子進程

import os    
if __name__ == '__main__':
    
    pid = os.fork()
    if pid == 0:
        print("child process:{0}, parent process:{1}".format(os.getpid(), os.getppid()))
    else:
        print("parent process:{0} is creating child process {1}".format(os.getpid(), pid))

因爲windows沒有fork調用,因此推薦使用能夠跨平臺的multiprocessing多進程模塊。


import os
from multiprocessing import Process     # 提供Process類來表示進程對象
def run_proc(name):
    print "child process %s is running, pid: %s" % (name, os.getpid()) 
    
if __name__ == '__main__':
    print "parent process: %s" % (os.getpid())
    
    p = Process(target=run_proc, args=('test_process',))   # 建立一個Process實例,能夠查看Process的__init__函數,只須要一個執行函數和函數參數
    print "child process will start"
    p.start()                            # 啓動子進程
    p.join(timeout=10)                   # 等待子進程執行結束後,再繼續執行
    print "child process end"
    
執行結果:    
parent process: 13119
child process will start
child process test_process is running, pid: 13120
child process end


以進程池方式批量建立子進程

import os, random, time
from multiprocessing import Pool   

def run_proc(name):
    print "child process %s is running, pid: %s" % (name, os.getpid()) 
    start = time.time()
    time.sleep(random.random()*5)
    end = time.time()
    print "child process %s runs %s seconds" % (name, end - start)
    
if __name__ == '__main__':
    print "parent process: %s" % (os.getpid())
    p = Pool()
    for i in xrange(5):
        p.apply_async(run_proc, args=(i,))      啓動方式跟Process有點區別
    print "wait all child process to finish"
    p.close()
    p.join()                         # 對Pool對象調用join方法以前,必須先調用close方法,即再也不繼續添加新的Process對象了
    print "all child process done"
    
運行結果:
parent process: 13149
wait all child process to finish
child process 0 is running, pid: 13151
child process 1 is running, pid: 13152
child process 2 is running, pid: 13153
child process 3 is running, pid: 13154
child process 0 runs 0.198132038116 seconds
child process 4 is running, pid: 13151
child process 3 runs 0.270474910736 seconds
child process 1 runs 4.15184187889 seconds
child process 2 runs 4.84887504578 seconds
child process 4 runs 4.76589512825 seconds
all child process done

這裏看到child process 4須要等待後才能執行,由於Pool的默認大小是4;能夠修改Pool(10),而後再運行

Python multiprocessing默認不支持instance method

http://nyeggen.com/post/2011-07-16-python-multiprocessing-with-instance-methods/



進程間通訊

multiprocessing模塊提供queue、pipes等多種方式來通訊

from multiprocessing import Process, Queue
import time, random

def read(q):
    while True:
        v = q.get(True)
        print("get %s from queue" % v)
        
def write(q):
    for i in ['a','b','c']:
        print("put %s into queue" % i)
        q.put(i)
        time.sleep(random.random())
        
if __name__ == "__main__":
    q = Queue()
    pr = Process(target=read, args=(q,))
    pw = Process(target=write, args=(q,))
    
    pw.start()
    pr.start()
    
    pw.join()
    pr.terminate()


二、多線程模式

多線程也能夠實現多任務的執行,python提供了thread和threading模塊,threading是對thread的進一步封裝;推薦使用threading


啓動一個線程

import time, threading
def loop():
    print "thread %s is running" % threading.current_thread().name   # current_thread() 返回當前線程的實例
    i = 0
    while i < 5:
        i+=1
        print "thread %s is implementing task %s" % (threading.currentThread().name, i)    
    print "thread %s end" % threading.currentThread().name
    
print "thread %s is running" % threading.current_thread().name
t = threading.Thread(target=loop, name="thread-1")
t.start()
t.join()
print "thread %s end" % threading.current_thread().name 

運行結果:
thread MainThread is running
thread thread-1 is running
thread thread-1 is implementing task 1
thread thread-1 is implementing task 2
thread thread-1 is implementing task 3
thread thread-1 is implementing task 4
thread thread-1 is implementing task 5
thread thread-1 end
thread MainThread end


線程lock

線程與進程最大的不一樣就是,全部線程共享全部變量,變量對每一個進程都是一個完整的拷貝。

import time, threading
n = 0
def change(m):
    global n
    n = n + m
    n = n - m
def loop(m):
    for i in xrange(10000):
        change(m)
t = threading.Thread(target=loop, name="thread-1", args=(5,))
t2 = threading.Thread(target=loop, name='thread-2', args=(9,))
t.start()
t2.start()
t2.join()
t.join()
print n

運行結果:0、-5 -9 十、1四、五、9都出現過,只要循環次數夠多


# 經過加鎖能夠解決多線程資源爭用的問題
lock = threading.Lock()   # 建立一個鎖

def change(m):
    global n
    n = n + m
    n = n - m

def loop(m):
    for i in xrange(10000):
        lock.acquire()       # 同一時刻,只有一個線程能夠得到鎖
        try:
            change(m)
        finally:
            lock.release()   # 爲了確保,可以釋放鎖,避免其它線程一直在等待鎖

包含鎖的代碼只能以單線程模式運行,利用不到多線程的優點。


因爲python GIL 全局鎖的存在,線程執行以前都要先獲取GIL鎖, 引用:

《GIL的設計簡化了CPython的實現,使得對象模型,包括關鍵的內建類型如字典,都是隱含能夠併發訪問的。鎖住全局解釋器使得比較容易的實現對多線程的支持,但也損失了多處理器主機的並行計算能力。

可是,不論標準的,仍是第三方的擴展模塊,都被設計成在進行密集計算任務是,釋放GIL。

還有,就是在作I/O操做時,GIL老是會被釋放。對全部面向I/O 的(會調用內建的操做系統C 代碼的)程序來講,GIL 會在這個I/O 調用以前被釋放,以容許其它的線程在這個線程等待I/O 的時候運行。若是是純計算的程序,沒有 I/O 操做,解釋器會每隔 100 次操做就釋放這把鎖,讓別的線程有機會執行(這個次數能夠經過 sys.setcheckinterval 來調整)若是某線程並未使用不少I/O 操做,它會在本身的時間片內一直佔用處理器(和GIL)。也就是說,I/O 密集型的Python 程序比計算密集型的程序更能充分利用多線程環境的好處。》

連接http://blog.csdn.net/jinguangliu/article/details/45422663

雖然python多線程很雞肋,但仍是能夠經過多進程來實現多任務並行工做,每一個進程都有各自獨立的GIL鎖。


threadlocal

多線程環境下,線程使用本身的局部變量比全局變量好,要想使用全局變量就得加鎖,可是使用局部變量函數之間調用傳參比較麻煩。

import time, threading
from warnings import catch_warnings
threadlocal = threading.local()     # 每一個threadlocal對象均可以對name屬性進行讀寫,並且互不影響
def print_name():
    print "hello %s, process name: %s" % (threadlocal.name, threading.current_thread().name)
def loop(m):
    threadlocal.name = m
    print_name()
    
t = threading.Thread(target=loop, name="thread-1", args=('yai',))
t2 = threading.Thread(target=loop, name='thread-2', args=('cai',))
t.start()
t2.start()
t2.join()
t.join()

運行結果:
hello yai, process name: thread-1
hello cai, process name: thread-2

threadlocal最經常使用的地方就是爲每一個線程綁定一個數據庫鏈接,HTTP請求,用戶身份信息等,這樣一個線程的全部調用到的處理函數均可以很是方便地訪問這些資源。


協程(coroutine)

關於協程的介紹,廖老師的網站上介紹得很好:http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001432090171191d05dae6e129940518d1d6cf6eeaaa969000

一句話總結協程的特定:子程序(函數)就是協程的特例。


元類(metaclass)

平時雖然用得很少,有必要學習下,這裏有篇很好的教程:http://mp.weixin.qq.com/s?__biz=MzA4MjEyNTA5Mw==&mid=2652563643&idx=1&sn=f06beb600b41a6ec8f1d22b2b5912ed0&scene=23&srcid=0710rhyMwjbzJyechK8V3Yu6#rd



三、單元測試

unittest是經常使用的測試框架, 下面看個例子

# 下面程序實現的是像訪問class的attribute同樣訪問dict的value
class Dict(dict):
    def __init__(self, **kwargs):
        super(Dict, self).__init__(**kwargs)
        self.c = 123
        
    def __setattr__(self, key, value):
        self[key] = value
        
    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError(r"'Dict' object has no attribute '%s'" % key)
            
if __name__ == '__main__':
    a = Dict(a=1,b=2)
    print a.a, a.c, a['b']


針對上面程序的測試用例

import unittest
from ut import Dict

class TestDict(unittest.TestCase):          #  從unittest.TestCase繼承,它提供了不少內置的測試方法
    def setUp(self):
        print "starting..."
    def test_init(self):
        d = Dict(a=1, b='123')
        self.assertEquals(d.a, 1)
        self.assertEquals(d.b, '123')
        self.assertTrue(isinstance(d, dict))
    def test_key(self):
        d = Dict()
        d['key'] = 'value'
        self.assertEquals(d.key, 'value')
    def test_value(self):
        d = Dict()
        d.key = 'value'
        self.assertTrue('key' in d)
        self.assertEquals(d['key'], 'value')
    def test_keyerror(self):
        d = Dict()
        with self.assertRaises(KeyError):      # assertRaises期待拋出指定類型的Error, 訪問不存在的key時,拋出KeyError
            value = d['empty']
    def test_attrerror(self):           
        d = Dict()
        with self.assertRaises(AttributeError):
            value = d.empty
    def tearDown(self):
        print "ending..."
if __name__ == '__main__':            
    unittest.main()                            # 這樣就能夠像運行腳本同樣,直接python ***.py; 或者不加這個也行,python -m unittest ***
    
注:以test開頭的方法就是測試方法,不以test開頭的方法不被認爲是測試方法,測試的時候不會被執行。
能夠在單元測試中編寫兩個特殊的setUp()和tearDown()方法。這兩個方法會分別在每調用一個測試方法的先後分別被執行。


# 運行結果
starting...
ending...
.starting...
ending...
.starting...
ending...
.starting...
ending...
.starting...
ending...
.
----------------------------------------------------------------------
Ran 5 tests in 0.001s

OK


mock實戰:https://www.toptal.com/python/an-introduction-to-mocking-in-python

這篇老外的教程至關不錯。


Mock和MagicMock的區別:

MagicMock是Mock的子類,其實是對Mock的擴展,容許模擬python的magic methods。下面看個例子:

>>> import mock
>>> mo = mock.Mock()
>>> mo.__str__.return_value = "1234"
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'method-wrapper' object has only read-only attributes (assign to .return_value)
>>> mo = mock.MagicMock()
>>> mo.__str__.return_value = "1234"
>>> str(mo)
'1234'
>>>


看到一個介紹OpenStack api的教程:

經過demo學習OpenStack開發所需的基礎知識 -- API服務(1)

經過demo學習OpenStack開發所需的基礎知識 -- API服務(2)

經過demo學習OpenStack開發所需的基礎知識 -- API服務(3)

經過demo學習OpenStack開發所需的基礎知識 -- API服務(4)

相關文章
相關標籤/搜索