python網絡篇【第十篇】多路複用、多線程

1、小知識點(做用域)python

 

進入主題以前先分享一個有關某公司一道python自動化的面試題:linux

首先說一下做用域的關係,如如下案例:面試

if 1==1:
    name="tom"
print(name)

看以上代碼你以爲會打印出來"tom"嗎?shell

答案是:會的。c#

這個地方須要瞭解的是,在Java、c#是有塊級做用域的,不會打印出來。在python中無塊級做用域能夠會被執行windows

既然已經瞭解python中無塊級做用域,就要了解變量執行的優先級,先從本身自己做用域找,找不到往上一級找,以此類推。數組

那麼咱們在瞭解一個案例:服務器

name ="tom"
def f1():
    print(name)

def f2():
    name="jerry"
    f1()
f2()

你以爲name會是什麼? tom or jerry。網絡

答案是:tom    why?  一臉懵逼!!!數據結構

原理:在python中 做用域在執行以前已經肯定,這就好理解了

 

好了,下面就說一下某浪公司的面試題:

實例1:

line=[x+100 for x in range(10)] 
#意思是循環0-9 每一個數加100 生成一個新的列表
print(line)

#顯示結果
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

實例2:

ll=[x+100 for x in range(10) if x>6]
#意思是循環0-9 當x>6時 加100 
print(ll)
 
#顯示結果
[107, 108, 109]

 

按照上面的兩個列子,x+100 也能夠寫成是一個函數如:

line=[lambda :x for x in range(10)]
r= line[0]()
print(r)

結果會是什麼? 0 or None or lambda:x ??

答案是:9     why?  兩臉懵逼!!!

原理:函數在沒有執行以前內部代碼不執行  

那麼line列表中就有十個 lambda :x

那就找 x 是多少就好了 ,很明顯循環到最後一次時x被從新複製給 9,那麼lambda表達式直接return x 

這就是某浪的一道面試題。。。。

 

2、socketserver源碼剖析

 

繼續上一篇socketserver往下說,上一篇只是知道了具體的用法,可是不知道什麼意思,下面咱們來源碼剖析。。。

 1 import subprocess
 2 class MyServer(socketserver.BaseRequestHandler):
 3     def handle(self):
 4         self.request.sendall(bytes("歡迎光臨",encoding="utf-8"))
 5         while True:
 6                 data=self.request.recv(1024)
 7                 if len(data) ==0:break
 8                 print("[%s] says:%s" %(self.client_address,data.decode()))
 9                 cmd= subprocess.Popen(data.decode(),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
10                 cmd_res=cmd.stdout.read()
11                 if not cmd_res:
12                     cmd_data=bytes(cmd.stderr.read())
13                 if len(cmd_res)==0:
14                     cmd_data=bytes("err cmd",encoding="utf8")
15                 self.request.send(cmd_res)
16 
17 
18 if __name__=='__main__':
19     server=socketserver.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
20     server.serve_forever()

ThreadingTCPServer的類圖關係以下:

 

 

 

以上面的代碼爲例內部調用流程爲:

  • 啓動服務端程序
  • 執行 TCPServer.__init__ 方法,建立服務端Socket對象並綁定 IP 和 端口
  • 執行 BaseServer.__init__ 方法,將自定義的繼承自socketserver.BaseRequestHandler 的類MyServer賦值給 self.RequestHandlerClass
  • 執行 BaseServer.server_forever 方法,While 循環一直監聽是否有客戶端請求到達 ...
  • 執行 handle_request_noblock()
  • 當客戶端鏈接到達服務器
  • 執行 ThreadingMixIn.process_request 方法,建立一個 「線程」 用來處理請求
  • 執行 ThreadingMixIn.process_request_thread 方法
  • 執行 BaseServer.finish_request 方法,
  • 執行 self.RequestHandlerClass()  即:執行 自定義 MyServer 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyServer的handle方法)

 

