Python之socket(套接字)

Socket

1、概述

socket一般也稱做"套接字",用於描述IP地址和端口,是一個通訊鏈的句柄,應用程序一般經過"套接字"向網絡發出請求或者應答網絡請求。java

socket起源於Unix,而Unix/Linux基本哲學之一就是「一切皆文件」,對於文件用【打開】【讀寫】【關閉】模式來操做。socket就是該模式的一個實現,socket便是一種特殊的文件,一些socket函數就是對其進行的操做(讀/寫IO、打開、關閉)python

socket和file的區別:算法

  • file模塊是針對某個指定文件進行【打開】【讀寫】【關閉】
  • socket模塊是針對 服務器端 和 客戶端Socket 進行【打開】【讀寫】【關閉】

複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket

ip_port = ('127.0.0.1',9999)

sk = socket.socket()
sk.bind(ip_port)
sk.listen(5)

while True:
    print 'server waiting...'
    conn,addr = sk.accept()

    client_data = conn.recv(1024)
    print client_data
    conn.sendall('不要回答,不要回答,不要回答')

    conn.close()

socket server
複製代碼
複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
ip_port = ('127.0.0.1',9999)

sk = socket.socket()
sk.connect(ip_port)

sk.sendall('請求佔領地球')

server_reply = sk.recv(1024)
print server_reply

sk.close()

socket client
複製代碼

WEB服務應用:數組

複製代碼
#!/usr/bin/env python
#coding:utf-8
import socket

def handle_request(client):
    buf = client.recv(1024)
    client.send("HTTP/1.1 200 OK\r\n\r\n")
    client.send("Hello, World")

def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(('localhost',8080))
    sock.listen(5)

    while True:
        connection, address = sock.accept()
        handle_request(connection)
        connection.close()

if __name__ == '__main__':
  main()
複製代碼

2、解釋

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)服務器

複製代碼
參數一:地址簇

  socket.AF_INET IPv4(默認)
  socket.AF_INET6 IPv6

  socket.AF_UNIX 只可以用於單一的Unix系統進程間通訊

參數二:類型

  socket.SOCK_STREAM  流式socket , for TCP (默認)
  socket.SOCK_DGRAM   數據報式socket , for UDP

  socket.SOCK_RAW 原始套接字,普通的套接字沒法處理ICMP、IGMP等網絡報文,而SOCK_RAW能夠;其次,SOCK_RAW也能夠處理特殊的IPv4報文;此外,利用原始套接字,能夠經過IP_HDRINCL套接字選項由用戶構造IP頭。
  socket.SOCK_RDM 是一種可靠的UDP形式,即保證交付數據報但不保證順序。SOCK_RAM用來提供對原始協議的低級訪問,在須要執行某些特殊操做時使用,如發送ICMP報文。SOCK_RAM一般僅限於高級用戶或管理員運行的程序使用。
  socket.SOCK_SEQPACKET 可靠的連續數據包服務

參數三:協議

  0  (默認)與特定的地址家族相關的協議,若是是 0 ,則系統就會根據地址格式和套接類別,自動選擇一個合適的協議
複製代碼
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
sk.bind(ip_port)

while True:
    data = sk.recv(1024)
    print data




import socket
ip_port = ('127.0.0.1',9999)

sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
while True:
    inp = raw_input('數據:').strip()
    if inp == 'exit':
        break
    sk.sendto(inp,ip_port)

sk.close()
複製代碼
複製代碼

sk.bind(address)網絡

s.bind(address) 將套接字綁定到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。

sk.listen(backlog)數據結構

開始監聽傳入鏈接。backlog指定在拒絕鏈接以前,能夠掛起的最大鏈接數量。

      backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5
      這個值不能無限大,由於要在內核中維護鏈接隊列

sk.setblocking(bool)多線程

是否阻塞(默認True),若是設置False,那麼accept和recv時一旦無數據,則報錯。

sk.accept()app

接受鏈接並返回(conn,address),其中conn是新的套接字對象,能夠用來接收和發送數據。address是鏈接客戶端的地址。

  接收TCP 客戶的鏈接(阻塞式)等待鏈接的到來

