SELECTORS模塊實現併發簡單版FTP

環境:windows, python 3.5
功能:
使用SELECTORS模塊實現併發簡單版FTP
容許多用戶併發上傳下載文件

結構:
ftp_client ---|
bin ---|
start_client.py ......啓動客戶端
conf---|
config.py ......客戶端參數配置
system.ini ......客戶端參數配置文件
core---|
clients.py ......客戶端主程序
home ......默認下載路徑
ftp_server ---|
bin ---|
start_server.py ......啓動服務端
conf---|
config.py ......服務端參數配置
system.ini ......服務端參數配置文件
core---|
servers.py ......服務端主程序
db ---|
data.py ......存取用戶數據
home ......默認上傳路徑

功能實現:
客戶端輸入命令,根據命令經過映射進入相應方法,上傳或者下載;
服務端用SELECTORS模塊實現併發,接收數據,經過action用映射進入相應方法;


如何使用:
啓動start_server.py,啓動start_client.py;
在客戶端輸入get pathname/put pathname進行上傳下載文件,開啓多個客戶端可併發上傳下載。

core:
#!/usr/bin/env python
# -*-coding:utf-8-*-
# Author:zh
import socket
import os
import json
import random
import conf
PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))+os.sep+"home"+os.sep   # 默認下載存放路徑爲當前路徑的home目錄下


class FtpClient(object):
    def __init__(self):
        self.client = socket.socket()

    def connect(self, ip, port):
        self.client.connect((ip, port))

    def interactive(self):
        # 入口
        while True:
            cmd = input(">>").strip()
            if len(cmd) == 0:
                continue
            cmd_list = cmd.split()
            if len(cmd_list) == 2:
                cmd_str = cmd_list[0]
                if hasattr(self, cmd_str):
                    func = getattr(self, cmd_str)
                    func(cmd)
                else:
                    print("輸入格式錯誤")
                    self.help()
            else:
                print("輸入格式錯誤")
                self.help()

    @staticmethod
    def help():
        msg = '''
get filepath(下載文件)
put filepath(上傳文件)
        '''
        print(msg)

    def get(self, *args):
        # 下載
        data = args[0].split()
        action = data[0]
        path = data[1]
        send_msg = {
            "action": action,  # 第一次請求,須要服務端返回文件信息
            "file_path": path  # F:\oracle課程\(必讀)11gOCP考試流程與須知.rar
        }
        self.client.send(json.dumps(send_msg).encode())
        data = self.client.recv(1024)
        data = json.loads(data.decode())
        if data["sign"] == "0":
            file_length = data["length"]
            file_path = PATH+data["filename"]
            if os.path.isfile(file_path):
                file_path = file_path + str(random.randint(1,1000))
            file = open(file_path, "wb")
            send_msg_2 = {
                "action": "get_file_data",  # 第二次請求,請求客戶端給發送文件數據
                "file_path": path  # 第二次請求依然須要傳入文件路徑,也能夠在服務端利用全局變量保存
            }
            self.client.send(json.dumps(send_msg_2).encode())
            get_length = 0
            while get_length < int(file_length):
                data = self.client.recv(1024)
                file.write(data)
                get_length += len(data)
            else:
                file.close()
            if get_length == int(file_length):
                print("下載成功")
            else:
                print("文件傳輸失敗")
                os.remove(file_path)  # 若是傳輸過程出現問題,刪除文件
        else:
            print("文件不存在")

    def put(self, *args):
        # 上傳
        data = args[0].split()
        action = data[0]
        file_path = data[1]
        if os.path.isfile(file_path):
            msg = {
                "action": action,  # 上傳第一次發送,發送文件大小和姓名
                "length": os.stat(file_path).st_size,
                "filename": file_path[file_path.rfind("\\") + 1:]
            }
            self.client.send(json.dumps(msg).encode())
            self.client.recv(1024)  # 避免黏包
            file = open(file_path, "rb")
            for line in file:
                self.client.send(line)  # 上傳第二次發送,發送文件數據,沒有action,在服務端經過try,json報錯的進入接收數據的方法裏
            else:
                file.close()
            sign = self.client.recv(1024)
            if sign == b'0':
                print("上傳成功")
            else:
                print("上傳失敗")
        else:
            print("文件不存在")


def run():
    data = conf.config.Configuration()
    conf_data = data.get_config()
    obj = FtpClient()
    obj.connect(conf_data[0][1], int(conf_data[1][1]))
    obj.interactive()