class BaseServer:

    """Base class for server classes.

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you do not use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - server_close()
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - allow_reuse_address

    Instance variables:

    - RequestHandlerClass
    - socket

    """

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        pass

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                # XXX: Consider using another file descriptor or
                # connecting to the socket to wake this up instead of
                # polling. Polling reduces our responsiveness to a
                # shutdown request and wastes cpu at all other times.
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        """Stops the serve_forever loop.

        Blocks until the loop has finished. This must be called while
        serve_forever() is running in another thread, or it will
        deadlock.
        """
        self.__shutdown_request = True
        self.__is_shut_down.wait()

    # The distinction between handling, getting, processing and
    # finishing a request is fairly arbitrary.  Remember:
    #
    # - handle_request() is the top-level call.  It calls
    #   select, get_request(), verify_request() and process_request()
    # - get_request() is different for stream or datagram sockets
    # - process_request() is the place that may fork a new process
    #   or create a new thread to finish the request
    # - finish_request() instantiates the request handler class;
    #   this constructor will handle the request all by itself

    def handle_request(self):
        """Handle one request, possibly blocking.

        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that select.select has returned that the socket is
        readable before this function was called, so there should be
        no risk of blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

    def handle_timeout(self):
        """Called if no new request arrives within self.timeout.

        Overridden by ForkingMixIn.
        """
        pass

    def verify_request(self, request, client_address):
        """Verify the request.  May be overridden.

        Return True if we should proceed with this request.

        """
        return True

    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        pass

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        pass

    def handle_error(self, request, client_address):
        """Handle an error gracefully.  May be overridden.

        The default is to print a traceback and continue.

        """
        print '-'*40
        print 'Exception happened during processing of request from',
        print client_address
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print '-'*40

BaseServer
BaseServer
class TCPServer(BaseServer):

    """Base class for various socket-based server classes.

    Defaults to synchronous IP stream (i.e., TCP).

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you don't use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - request_queue_size (only for stream sockets)
    - allow_reuse_address

    Instance variables:

    - server_address
    - RequestHandlerClass
    - socket

    """

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

    def fileno(self):
        """Return socket file number.

        Interface required by select().

        """
        return self.socket.fileno()

    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept()

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        try:
            #explicitly shutdown.  socket.close() merely releases
            #the socket and waits for GC to perform the actual close.
            request.shutdown(socket.SHUT_WR)
        except socket.error:
            pass #some platforms may raise ENOTCONN here
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        request.close()

TCPServer
TCPServer源碼
class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()

ThreadingMixIn
ThreadingMixIn源碼
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
ThreadingTCPServer源碼
class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define arbitrary other instance variariables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

SocketServer.BaseRequestHandler
BaseRequestHandler源碼

 

3、I/O多路複用

I/O多路複用指:經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。注意(I/O是不佔用cpu的)

Linux中的 select,poll,epoll 都是IO多路複用的機制。    

 1 select
 2  
 3 select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。
 4 select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。
 5 select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。
 6 另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。
 7  
 8 poll
 9  
10 poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。
11 poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。
12 另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。
13  
14 epoll
15  
16 直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。
17 epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。
18 epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。
19 另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
介紹

 在Windows 和Mac 系統中 python只提供了select一種IO多路複用的機制

在linux中  select,poll,epoll 都是支持的

 注意:網絡操做、文件操做、終端操做等均屬於IO操做,對於windows只支持Socket操做,其餘系統支持其餘IO操做,可是沒法檢測 普通文件操做 自動上次讀取是否已經變化。

用select實現socket多線路鏈接

書寫格式以下例:

server端:

import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8888))
sk.listen(5)
while True:
    rlist,wlist,elist=select.select([sk,],[],[],1)
    print(rlist)   #rlist中是socket 對象列表 【sk】
    for i in rlist:
        conn,addr=i.accept()
        conn.sendall(bytes("hello",encoding="utf8"))

client端:

import socket
sk=socket.socket()
sk.connect(("127.0.0.1",8888))
data=sk.recv(1024)
print(data.decode())
while True:
    input(">>>>")
sk.close()

server端詳解:

句柄列表rlist, 句柄列表wlist, 句柄列表elist = select.select([sk], 句柄序列2, 句柄序列3, 1是超時時間)

 
參數: 可接受四個參數(前三個必須)
返回值:三個列表
 
select方法用來監視文件句柄,若是句柄發生變化,則獲取該句柄。
1、當 參數sk序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中
2、當 參數2 序列中含有句柄時,則將該序列中全部的句柄添加到 返回值2 序列中
3、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中
4、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化
   當 超時時間 = 1時,那麼若是監聽的句柄均無任何變化,則select會阻塞 1 秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。

 

實例二:

server端

import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8888))
sk.listen(5)

inputs=[sk,]

outputs=[]
while True:
    rlist,wlist,e=select.select(inputs,outputs,[],1)
    print(len(inputs),len(rlist),len(outputs),len(wlist))   #rlist中是socket 對象列表 【sk】
    #打印的第一個參數是 公有多少個鏈接,2,變化的rlist個數, 3,有操做變化個數,4,也是有操做變化個數
    for i in rlist:   #循環這個句柄,只要inputs,有變化,rlist就能取到
        if i == sk:    #判斷是否相等sk,相等話證實就會有新的鏈接
            conn,addr=i.accept()   #而後創建鏈接
            inputs.append(conn)    #把這次鏈接的線路也進行監聽
            conn.sendall(bytes("hello",encoding="utf8"))    #發送給客戶端一個信息驗證
        else:     #若是不等,就有多是線路發生了變化
            try:   #程序運行正確
                data=i.recv(1024)   #接收客戶端信息
                print(data.decode())
                if not data:        #
                    raise Exception("斷開鏈接")
                else:           #一些運行正常 ,把這次線路變化加到outputs列表中
                    outputs.append(i)
            except Exception as e:  #若是有程序退出, 也須要把這次線路變化從監聽中刪除
                print(e)
                inputs.remove(i)
    for w in wlist:    #wlist 有變化證實接收到信息,
        w.sendall(bytes("response",encoding="utf8"))    # 給客戶端回個信息
        outputs.remove(w)  #而後刪除這次監聽,實現讀寫分離做用

client端:

import socket
sk=socket.socket()
sk.connect(("127.0.0.1",8888))
data=sk.recv(1024)
print(data.decode())
while True:
    intt=input(">>>>")
    sk.sendall(bytes(intt,encoding="utf8"))
    print(sk.recv(1024))
sk.close()

 

其實用select,並非真正實現併發,一直循環着在監聽數據是否有變化,並把數據處理完畢以後纔會去處理新的請求數據。若是每一個請求的耗時比較長時,select版本的服務器端也沒法完成同時操做,這種模式稱之爲僞併發。

 

4、多線程、多進程

1. 一個應用程序能夠有多進程、多線程
2. 默認是單進程、單線程
3. 單進程,多線程,在Python中不會性能提高,在Java和C#中能夠提高。
提升併發:
多線程: IO操做,不會用到CPU,效率提高是能夠的

多進程:計算型操做, 須要佔用CPU,所以性能不會有提高

在程序猿的世界中,線程和進程是一個很重要的概念,不少人常常弄不清線程和進程究竟是什麼,有什麼區別,本文試圖來解釋一下線程和進程。首先來看一下概念:

進程(英語:process):

是計算機中已運行程序的實體。進程爲曾經是分時系統的基本運做單位。在面向進程設計的系統(如早期的UNIX,Linux 2.4及更早的版本)中,進程是程序的基本執行實體;在面向線程設計的系統(如當代多數操做系統、Linux 2.6及更新的版本)中,進程自己不是基本運行單位,而是線程的容器。程序自己只是指令、數據及其組織形式的描述,進程纔是程序(那些指令和數據)的真正運行實例。

 

線程(英語:thread):

是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務

概念太嚇人了,先來看一下進程,這個相對於線程來講仍是稍微好理解一點的。進程,是程序運行的實體,這句話的意思是,程序是存放在硬盤中的,當這個程序運行時,就會產生若干個進程,而且這個進程是可見的

那麼什麼是 線程呢,線程是一個任務流,它被包含在進程之中

 

下面來看一下python中線程:

 1 import threading
 2 import time
 3 def f1(arg):
 4     time.sleep(5)  #等待5秒鐘
 5     print(arg)     #執行
 6 t = threading.Thread(target=f1,args=(123,))
 7 t.setDaemon(True)   #True 表示主線程不等此子線程
 8 t.start()  #不表明當前線程會被當即執行
 9 t.join(6)  #表示主線程到此,等待。。直到子線程執行完畢
10             # 參數6,表示主線程在此最多等待6秒
11 
12 print(33)
13 print(33)

 

未完待續。。。。。。

相關文章
相關標籤/搜索