sk.connect(address)socket

 鏈接到address處的套接字。通常,address的格式爲元組(hostname,port),若是鏈接出錯,返回socket.error錯誤。

sk.connect_ex(address)

同上,只不過會有返回值,鏈接成功時返回 0 ,鏈接失敗時候返回編碼,例如:10061

sk.close()

關閉套接字

sk.recv(bufsize[,flag])

接受套接字的數據。數據以字符串形式返回,bufsize指定最多能夠接收的數量。flag提供有關消息的其餘信息,一般能夠忽略。

sk.recvfrom(bufsize[.flag])

 與recv()相似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。

sk.send(string[,flag])

將string中的數據發送到鏈接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。

sk.sendall(string[,flag])

將string中的數據發送到鏈接的套接字,但在返回以前會嘗試發送全部數據。成功返回None,失敗則拋出異常。

sk.sendto(string[,flag],address)

將數據發送到套接字,address是形式爲(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。該函數主要用於UDP協議。

sk.settimeout(timeout)

設置套接字操做的超時期,timeout是一個浮點數,單位是秒。值爲None表示沒有超時期。通常,超時期應該在剛建立套接字時設置,由於它們可能用於鏈接的操做(如 client 鏈接最多等待5s )

sk.getpeername()

返回鏈接套接字的遠程地址。返回值一般是元組(ipaddr,port)。

sk.getsockname()

返回套接字本身的地址。一般是一個元組(ipaddr,port)

sk.fileno()

套接字的文件描述符
複製代碼
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
sk.bind(ip_port)

while True:
    data = sk.recv(1024)
    print data




import socket
ip_port = ('127.0.0.1',9999)

sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
while True:
    inp = raw_input('數據:').strip()
    if inp == 'exit':
        break
    sk.sendto(inp,ip_port)

sk.close()

UDP Demo
複製代碼

3、實例

智能機器人

複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-


import socket

ip_port = ('127.0.0.1',8888)
sk = socket.socket()
sk.bind(ip_port)
sk.listen(5)

while True:
    conn,address =  sk.accept()
    conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
    Flag = True
    while Flag:
        data = conn.recv(1024)
        if data == 'exit':
            Flag = False
        elif data == '0':
            conn.sendall('經過可能會被錄音.balabala一大推')
        else:
            conn.sendall('請從新輸入.')
    conn.close()

服務端
複製代碼
複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8005)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()

客戶端
複製代碼

SocketServer模塊

1、使用以源碼剖析

對於默認Socket服務端處理客戶端請求時,按照阻塞方式依次處理請求,SocketServer實現同事處理多個請求。

複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('經過可能會被錄音.balabala一大推')
            else:
                conn.sendall('請從新輸入.')


if __name__ == '__main__':
    server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()
複製代碼
複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()

客戶端
複製代碼

從剖析上述源碼執行流程,對源碼精簡以下:

複製代碼
import socket
import threading
import select


def process(request, client_address):
    print request,client_address
    conn = request
    conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
    flag = True
    while flag:
        data = conn.recv(1024)
        if data == 'exit':
            flag = False
        elif data == '0':
            conn.sendall('經過可能會被錄音.balabala一大推')
        else:
            conn.sendall('請從新輸入.')

sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(('127.0.0.1',8002))
sk.listen(5)

while True:
    r, w, e = select.select([sk,],[],[],1)
    print 'looping'
    if sk in r:
        print 'get request'
        request, client_address = sk.accept()
        t = threading.Thread(target=process, args=(request, client_address))
        t.daemon = False
        t.start()

sk.close()
複製代碼

如精簡代碼能夠看出,SocketServer之因此能夠同時處理請求得益於 select 和 Threading 兩個東西,其實本質上就是在服務器端爲每個客戶端建立一個線程,當前線程用來處理對應客戶端的請求,因此,能夠支持同時n個客戶端連接(長鏈接)。

複製代碼
#!/usr/bin/env python
#coding:utf-8

