如今工具很差用,用的pycharm自動同步,但對於git拉下來的新文件不能自動上傳到linux,只有本身編輯過或者手動ctrl + s的文件纔會自動同步。致使爲了避免遺漏文件,常常須要全量上傳,速度很是慢。python
因爲常常須要在windows的pycharm上直接使用linux解釋器,要快速測試,頻繁在本機和linux用git push pull不方便,測試環境是 用的git,但開發時候仍是直接映射文件夾同步比使用git更方便。linux
採用了鏈接池的方式,比單線程單linux連接,一個一個的上傳體積很小的碎片時候,文件上傳速度提升了數十倍。git
""" 自動同步文件夾到linux機器 """ import json import os import queue import re import time from collections import OrderedDict from pathlib import Path import paramiko from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler class LinuxSynchronizer(LoggerMixinDefaultWithFileHandler): def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000, path_pattern_exluded_tuple=('/.git/', '/.idea/'), only_upload_within_the_last_modify_time=7 * 24 * 60 * 60, cycle_interval=10, ): """ :param host: :param port: :param username: :param password: :param local_dir: :param remote_dir: :param file_suffix_tuple_exluded: 排除以這些結尾的文件 :param file_volume_limit: 最大文件容量可以限制,若是超過此大小,則該文件不上傳 :param path_pattern_exluded_tuple: 更強大的文件排除功能,比光排除以什麼後綴結尾更強大靈活 :param only_upload_within_the_last_modify_time: 只上傳離當前時間最晚修改時間之後的文件 :param cycle_interval: 每隔多少秒掃描一次須要上傳的文件。 """ self._host = host self._port = port self._username = username self._password = password self._local_dir = str(local_dir).replace('\\', '/') self._remote_dir = remote_dir self._file_suffix_tuple_exluded = file_suffix_tuple_exluded self._path_pattern_exluded_tuple = path_pattern_exluded_tuple self._only_upload_within_the_last_modify_time = only_upload_within_the_last_modify_time self._cycle_interval = cycle_interval self._file_volume_limit = file_volume_limit self.filename__filesize_map = dict() self.filename__st_mtime_map = dict() self.build_connect() # noinspection PyAttributeOutsideInit def build_connect(self): self.logger.warning('創建linux鏈接') # noinspection PyTypeChecker t = paramiko.Transport((self._host, self._port)) t.connect(username=self._username, password=self._password) self.sftp = paramiko.SFTPClient.from_transport(t) ssh = paramiko.SSHClient() ssh.load_system_host_keys() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True) self.ssh = ssh # @decorators.tomorrow_threads(1) def ftp_upload(self, file: str): # file = file.replace('\\', '/') pattern_str = self._local_dir file_remote = file.replace(pattern_str, self._remote_dir) # self.logger.debug((file, file_remote)) for _ in range(10): try: time_start = time.time() self.sftp.put(file, file_remote) self.logger.debug(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}') break except FileNotFoundError: cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/') self.logger.info(cmd) tdin, stdout, stderr = self.ssh.exec_command(cmd) stderr_bytes = stderr.read() # self.logger.debug(stderr_bytes) if stderr_bytes != b'': self.logger.debug(stderr_bytes) except OSError as e: self.logger.exception(e) pass self.build_connect() # OSError: Socket is closed def _judge_need_filter_a_file(self, filename: str): ext = filename.split('.')[-1] if '.' + ext in self._file_suffix_tuple_exluded: return True for path_pattern_exluded in self._path_pattern_exluded_tuple: if re.search(path_pattern_exluded, filename): return True return False def find_all_files_meet_the_conditions(self): total_volume = 0 self.filename__filesize_map.clear() for parent, dirnames, filenames in os.walk(self._local_dir): for filename in filenames: file_full_name = os.path.join(parent, filename).replace('\\', '/') if not self._judge_need_filter_a_file(file_full_name): # self.logger.debug(os.stat(file_full_name).st_mtime) file_st_mtime = os.stat(file_full_name).st_mtime volume = os.path.getsize(file_full_name) if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60): self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str} self.filename__st_mtime_map[file_full_name] = file_st_mtime total_volume += volume filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict() for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']): filename__filesize_map_ordered_by_lsat_modify_time[k] = v self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time self.logger.warning(f'須要上傳的全部文件數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,文件分別是 {json.dumps(self.filename__filesize_map, indent=4)}') @decorators.tomorrow_threads(10) def start_upload_files(self): decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)() def _start_upload_files(self): with decorators.TimerContextManager(): self.find_all_files_meet_the_conditions() for file in self.filename__filesize_map: self.ftp_upload(file) self.logger.warn('完成')
""" 自動同步文件夾到linux機器 這個更犀利,採用了鏈接池 加線程池,上傳大量碎片文件的速度大幅提高。 """ import hashlib import json import os from threading import Thread import queue import re import shutil import filecmp import time from collections import OrderedDict from pathlib import Path from typing import Union import paramiko from paramiko import SSHException from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler, nb_print, BoundedThreadPoolExecutor class LocalCopier(LoggerMixinDefaultWithFileHandler): """ 本地的兩個文件夾之間的同步 """ def __init__(self, local_dir, remote_dir, *args, **kwargs): self._local_dir = str(local_dir).replace('\\', '/') self._remote_dir = str(remote_dir).replace('\\', '/') self.logger_extra_suffix = '本地windows間複製' def upload(self, file: str): file_remote = file.replace(self._local_dir, self._remote_dir) if not Path(file_remote).parent.exists(): os.makedirs(str(Path(file_remote).parent)) # if self.get_file_md5(Path(file).open('rb')) != self.get_file_md5(Path(file_remote).open('rb')) : if not Path(file_remote).exists() or not filecmp.cmp(file, file_remote): shutil.copyfile(file, file_remote) self.logger.info(f'從 {file} 複製成功到{file_remote} ,大小是 {round(os.path.getsize(file) / 1024)} kb') else: self.logger.debug(f'{file} 不復制到 {file_remote} 沒有變化。') @staticmethod def get_file_md5(file): m = hashlib.md5() while True: # 若是不用二進制打開文件,則須要先編碼 # data = f.read(1024).encode('utf-8') data = file.read(1024) # 將文件分塊讀取 if not data: break m.update(data) return m.hexdigest() @decorators.flyweight class LinuxConnectionPool(LoggerMixinDefaultWithFileHandler): def __init__(self, host, port, username, password): # 對相同的連接參數作了享元模式保存鏈接池。 self.logger_extra_suffix = host self.logger.warning(f'初始化linux鏈接池{host}') self._host = host self._port = port self._username = username self._password = password self.queue_sftp_free = queue.Queue(100) self.queue_ssh_free = queue.Queue(100) self.build_connect() @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=0) def build_sftp(self): self.logger.warning(f'創建linux sftp鏈接中。。。') t_start = time.time() # noinspection PyTypeChecker t = paramiko.Transport((self._host, self._port)) t.connect(username=self._username, password=self._password) sftp = paramiko.SFTPClient.from_transport(t) self.queue_sftp_free.put(sftp) self.logger.warning(f'創建linux sftp鏈接耗時 {round(time.time() - t_start, 2)}') @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=1) def bulid_ssh(self): self.logger.warning(f'創建linux ssh鏈接中。。。。') t_start = time.time() ssh = paramiko.SSHClient() ssh.load_system_host_keys() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True) self.queue_ssh_free.put(ssh) self.logger.warning(f'創建linux ssh鏈接耗時 {round(time.time() - t_start, 2)}') def build_connect(self): # decorators.tomorrow_threads(10)(self._build_sftp)() # decorators.tomorrow_threads(10)(self.__class__._bulid_ssh)(self) def _inner(): executor = BoundedThreadPoolExecutor(100) for _ in range(10): time.sleep(0.2) executor.submit(self.build_sftp) for _ in range(3): time.sleep(0.5) executor.submit(self.bulid_ssh) Thread(target=_inner).start() def borrow_sftp(self): return self.queue_sftp_free.get() def borrow_ssh(self): return self.queue_ssh_free.get() def back_sftp(self, sftp): self.queue_sftp_free.put(sftp) def back_ssh(self, ssh): self.queue_ssh_free.put(ssh) class LinuxRemoteUploader(LocalCopier): """ windows同步到linux。 """ def __init__(self, local_dir, remote_dir, host, port, username, password): super().__init__(local_dir, remote_dir) self.logger_extra_suffix = host self.linux_conn_pool = LinuxConnectionPool(host, port, username, password) def _do_mkdir_operation(self, file_remote): cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/') self.logger.info(cmd) ssh = self.linux_conn_pool.borrow_ssh() try: tdin, stdout, stderr = ssh.exec_command(cmd) except SSHException: self.linux_conn_pool.bulid_ssh() except Exception as e: self.logger.exception(e) else: stderr_bytes = stderr.read() # self.logger.debug(stderr_bytes) if stderr_bytes != b'': self.logger.debug(stderr_bytes) self.linux_conn_pool.back_ssh(ssh) @decorators.tomorrow_threads(19) def upload(self, file: str): self.logger.debug(f'sftp空閒連接數量 {self.linux_conn_pool.queue_sftp_free.qsize()}, ssh空閒連接數量 {self.linux_conn_pool.queue_ssh_free.qsize()}') # file = file.replace('\\', '/') pattern_str = self._local_dir file_remote = file.replace(pattern_str, self._remote_dir) # self.logger.debug((file, file_remote)) for _ in range(10): sftp = self.linux_conn_pool.borrow_sftp() try: time_start = time.time() sftp.put(file, file_remote) self.logger.info(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}') self.linux_conn_pool.back_sftp(sftp) # self.linux_conn_pool.logger.debug((self.linux_conn_pool.queue_sftp_free.qsize(),self.linux_conn_pool.queue_ssh_free.qsize())) break except FileNotFoundError: self._do_mkdir_operation(file_remote) self.linux_conn_pool.back_sftp(sftp) except (OSError, SSHException) as e: self.logger.exception(e) self.linux_conn_pool.build_sftp() # OSError: Socket is closed class Synchronizer(LoggerMixinDefaultWithFileHandler): def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000, path_pattern_exluded_tuple=('/.git/', '/.idea/', 'cnbooking_all.json'), only_upload_within_the_last_modify_time='7 * 24 * 60 * 60', cycle_interval=2, just_windows_copy=False): """ :param host: :param port: :param username: :param password: :param local_dir: :param remote_dir: :param file_suffix_tuple_exluded: 排除以這些結尾的文件。 :param file_volume_limit: 最大文件容量可以限制,若是超過此大小,則該文件不上傳 :param path_pattern_exluded_tuple: 更強大的文件排除功能,比光排除以什麼後綴結尾更強大靈活,使用的是python正則表達式。 :param only_upload_within_the_last_modify_time: 只上傳離當前時間最晚修改時間之後的文件。 :param cycle_interval: 每隔多少秒掃描一次須要上傳的文件。 :param just_windows_copy: 執行windows不一樣文件夾之間的複製,不上傳linux。 """ self.logger_extra_suffix = host if not just_windows_copy else '本地' self._local_dir = str(local_dir).replace('\\', '/') self._file_suffix_tuple_exluded = file_suffix_tuple_exluded self._path_pattern_exluded_tuple = path_pattern_exluded_tuple self._only_upload_within_the_last_modify_time = self._compute_result(only_upload_within_the_last_modify_time) self._cycle_interval = cycle_interval self._file_volume_limit = self._compute_result(file_volume_limit) self.filename__filesize_map = dict() self.filename__st_mtime_map = dict() self._just_windows_copy = just_windows_copy self.uploader = LinuxRemoteUploader(local_dir, remote_dir, host, port, username, password) if not just_windows_copy else LocalCopier(local_dir, remote_dir, host, port, username, password) @staticmethod def _compute_result(sth: Union[str, int]): return sth if isinstance(sth, int) else eval(sth) def _judge_need_filter_a_file(self, filename: str): ext = filename.split('.')[-1] if '.' + ext in self._file_suffix_tuple_exluded: return True for path_pattern_exluded in self._path_pattern_exluded_tuple: if re.search(path_pattern_exluded, filename): return True return False def find_all_files_meet_the_conditions(self): t_start = time.time() total_volume = 0 self.filename__filesize_map.clear() for parent, dirnames, filenames in os.walk(self._local_dir): for filename in filenames: file_full_name = os.path.join(parent, filename).replace('\\', '/') if not self._judge_need_filter_a_file(file_full_name): # self.logger.debug(os.stat(file_full_name).st_mtime) file_st_mtime = os.stat(file_full_name).st_mtime volume = os.path.getsize(file_full_name) if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60): if self.filename__st_mtime_map.get(file_full_name, None) != file_st_mtime: self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str} self.filename__st_mtime_map[file_full_name] = file_st_mtime total_volume += volume filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict() for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']): filename__filesize_map_ordered_by_lsat_modify_time[k] = v self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time if len(self.filename__filesize_map) > 0: self.logger.warning(f'須要{"複製" if self._just_windows_copy else "上傳"} 的全部文件數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,' f'查找文件耗時 {round(time.time() - t_start, 2)} 秒,文件分別是 {json.dumps(self.filename__filesize_map, indent=4)}') # @decorators.tomorrow_threads(10) def start_upload_files(self): Thread(target=decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)).start() def _start_upload_files(self): self.find_all_files_meet_the_conditions() for file in self.filename__filesize_map: self.uploader.upload(file) # noinspection PyPep8 if __name__ == '__main__': """ 配置裏面的內容格式以下,支持同步多個文件夾映射。 [ { "host": "112.90.xx.xx", "port": 10005, "username": "root", "password": "@0^Lc97MewI3i7xxxxxx", "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares", "remote_dir": "/home/ydf/hotelf15", "file_suffix_tuple_exluded": [ ".pyc", ".log", ".gz" ], "path_pattern_exluded_tuple": [ "/.git/", "/.idea/", "cnbooking_cn_all.json" ], "only_upload_within_the_last_modify_time": "365 * 24 * 3600", "file_volume_limit": "2 * 1000 * 1000", "cycle_interval": 10 } ] """ for config_item in json.load(Path('/windows_to_linux_syn_config.json').open()): nb_print(json.dumps(config_item)) Synchronizer(**config_item).start_upload_files() # sc create PythonApp6 binPath= "D:\Users\ydf\Desktop\oschina\coding\hotel_fares\dist\windows_to_linux_syn2\windows_to_linux_syn2.exe" # pyinstaller --distpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --workpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --specpath=D:\Users\ydf\Desktop\oschina\specify_pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py # 可使用pyinstaller打包這個文件。先添加PYTHONPATH變量,在另外的文件夾執行這個命令。 # pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py # cd .. # set PYTHONPATH=D:\coding2\hotel_fares # pyinstaller -F --icon="D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py # 測試更新。。。。。。.
配置裏面的內容以下。正則表達式
[ { "host": "112.xx.89.16", "port": 10033, "username": "root", "password": "xxxx", "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares", "remote_dir": "/home/ydf/hotelf18", "file_suffix_tuple_exluded": [ ".pyc", ".log", ".gz" ], "path_pattern_exluded_tuple": [ "/.git/", "/.idea/", "cnbooking_cn_all.json" ], "only_upload_within_the_last_modify_time": "30 * 24 * 3600", "file_volume_limit": "2 * 1000 * 1000", "cycle_interval": 1 }, { "host": "112.90.xx.16", "port": 10033, "username": "root", "password": "xxxx", "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\movie_data", "remote_dir": "/home/ydf/movie_data2", "file_suffix_tuple_exluded": [ ".pyc", ".log", ".gz" ], "path_pattern_exluded_tuple": [ "/.git/", "/.idea/", "cnbooking_cn_all.json" ], "only_upload_within_the_last_modify_time": "30 * 24 * 3600", "file_volume_limit": "2 * 1000 * 1000", "cycle_interval": 1 } ]
第一次運行是對指定最晚修改間以內的文件進行全量上傳,以後是每隔2秒(由json文件動態配置)檢查一次,將最近 10分鐘以內變化的文件,上傳到linux。json