1 # 進程/線程/協程 2 # IO:同步/異步/阻塞/非阻塞 3 # greenlet gevent 4 # 事件驅動與異步IO 5 # Select\Poll\Epoll異步IO 以及selectors模塊 6 # Python隊列/RabbitMQ隊列 7 8 ############################################################################################## 9 1.什麼是進程?進程和程序之間有什麼區別? 10 進程:一個程序的執行實例稱爲進程; 11 每一個進程都提供執行程序所需的資源。 12 進程有一個虛擬地址空間、可執行代碼、對系統對象的開放句柄、一個安全上下文、一個唯一的進程標識符、環境變量、一個優先級類、最小和最大工做集大小,以及至少一個執行線程; 13 每一個進程都由一個線程啓動,這個線程一般被稱爲主線程,可是能夠從它的任何線程中建立額外的線程; 14 程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程; 15 程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。 16 在多道編程中,咱們容許多個程序同時加載到內存中,在操做系統的調度下,能夠實現併發地執行,大大提升了CPU的利用率 17 2.什麼是線程? 18 進程的缺點有: 19 進程只能在一個時間幹一件事,若是想同時幹兩件事或多件事,進程就無能爲力了。 20 進程在執行的過程當中若是阻塞,例如等待輸入,整個進程就會掛起,即便進程中有些工做不依賴於輸入的數據,也將沒法執行。 21 線程是操做系統可以進行運算調度的最小單位。 22 它被包含在進程之中,是進程中的實際運做單位。 23 一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務 24 線程是一個執行上下文,它是一個CPU用來執行指令流的全部信息。 25 3.進程和線程之間的關係? 26 線程共享建立它的進程的地址空間;進程有本身的地址空間。(內存地址) 27 線程能夠直接訪問其進程的數據段;進程有本身的父進程數據段的副本。 28 線程能夠直接與進程的其餘線程通訊;進程必須使用進程間通訊來與兄弟進程通訊。 29 新線程很容易建立;新進程須要複製父進程。 30 線程能夠對同一進程的線程進行至關大的控制;進程只能對子進程執行控制。 31 對主線程的更改(取消、優先級更改等)可能會影響流程的其餘線程的行爲;對父進程的更改不會影響子進程。 32 4.python GIL全局解釋器鎖 33 不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行 34 http: // www.dabeaz.com / python / UnderstandingGIL.pdf 35 5.Python threading模塊的使用 36 基本調用方式1 37 import threading 38 import time 39 40 41 def sayhi(num): # 定義每一個線程要運行的函數 42 43 print("running on number:%s" % num) 44 45 time.sleep(3) 46 47 48 if __name__ == '__main__': 49 t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一個線程實例 50 t2 = threading.Thread(target=sayhi, args=(2,)) # 生成另外一個線程實例 51 52 t1.start() # 啓動線程 53 t2.start() # 啓動另外一個線程 54 55 print(t1.getName()) # 獲取線程名 56 print(t2.getName()) 57 基本調用方式2 58 import threading 59 import time 60 61 62 class MyThread(threading.Thread): 63 def __init__(self, num): 64 threading.Thread.__init__(self) 65 self.num = num 66 67 def run(self): # 定義每一個線程要運行的函數 68 69 print("running on number:%s" % self.num) 70 71 time.sleep(3) 72 73 74 if __name__ == '__main__': 75 t1 = MyThread(1) 76 t2 = MyThread(2) 77 t1.start() 78 t2.start() 79 6.守護線程Daemon: 80 非守護進程線程退出,就能夠將守護線程殺死。 81 # _*_coding:utf-8_*_ 82 83 import time 84 import threading 85 86 87 def run(n): 88 print('[%s]------running----\n' % n) 89 time.sleep(2) 90 print('--done--') 91 92 93 def main(): 94 for i in range(5): 95 t = threading.Thread(target=run, args=[i, ]) 96 t.start() 97 t.join(1) 98 print('starting thread', t.getName()) 99 100 101 m = threading.Thread(target=main, args=[]) 102 m.setDaemon(True) # 將main線程設置爲Daemon線程,它作爲程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啓動的其它子線程會同時退出,無論是否執行完任務 103 m.start() 104 m.join(timeout=2) 105 print("---main thread done----") 106 7.線程鎖(互斥鎖) 107 一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,可能會致使數據被同時修改而使得計算結果不許確(重複賦值) 108 import time 109 import threading 110 111 112 def addNum(): 113 global num # 在每一個線程中都獲取這個全局變量 114 print('--get num:', num) 115 time.sleep(1) 116 num -= 1 # 對此公共變量進行-1操做 117 118 119 num = 100 # 設定一個共享變量 120 thread_list = [] 121 for i in range(100): 122 t = threading.Thread(target=addNum) 123 t.start() 124 thread_list.append(t) 125 126 for t in thread_list: # 等待全部線程執行完畢 127 t.join() 128 129 print('final num:', num) 130 加上線程鎖 131 import time 132 import threading 133 134 135 def addNum(): 136 global num # 在每一個線程中都獲取這個全局變量 137 print('--get num:', num) 138 time.sleep(1) 139 lock.acquire() # 修改數據前加鎖 140 num -= 1 # 對此公共變量進行-1操做 141 lock.release() # 修改後釋放 142 143 144 num = 100 # 設定一個共享變量 145 thread_list = [] 146 lock = threading.Lock() # 生成全局鎖 147 for i in range(100): 148 t = threading.Thread(target=addNum) 149 t.start() 150 thread_list.append(t) 151 152 for t in thread_list: # 等待全部線程執行完畢 153 t.join() 154 155 print('final num:', num) 156 8.線程鎖與GIL之間的關係? 157 加入GIL主要的緣由是爲了下降程序的開發的複雜度, 158 好比如今的你寫python不須要關心內存回收的問題,由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程, 159 每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序裏的線程和 py解釋器本身的線程是併發運行的, 160 假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻, 161 可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了, 162 爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動 163 9.遞歸鎖(Rlock 不用遞歸鎖而多重加lock鎖會致使被鎖住,程序卡死) 164 import threading, time 165 166 167 def run1(): 168 print("grab the first part data") 169 lock.acquire() 170 global num 171 num += 1 172 lock.release() 173 return num 174 175 176 def run2(): 177 print("grab the second part data") 178 lock.acquire() 179 global num2 180 num2 += 1 181 lock.release() 182 return num2 183 184 185 def run3(): 186 lock.acquire() 187 res = run1() 188 print('--------between run1 and run2-----') 189 res2 = run2() 190 lock.release() 191 print(res, res2) 192 193 194 if __name__ == '__main__': 195 196 num, num2 = 0, 0 197 lock = threading.RLock() #注意遞歸鎖是Rlock 198 for i in range(10): 199 t = threading.Thread(target=run3) 200 t.start() 201 202 while threading.active_count() != 1: 203 print(threading.active_count()) 204 else: 205 print('----all threads done---') 206 print(num, num2) 207 10.Semaphore(信號量): 208 同時容許必定數量的線程更改數據 209 import threading, time 210 211 def run(n): 212 semaphore.acquire() 213 time.sleep(1) 214 print("run the thread: %s\n" % n) 215 semaphore.release() 216 217 if __name__ == '__main__': 218 219 num = 0 220 semaphore = threading.BoundedSemaphore(5) # 最多容許5個線程同時運行 221 for i in range(20): 222 t = threading.Thread(target=run, args=(i,)) 223 t.start() 224 225 while threading.active_count() != 1: 226 pass # print threading.active_count() 227 else: 228 print('----all threads done---') 229 print(num) 230 11.Events事件:經過Event來實現兩個或多個線程間的交互 231 event = threading.Event() 232 # a client thread can wait for the flag to be set 233 event.wait() 234 # a server thread can set or reset it 235 event.set() 236 event.clear() 237 一個紅綠燈的例子,即起動一個線程作交通指揮燈,生成幾個線程作車輛,車輛行駛按紅燈停,綠燈行的規則: 238 import threading,time 239 import random 240 def light(): 241 if not event.isSet(): 242 event.set() #wait就不阻塞 #綠燈狀態 243 count = 0 244 while True: 245 if count < 10: 246 print('\033[42;1m--green light on---\033[0m') 247 elif count <13: 248 print('\033[43;1m--yellow light on---\033[0m') 249 elif count <20: 250 if event.isSet(): 251 event.clear() 252 print('\033[41;1m--red light on---\033[0m') 253 else: 254 count = 0 255 event.set() #打開綠燈 256 time.sleep(1) 257 count +=1 258 def car(n): 259 while 1: 260 time.sleep(random.randrange(10)) 261 if event.isSet(): #綠燈 262 print("car [%s] is running.." % n) 263 else: 264 print("car [%s] is waiting for the red light.." %n) 265 if __name__ == '__main__': 266 event = threading.Event() 267 Light = threading.Thread(target=light) 268 Light.start() 269 for i in range(3): 270 t = threading.Thread(target=car,args=(i,)) 271 t.start() 272 12.python queue隊列(線程隊列) 273 class queue.Queue(maxsize=0) #先入先出 274 class queue.LifoQueue(maxsize=0) #last in fisrt out 275 class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列 276 Queue.qsize() 277 Queue.empty() #return True if empty 278 Queue.full() # return True if full 279 Queue.put(item, block=True, timeout=None) #將項目放入隊列中。若是可選的args塊是true,則超時爲None(缺省值),若是須要則阻塞,直到空閒槽可用。若是超時是一個正數,它會在大多數超時秒中阻塞,若是在那個時間內沒有空閒槽,則會引起徹底的異常。不然(塊是false),若是一個空閒槽當即可用,則在隊列上放置一個項,不然就會拋出徹底異常(在這種狀況下會忽略超時)。 280 Queue.put_nowait(item) #Equivalent to put(item, False). 281 Queue.get(block=True, timeout=None) #從隊列中刪除並返回一個項目。若是可選的args塊是true,則超時爲None(缺省值),若是須要則阻塞,直到有可用的項。若是超時是一個正數,它會在大多數超時秒中阻塞,若是在那個時間內沒有可用的項,則會拋出空的異常。不然(塊是false),若是當即可用,返回一個項目,不然將拋出空異常(在這種狀況下忽略超時)。 282 Queue.get_nowait() #Equivalent to get(False). 283 兩種方法來支持跟蹤隊列的任務是否已經被守護進程的消費者線程徹底地處理。 284 Queue.task_done() 285 Queue.join() block直到queue被消費完畢 286 13.生產者消費者模型 287 生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。 288 生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信, 289 因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列, 290 消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。 291 import threading 292 import queue 293 294 295 def producer(): 296 for i in range(10): 297 q.put("骨頭 %s" % i) 298 299 print("開始等待全部的骨頭被取走...") 300 q.join() 301 print("全部的骨頭被取完了...") 302 303 304 def consumer(n): 305 while q.qsize() > 0: 306 print("%s 取到" % n, q.get()) 307 q.task_done() # 告知這個任務執行完了 308 309 310 q = queue.Queue() 311 312 p = threading.Thread(target=producer, ) 313 p.start() 314 315 c1 = consumer("李闖") 316 317 318 import time,random 319 import queue,threading 320 q = queue.Queue() 321 def Producer(name): 322 count = 0 323 while count <20: 324 time.sleep(random.randrange(3)) 325 q.put(count) 326 print('Producer %s has produced %s baozi..' %(name, count)) 327 count +=1 328 def Consumer(name): 329 count = 0 330 while count <20: 331 time.sleep(random.randrange(4)) 332 if not q.empty(): 333 data = q.get() 334 print(data) 335 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 336 else: 337 print("-----no baozi anymore----") 338 count +=1 339 p1 = threading.Thread(target=Producer, args=('A',)) 340 c1 = threading.Thread(target=Consumer, args=('B',)) 341 p1.start() 342 c1.start() 343 14.多進程模塊multiprocessing 344 from multiprocessing import Process 345 import time 346 347 348 def f(name): 349 time.sleep(2) 350 print('hello', name) 351 352 353 if __name__ == '__main__': 354 p = Process(target=f, args=('bob',)) 355 p.start() 356 p.join() 357 14.1展現進程號: 358 from multiprocessing import Process 359 import os 360 361 362 def info(title): 363 print(title) 364 print('module name:', __name__) 365 print('parent process:', os.getppid()) 366 print('process id:', os.getpid()) 367 print("\n\n") 368 369 370 def f(name): 371 info('\033[31;1mfunction f\033[0m') 372 print('hello', name) 373 374 375 if __name__ == '__main__': 376 info('\033[32;1mmain process line\033[0m') 377 p = Process(target=f, args=('bob',)) 378 p.start() 379 p.join() 380 14.2進程間通信 381 14.2.1Queues方法 382 from multiprocessing import Process, Queue 383 384 385 def f(q): 386 q.put([42, None, 'hello']) 387 388 389 if __name__ == '__main__': 390 q = Queue() 391 p = Process(target=f, args=(q,)) 392 p.start() 393 print(q.get()) # prints "[42, None, 'hello']" 394 p.join() 395 14.2.2Pipes方法 396 from multiprocessing import Process, Pipe 397 398 399 def f(conn): 400 conn.send([42, None, 'hello']) 401 conn.close() 402 403 404 if __name__ == '__main__': 405 parent_conn, child_conn = Pipe() 406 p = Process(target=f, args=(child_conn,)) 407 p.start() 408 print(parent_conn.recv()) # prints "[42, None, 'hello']" 409 p.join() 410 14.2.3Managers方法 411 from multiprocessing import Process, Manager 412 413 414 def f(d, l): 415 d[1] = '1' 416 d['2'] = 2 417 d[0.25] = None 418 l.append(1) 419 print(l) 420 421 422 if __name__ == '__main__': 423 with Manager() as manager: 424 d = manager.dict() 425 426 l = manager.list(range(5)) 427 p_list = [] 428 for i in range(10): 429 p = Process(target=f, args=(d, l)) 430 p.start() 431 p_list.append(p) 432 for res in p_list: 433 res.join() 434 435 print(d) 436 print(l) 437 14.3進程同步 438 from multiprocessing import Process, Lock 439 440 441 def f(l, i): 442 l.acquire() 443 try: 444 print('hello world', i) 445 finally: 446 l.release() 447 448 449 if __name__ == '__main__': 450 lock = Lock() 451 452 for num in range(10): 453 Process(target=f, args=(lock, num)).start() 454 15.進程池 455 進程池中有兩個方法: 456 apply; 457 apply_async; 458 from multiprocessing import Process, Pool 459 import time 460 def Foo(i): 461 time.sleep(2) 462 return i + 100 463 def Bar(arg): 464 print('-->exec done:', arg) 465 pool = Pool(5) 466 for i in range(10): 467 pool.apply_async(func=Foo, args=(i,), callback=Bar) 468 # pool.apply(func=Foo, args=(i,)) 469 print('end') 470 pool.close() 471 pool.join() # 進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。 472 473 474 16.協程: 475 協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。 476 協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以: 477 協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。 478 協程的好處: 479 無需線程上下文切換的開銷 480 無需原子操做鎖定及同步的開銷 481 原子操做(atomic operation)是不須要同步的,所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。 482 原子操做能夠是一個步驟,也能夠是多個操做步驟,可是其順序是不能夠被打亂,或者切割掉只執行部分。視做總體是原子性的核心。 483 方便切換控制流,簡化編程模型 484 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理 485 缺點: 486 沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。 487 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序 488 16.1利用yield實現僞協程 489 import time 490 import queue 491 492 493 def consumer(name): 494 print("--->starting eating baozi...") 495 while True: 496 new_baozi = yield 497 print("[%s] is eating baozi %s" % (name, new_baozi)) 498 # time.sleep(1) 499 500 501 def producer(): 502 r = con.__next__() 503 r = con2.__next__() 504 n = 0 505 while n < 5: 506 n += 1 507 con.send(n) 508 con2.send(n) 509 print("\033[32;1m[producer]\033[0m is making baozi %s" % n) 510 511 512 if __name__ == '__main__': 513 con = consumer("c1") 514 con2 = consumer("c2") 515 p = producer() 516 16.2協程的特色 517 必須在只有一個單線程裏實現併發 518 修改共享數據不需加鎖 519 用戶程序裏本身保存多個控制流的上下文棧 520 一個協程遇到IO操做自動切換到其它協程 521 16.3Greenlet實現協程(手動) 522 # -*- coding:utf-8 -*- 523 from greenlet import greenlet 524 def test1(): 525 print(12) 526 gr2.switch() 527 print(34) 528 gr2.switch() 529 def test2(): 530 print(56) 531 gr1.switch() 532 print(78) 533 gr1 = greenlet(test1) 534 gr2 = greenlet(test2) 535 gr1.switch() 536 16.4Gevent 協程自動切換 537 import gevent 538 539 540 def func1(): 541 print('\033[31;1m李闖在跟海濤搞...\033[0m') 542 gevent.sleep(2) 543 print('\033[31;1m李闖又回去跟繼續跟海濤搞...\033[0m') 544 545 546 def func2(): 547 print('\033[32;1m李闖切換到了跟海龍搞...\033[0m') 548 gevent.sleep(1) 549 print('\033[32;1m李闖搞完了海濤,回來繼續跟海龍搞...\033[0m') 550 551 552 gevent.joinall([ 553 gevent.spawn(func1), 554 gevent.spawn(func2), 555 # gevent.spawn(func3), 556 ]) 557 16.5 比較同步與異步的性能差異 558 from gevent import monkey; 559 560 monkey.patch_all() 561 import gevent 562 from urllib.request import urlopen 563 564 565 def f(url): 566 print('GET: %s' % url) 567 resp = urlopen(url) 568 data = resp.read() 569 print('%d bytes received from %s.' % (len(data), url)) 570 571 572 gevent.joinall([ 573 gevent.spawn(f, 'https://www.python.org/'), 574 gevent.spawn(f, 'https://www.yahoo.com/'), 575 gevent.spawn(f, 'https://github.com/'), 576 ]) 577 17.經過gevent實現單線程下的多socket併發 578 17.1server side : 579 import sys 580 import socket 581 import time 582 import gevent 583 584 from gevent import socket, monkey 585 586 monkey.patch_all() 587 588 589 def server(port): 590 s = socket.socket() 591 s.bind(('0.0.0.0', port)) 592 s.listen(500) 593 while True: 594 cli, addr = s.accept() 595 gevent.spawn(handle_request, cli) 596 597 598 def handle_request(conn): 599 try: 600 while True: 601 data = conn.recv(1024) 602 print("recv:", data) 603 conn.send(data) 604 if not data: 605 conn.shutdown(socket.SHUT_WR) 606 607 except Exception as ex: 608 print(ex) 609 finally: 610 conn.close() 611 612 613 if __name__ == '__main__': 614 server(8001) 615 17.2client side : 616 import socket 617 618 HOST = 'localhost' # The remote host 619 PORT = 8001 # The same port as used by the server 620 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 621 s.connect((HOST, PORT)) 622 while True: 623 msg = bytes(input(">>:"), encoding="utf8") 624 s.sendall(msg) 625 data = s.recv(1024) 626 # print(data) 627 628 print('Received', repr(data)) 629 s.close() 630 18.事件驅動與異步IO 631 方式一:建立一個線程,該線程一直循環檢測是否有鼠標點擊,那麼這個方式有如下幾個缺點: 632 1. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢? 633 2. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤; 634 3. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題; 635 因此,該方式是很是很差的。 636 637 方式二:就是事件驅動模型 638 目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下: 639 1. 有一個事件(消息)隊列; 640 2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息); 641 3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等; 642 4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數; 643 644 當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇: 645 1.程序中有許多任務,並且… 646 2.任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且… 647 3.在等待事件到來時,某些任務會阻塞。 648 4.當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。 649 網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。 650 19.select多併發socket 651 #_*_coding:utf-8_*_ 652 __author__ = 'Alex Li' 653 654 import select 655 import socket 656 import sys 657 import queue 658 659 660 server = socket.socket() 661 server.setblocking(0) 662 663 server_addr = ('localhost',10000) 664 665 print('starting up on %s port %s' % server_addr) 666 server.bind(server_addr) 667 668 server.listen(5) 669 670 671 inputs = [server, ] #本身也要監測呀,由於server自己也是個fd 672 outputs = [] 673 674 message_queues = {} 675 676 while True: 677 print("waiting for next event...") 678 679 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏 680 681 for s in readable: #每一個s就是一個socket 682 683 if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了, 684 #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀 685 #新鏈接進來了,接受這個鏈接 686 conn, client_addr = s.accept() 687 print("new connection from",client_addr) 688 conn.setblocking(0) 689 inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接 690 #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到 691 #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的 692 693 message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送 694 695 else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了 696 #客戶端的數據過來了,在這接收 697 data = s.recv(1024) 698 if data: 699 print("收到來自[%s]的數據:" % s.getpeername()[0], data) 700 message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端 701 if s not in outputs: 702 outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端 703 704 705 else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀 706 print("客戶端斷開了",s) 707 708 if s in outputs: 709 outputs.remove(s) #清理已斷開的鏈接 710 711 inputs.remove(s) #清理已斷開的鏈接 712 713 del message_queues[s] ##清理已斷開的鏈接 714 715 716 for s in writeable: 717 try : 718 next_msg = message_queues[s].get_nowait() 719 720 except queue.Empty: 721 print("client [%s]" %s.getpeername()[0], "queue is empty..") 722 outputs.remove(s) 723 724 else: 725 print("sending msg to [%s]"%s.getpeername()[0], next_msg) 726 s.send(next_msg.upper()) 727 728 729 for s in exeptional: 730 print("handling exception for ",s.getpeername()) 731 inputs.remove(s) 732 if s in outputs: 733 outputs.remove(s) 734 s.close() 735 736 del message_queues[s] 737 738 #_*_coding:utf-8_*_ 739 __author__ = 'Alex Li' 740 741 742 import socket 743 import sys 744 745 messages = [ b'This is the message. ', 746 b'It will be sent ', 747 b'in parts.', 748 ] 749 server_address = ('localhost', 10000) 750 751 # Create a TCP/IP socket 752 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), 753 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 754 ] 755 756 # Connect the socket to the port where the server is listening 757 print('connecting to %s port %s' % server_address) 758 for s in socks: 759 s.connect(server_address) 760 761 for message in messages: 762 763 # Send messages on both sockets 764 for s in socks: 765 print('%s: sending "%s"' % (s.getsockname(), message) ) 766 s.send(message) 767 768 # Read responses on both sockets 769 for s in socks: 770 data = s.recv(1024) 771 print( '%s: received "%s"' % (s.getsockname(), data) ) 772 if not data: 773 print(sys.stderr, 'closing socket', s.getsockname() ) 774 775 20.selectors模塊 776 import selectors 777 import socket 778 779 sel = selectors.DefaultSelector() 780 781 782 def accept(sock, mask): 783 conn, addr = sock.accept() # Should be ready 784 print('accepted', conn, 'from', addr) 785 conn.setblocking(False) 786 sel.register(conn, selectors.EVENT_READ, read) 787 788 789 def read(conn, mask): 790 data = conn.recv(1000) # Should be ready 791 if data: 792 print('echoing', repr(data), 'to', conn) 793 conn.send(data) # Hope it won't block 794 else: 795 print('closing', conn) 796 sel.unregister(conn) 797 conn.close() 798 799 800 sock = socket.socket() 801 sock.bind(('localhost', 10000)) 802 sock.listen(100) 803 sock.setblocking(False) 804 sel.register(sock, selectors.EVENT_READ, accept) 805 806 while True: 807 events = sel.select() 808 for key, mask in events: 809 callback = key.data 810 callback(key.fileobj, mask) 811 812 21.RabbitMQ隊列 813 安裝 http://www.rabbitmq.com/install-standalone-mac.html 814 安裝python rabbitMQ module 815 pip install pika 816 or 817 easy_install pika 818 or 819 源碼 820 https://pypi.python.org/pypi/pika 821 822 21.1send端 823 # !/usr/bin/env python 824 import pika 825 826 connection = pika.BlockingConnection(pika.ConnectionParameters( 827 'localhost')) 828 channel = connection.channel() 829 830 # 聲明queue 831 channel.queue_declare(queue='hello') 832 833 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 834 channel.basic_publish(exchange='', 835 routing_key='hello', 836 body='Hello World!') 837 print(" [x] Sent 'Hello World!'") 838 connection.close() 839 840 21.2receive端 841 # _*_coding:utf-8_*_ 842 __author__ = 'Alex Li' 843 import pika 844 845 connection = pika.BlockingConnection(pika.ConnectionParameters( 846 'localhost')) 847 channel = connection.channel() 848 849 # You may ask why we declare the queue again ‒ we have already declared it in our previous code. 850 # We could avoid that if we were sure that the queue already exists. For example if send.py program 851 # was run before. But we're not yet sure which program to run first. In such cases it's a good 852 # practice to repeat declaring the queue in both programs. 853 channel.queue_declare(queue='hello') 854 855 856 def callback(ch, method, properties, body): 857 print(" [x] Received %r" % body) 858 859 860 channel.basic_consume(callback, 861 queue='hello', 862 no_ack=True) 863 864 print(' [*] Waiting for messages. To exit press CTRL+C') 865 channel.start_consuming() 866 867 21.3 Work Queues 868 21.3.1消息提供者代碼 869 import pika 870 import time 871 872 connection = pika.BlockingConnection(pika.ConnectionParameters( 873 'localhost')) 874 channel = connection.channel() 875 876 # 聲明queue 877 channel.queue_declare(queue='task_queue') 878 879 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 880 import sys 881 882 message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() 883 channel.basic_publish(exchange='', 884 routing_key='task_queue', 885 body=message, 886 properties=pika.BasicProperties( 887 delivery_mode=2, # make message persistent 888 ) 889 ) 890 print(" [x] Sent %r" % message) 891 connection.close() 892 21.3.2消費者代碼 893 # _*_coding:utf-8_*_ 894 895 import pika, time 896 897 connection = pika.BlockingConnection(pika.ConnectionParameters( 898 'localhost')) 899 channel = connection.channel() 900 901 902 def callback(ch, method, properties, body): 903 print(" [x] Received %r" % body) 904 time.sleep(20) 905 print(" [x] Done") 906 print("method.delivery_tag", method.delivery_tag) 907 ch.basic_ack(delivery_tag=method.delivery_tag) 908 909 910 channel.basic_consume(callback, 911 queue='task_queue', 912 no_ack=True 913 ) 914 915 print(' [*] Waiting for messages. To exit press CTRL+C') 916 channel.start_consuming() 917 21.3.3消息持久化 918 First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable: 919 --channel.queue_declare(queue='hello', durable=True) 920 Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue: 921 --channel.queue_declare(queue='task_queue', durable=True) 922 This queue_declare change needs to be applied to both the producer and consumer code. 923 At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2. 924 --channel.basic_publish(exchange='', 925 routing_key="task_queue", 926 body=message, 927 properties=pika.BasicProperties( 928 delivery_mode=2, # make message persistent 929 )) 930 21.3.4消息公平分發 931 channel.basic_qos(prefetch_count=1) 932 933 21.3.5帶消息持久化+公平分發的完整代碼 934 生產者端 935 # !/usr/bin/env python 936 import pika 937 import sys 938 939 connection = pika.BlockingConnection(pika.ConnectionParameters( 940 host='localhost')) 941 channel = connection.channel() 942 943 channel.queue_declare(queue='task_queue', durable=True) 944 945 message = ' '.join(sys.argv[1:]) or "Hello World!" 946 channel.basic_publish(exchange='', 947 routing_key='task_queue', 948 body=message, 949 properties=pika.BasicProperties( 950 delivery_mode=2, # make message persistent 951 )) 952 print(" [x] Sent %r" % message) 953 connection.close() 954 消費者端 955 # !/usr/bin/env python 956 import pika 957 import time 958 959 connection = pika.BlockingConnection(pika.ConnectionParameters( 960 host='localhost')) 961 channel = connection.channel() 962 963 channel.queue_declare(queue='task_queue', durable=True) 964 print(' [*] Waiting for messages. To exit press CTRL+C') 965 966 967 def callback(ch, method, properties, body): 968 print(" [x] Received %r" % body) 969 time.sleep(body.count(b'.')) 970 print(" [x] Done") 971 ch.basic_ack(delivery_tag=method.delivery_tag) 972 973 974 channel.basic_qos(prefetch_count=1) 975 channel.basic_consume(callback, 976 queue='task_queue') 977 978 channel.start_consuming() 979 21.3.5Publish\Subscribe(消息發佈\訂閱) 980 以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了, 981 982 An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type. 983 984 Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息 985 986 987 fanout: 全部bind到此exchange的queue均可以接收消息 988 direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息 989 topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息 990 991 表達式符號說明:#表明一個或多個字符,*表明任何字符 992 例:#.a會匹配a.a,aa.a,aaa.a等 993 *.a會匹配a.a,b.a,c.a等 994 注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout 995 996 headers: 經過headers 來決定把消息發給哪些queue 997 998 21.3.6消息publisher 999 1000 import pika 1001 import sys 1002 1003 connection = pika.BlockingConnection(pika.ConnectionParameters( 1004 host='localhost')) 1005 channel = connection.channel() 1006 1007 channel.exchange_declare(exchange='logs', 1008 type='fanout') 1009 1010 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 1011 channel.basic_publish(exchange='logs', 1012 routing_key='', 1013 body=message) 1014 print(" [x] Sent %r" % message) 1015 connection.close() 1016 1017 21.3.7消息subscriber 1018 # _*_coding:utf-8_*_ 1019 __author__ = 'Alex Li' 1020 import pika 1021 1022 connection = pika.BlockingConnection(pika.ConnectionParameters( 1023 host='localhost')) 1024 channel = connection.channel() 1025 1026 channel.exchange_declare(exchange='logs', 1027 type='fanout') 1028 1029 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 1030 queue_name = result.method.queue 1031 1032 channel.queue_bind(exchange='logs', 1033 queue=queue_name) 1034 1035 print(' [*] Waiting for logs. To exit press CTRL+C') 1036 1037 1038 def callback(ch, method, properties, body): 1039 print(" [x] %r" % body) 1040 1041 1042 channel.basic_consume(callback, 1043 queue=queue_name, 1044 no_ack=True) 1045 1046 channel.start_consuming() 1047 21.3.8有選擇的接收消息(exchange type=direct) 1048 RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。 1049 publisher: 1050 import pika 1051 import sys 1052 1053 connection = pika.BlockingConnection(pika.ConnectionParameters( 1054 host='localhost')) 1055 channel = connection.channel() 1056 1057 channel.exchange_declare(exchange='direct_logs', 1058 type='direct') 1059 1060 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 1061 message = ' '.join(sys.argv[2:]) or 'Hello World!' 1062 channel.basic_publish(exchange='direct_logs', 1063 routing_key=severity, 1064 body=message) 1065 print(" [x] Sent %r:%r" % (severity, message)) 1066 connection.close() 1067 1068 subscriber : 1069 import pika 1070 import sys 1071 1072 connection = pika.BlockingConnection(pika.ConnectionParameters( 1073 host='localhost')) 1074 channel = connection.channel() 1075 1076 channel.exchange_declare(exchange='direct_logs', 1077 type='direct') 1078 1079 result = channel.queue_declare(exclusive=True) 1080 queue_name = result.method.queue 1081 1082 severities = sys.argv[1:] 1083 if not severities: 1084 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 1085 sys.exit(1) 1086 1087 for severity in severities: 1088 channel.queue_bind(exchange='direct_logs', 1089 queue=queue_name, 1090 routing_key=severity) 1091 1092 print(' [*] Waiting for logs. To exit press CTRL+C') 1093 1094 1095 def callback(ch, method, properties, body): 1096 print(" [x] %r:%r" % (method.routing_key, body)) 1097 1098 1099 channel.basic_consume(callback, 1100 queue=queue_name, 1101 no_ack=True) 1102 1103 21.3.9 更細緻的消息過濾 1104 Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria. 1105 In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...). 1106 That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'. 1107 1108 publisher: 1109 import pika 1110 import sys 1111 1112 connection = pika.BlockingConnection(pika.ConnectionParameters( 1113 host='localhost')) 1114 channel = connection.channel() 1115 1116 channel.exchange_declare(exchange='topic_logs', 1117 type='topic') 1118 1119 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 1120 message = ' '.join(sys.argv[2:]) or 'Hello World!' 1121 channel.basic_publish(exchange='topic_logs', 1122 routing_key=routing_key, 1123 body=message) 1124 print(" [x] Sent %r:%r" % (routing_key, message)) 1125 connection.close() 1126 1127 ------------------------------------------------------------------ 1128 subscriber: 1129 import pika 1130 import sys 1131 1132 connection = pika.BlockingConnection(pika.ConnectionParameters( 1133 host='localhost')) 1134 channel = connection.channel() 1135 1136 channel.exchange_declare(exchange='topic_logs', 1137 type='topic') 1138 1139 result = channel.queue_declare(exclusive=True) 1140 queue_name = result.method.queue 1141 1142 binding_keys = sys.argv[1:] 1143 if not binding_keys: 1144 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 1145 sys.exit(1) 1146 1147 for binding_key in binding_keys: 1148 channel.queue_bind(exchange='topic_logs', 1149 queue=queue_name, 1150 routing_key=binding_key) 1151 1152 print(' [*] Waiting for logs. To exit press CTRL+C') 1153 1154 1155 def callback(ch, method, properties, body): 1156 print(" [x] %r:%r" % (method.routing_key, body)) 1157 1158 1159 channel.basic_consume(callback, 1160 queue=queue_name, 1161 no_ack=True) 1162 1163 channel.start_consuming() 1164 1165 --------------------------------------------------------------- 1166 To receive all the logs run: 1167 1168 python receive_logs_topic.py "#" 1169 To receive all logs from the facility "kern": 1170 1171 python receive_logs_topic.py "kern.*" 1172 Or if you want to hear only about "critical" logs: 1173 1174 python receive_logs_topic.py "*.critical" 1175 You can create multiple bindings: 1176 1177 python receive_logs_topic.py "kern.*" "*.critical" 1178 And to emit a log with a routing key "kern.critical" type: 1179 1180 python emit_log_topic.py "kern.critical" "A critical kernel error"