#!/usr/bin/env python #-*- coding: utf-8 -*- #Asynchronous Echo Server - Chapter 22 - echoserver.py #Compare to echo server in Chapter 3 import socket import traceback import os import sys import select class stateclass: stdmask = select.POLLERR | select.POLLHUP | select.POLLNVAL def __init__(self, mastersock): """ Initialize the state class """ """ select.poll() Returns a polling object, which supports registering and unregistering file descriptors, and then polling them for I/O events;(Not supported by all operating systems) The poll() system call, supported on most Unix systems, poll() scales better because the system call only requires listing the file descriptors of interest, while select() builds a bitmap, turns on bits for the fds of interest, and then afterward the whole bitmap has to be linearly scanned again. """ self.p = select.poll() self.mastersock = mastersock self.watchread(mastersock) self.buffers = {} self.sockets = {mastersock.fileno():mastersock} def fd2socket(self, fd): """ Given a file descriptor, return a socket """ return self.sockets[fd] def watchread(self, fd): """ Note interest in reading and register it """ """ poll.register(fd,[,eventmask]) Register a file descriptor with the polling object. Future calls to the poll() method will then check whether the file descriptor has any pending I/O events. fd can be either an integer, or an object with a fineno() method that returns an integer. File objects implement fileno(), so they can also be used as the argument. eventmask is an optional bitmask describing the type of events you want to check for, and can be a combination of the constants POLLIN, POLLPRI, and POLLOUT, described in the table below. If not specified, the default value used will check for all 3 types of events. Constant Meaning POLLIN There is data to read POLLPRI There is urgent data to read POLLOUT Ready for output:writing will not block POLLERR Error condition of some sort POLLHUP Hung up POLLNVAL Invalid request: descriptor not open """ self.p.register(fd, select.POLLIN | self.stdmask) def watchwrite(self, fd): """ Note interest in writing and register it """ self.p.register(fd, select.POLLOUT | self.stdmask) def watchboth(self, fd): """ Note interest in reading and writing and register them """ self.p.register(fd, select.POLLIN | select.POLLOUT | self.stdmask) def dontwatch(self, fd): """ Don't watch anything about this fd and unregister it """ self.p.unregister(fd) def newconn(self, sock): """ Process a new connection """ fd = sock.fileno() #Start out watching both since there will be an outgoing message self.watchboth(fd) #Put a greeting message as default value into the buffer self.buffers[fd] = "Welcome to the echoserver, %s\n" %str(sock.getpeername()) self.sockets[fd] = sock def readevent(self, fd): """ Called when data is ready to read """ try: #Read the data and append it to the write buffer self.buffers[fd] += self.fd2socket(fd).recv(4096) except: self.closeout(fd) #Interest in reading and writing, because if data received, then meaning writing maybe needed self.watchboth(fd) def writeevent(self, fd): """ Called when data is ready to write """ if not len(self.buffers[fd]): #No data to send? Take it out of the write list and return self.watchread(fd) return try: """ socket.send(string[,flags]) send data to the socket. The socket must be connected to a remote socket. The optional flags argument has the same meaning as for recv() above. Returns the number of bytes sent. Applications are responsible for checking that all data has been sent; if only some of the data was transmitted, the application needs to attempt delivery of the remaining data. """ #send() function will send all data with non-blocking, and return the number of bytes sent byteswritten = self.fd2socket(fd).send(self.buffers[fd]) except: self.closeout(fd) #Delete the text send from the buffer self.buffers[fd] = self.buffers[fd][byteswritten:] #If the buffer is empty, we don't care about writing in the future if not len(self.buffers[fd]): self.watchread(fd) def errorevent(self, fd): """ Called when an error occurs """ self.closeout(fd) def closeout(self, fd): """ Closes out a connection and removes it from data structures """ self.dontwatch(fd) try: """ socket.close() Close the socket. All future opertaions on the socket object will fail. The remote end will receive no more data(after queued data is flushed). Sockets are automaticcly closed when they are garbage-collected. Note: close() releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, call shutdown() before close(). """ self.fd2socket(fd).close() except: pass del self.buffers[fd] del self.sockets[fd] def loop(self): """ Main loop for the program """ while 1: """ poll.poll([timeout]) polls the set of registered file descriptors, and returns a possibly-empty list containing(fd, event) 2-tuples for the descriptors that have events or errors to report. fd is the file descriptor, and event is a bitmask with bits set for the reported events for that descriptor - POLLIN for waiting input, POLLOUT to indicate that the descriptor can be written to, and so forth. An empty list indicates that the call timed out and no file descriptors had any events to report. If timeout is given, is specifies the length of time in milliseconds which the system will wait for events before returning. If timeout is ommited, negative or None, the call will block until there is an event for this poll object. """ """ (1)首先,對p.poll()的調用返回master socket,以及能夠讀的socket列表。服務器經過新的客戶socket調用newconn()函數。newconn()函數會初始化數據結構,並在爲客戶準備的緩存器中添加「問候語」。最後,它在返回以前會觀察來自客戶端的讀和寫,控制接着被返回給p.poll()。 (2)當客戶端準備好接收數據的時候,p.poll()將和客戶socket一塊兒再次返回,同時還有一個準備好寫的socket列表。服務器會傳輸一些數據,若是緩存器的全部內容都被髮送,它將把客戶從寫的socket列表中去掉。控制再次返回循環。 (3)當客戶端發送數據到服務器的時候,p.poll()將返回,並代表要從客戶端讀取一些數據,readevent()方法被調用。它接收數據,把它們加到緩存器的結尾處,並確保服務器已經準備好把數據寫回給客戶端。當客戶端準備好接收數據了,數據就像開始時候發送歡迎語那樣發送出去。 (4)當客戶端關閉鏈接的時候,服務器會被告知出現了一個錯誤,所以會調用errorevent()函數,它關閉服務器端的socket,並把客戶從數據結構中去掉。 """ #The step take time and blocked until received client connection result = self.p.poll() for fd, event in result: if fd == self.mastersock.fileno() and event == select.POLLIN: #mastersock events mean a new client connection. #Accept it(get the new socket of client), configure it, and pass it over to newconn() try: newsock, addr = self.fd2socket(fd).accept() """ socket.setblocking(flag) set blocking or non-blocking mode of the socket: if flag is 0, the socket is set to non-blocking, else to blocking mode. Initially all sockets are in blocking mode. In non-blocking mode, if a recv() call doesn't find any data, or if a send() call can't immediately dispose of the data, a error exception is raised; in blocking mode, the calls block until they can proceed. s.setblocking(0) is equivalent to s.settimeout(0.0); s.setblocking(1) is equivalent to s.settimeout(None). """ newsock.setblocking(0) print "Got connection from", newsock.getpeername() self.newconn(newsock) except: pass elif event == select.POLLIN: self.readevent(fd) elif event == select.POLLOUT: self.writeevent(fd) else: self.errorevent(fd) host = '' port = 51423 """ socket() function returns a socket object whose methods implement the various socket system calls. """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) """ socket.setsockopt(level, optname, value) set the value of the given socket option. """ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((host, port)) s.listen(1) s.setblocking(0) state = stateclass(s) state.loop()
2、高級的服務器端使用 python
不少異步服務器實際上針對每一個客戶端使用兩個緩存器---一個是爲到來的指令,一個是爲發送的數據。這樣就可使服務器把那些不在一個單獨信息包中的指令合併在一塊兒。下面是一個很是簡單的聊天系統。服務器只有在收到文本SEND以後,纔會把收到的數據發送給全部鏈接的客戶端。 緩存
這個程序的框架和前一個相似。可是,請注意對於加入的讀緩存器和處理它的代碼。代碼主要處理3種不一樣的輸入感興趣: 服務器
(1)沒有結束指令(SEND)的數據; 數據結構
(2)一個或多個的結束指令; app
(3)一個或多個結束指令外加一條未結束的指令。 框架
經過異步I/O,使用了常規的指令,好比沒法讀取一整行。所以,必須本身來實現把輸入放入緩存器,並準備好同時接收部分行或多行。 異步
#!/usr/bin/env python #-*- coding: utf-8 -*- #Asynchronous Chat Server - Chapter 22 - chatserver.py import socket, traceback, os, sys, select class stateclass: stdmask = select.POLLERR | select.POLLHUP | select.POLLNVAL def __init__(self, mastersock): """ select.poll() Returns a polling object, which supports registering and unregistering file descriptors, and then polling them for I/O events; The poll() system call, supported on most Unix systems, provides better scalability for network servers that service many, many clients at the same time. poll()scales better because the system call only requires listing the file descriptors of interest, while select() builds a bitmap, turns on bits for the fds of interest, and the afterward the whole bitmap has to be linearly scanned again. """ self.p = select.poll() self.mastersock = mastersock self.watchread(mastersock) self.readbuffers = {} self.writebuffers = {} self.sockets = {mastersock.fileno():mastersock} def fd2socket(self, fd): return self.sockets[fd] def watchread(self, fd): """ poll.register(fd[,eventmask]) Register a file descriptor with the polling object. Future calls to the poll() method will then check whether the file descriptor has any pending I/O events. fd can be either an integer, or an object with a finelno() method that returns an integer. File objects implement fileno(), so they can also be used as the argument. eventmask is an optional bitmask describing the type of events you want to check for, and can be a combination of the constants POLLIN, POLLPRI, and POLLOUT, described in the table below. If not specified, the default value used will check for all 3 types of events. Constant Meaning POLLIN There is data to read POLLPRI There is urgent data to read POLLOUT Ready for output: wirting will not block POLLERR Error condition of some sort POLLHUP Hung up POLLNVAL Invalid request:descriptor not open Registering a file descriptor that's already registered is not an error, and has the same effect as registering the descriptor exactly once. """ self.p.register(fd, select.POLLIN | self.stdmask) def watchwrite(self, fd): self.p.register(fd, select.POLLOUT | self.stdmask) def watchboth(self, fd): self.p.register(fd, select.POLLIN | select.POLLOUT | self.stdmask) def dontwatch(self, fd): """ Remove a file descriptor being tracked by a polling object. Just like the register() method, fd can be an integer or an object with a fileno() method that returns an integer. """ self.p.unregister(fd) def sendtoall(self, text, originfd): for line in text.split("\n"): line = line.strip() transmittext = str(self.fd2socket(originfd).getpeername())+ ": " + line + "\n" for fd in self.writebuffers.keys(): self.writebuffers[fd] += transmittext self.watchboth(fd) def newconn(self, sock): """ Return the socket's file descriptor(a small integer). """ fd = sock.fileno() self.watchboth(fd) self.writebuffers[fd] = "Welcome to the chat server %s\n" %str(sock.getpeername()) self.readbuffers[fd] = "" self.sockets[fd] = sock def readevent(self, fd): try: """ socket.recv(bufsize[,flags]) Receive data from the socket. The return value is a string representing the data received. The maximum amount of data to be received at once is specified by bufsize. Note: For best match with hadrware and network realities, the value of bufsize should be a relatively small power of 2, for example, 4096 """ #Read the data and append it to the write buffer. self.readbuffers[fd] += self.fd2socket(fd).recv(4096) except: self.closeout(fd) parts = self.readbuffers[fd].split("SEND") if len(parts) < 2: #No SEND command received return elif parts[-1] == '': #Nothing follows the SEND command,send what we have and ignore the rest. self.readbuffers[fd] = "" sendlist = parts[:-1] else: #The last element has data for which a SEND has not yet been seen; #push it onto the buffer and process the rest. self.readbuffers[fd] = parts[-1] sendlist = parts[:-1] for item in sendlist: self.sendtoall(item.strip(), fd) def writeevent(self, fd): if not len(self.writebuffers[fd]): #No data to send? Take it out of the write list and return. self.watchread(fd) return try: """ socket.send(string[,flags]) send data to the socket. The socket must be connected to a remote socket. The optional flags argument has the same meaning as for recv() above. Returns the number of bytes sent. Applications are responsible for checking that all data has been sent; if only some of the data was tranmitted, the application needs to attempt delivery of the remaining data. """ byteswritten = self.fd2socket(fd).send(self.writebuffers[fd]) except: self.closeout(fd) self.writebuffers[fd] = self.writebuffers[fd][byteswritten:] if not len(self.writebuffers[fd]): self.watchread(fd) def errorevent(self, fd): self.closeout(fd) def closeout(self, fd): self.dontwatch(fd) try: """ soket.close() Close the socket. All future operations on the socket object will fail. The remote end will receive no more data(after queued data is flushed). Sockets are automatically closed when they are garbage-collected. Note: close() releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, call shutdown() before close(). socket.shutdown(how) shut down one or both halves of the connection. If how is SHUT_RD, further receives are disallowed. If how is SHUT_WR, further sends are disallowed. If how is SHUT_RDWR, further sends and receives are disallowed. Depending on the platform, shutting down one half of the connection can also close the opposite half. """ self.fd2socket(fd).close() except: pass del self.writebuffers[fd] del self.sockets[fd] def loop(self): while 1: """ poll.poll([timeout]), polls the set of registered file descriptors, and returns a possibly-empty list containing(fd, event) 2-tuples for the descriptors that have events or errors report. fd is the file descriptor, and event is a bitmask with bits set for the reported events for that descriptor - POLLIN for waiting input, POLLOUT to indicate that the descriptor can be written to, and so forth. An empty list indicates that the call timed out and no file descriptors had any events to report. If timeout is given, it specifies the length of time in milliseconds which the system will wait for events before returning. If timeout is omitted, negative, or None, the call will block until there is an event for this poll object. """ result = self.p.poll() for fd, event in result: if fd == self.mastersock.fileno() and event == select.POLLIN: try: """ socket.accept() Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair(conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. """ newsock, addr = self.fd2socket(fd).accept() newsock.setblocking(0) """ socket.getpeername() Return the remote address to which the socket is connected. This is useful to find out the port number of a remote IPv4/v6 socket, for instance.(The format of the address returned depends on the address family) """ print "Got connection from", newsock.getpeername() self.newconn(newsock) except: pass elif event == select.POLLIN: self.readevent(fd) elif event == select.POLLOUT: self.writeevent(fd) else: self.errorevent(fd) host = '' #Bind to all interfaces port = 51423 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((host, port)) s.listen(1) """ socket.setblocking(flag) Set blocking or non-blocking mode of the socket: if flag is 0, the socket is set to non-blocking, else to blocking mode. Initially all sockets are in blocking mode. In non-blocking mode, if a recv() call does't find any data, or if a send() call can't immediately dispose of the data, a error exception is raised; in blocking mode, the calls block until they can proceed. s.setblocking(0) is equivalent to s.settimeout(0.0); s.setblocking(1) is equivalent to s.settimeout(None). """ s.setblocking(0) state = stateclass(s) state.loop()
3、監控多個master socket socket
使用一個單任務服務器來偵聽多個不一樣的端口。事實上,標準UNIX的"superserver"守護進程(inetd)就是這樣作的。 ide
守護進程偵聽多個端口。當有鏈接到來的時候,它會啓動一個能夠處理這個鏈接的程序。經過這種方法,一個單獨的進程能夠處理許多socket。在那些有不少進程,可是用得不是不少的系統上,這時有一個優勢,由於可使用一個進程偵聽多個不一樣的socket。 函數
實現相似守護進程服務器的一種方法是使用poll()函數來檢測全部的master socket。當接收到鏈接的時候,它會轉向一個已經知道的文件描述符,並把它轉給一個實際的處理程序。
在實際當中,相似守護進程的服務器將是一些東西的混合體。它使用poll()來監視master socket,可是也會使用fork()把它們傳遞給處理程序。
首先,它創建一個stateclass類實例,接着打開它的配置文件inetd.txt,並讀取它。每一行都給出了一個TCP端口號和一個當有客戶端鏈接到該端口的時候會運行的指令。因此,對於配置文件的每一行,一個新的socket對象被創建、綁定、配置以及被加入到stateclass中。最後,配置文件被所有處理以後,它被關閉,程序也進入主循環。守護進程的循環只須要處理一個事件---那就是客戶端的鏈接。當有鏈接的時候,客戶端被傳遞給self.newconn(),同時還有將要運行的指令。
newconn()是實際操做發生的地方。經過檢查pid的值,若是正在父進程中,新的客戶端socket將被關閉,而且代碼返回循環。在子進程一端,它作的第一件事情是關閉每一個單獨的master socket。
#!/usr/bin/env python #-*- coding: utf-8 -*- #Asynchronous Inetd-like Server - Chapter 22 - inetd.py import socket, traceback, os, sys, select class stateclass: def __init__(self): """ Returns a polling object, which supports registering and unregistering file descriptors, and then polling them for I/O events. The poll() system call, supported on most Unix systems, provides better scalability for network servers that service many, many clients at the same time. poll() scales better because the system call only requires listing the file descriptors of interest, while select() builds a bitmap, turns on bits for the fds of interest, and then afterward the whole bitmap has to be linearly scanned again. """ self.p = select.poll() self.mastersocks = {} self.commands = {} def fd2socket(self, fd): return self.mastersocks[fd] def addmastersock(self, sockobj, command): self.mastersocks[sockobj.fileno()] = sockobj self.commands[sockobj.fileno()] = command self.watchread(sockobj) def watchread(self, fd): """ poll.register(fd[,eventmask]) Register a file descriptor with the polling object. Future calls to the poll() method will then check whether the file descriptor has any pending I/O events. fd can be either an integer, or an object with a fileno() method that returns an integer. File objects implement fileno(), so they can also be used as the argument. eventmask is an optional bitmask describing the type of events you want to check for, and can be a combination of the constants POLLIN, POLLPRI and POLLOUT, described in the table below. If not specified, the default value used will check for all 3 types of events. Constant Meaning POLLIN There is data to read POLLPRI There is urgent data to read POLLOUT Ready for output: writing will not block POLLERR Error condition of some sort POLLHUP Hung up POLLNVAL Invalid request: descriptor not open """ self.p.register(fd, select.POLLIN) def dontwatch(self, fd): """ poll.unregister(fd) Remove a file descriptor being tracked by a polling object. Just like the register() method, fd can be an integer or an object with a fineno() method that returns an integer. Attempting to remove a fild descriptor that was never registerd causes a KeyError exception to be raised. """ self.p.unregister(fd) def newconn(self, newsock, command): try: pid = os.fork() except: try: newsock.close() except: pass return if pid: #Parent process newsock.close() return #Child process from here on #First, close all the master sockets. for sock in self.mastersocks.values(): """ socket.close() Close the socket. All future operations on the socket object will fail. The remote end will receive no more data(after queued data is flushed). Sockets are automatically closed when they are garbage-collected. Note: close() releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, call shutdown() before close(). """ sock.close() #Next, copy the socket's file descriptor to standard input(0), #standard output(1), and standard error(2). fd = newsock.fileno() """ os.dup2(fd, fd2) Duplicate file descriptor fd to fd2, closing the latter first if necessary. """ os.dup2(fd, 0) os.dup2(fd, 1) os.dup2(fd, 2) #Finally, call the command. program = command.split('')[0] args = command.split('')[1:] try: """ os.execvp(file, args) os.execvpe(file, args, env) These functions all execute a new program, replacing the current process; they do not return. On Unix, the new executable is loaded into the current process, and will have the same process id as the caller. Errors will be reported as OSError exceptions. The current process is replaced immediately. Open file objects and descriptors are not flushed, so if there may be data buffered on these open files, you should flush them using sys.stdout.flush() or os.fsync() before calling an exec*() function. """ os.execvp(program, [program]+args) except: sys.exit(1) def loop(self): while 1: """ poll.poll([timeout]) Polls the set of registered file descriptors, and returns a possibly-empty list containing(id, event) 2-tuples for the descriptors that have events or errors to report. fd is the file descriptor, and event is a bitmask with bits set for the reported events for that descriptor ---POLLIN for waiting input, POLLOUT to indicate that the descriptor can be written to, and so forth. An empty list indicates that the call timed out and no file descriptors had any events to report. If timeout is given, it specifies the lenght of time in milliseconds which the system will wait for events before returning. If timeout is omitted, negative, or None, the call will block until there is an event for this poll object. """ result = self.p.poll() for fd, event in result: print "Received a child connection" try: """ socket.accept() Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair(conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. """ newsock, addr = self.fd2socket(fd).accept() self.newconn(newsock, self.commands[fd]) except: pass host = '' #Bind to all interfaces state = stateclass() config = open("inetd.txt") for line in config: line = line.strip() """ string.split(s[,sep[,maxsplit]]) Return a list of the words of the string s. If the optional second argument sep is absent or None, the words are separated by arbitrary strings of whitespace characters(space, tab, newline, return, formfeed. If the second argument sep is present and not None, it specifies a string to be used as the word separator. The returned list will then have one more item than the number of non-overlapping occurrences of the separtor inthe string. If maxsplit is given, at most maxsplit number of splits occur, and the remainder of the string is returned as the final element of the list(thus, the list will have at most maxlist+1 elements). If maxplit is not specified or -1, then there is no limit on the number of splits(all possible splits are made). The behavior of split on empty string depends on the value of sep. If sep is not specified, or specified as None, the result will be an empty list. If sep is specified as any string, the result will be a list containing one element which is an empty string. """ port, command = line.split(":",1) port = int(port) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) """ socket.setsockopt(level, optname, value) Set the value of the given option. The needed symbolic constants are defined in the socket module. The value can be an integer or a string representing a buffer. """ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) """ socket.bind(address) Bind the socket to address. The socket must not alredy be bound.(The format of address depends on the address family) """ s.bind((host, port)) """ socket.listen(backlog) Listen for connections made to the socket. The backlog argument specifies the maximum number of queued connections and should be at least 0; the maximum value is system-dependent(usually 5), the minimum value is forced to 0. """ s.listen(1) """ socket.setblocking(flag) set blocking or non-blocking mode of the socket: if flag is 0, the socket is set to non-blocking, else to blocking mode. Initially all sockets are in blocking mode. In non-blocking mode, if a recv() call doesn't find any data, or if a send() call can't immediately dispose of the data, a error exception is raised; in blocking mode, the calls block until they can proceed. s.setblocking(0) is equivalent to s.settimeout(0.0); s.setblocking(1) is equivalent to s.settimeout(None). """ s.setblocking(0) state.addmastersock(s, command) config.close() state.loop()