# 進程/線程/協程 # IO:同步/異步/阻塞/非阻塞 # greenlet gevent # 事件驅動與異步IO # Select\Poll\Epoll異步IO 以及selectors模塊 # P

   1 # 進程/線程/協程
   2 # IO:同步/異步/阻塞/非阻塞
   3 #     greenlet gevent
   4 # 事件驅動與異步IO
   5 # Select\Poll\Epoll異步IO 以及selectors模塊
   6 # Python隊列/RabbitMQ隊列  
   7 
   8 ##############################################################################################
   9 1.什麼是進程?進程和程序之間有什麼區別?
  10     進程:一個程序的執行實例稱爲進程;
  11     每一個進程都提供執行程序所需的資源。
  12     進程有一個虛擬地址空間、可執行代碼、對系統對象的開放句柄、一個安全上下文、一個唯一的進程標識符、環境變量、一個優先級類、最小和最大工做集大小,以及至少一個執行線程;
  13     每一個進程都由一個線程啓動,這個線程一般被稱爲主線程,可是能夠從它的任何線程中建立額外的線程;
  14     程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程;
  15     程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。
  16     在多道編程中,咱們容許多個程序同時加載到內存中,在操做系統的調度下,能夠實現併發地執行,大大提升了CPU的利用率
  17 2.什麼是線程?
  18     進程的缺點有:
  19         進程只能在一個時間幹一件事,若是想同時幹兩件事或多件事,進程就無能爲力了。
  20         進程在執行的過程當中若是阻塞,例如等待輸入,整個進程就會掛起,即便進程中有些工做不依賴於輸入的數據,也將沒法執行。
  21     線程是操做系統可以進行運算調度的最小單位。
  22     它被包含在進程之中,是進程中的實際運做單位。
  23     一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務
  24     線程是一個執行上下文,它是一個CPU用來執行指令流的全部信息。
  25 3.進程和線程之間的關係?
  26     線程共享建立它的進程的地址空間;進程有本身的地址空間。(內存地址)
  27     線程能夠直接訪問其進程的數據段;進程有本身的父進程數據段的副本。
  28     線程能夠直接與進程的其餘線程通訊;進程必須使用進程間通訊來與兄弟進程通訊。
  29     新線程很容易建立;新進程須要複製父進程。
  30     線程能夠對同一進程的線程進行至關大的控制;進程只能對子進程執行控制。
  31     對主線程的更改(取消、優先級更改等)可能會影響流程的其餘線程的行爲;對父進程的更改不會影響子進程。
  32 4.python GIL全局解釋器鎖
  33     不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行
  34     http: // www.dabeaz.com / python / UnderstandingGIL.pdf
  35 5.Python threading模塊的使用
  36     基本調用方式1
  37         import threading
  38         import time
  39 
  40 
  41         def sayhi(num):  # 定義每一個線程要運行的函數
  42 
  43             print("running on number:%s" % num)
  44 
  45             time.sleep(3)
  46 
  47 
  48         if __name__ == '__main__':
  49             t1 = threading.Thread(target=sayhi, args=(1,))  # 生成一個線程實例
  50             t2 = threading.Thread(target=sayhi, args=(2,))  # 生成另外一個線程實例
  51 
  52             t1.start()  # 啓動線程
  53             t2.start()  # 啓動另外一個線程
  54 
  55             print(t1.getName())  # 獲取線程名
  56             print(t2.getName())
  57     基本調用方式2
  58         import threading
  59         import time
  60 
  61 
  62         class MyThread(threading.Thread):
  63             def __init__(self, num):
  64                 threading.Thread.__init__(self)
  65                 self.num = num
  66 
  67             def run(self):  # 定義每一個線程要運行的函數
  68 
  69                 print("running on number:%s" % self.num)
  70 
  71                 time.sleep(3)
  72 
  73 
  74         if __name__ == '__main__':
  75             t1 = MyThread(1)
  76             t2 = MyThread(2)
  77             t1.start()
  78             t2.start()
  79 6.守護線程Daemon:
  80     非守護進程線程退出,就能夠將守護線程殺死。
  81     # _*_coding:utf-8_*_
  82 
  83     import time
  84     import threading
  85 
  86 
  87     def run(n):
  88         print('[%s]------running----\n' % n)
  89         time.sleep(2)
  90         print('--done--')
  91 
  92 
  93     def main():
  94         for i in range(5):
  95             t = threading.Thread(target=run, args=[i, ])
  96             t.start()
  97             t.join(1)
  98             print('starting thread', t.getName())
  99 
 100 
 101     m = threading.Thread(target=main, args=[])
 102     m.setDaemon(True)  # 將main線程設置爲Daemon線程,它作爲程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啓動的其它子線程會同時退出,無論是否執行完任務
 103     m.start()
 104     m.join(timeout=2)
 105     print("---main thread done----")
 106 7.線程鎖(互斥鎖)
 107     一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,可能會致使數據被同時修改而使得計算結果不許確(重複賦值)
 108     import time
 109     import threading
 110 
 111 
 112     def addNum():
 113         global num  # 在每一個線程中都獲取這個全局變量
 114         print('--get num:', num)
 115         time.sleep(1)
 116         num -= 1  # 對此公共變量進行-1操做
 117 
 118 
 119     num = 100  # 設定一個共享變量
 120     thread_list = []
 121     for i in range(100):
 122         t = threading.Thread(target=addNum)
 123         t.start()
 124         thread_list.append(t)
 125 
 126     for t in thread_list:  # 等待全部線程執行完畢
 127         t.join()
 128 
 129     print('final num:', num)
 130     加上線程鎖
 131     import time
 132     import threading
 133 
 134 
 135     def addNum():
 136         global num  # 在每一個線程中都獲取這個全局變量
 137         print('--get num:', num)
 138         time.sleep(1)
 139         lock.acquire()  # 修改數據前加鎖
 140         num -= 1  # 對此公共變量進行-1操做
 141         lock.release()  # 修改後釋放
 142 
 143 
 144     num = 100  # 設定一個共享變量
 145     thread_list = []
 146     lock = threading.Lock()  # 生成全局鎖
 147     for i in range(100):
 148         t = threading.Thread(target=addNum)
 149         t.start()
 150         thread_list.append(t)
 151 
 152     for t in thread_list:  # 等待全部線程執行完畢
 153         t.join()
 154 
 155     print('final num:', num)
 156 8.線程鎖與GIL之間的關係?
 157     加入GIL主要的緣由是爲了下降程序的開發的複雜度,
 158     好比如今的你寫python不須要關心內存回收的問題,由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,
 159     每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序裏的線程和 py解釋器本身的線程是併發運行的,
 160     假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,
 161     可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,
 162     爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動
 163 9.遞歸鎖(Rlock 不用遞歸鎖而多重加lock鎖會致使被鎖住,程序卡死)
 164     import threading, time
 165 
 166 
 167     def run1():
 168         print("grab the first part data")
 169         lock.acquire()
 170         global num
 171         num += 1
 172         lock.release()
 173         return num
 174 
 175 
 176     def run2():
 177         print("grab the second part data")
 178         lock.acquire()
 179         global num2
 180         num2 += 1
 181         lock.release()
 182         return num2
 183 
 184 
 185     def run3():
 186         lock.acquire()
 187         res = run1()
 188         print('--------between run1 and run2-----')
 189         res2 = run2()
 190         lock.release()
 191         print(res, res2)
 192 
 193 
 194     if __name__ == '__main__':
 195 
 196         num, num2 = 0, 0
 197         lock = threading.RLock() #注意遞歸鎖是Rlock
 198         for i in range(10):
 199             t = threading.Thread(target=run3)
 200             t.start()
 201 
 202     while threading.active_count() != 1:
 203         print(threading.active_count())
 204     else:
 205         print('----all threads done---')
 206         print(num, num2)
 207 10.Semaphore(信號量):
 208     同時容許必定數量的線程更改數據
 209     import threading, time
 210 
 211     def run(n):
 212         semaphore.acquire()
 213         time.sleep(1)
 214         print("run the thread: %s\n" % n)
 215         semaphore.release()
 216 
 217     if __name__ == '__main__':
 218 
 219         num = 0
 220         semaphore = threading.BoundedSemaphore(5)  # 最多容許5個線程同時運行
 221         for i in range(20):
 222             t = threading.Thread(target=run, args=(i,))
 223             t.start()
 224 
 225     while threading.active_count() != 1:
 226         pass  # print threading.active_count()
 227     else:
 228         print('----all threads done---')
 229         print(num)
 230 11.Events事件:經過Event來實現兩個或多個線程間的交互
 231     event = threading.Event()
 232     # a client thread can wait for the flag to be set
 233     event.wait()
 234     # a server thread can set or reset it
 235     event.set()
 236     event.clear()
 237     一個紅綠燈的例子,即起動一個線程作交通指揮燈,生成幾個線程作車輛,車輛行駛按紅燈停,綠燈行的規則:
 238     import threading,time
 239     import random
 240     def light():
 241         if not event.isSet():
 242             event.set() #wait就不阻塞 #綠燈狀態
 243         count = 0
 244         while True:
 245             if count < 10:
 246                 print('\033[42;1m--green light on---\033[0m')
 247             elif count <13:
 248                 print('\033[43;1m--yellow light on---\033[0m')
 249             elif count <20:
 250                 if event.isSet():
 251                     event.clear()
 252                 print('\033[41;1m--red light on---\033[0m')
 253             else:
 254                 count = 0
 255                 event.set() #打開綠燈
 256             time.sleep(1)
 257             count +=1
 258     def car(n):
 259         while 1:
 260             time.sleep(random.randrange(10))
 261             if  event.isSet(): #綠燈
 262                 print("car [%s] is running.." % n)
 263             else:
 264                 print("car [%s] is waiting for the red light.." %n)
 265     if __name__ == '__main__':
 266         event = threading.Event()
 267         Light = threading.Thread(target=light)
 268         Light.start()
 269         for i in range(3):
 270             t = threading.Thread(target=car,args=(i,))
 271             t.start()
 272 12.python queue隊列(線程隊列)
 273     class queue.Queue(maxsize=0) #先入先出
 274     class queue.LifoQueue(maxsize=0) #last in fisrt out
 275     class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列
 276     Queue.qsize()
 277     Queue.empty() #return True if empty
 278     Queue.full() # return True if full
 279     Queue.put(item, block=True, timeout=None) #將項目放入隊列中。若是可選的args塊是true,則超時爲None(缺省值),若是須要則阻塞,直到空閒槽可用。若是超時是一個正數,它會在大多數超時秒中阻塞,若是在那個時間內沒有空閒槽,則會引起徹底的異常。不然(塊是false),若是一個空閒槽當即可用,則在隊列上放置一個項,不然就會拋出徹底異常(在這種狀況下會忽略超時)。
 280     Queue.put_nowait(item) #Equivalent to put(item, False).
 281     Queue.get(block=True, timeout=None) #從隊列中刪除並返回一個項目。若是可選的args塊是true,則超時爲None(缺省值),若是須要則阻塞,直到有可用的項。若是超時是一個正數,它會在大多數超時秒中阻塞,若是在那個時間內沒有可用的項,則會拋出空的異常。不然(塊是false),若是當即可用,返回一個項目,不然將拋出空異常(在這種狀況下忽略超時)。
 282     Queue.get_nowait() #Equivalent to get(False).
 283     兩種方法來支持跟蹤隊列的任務是否已經被守護進程的消費者線程徹底地處理。
 284     Queue.task_done()
 285     Queue.join() block直到queue被消費完畢
 286 13.生產者消費者模型
 287     生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。
 288     生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,
 289     因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,
 290     消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
 291     import threading
 292     import queue
 293 
 294 
 295     def producer():
 296         for i in range(10):
 297             q.put("骨頭 %s" % i)
 298 
 299         print("開始等待全部的骨頭被取走...")
 300         q.join()
 301         print("全部的骨頭被取完了...")
 302 
 303 
 304     def consumer(n):
 305         while q.qsize() > 0:
 306             print("%s 取到" % n, q.get())
 307             q.task_done()  # 告知這個任務執行完了
 308 
 309 
 310     q = queue.Queue()
 311 
 312     p = threading.Thread(target=producer, )
 313     p.start()
 314 
 315     c1 = consumer("李闖")
 316 
 317 
 318     import time,random
 319     import queue,threading
 320     q = queue.Queue()
 321     def Producer(name):
 322       count = 0
 323       while count <20:
 324         time.sleep(random.randrange(3))
 325         q.put(count)
 326         print('Producer %s has produced %s baozi..' %(name, count))
 327         count +=1
 328     def Consumer(name):
 329       count = 0
 330       while count <20:
 331         time.sleep(random.randrange(4))
 332         if not q.empty():
 333             data = q.get()
 334             print(data)
 335             print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
 336         else:
 337             print("-----no baozi anymore----")
 338         count +=1
 339     p1 = threading.Thread(target=Producer, args=('A',))
 340     c1 = threading.Thread(target=Consumer, args=('B',))
 341     p1.start()
 342     c1.start()
 343 14.多進程模塊multiprocessing
 344     from multiprocessing import Process
 345     import time
 346 
 347 
 348     def f(name):
 349         time.sleep(2)
 350         print('hello', name)
 351 
 352 
 353     if __name__ == '__main__':
 354         p = Process(target=f, args=('bob',))
 355         p.start()
 356         p.join()
 357     14.1展現進程號:
 358     from multiprocessing import Process
 359     import os
 360 
 361 
 362     def info(title):
 363         print(title)
 364         print('module name:', __name__)
 365         print('parent process:', os.getppid())
 366         print('process id:', os.getpid())
 367         print("\n\n")
 368 
 369 
 370     def f(name):
 371         info('\033[31;1mfunction f\033[0m')
 372         print('hello', name)
 373 
 374 
 375     if __name__ == '__main__':
 376         info('\033[32;1mmain process line\033[0m')
 377         p = Process(target=f, args=('bob',))
 378         p.start()
 379         p.join()
 380     14.2進程間通信
 381         14.2.1Queues方法
 382             from multiprocessing import Process, Queue
 383 
 384 
 385             def f(q):
 386                 q.put([42, None, 'hello'])
 387 
 388 
 389             if __name__ == '__main__':
 390                 q = Queue()
 391                 p = Process(target=f, args=(q,))
 392                 p.start()
 393                 print(q.get())  # prints "[42, None, 'hello']"
 394                 p.join()
 395         14.2.2Pipes方法
 396             from multiprocessing import Process, Pipe
 397 
 398 
 399             def f(conn):
 400                 conn.send([42, None, 'hello'])
 401                 conn.close()
 402 
 403 
 404             if __name__ == '__main__':
 405                 parent_conn, child_conn = Pipe()
 406                 p = Process(target=f, args=(child_conn,))
 407                 p.start()
 408                 print(parent_conn.recv())  # prints "[42, None, 'hello']"
 409                 p.join()
 410         14.2.3Managers方法
 411             from multiprocessing import Process, Manager
 412 
 413 
 414             def f(d, l):
 415                 d[1] = '1'
 416                 d['2'] = 2
 417                 d[0.25] = None
 418                 l.append(1)
 419                 print(l)
 420 
 421 
 422             if __name__ == '__main__':
 423                 with Manager() as manager:
 424                     d = manager.dict()
 425 
 426                     l = manager.list(range(5))
 427                     p_list = []
 428                     for i in range(10):
 429                         p = Process(target=f, args=(d, l))
 430                         p.start()
 431                         p_list.append(p)
 432                     for res in p_list:
 433                         res.join()
 434 
 435                     print(d)
 436                     print(l)
 437     14.3進程同步
 438         from multiprocessing import Process, Lock
 439 
 440 
 441         def f(l, i):
 442             l.acquire()
 443             try:
 444                 print('hello world', i)
 445             finally:
 446                 l.release()
 447 
 448 
 449         if __name__ == '__main__':
 450             lock = Lock()
 451 
 452             for num in range(10):
 453                 Process(target=f, args=(lock, num)).start()
 454 15.進程池
 455     進程池中有兩個方法:
 456     apply;
 457     apply_async;
 458     from  multiprocessing import Process, Pool
 459     import time
 460     def Foo(i):
 461         time.sleep(2)
 462         return i + 100
 463     def Bar(arg):
 464         print('-->exec done:', arg)
 465     pool = Pool(5)
 466     for i in range(10):
 467         pool.apply_async(func=Foo, args=(i,), callback=Bar)
 468     # pool.apply(func=Foo, args=(i,))
 469     print('end')
 470     pool.close()
 471     pool.join()  # 進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。
 472 
 473 
 474 16.協程:
 475     協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。
 476     協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:
 477     協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
 478     協程的好處:
 479         無需線程上下文切換的開銷
 480         無需原子操做鎖定及同步的開銷
 481         原子操做(atomic operation)是不須要同步的,所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。
 482         原子操做能夠是一個步驟,也能夠是多個操做步驟,可是其順序是不能夠被打亂,或者切割掉只執行部分。視做總體是原子性的核心。
 483         方便切換控制流,簡化編程模型
 484         高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理
 485     缺點:
 486         沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
 487         進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序
 488     16.1利用yield實現僞協程
 489         import time
 490         import queue
 491 
 492 
 493         def consumer(name):
 494             print("--->starting eating baozi...")
 495             while True:
 496                 new_baozi = yield
 497                 print("[%s] is eating baozi %s" % (name, new_baozi))
 498             # time.sleep(1)
 499 
 500 
 501         def producer():
 502             r = con.__next__()
 503             r = con2.__next__()
 504             n = 0
 505             while n < 5:
 506                 n += 1
 507                 con.send(n)
 508                 con2.send(n)
 509                 print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
 510 
 511 
 512         if __name__ == '__main__':
 513             con = consumer("c1")
 514             con2 = consumer("c2")
 515             p = producer()
 516     16.2協程的特色
 517         必須在只有一個單線程裏實現併發
 518         修改共享數據不需加鎖
 519         用戶程序裏本身保存多個控制流的上下文棧
 520         一個協程遇到IO操做自動切換到其它協程
 521     16.3Greenlet實現協程(手動)
 522         # -*- coding:utf-8 -*-
 523         from greenlet import greenlet
 524         def test1():
 525             print(12)
 526             gr2.switch()
 527             print(34)
 528             gr2.switch()
 529         def test2():
 530             print(56)
 531             gr1.switch()
 532             print(78)
 533         gr1 = greenlet(test1)
 534         gr2 = greenlet(test2)
 535         gr1.switch()
 536     16.4Gevent 協程自動切換
 537         import gevent
 538 
 539 
 540         def func1():
 541             print('\033[31;1m李闖在跟海濤搞...\033[0m')
 542             gevent.sleep(2)
 543             print('\033[31;1m李闖又回去跟繼續跟海濤搞...\033[0m')
 544 
 545 
 546         def func2():
 547             print('\033[32;1m李闖切換到了跟海龍搞...\033[0m')
 548             gevent.sleep(1)
 549             print('\033[32;1m李闖搞完了海濤,回來繼續跟海龍搞...\033[0m')
 550 
 551 
 552         gevent.joinall([
 553             gevent.spawn(func1),
 554             gevent.spawn(func2),
 555             # gevent.spawn(func3),
 556         ])
 557     16.5 比較同步與異步的性能差異
 558         from gevent import monkey;
 559 
 560         monkey.patch_all()
 561         import gevent
 562         from  urllib.request import urlopen
 563 
 564 
 565         def f(url):
 566             print('GET: %s' % url)
 567             resp = urlopen(url)
 568             data = resp.read()
 569             print('%d bytes received from %s.' % (len(data), url))
 570 
 571 
 572         gevent.joinall([
 573             gevent.spawn(f, 'https://www.python.org/'),
 574             gevent.spawn(f, 'https://www.yahoo.com/'),
 575             gevent.spawn(f, 'https://github.com/'),
 576         ])
 577 17.經過gevent實現單線程下的多socket併發
 578     17.1server side :
 579         import sys
 580         import socket
 581         import time
 582         import gevent
 583 
 584         from gevent import socket, monkey
 585 
 586         monkey.patch_all()
 587 
 588 
 589         def server(port):
 590             s = socket.socket()
 591             s.bind(('0.0.0.0', port))
 592             s.listen(500)
 593             while True:
 594                 cli, addr = s.accept()
 595                 gevent.spawn(handle_request, cli)
 596 
 597 
 598         def handle_request(conn):
 599             try:
 600                 while True:
 601                     data = conn.recv(1024)
 602                     print("recv:", data)
 603                     conn.send(data)
 604                     if not data:
 605                         conn.shutdown(socket.SHUT_WR)
 606 
 607             except Exception as  ex:
 608                 print(ex)
 609             finally:
 610                 conn.close()
 611 
 612 
 613         if __name__ == '__main__':
 614             server(8001)
 615     17.2client side :
 616         import socket
 617 
 618         HOST = 'localhost'  # The remote host
 619         PORT = 8001  # The same port as used by the server
 620         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 621         s.connect((HOST, PORT))
 622         while True:
 623             msg = bytes(input(">>:"), encoding="utf8")
 624             s.sendall(msg)
 625             data = s.recv(1024)
 626             # print(data)
 627 
 628             print('Received', repr(data))
 629         s.close()
 630 18.事件驅動與異步IO
 631     方式一:建立一個線程,該線程一直循環檢測是否有鼠標點擊,那麼這個方式有如下幾個缺點:
 632     1. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
 633     2. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
 634     3. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題;
 635     因此,該方式是很是很差的。
 636 
 637     方式二:就是事件驅動模型
 638     目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
 639     1. 有一個事件(消息)隊列;
 640     2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);
 641     3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
 642     4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數;
 643 
 644     當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:
 645     1.程序中有許多任務,並且…
 646     2.任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…
 647     3.在等待事件到來時,某些任務會阻塞。
 648     4.當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。
 649     網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。
 650 19.select多併發socket
 651     #_*_coding:utf-8_*_
 652     __author__ = 'Alex Li'
 653 
 654     import select
 655     import socket
 656     import sys
 657     import queue
 658 
 659 
 660     server = socket.socket()
 661     server.setblocking(0)
 662 
 663     server_addr = ('localhost',10000)
 664 
 665     print('starting up on %s port %s' % server_addr)
 666     server.bind(server_addr)
 667 
 668     server.listen(5)
 669 
 670 
 671     inputs = [server, ] #本身也要監測呀,由於server自己也是個fd
 672     outputs = []
 673 
 674     message_queues = {}
 675 
 676     while True:
 677         print("waiting for next event...")
 678 
 679         readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏
 680 
 681         for s in readable: #每一個s就是一個socket
 682 
 683             if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了,
 684                 #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀
 685                 #新鏈接進來了,接受這個鏈接
 686                 conn, client_addr = s.accept()
 687                 print("new connection from",client_addr)
 688                 conn.setblocking(0)
 689                 inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接
 690                 #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到
 691                 #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的
 692 
 693                 message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送
 694 
 695             else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了
 696                 #客戶端的數據過來了,在這接收
 697                 data = s.recv(1024)
 698                 if data:
 699                     print("收到來自[%s]的數據:" % s.getpeername()[0], data)
 700                     message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端
 701                     if s not  in outputs:
 702                         outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端
 703 
 704 
 705                 else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀
 706                     print("客戶端斷開了",s)
 707 
 708                     if s in outputs:
 709                         outputs.remove(s) #清理已斷開的鏈接
 710 
 711                     inputs.remove(s) #清理已斷開的鏈接
 712 
 713                     del message_queues[s] ##清理已斷開的鏈接
 714 
 715 
 716         for s in writeable:
 717             try :
 718                 next_msg = message_queues[s].get_nowait()
 719 
 720             except queue.Empty:
 721                 print("client [%s]" %s.getpeername()[0], "queue is empty..")
 722                 outputs.remove(s)
 723 
 724             else:
 725                 print("sending msg to [%s]"%s.getpeername()[0], next_msg)
 726                 s.send(next_msg.upper())
 727 
 728 
 729         for s in exeptional:
 730             print("handling exception for ",s.getpeername())
 731             inputs.remove(s)
 732             if s in outputs:
 733                 outputs.remove(s)
 734             s.close()
 735 
 736             del message_queues[s]
 737 
 738     #_*_coding:utf-8_*_
 739     __author__ = 'Alex Li'
 740 
 741 
 742     import socket
 743     import sys
 744 
 745     messages = [ b'This is the message. ',
 746                  b'It will be sent ',
 747                  b'in parts.',
 748                  ]
 749     server_address = ('localhost', 10000)
 750 
 751     # Create a TCP/IP socket
 752     socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
 753               socket.socket(socket.AF_INET, socket.SOCK_STREAM),
 754               ]
 755 
 756     # Connect the socket to the port where the server is listening
 757     print('connecting to %s port %s' % server_address)
 758     for s in socks:
 759         s.connect(server_address)
 760 
 761     for message in messages:
 762 
 763         # Send messages on both sockets
 764         for s in socks:
 765             print('%s: sending "%s"' % (s.getsockname(), message) )
 766             s.send(message)
 767 
 768         # Read responses on both sockets
 769         for s in socks:
 770             data = s.recv(1024)
 771             print( '%s: received "%s"' % (s.getsockname(), data) )
 772             if not data:
 773                 print(sys.stderr, 'closing socket', s.getsockname() )
 774 
 775 20.selectors模塊
 776     import selectors
 777     import socket
 778 
 779     sel = selectors.DefaultSelector()
 780 
 781 
 782     def accept(sock, mask):
 783         conn, addr = sock.accept()  # Should be ready
 784         print('accepted', conn, 'from', addr)
 785         conn.setblocking(False)
 786         sel.register(conn, selectors.EVENT_READ, read)
 787 
 788 
 789     def read(conn, mask):
 790         data = conn.recv(1000)  # Should be ready
 791         if data:
 792             print('echoing', repr(data), 'to', conn)
 793             conn.send(data)  # Hope it won't block
 794         else:
 795             print('closing', conn)
 796             sel.unregister(conn)
 797             conn.close()
 798 
 799 
 800     sock = socket.socket()
 801     sock.bind(('localhost', 10000))
 802     sock.listen(100)
 803     sock.setblocking(False)
 804     sel.register(sock, selectors.EVENT_READ, accept)
 805 
 806     while True:
 807         events = sel.select()
 808         for key, mask in events:
 809             callback = key.data
 810             callback(key.fileobj, mask)
 811 
 812 21.RabbitMQ隊列
 813     安裝 http://www.rabbitmq.com/install-standalone-mac.html
 814     安裝python rabbitMQ module
 815     pip install pika
 816     or
 817     easy_install pika
 818     or
 819     源碼
 820     https://pypi.python.org/pypi/pika
 821 
 822     21.1send端
 823     # !/usr/bin/env python
 824     import pika
 825 
 826     connection = pika.BlockingConnection(pika.ConnectionParameters(
 827         'localhost'))
 828     channel = connection.channel()
 829 
 830     # 聲明queue
 831     channel.queue_declare(queue='hello')
 832 
 833     # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
 834     channel.basic_publish(exchange='',
 835                           routing_key='hello',
 836                           body='Hello World!')
 837     print(" [x] Sent 'Hello World!'")
 838     connection.close()
 839 
 840     21.2receive端
 841     # _*_coding:utf-8_*_
 842     __author__ = 'Alex Li'
 843     import pika
 844 
 845     connection = pika.BlockingConnection(pika.ConnectionParameters(
 846         'localhost'))
 847     channel = connection.channel()
 848 
 849     # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
 850     # We could avoid that if we were sure that the queue already exists. For example if send.py program
 851     # was run before. But we're not yet sure which program to run first. In such cases it's a good
 852     # practice to repeat declaring the queue in both programs.
 853     channel.queue_declare(queue='hello')
 854 
 855 
 856     def callback(ch, method, properties, body):
 857         print(" [x] Received %r" % body)
 858 
 859 
 860     channel.basic_consume(callback,
 861                           queue='hello',
 862                           no_ack=True)
 863 
 864     print(' [*] Waiting for messages. To exit press CTRL+C')
 865     channel.start_consuming()
 866 
 867     21.3 Work Queues
 868         21.3.1消息提供者代碼
 869             import pika
 870             import time
 871 
 872             connection = pika.BlockingConnection(pika.ConnectionParameters(
 873                 'localhost'))
 874             channel = connection.channel()
 875 
 876             # 聲明queue
 877             channel.queue_declare(queue='task_queue')
 878 
 879             # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
 880             import sys
 881 
 882             message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
 883             channel.basic_publish(exchange='',
 884                                   routing_key='task_queue',
 885                                   body=message,
 886                                   properties=pika.BasicProperties(
 887                                       delivery_mode=2,  # make message persistent
 888                                   )
 889                                   )
 890             print(" [x] Sent %r" % message)
 891             connection.close()
 892         21.3.2消費者代碼
 893             # _*_coding:utf-8_*_
 894 
 895             import pika, time
 896 
 897             connection = pika.BlockingConnection(pika.ConnectionParameters(
 898                 'localhost'))
 899             channel = connection.channel()
 900 
 901 
 902             def callback(ch, method, properties, body):
 903                 print(" [x] Received %r" % body)
 904                 time.sleep(20)
 905                 print(" [x] Done")
 906                 print("method.delivery_tag", method.delivery_tag)
 907                 ch.basic_ack(delivery_tag=method.delivery_tag)
 908 
 909 
 910             channel.basic_consume(callback,
 911                                   queue='task_queue',
 912                                   no_ack=True
 913                                   )
 914 
 915             print(' [*] Waiting for messages. To exit press CTRL+C')
 916             channel.start_consuming()
 917         21.3.3消息持久化 
 918             First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:
 919             --channel.queue_declare(queue='hello', durable=True)
 920             Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:
 921             --channel.queue_declare(queue='task_queue', durable=True)
 922             This queue_declare change needs to be applied to both the producer and consumer code.
 923             At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.
 924             --channel.basic_publish(exchange='',
 925                                   routing_key="task_queue",
 926                                   body=message,
 927                                   properties=pika.BasicProperties(
 928                                       delivery_mode=2,  # make message persistent
 929                                   ))
 930         21.3.4消息公平分發
 931             channel.basic_qos(prefetch_count=1)
 932 
 933         21.3.5帶消息持久化+公平分發的完整代碼
 934             生產者端
 935             # !/usr/bin/env python
 936             import pika
 937             import sys
 938 
 939             connection = pika.BlockingConnection(pika.ConnectionParameters(
 940                 host='localhost'))
 941             channel = connection.channel()
 942 
 943             channel.queue_declare(queue='task_queue', durable=True)
 944 
 945             message = ' '.join(sys.argv[1:]) or "Hello World!"
 946             channel.basic_publish(exchange='',
 947                                   routing_key='task_queue',
 948                                   body=message,
 949                                   properties=pika.BasicProperties(
 950                                       delivery_mode=2,  # make message persistent
 951                                   ))
 952             print(" [x] Sent %r" % message)
 953             connection.close()
 954             消費者端
 955             # !/usr/bin/env python
 956             import pika
 957             import time
 958 
 959             connection = pika.BlockingConnection(pika.ConnectionParameters(
 960                 host='localhost'))
 961             channel = connection.channel()
 962 
 963             channel.queue_declare(queue='task_queue', durable=True)
 964             print(' [*] Waiting for messages. To exit press CTRL+C')
 965 
 966 
 967             def callback(ch, method, properties, body):
 968                 print(" [x] Received %r" % body)
 969                 time.sleep(body.count(b'.'))
 970                 print(" [x] Done")
 971                 ch.basic_ack(delivery_tag=method.delivery_tag)
 972 
 973 
 974             channel.basic_qos(prefetch_count=1)
 975             channel.basic_consume(callback,
 976                                   queue='task_queue')
 977 
 978             channel.start_consuming()
 979         21.3.5Publish\Subscribe(消息發佈\訂閱) 
 980             以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,
 981 
 982             An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
 983 
 984             Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息
 985 
 986 
 987             fanout: 全部bind到此exchange的queue均可以接收消息
 988             direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
 989             topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
 990 
 991                表達式符號說明:#表明一個或多個字符,*表明任何字符
 992                   例:#.a會匹配a.a,aa.a,aaa.a等
 993                       *.a會匹配a.a,b.a,c.a等
 994                  注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout 
 995 
 996             headers: 經過headers 來決定把消息發給哪些queue
 997 
 998         21.3.6消息publisher
 999 