import SocketServer
import os

class MyServer(SocketServer.BaseRequestHandler):
    def handle(self):
        base_path = 'G:/temp'
        conn = self.request
        print 'connected...'
        while True:
            pre_data = conn.recv(1024)
            #獲取請求方法、文件名、文件大小
            cmd,file_name,file_size = pre_data.split('|')
            # 防止粘包,給客戶端發送一個信號。
            conn.sendall('nothing')            
            #已經接收文件的大小
            recv_size = 0
            #上傳文件路徑拼接
            file_dir = os.path.join(base_path,file_name)
            f = file(file_dir,'wb')
            Flag = True
            while Flag:
                #未上傳完畢,
                if int(file_size)>recv_size:
                    #最多接收1024,可能接收的小於1024
                    data = conn.recv(1024) 
                    recv_size+=len(data)
                    #寫入文件
                    f.write(data)
                #上傳完畢,則退出循環
                else:
                    recv_size = 0
                    Flag = False
                
            print 'upload successed.'
            f.close()
    
instance = SocketServer.ThreadingTCPServer(('127.0.0.1',9999),MyServer)
instance.serve_forever()

FTP上傳文件(服務端)
複製代碼
複製代碼
#!/usr/bin/env python
#coding:utf-8


import socket
import sys
import os

ip_port = ('127.0.0.1',9999)
sk = socket.socket()
sk.connect(ip_port)

container = {'key':'','data':''}
while True:
    # 客戶端輸入要上傳文件的路徑
    input = raw_input('path:')
    # 根據路徑獲取文件名
    file_name = os.path.basename(path)
    # 獲取文件大小
    file_size=os.stat(path).st_size
    # 發送文件名 和 文件大小
    sk.send(file_name+'|'+str(file_size))
    # 爲了防止粘包,將文件名和大小發送過去以後,等待服務端收到,直到從服務端接受一個信號(說明服務端已經收到)
    sk.recv(1024)
    send_size = 0
    f= file(path,'rb')
    Flag = True
    while Flag:
        if send_size + 1024 >file_size:
            data = f.read(file_size-send_size)
            Flag = False
        else:
            data = f.read(1024)
            send_size+=1024
        sk.send(data)
    f.close()
    
sk.close()

FTP上傳文件(客戶端)
複製代碼

對於大文件處理:

send只會向緩衝區寫一次,傳入的內容不必定能發完,因此,返回值是實際發送的大小。

例如:

1023M = send(1g數據)   那麼實際是發送了 1023M,其餘 1M 就是漏發了

sendall,內部調用send會一直向緩衝區寫,直到文件所有寫完。

例如:

複製代碼
   sendall(1g數據)

    第一次:
        send(1023M)
    第二次:
        send(1M)

==========
發送大文件時候,不可能所有讀1G內存,須要open文件時,一點一點讀,而後再發。
複製代碼

# 大文件大小

file_size=os.stat(文件路徑).st_size

 

# 打開大文件

f =  file(文件路徑,'rb')

 

# 已經發送的數據

send_size = 0
複製代碼
while Flag:
    # 大文件只剩下 不到 1024 字節,其餘已經被髮送。
    if send_size + 1024 > file_size:
        # 從大文件中讀取小於 1024字節,多是 10字節...
        data = f.read(file_size-send_size)
        Flag = False
    else:
        # 從大文件中讀取 1024 字節
        data = f.read(1024)
        # 記錄已經發送了多少字節
        send_size += 1024
    # 將大文件中的數據,分批發送到緩衝區,每次最多發 1024 字節
    sk.sendall(data)
複製代碼

2、select

Linux中的 select,poll,epoll 都是IO多路複用的機制。

I/O多路複用指:經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。

複製代碼
select 
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。

select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。

select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。

另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。

poll 
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。

poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。

另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。

epoll 
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。

epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。

epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。

另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。 
複製代碼

Python select 用於監聽多個文件描述符:

複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import threading
import select