clients

 

core:python

#!/usr/bin/env python
# -*-coding:utf-8-*-
# Author:zh
import selectors
import socket
import json
import os
import random
import conf
PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))+os.sep+"home"+os.sep  # 默認上傳存放路徑爲當前路徑的home目錄下


class SelectorFtp(object):

    def __init__(self):
        self.sel = selectors.DefaultSelector()

    def connect(self, ip, port):
        sock = socket.socket()
        sock.bind((ip, port))
        sock.listen(100)
        sock.setblocking(False)
        self.sel.register(sock, selectors.EVENT_READ, self.accept)
        while True:
            events = self.sel.select()
            for key, mask in events:
                callback = key.data
                callback(key.fileobj, mask)

    def accept(self, sock, mask):
        conn, addr = sock.accept()  # Should be ready
        conn.setblocking(False)
        self.sel.register(conn, selectors.EVENT_READ, self.interactive)

    def interactive(self, conn, mask):
        # 回調函數,每次接收消息都判斷action,根據action分別調用不一樣函數信息交互
        try:
            data = conn.recv(1024)
            if data:
                try:
                    cmd_dict = json.loads(data.decode())
                    action = cmd_dict['action']
                    if hasattr(self, action):
                        func = getattr(self, action)
                        func(cmd_dict, conn)
                except (UnicodeDecodeError, json.decoder.JSONDecodeError, TypeError) as e:
                    # put發送文件數據,沒法經過json打包,採起json報錯後進入方法循環接收
                    self.put_file_data(data, conn)
            else:
                self.sel.unregister(conn)
                conn.close()
        except ConnectionResetError as e:
            print(e)
            # 客戶端意外斷開時註銷連接
            self.sel.unregister(conn)
            conn.close()

    def get(self, *args):
        # 下載第一步:判斷文件是否存在並返回文件大小和文件名
        conn = args[1]
        file_path = args[0]["file_path"]
        if os.path.isfile(file_path):
            sign = "0"
            length = os.stat(file_path).st_size
        else:
            sign = "1"
            length = None
        msg = {
            "sign": sign,
            "length": length,
            "filename": file_path[file_path.rfind("\\")+1:]
        }
        conn.send(json.dumps(msg).encode())

    def get_file_data(self, *args):
        # 下載第二步,傳送文件數據
        conn = args[1]
        file_path = args[0]["file_path"]
        file = open(file_path, "rb")
        for line in file:
            conn.send(line)
        else:
            file.close()

    def put(self, *args):
        # 上傳第一步,接收文件大小和文件名
        conn = args[1]
        self.length = args[0]["length"]
        self.filename = PATH + args[0]["filename"]
        if os.path.isfile(self.filename):
            self.filename = self.filename + str(random.randint(1, 1000))
        self.file = open(self.filename, "wb")
        self.recv_size = 0
        conn.send(b"0")  # 避免客戶端黏包

    def put_file_data(self, *args):
        # json報錯後進入此循環接收數據
        conn = args[1]
        self.recv_size += len(args[0])
        self.file.write(args[0])
        if int(self.length) == self.recv_size:
            self.file.close()
            conn.send(b"0")


def run():
    data = conf.config.Configuration()
    conf_data = data.get_config()
    obj = SelectorFtp()
    obj.connect(conf_data[0][1], int(conf_data[1][1]))
servers

 

config:json

#!/usr/bin/env python
# -*-coding:utf-8-*-
# _author_=zh
import os
import configparser
PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


class Configuration(object):
    def __init__(self):
        self.config = configparser.ConfigParser()
        self.name = PATH+os.sep+"conf"+os.sep+"system.ini"

    def init_config(self):
        # 初始化配置文件,ip :客戶端IP,port:客戶端端口
        if not os.path.exists(self.name):
            self.config["config"] = {"ip": "localhost", "port": 1234}
            self.config.write(open(self.name, "w", encoding="utf-8", ))

    def get_config(self, head="config"):
        '''
        獲取配置文件數據
        :param head: 配置文件的section,默認取初始化文件config的數據
        :return:返回head中的全部數據(列表)
        '''
        self.init_config()  # 取文件數據以前生成配置文件
        self.config.read(self.name, encoding="utf-8")
        if self.config.has_section(head):
            section = self.config.sections()
            return self.config.items(section[0])
config
相關文章
相關標籤/搜索