1000             import pika
1001             import sys
1002 
1003             connection = pika.BlockingConnection(pika.ConnectionParameters(
1004                 host='localhost'))
1005             channel = connection.channel()
1006 
1007             channel.exchange_declare(exchange='logs',
1008                                      type='fanout')
1009 
1010             message = ' '.join(sys.argv[1:]) or "info: Hello World!"
1011             channel.basic_publish(exchange='logs',
1012                                   routing_key='',
1013                                   body=message)
1014             print(" [x] Sent %r" % message)
1015             connection.close()
1016 
1017         21.3.7消息subscriber
1018             # _*_coding:utf-8_*_
1019             __author__ = 'Alex Li'
1020             import pika
1021 
1022             connection = pika.BlockingConnection(pika.ConnectionParameters(
1023                 host='localhost'))
1024             channel = connection.channel()
1025 
1026             channel.exchange_declare(exchange='logs',
1027                                      type='fanout')
1028 
1029             result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
1030             queue_name = result.method.queue
1031 
1032             channel.queue_bind(exchange='logs',
1033                                queue=queue_name)
1034 
1035             print(' [*] Waiting for logs. To exit press CTRL+C')
1036 
1037 
1038             def callback(ch, method, properties, body):
1039                 print(" [x] %r" % body)
1040 
1041 
1042             channel.basic_consume(callback,
1043                                   queue=queue_name,
1044                                   no_ack=True)
1045 
1046             channel.start_consuming()
1047         21.3.8有選擇的接收消息(exchange type=direct) 
1048             RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
1049             publisher:
1050             import pika
1051             import sys
1052 
1053             connection = pika.BlockingConnection(pika.ConnectionParameters(
1054                 host='localhost'))
1055             channel = connection.channel()
1056 
1057             channel.exchange_declare(exchange='direct_logs',
1058                                      type='direct')
1059 
1060             severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
1061             message = ' '.join(sys.argv[2:]) or 'Hello World!'
1062             channel.basic_publish(exchange='direct_logs',
1063                                   routing_key=severity,
1064                                   body=message)
1065             print(" [x] Sent %r:%r" % (severity, message))
1066             connection.close()
1067 
1068             subscriber :
1069             import pika
1070             import sys
1071 
1072             connection = pika.BlockingConnection(pika.ConnectionParameters(
1073                 host='localhost'))
1074             channel = connection.channel()
1075 
1076             channel.exchange_declare(exchange='direct_logs',
1077                                      type='direct')
1078 
1079             result = channel.queue_declare(exclusive=True)
1080             queue_name = result.method.queue
1081 
1082             severities = sys.argv[1:]
1083             if not severities:
1084                 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
1085                 sys.exit(1)
1086 
1087             for severity in severities:
1088                 channel.queue_bind(exchange='direct_logs',
1089                                    queue=queue_name,
1090                                    routing_key=severity)
1091 
1092             print(' [*] Waiting for logs. To exit press CTRL+C')
1093 
1094 
1095             def callback(ch, method, properties, body):
1096                 print(" [x] %r:%r" % (method.routing_key, body))
1097 
1098 
1099             channel.basic_consume(callback,
1100                                   queue=queue_name,
1101                                   no_ack=True)
1102 
1103         21.3.9 更細緻的消息過濾
1104             Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.
1105             In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).
1106             That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.
1107 
1108             publisher:
1109             import pika
1110             import sys
1111 
1112             connection = pika.BlockingConnection(pika.ConnectionParameters(
1113                 host='localhost'))
1114             channel = connection.channel()
1115 
1116             channel.exchange_declare(exchange='topic_logs',
1117                                      type='topic')
1118 
1119             routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
1120             message = ' '.join(sys.argv[2:]) or 'Hello World!'
1121             channel.basic_publish(exchange='topic_logs',
1122                                   routing_key=routing_key,
1123                                   body=message)
1124             print(" [x] Sent %r:%r" % (routing_key, message))
1125             connection.close()
1126 
1127             ------------------------------------------------------------------
1128             subscriber:
1129             import pika
1130             import sys
1131 
1132             connection = pika.BlockingConnection(pika.ConnectionParameters(
1133                 host='localhost'))
1134             channel = connection.channel()
1135 
1136             channel.exchange_declare(exchange='topic_logs',
1137                                      type='topic')
1138 
1139             result = channel.queue_declare(exclusive=True)
1140             queue_name = result.method.queue
1141 
1142             binding_keys = sys.argv[1:]
1143             if not binding_keys:
1144                 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
1145                 sys.exit(1)
1146 
1147             for binding_key in binding_keys:
1148                 channel.queue_bind(exchange='topic_logs',
1149                                    queue=queue_name,
1150                                    routing_key=binding_key)
1151 
1152             print(' [*] Waiting for logs. To exit press CTRL+C')
1153 
1154 
1155             def callback(ch, method, properties, body):
1156                 print(" [x] %r:%r" % (method.routing_key, body))
1157 
1158 
1159             channel.basic_consume(callback,
1160                                   queue=queue_name,
1161                                   no_ack=True)
1162 
1163             channel.start_consuming()
1164 
1165             ---------------------------------------------------------------
1166             To receive all the logs run:
1167 
1168             python receive_logs_topic.py "#"
1169             To receive all logs from the facility "kern":
1170 
1171             python receive_logs_topic.py "kern.*"
1172             Or if you want to hear only about "critical" logs:
1173 
1174             python receive_logs_topic.py "*.critical"
1175             You can create multiple bindings:
1176 
1177             python receive_logs_topic.py "kern.*" "*.critical"
1178             And to emit a log with a routing key "kern.critical" type:
1179 
1180             python emit_log_topic.py "kern.critical" "A critical kernel error"
相關文章
相關標籤/搜索