def process(request, client_address):
    print request,client_address
    conn = request
    conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
    flag = True
    while flag:
        data = conn.recv(1024)
        if data == 'exit':
            flag = False
        elif data == '0':
            conn.sendall('經過可能會被錄音.balabala一大推')
        else:
            conn.sendall('請從新輸入.')

s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s1.bind(('127.0.0.1',8020))
s1.listen(5)
s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s2.bind(('127.0.0.1',8021))
s2.listen(5)

while True:
    r, w, e = select.select([s1,s2,],[],[],1)
    print 'looping'
    for s in r:
        print 'get request'
        request, client_address = s.accept()
        t = threading.Thread(target=process, args=(request, client_address))
        t.daemon = False
        t.start()

s1.close()
s2.close()

服務端
複製代碼
複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8020)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()

客戶端:8020
複製代碼
複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8021)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()

客戶端:8021
複製代碼

3、threading

問答:

  • 應用程序、進程、線程關係?
  • 爲何要使用多個CPU ?
  • 爲何要使用多線程?
  • 爲何要使用多進程?
  • java和C#中的多線程和python多線程的區別?
  • Python GIL?
  • 線程和進程的選擇:計算密集型和IO密集型程序。(IO操做不佔用CPU)

一、Python線程

Threading用於提供線程相關的操做,線程是應用程序中工做的最小單元。

複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

def show(arg):
    time.sleep(1)
    print 'thread'+str(arg)

for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()

print 'main thread stop'
複製代碼

上述代碼建立了10個「前臺」線程,而後控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。

更多方法:

  • start            線程準備就緒,等待CPU調度
  • setName      爲線程設置名稱
  • getName      獲取線程名稱
  • setDaemon   設置爲後臺線程或前臺線程(默認)
                       若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止
                        若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止
  • join               逐個執行每一個線程,執行完畢後繼續往下執行...
  • run              線程被cpu調度後執行此方法

二、線程鎖

因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,CPU接着執行其餘線程。因此,可能出現以下問題:

複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

gl_num = 0

def show(arg):
    global gl_num
    time.sleep(1)
    gl_num +=1
    print gl_num

for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()

print 'main thread stop'

未使用線程鎖
複製代碼
複製代碼
#!/usr/bin/env python
#coding:utf-8
 
import threading
import time
 
gl_num = 0
 
lock = threading.RLock()
 
def Func():
    lock.acquire()
    global gl_num
    gl_num +=1
    time.sleep(1)
    print gl_num
    lock.release()
     
for i in range(10):
    t = threading.Thread(target=Func)
    t.start()
複製代碼

擴展:進程

一、建立多進程程序

複製代碼
from multiprocessing import Process
import threading
import time

def foo(i):
    print 'say hi',i

for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()
複製代碼

注意:因爲進程之間的數據須要各自持有一份,因此建立進程須要的很是大的開銷。

二、進程共享數據

進程各自持有一份數據,默認沒法共享數據

複製代碼
#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process
from multiprocessing import Manager

import time

li = []

def foo(i):
    li.append(i)
    print 'say hi',li
 
for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()
    
print 'ending',li
複製代碼
複製代碼
#方法一,Array
from multiprocessing import Process,Array
temp = Array('i', [11,22,33,44])

def Foo(i):
    temp[i] = 100+i
    for item in temp:
        print i,'----->',item

for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
    p.join()

#方法二:manage.dict()共享數據
from multiprocessing import Process,Manager

manage = Manager()
dic = manage.dict()

def Foo(i):
    dic[i] = 100+i
    print dic.values()

for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
    p.join()

進程間共享數據
複製代碼

三、進程池

複製代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from  multiprocessing import Process,Pool
import time

def Foo(i):
    time.sleep(2)
    return i+100

def Bar(arg):
    print arg

pool = Pool(5)
#print pool.apply(Foo,(1,))
#print pool.apply_async(func =Foo, args=(1,)).get()

for i in range(10):
    pool.apply_async(func=Foo, args=(i,),callback=Bar)

print 'end'
pool.close()
pool.join()
複製代碼
相關文章
相關標籤/搜索