環境:windows, python 3.5
功能:
使用SELECTORS模塊實現併發簡單版FTP
容許多用戶併發上傳下載文件
結構:
ftp_client ---|
bin ---|
start_client.py ......啓動客戶端
conf---|
config.py ......客戶端參數配置
system.ini ......客戶端參數配置文件
core---|
clients.py ......客戶端主程序
home ......默認下載路徑
ftp_server ---|
bin ---|
start_server.py ......啓動服務端
conf---|
config.py ......服務端參數配置
system.ini ......服務端參數配置文件
core---|
servers.py ......服務端主程序
db ---|
data.py ......存取用戶數據
home ......默認上傳路徑
功能實現:
客戶端輸入命令,根據命令經過映射進入相應方法,上傳或者下載;
服務端用SELECTORS模塊實現併發,接收數據,經過action用映射進入相應方法;
如何使用:
啓動start_server.py,啓動start_client.py;
在客戶端輸入get pathname/put pathname進行上傳下載文件,開啓多個客戶端可併發上傳下載。
core:
#!/usr/bin/env python # -*-coding:utf-8-*- # Author:zh import socket import os import json import random import conf PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))+os.sep+"home"+os.sep # 默認下載存放路徑爲當前路徑的home目錄下 class FtpClient(object): def __init__(self): self.client = socket.socket() def connect(self, ip, port): self.client.connect((ip, port)) def interactive(self): # 入口 while True: cmd = input(">>").strip() if len(cmd) == 0: continue cmd_list = cmd.split() if len(cmd_list) == 2: cmd_str = cmd_list[0] if hasattr(self, cmd_str): func = getattr(self, cmd_str) func(cmd) else: print("輸入格式錯誤") self.help() else: print("輸入格式錯誤") self.help() @staticmethod def help(): msg = ''' get filepath(下載文件) put filepath(上傳文件) ''' print(msg) def get(self, *args): # 下載 data = args[0].split() action = data[0] path = data[1] send_msg = { "action": action, # 第一次請求,須要服務端返回文件信息 "file_path": path # F:\oracle課程\(必讀)11gOCP考試流程與須知.rar } self.client.send(json.dumps(send_msg).encode()) data = self.client.recv(1024) data = json.loads(data.decode()) if data["sign"] == "0": file_length = data["length"] file_path = PATH+data["filename"] if os.path.isfile(file_path): file_path = file_path + str(random.randint(1,1000)) file = open(file_path, "wb") send_msg_2 = { "action": "get_file_data", # 第二次請求,請求客戶端給發送文件數據 "file_path": path # 第二次請求依然須要傳入文件路徑,也能夠在服務端利用全局變量保存 } self.client.send(json.dumps(send_msg_2).encode()) get_length = 0 while get_length < int(file_length): data = self.client.recv(1024) file.write(data) get_length += len(data) else: file.close() if get_length == int(file_length): print("下載成功") else: print("文件傳輸失敗") os.remove(file_path) # 若是傳輸過程出現問題,刪除文件 else: print("文件不存在") def put(self, *args): # 上傳 data = args[0].split() action = data[0] file_path = data[1] if os.path.isfile(file_path): msg = { "action": action, # 上傳第一次發送,發送文件大小和姓名 "length": os.stat(file_path).st_size, "filename": file_path[file_path.rfind("\\") + 1:] } self.client.send(json.dumps(msg).encode()) self.client.recv(1024) # 避免黏包 file = open(file_path, "rb") for line in file: self.client.send(line) # 上傳第二次發送,發送文件數據,沒有action,在服務端經過try,json報錯的進入接收數據的方法裏 else: file.close() sign = self.client.recv(1024) if sign == b'0': print("上傳成功") else: print("上傳失敗") else: print("文件不存在") def run(): data = conf.config.Configuration() conf_data = data.get_config() obj = FtpClient() obj.connect(conf_data[0][1], int(conf_data[1][1])) obj.interactive()
core:python
#!/usr/bin/env python # -*-coding:utf-8-*- # Author:zh import selectors import socket import json import os import random import conf PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))+os.sep+"home"+os.sep # 默認上傳存放路徑爲當前路徑的home目錄下 class SelectorFtp(object): def __init__(self): self.sel = selectors.DefaultSelector() def connect(self, ip, port): sock = socket.socket() sock.bind((ip, port)) sock.listen(100) sock.setblocking(False) self.sel.register(sock, selectors.EVENT_READ, self.accept) while True: events = self.sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) def accept(self, sock, mask): conn, addr = sock.accept() # Should be ready conn.setblocking(False) self.sel.register(conn, selectors.EVENT_READ, self.interactive) def interactive(self, conn, mask): # 回調函數,每次接收消息都判斷action,根據action分別調用不一樣函數信息交互 try: data = conn.recv(1024) if data: try: cmd_dict = json.loads(data.decode()) action = cmd_dict['action'] if hasattr(self, action): func = getattr(self, action) func(cmd_dict, conn) except (UnicodeDecodeError, json.decoder.JSONDecodeError, TypeError) as e: # put發送文件數據,沒法經過json打包,採起json報錯後進入方法循環接收 self.put_file_data(data, conn) else: self.sel.unregister(conn) conn.close() except ConnectionResetError as e: print(e) # 客戶端意外斷開時註銷連接 self.sel.unregister(conn) conn.close() def get(self, *args): # 下載第一步:判斷文件是否存在並返回文件大小和文件名 conn = args[1] file_path = args[0]["file_path"] if os.path.isfile(file_path): sign = "0" length = os.stat(file_path).st_size else: sign = "1" length = None msg = { "sign": sign, "length": length, "filename": file_path[file_path.rfind("\\")+1:] } conn.send(json.dumps(msg).encode()) def get_file_data(self, *args): # 下載第二步,傳送文件數據 conn = args[1] file_path = args[0]["file_path"] file = open(file_path, "rb") for line in file: conn.send(line) else: file.close() def put(self, *args): # 上傳第一步,接收文件大小和文件名 conn = args[1] self.length = args[0]["length"] self.filename = PATH + args[0]["filename"] if os.path.isfile(self.filename): self.filename = self.filename + str(random.randint(1, 1000)) self.file = open(self.filename, "wb") self.recv_size = 0 conn.send(b"0") # 避免客戶端黏包 def put_file_data(self, *args): # json報錯後進入此循環接收數據 conn = args[1] self.recv_size += len(args[0]) self.file.write(args[0]) if int(self.length) == self.recv_size: self.file.close() conn.send(b"0") def run(): data = conf.config.Configuration() conf_data = data.get_config() obj = SelectorFtp() obj.connect(conf_data[0][1], int(conf_data[1][1]))
config:json
#!/usr/bin/env python # -*-coding:utf-8-*- # _author_=zh import os import configparser PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) class Configuration(object): def __init__(self): self.config = configparser.ConfigParser() self.name = PATH+os.sep+"conf"+os.sep+"system.ini" def init_config(self): # 初始化配置文件,ip :客戶端IP,port:客戶端端口 if not os.path.exists(self.name): self.config["config"] = {"ip": "localhost", "port": 1234} self.config.write(open(self.name, "w", encoding="utf-8", )) def get_config(self, head="config"): ''' 獲取配置文件數據 :param head: 配置文件的section,默認取初始化文件config的數據 :return:返回head中的全部數據(列表) ''' self.init_config() # 取文件數據以前生成配置文件 self.config.read(self.name, encoding="utf-8") if self.config.has_section(head): section = self.config.sections() return self.config.items(section[0])