用python作的windows和linx文件夾同步。解決自動同步、加快傳輸大量小文件的速度、更豐富的文件上傳過濾設置。

如今工具很差用,用的pycharm自動同步,但對於git拉下來的新文件不能自動上傳到linux,只有本身編輯過或者手動ctrl + s的文件纔會自動同步。致使爲了避免遺漏文件,常常須要全量上傳,速度很是慢。python

因爲常常須要在windows的pycharm上直接使用linux解釋器,要快速測試,頻繁在本機和linux用git push pull不方便,測試環境是 用的git,但開發時候仍是直接映射文件夾同步比使用git更方便。linux

採用了鏈接池的方式,比單線程單linux連接,一個一個的上傳體積很小的碎片時候,文件上傳速度提升了數十倍。git

 

單linux鏈接上傳。

"""
自動同步文件夾到linux機器
"""
import json
import os
import queue
import re
import time
from collections import OrderedDict
from pathlib import Path
import paramiko
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler


class LinuxSynchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/'), only_upload_within_the_last_modify_time=7 * 24 * 60 * 60, cycle_interval=10, ):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以這些結尾的文件
        :param file_volume_limit: 最大文件容量可以限制,若是超過此大小,則該文件不上傳
        :param path_pattern_exluded_tuple: 更強大的文件排除功能,比光排除以什麼後綴結尾更強大靈活
        :param only_upload_within_the_last_modify_time: 只上傳離當前時間最晚修改時間之後的文件
        :param cycle_interval: 每隔多少秒掃描一次須要上傳的文件。
        """
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = remote_dir
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = only_upload_within_the_last_modify_time
        self._cycle_interval = cycle_interval
        self._file_volume_limit = file_volume_limit
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self.build_connect()

    # noinspection PyAttributeOutsideInit
    def build_connect(self):
        self.logger.warning('創建linux鏈接')
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        self.sftp = paramiko.SFTPClient.from_transport(t)

        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self.ssh = ssh

    # @decorators.tomorrow_threads(1)
    def ftp_upload(self, file: str):
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))
        for _ in range(10):
            try:
                time_start = time.time()
                self.sftp.put(file, file_remote)
                self.logger.debug(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
                break
            except FileNotFoundError:
                cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
                self.logger.info(cmd)
                tdin, stdout, stderr = self.ssh.exec_command(cmd)
                stderr_bytes = stderr.read()
                # self.logger.debug(stderr_bytes)
                if stderr_bytes != b'':
                    self.logger.debug(stderr_bytes)
            except OSError as e:
                self.logger.exception(e)
                pass
                self.build_connect()     # OSError: Socket is closed

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                        self.filename__st_mtime_map[file_full_name] = file_st_mtime
                        total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        self.logger.warning(f'須要上傳的全部文件數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,文件分別是 {json.dumps(self.filename__filesize_map, indent=4)}')

    @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)()

    def _start_upload_files(self):
        with decorators.TimerContextManager():
            self.find_all_files_meet_the_conditions()
            for file in self.filename__filesize_map:
                self.ftp_upload(file)
            self.logger.warn('完成')

 

 

採用了鏈接池 加多線程上傳

 

"""
自動同步文件夾到linux機器
這個更犀利,採用了鏈接池 加線程池,上傳大量碎片文件的速度大幅提高。
"""
import hashlib
import json
import os
from threading import Thread
import queue
import re
import shutil
import filecmp
import time
from collections import OrderedDict
from pathlib import Path
from typing import Union
import paramiko
from paramiko import SSHException
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler, nb_print, BoundedThreadPoolExecutor


class LocalCopier(LoggerMixinDefaultWithFileHandler):
    """
    本地的兩個文件夾之間的同步
    """

    def __init__(self, local_dir, remote_dir, *args, **kwargs):
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = str(remote_dir).replace('\\', '/')
        self.logger_extra_suffix = '本地windows間複製'

    def upload(self, file: str):
        file_remote = file.replace(self._local_dir, self._remote_dir)
        if not Path(file_remote).parent.exists():
            os.makedirs(str(Path(file_remote).parent))
        # if self.get_file_md5(Path(file).open('rb')) != self.get_file_md5(Path(file_remote).open('rb')) :
        if not Path(file_remote).exists() or not filecmp.cmp(file, file_remote):
            shutil.copyfile(file, file_remote)
            self.logger.info(f'從 {file} 複製成功到{file_remote} ,大小是 {round(os.path.getsize(file) / 1024)} kb')
        else:
            self.logger.debug(f'{file} 不復制到 {file_remote} 沒有變化。')

    @staticmethod
    def get_file_md5(file):
        m = hashlib.md5()
        while True:
            # 若是不用二進制打開文件,則須要先編碼
            # data = f.read(1024).encode('utf-8')
            data = file.read(1024)  # 將文件分塊讀取
            if not data:
                break
            m.update(data)
        return m.hexdigest()


@decorators.flyweight
class LinuxConnectionPool(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password):  # 對相同的連接參數作了享元模式保存鏈接池。
        self.logger_extra_suffix = host
        self.logger.warning(f'初始化linux鏈接池{host}')
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self.queue_sftp_free = queue.Queue(100)
        self.queue_ssh_free = queue.Queue(100)
        self.build_connect()

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=0)
    def build_sftp(self):
        self.logger.warning(f'創建linux sftp鏈接中。。。')
        t_start = time.time()
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        sftp = paramiko.SFTPClient.from_transport(t)
        self.queue_sftp_free.put(sftp)
        self.logger.warning(f'創建linux sftp鏈接耗時 {round(time.time() - t_start, 2)}')

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=1)
    def bulid_ssh(self):
        self.logger.warning(f'創建linux ssh鏈接中。。。。')
        t_start = time.time()
        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self.queue_ssh_free.put(ssh)
        self.logger.warning(f'創建linux ssh鏈接耗時 {round(time.time() - t_start, 2)}')

    def build_connect(self):
        # decorators.tomorrow_threads(10)(self._build_sftp)()
        # decorators.tomorrow_threads(10)(self.__class__._bulid_ssh)(self)
        def _inner():
            executor = BoundedThreadPoolExecutor(100)
            for _ in range(10):
                time.sleep(0.2)
                executor.submit(self.build_sftp)
            for _ in range(3):
                time.sleep(0.5)
                executor.submit(self.bulid_ssh)

        Thread(target=_inner).start()

    def borrow_sftp(self):
        return self.queue_sftp_free.get()

    def borrow_ssh(self):
        return self.queue_ssh_free.get()

    def back_sftp(self, sftp):
        self.queue_sftp_free.put(sftp)

    def back_ssh(self, ssh):
        self.queue_ssh_free.put(ssh)


class LinuxRemoteUploader(LocalCopier):
    """
    windows同步到linux。
    """

    def __init__(self, local_dir, remote_dir, host, port, username, password):
        super().__init__(local_dir, remote_dir)
        self.logger_extra_suffix = host
        self.linux_conn_pool = LinuxConnectionPool(host, port, username, password)

    def _do_mkdir_operation(self, file_remote):
        cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
        self.logger.info(cmd)
        ssh = self.linux_conn_pool.borrow_ssh()
        try:
            tdin, stdout, stderr = ssh.exec_command(cmd)
        except SSHException:
            self.linux_conn_pool.bulid_ssh()
        except Exception as e:
            self.logger.exception(e)
        else:
            stderr_bytes = stderr.read()
            # self.logger.debug(stderr_bytes)
            if stderr_bytes != b'':
                self.logger.debug(stderr_bytes)
            self.linux_conn_pool.back_ssh(ssh)

    @decorators.tomorrow_threads(19)
    def upload(self, file: str):
        self.logger.debug(f'sftp空閒連接數量  {self.linux_conn_pool.queue_sftp_free.qsize()},  ssh空閒連接數量 {self.linux_conn_pool.queue_ssh_free.qsize()}')
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))

        for _ in range(10):
            sftp = self.linux_conn_pool.borrow_sftp()
            try:
                time_start = time.time()
                sftp.put(file, file_remote)
                self.logger.info(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
                self.linux_conn_pool.back_sftp(sftp)
                # self.linux_conn_pool.logger.debug((self.linux_conn_pool.queue_sftp_free.qsize(),self.linux_conn_pool.queue_ssh_free.qsize()))
                break
            except FileNotFoundError:
                self._do_mkdir_operation(file_remote)
                self.linux_conn_pool.back_sftp(sftp)
            except (OSError, SSHException) as e:
                self.logger.exception(e)
                self.linux_conn_pool.build_sftp()  # OSError: Socket is closed


class Synchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/', 'cnbooking_all.json'), only_upload_within_the_last_modify_time='7 * 24 * 60 * 60', cycle_interval=2, just_windows_copy=False):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以這些結尾的文件。
        :param file_volume_limit: 最大文件容量可以限制,若是超過此大小,則該文件不上傳
        :param path_pattern_exluded_tuple: 更強大的文件排除功能,比光排除以什麼後綴結尾更強大靈活,使用的是python正則表達式。
        :param only_upload_within_the_last_modify_time: 只上傳離當前時間最晚修改時間之後的文件。
        :param cycle_interval: 每隔多少秒掃描一次須要上傳的文件。
        :param just_windows_copy: 執行windows不一樣文件夾之間的複製,不上傳linux。
        """
        self.logger_extra_suffix = host if not just_windows_copy else '本地'
        self._local_dir = str(local_dir).replace('\\', '/')
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = self._compute_result(only_upload_within_the_last_modify_time)
        self._cycle_interval = cycle_interval
        self._file_volume_limit = self._compute_result(file_volume_limit)
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self._just_windows_copy = just_windows_copy
        self.uploader = LinuxRemoteUploader(local_dir, remote_dir, host, port, username, password) if not just_windows_copy else LocalCopier(local_dir, remote_dir, host, port, username, password)

    @staticmethod
    def _compute_result(sth: Union[str, int]):
        return sth if isinstance(sth, int) else eval(sth)

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        t_start = time.time()
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name
                                                                                                                                             not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        if self.filename__st_mtime_map.get(file_full_name, None) != file_st_mtime:
                            self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                            self.filename__st_mtime_map[file_full_name] = file_st_mtime
                            total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        if len(self.filename__filesize_map) > 0:
            self.logger.warning(f'須要{"複製"  if self._just_windows_copy else "上傳"} 的全部文件數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,'
                                f'查找文件耗時 {round(time.time() - t_start, 2)} 秒,文件分別是 {json.dumps(self.filename__filesize_map, indent=4)}')

    # @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        Thread(target=decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)).start()

    def _start_upload_files(self):
        self.find_all_files_meet_the_conditions()
        for file in self.filename__filesize_map:
            self.uploader.upload(file)


# noinspection PyPep8
if __name__ == '__main__':
    """
    配置裏面的內容格式以下,支持同步多個文件夾映射。
    [
      {
        "host": "112.90.xx.xx",
        "port": 10005,
        "username": "root",
        "password": "@0^Lc97MewI3i7xxxxxx",
        "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
        "remote_dir": "/home/ydf/hotelf15",
        "file_suffix_tuple_exluded": [
          ".pyc",
          ".log",
          ".gz"
        ],
        "path_pattern_exluded_tuple": [
          "/.git/",
          "/.idea/",
          "cnbooking_cn_all.json"
        ],
        "only_upload_within_the_last_modify_time": "365 * 24 * 3600",
        "file_volume_limit": "2 * 1000 * 1000",
        "cycle_interval": 10
      }
    ]
    """

    for config_item in json.load(Path('/windows_to_linux_syn_config.json').open()):
        nb_print(json.dumps(config_item))
        Synchronizer(**config_item).start_upload_files()

    # sc create PythonApp6 binPath= "D:\Users\ydf\Desktop\oschina\coding\hotel_fares\dist\windows_to_linux_syn2\windows_to_linux_syn2.exe"
    # pyinstaller --distpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --workpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --specpath=D:\Users\ydf\Desktop\oschina\specify_pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 可使用pyinstaller打包這個文件。先添加PYTHONPATH變量,在另外的文件夾執行這個命令。
    # pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py

    # cd ..
    # set PYTHONPATH=D:\coding2\hotel_fares
    # pyinstaller -F --icon="D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 測試更新。。。。。。.

 

 

 

 

 

 

 

 配置裏面的內容以下。正則表達式

[
  {
    "host": "112.xx.89.16",
    "port": 10033,
    "username": "root",
    "password": "xxxx",
    "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
    "remote_dir": "/home/ydf/hotelf18",
    "file_suffix_tuple_exluded": [
      ".pyc",
      ".log",
      ".gz"
    ],
    "path_pattern_exluded_tuple": [
      "/.git/",
      "/.idea/",
      "cnbooking_cn_all.json"
    ],
    "only_upload_within_the_last_modify_time": "30 * 24 * 3600",
    "file_volume_limit": "2 * 1000 * 1000",
    "cycle_interval": 1
  },
  {
    "host": "112.90.xx.16",
    "port": 10033,
    "username": "root",
    "password": "xxxx",
    "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\movie_data",
    "remote_dir": "/home/ydf/movie_data2",
    "file_suffix_tuple_exluded": [
      ".pyc",
      ".log",
      ".gz"
    ],
    "path_pattern_exluded_tuple": [
      "/.git/",
      "/.idea/",
      "cnbooking_cn_all.json"
    ],
    "only_upload_within_the_last_modify_time": "30 * 24 * 3600",
    "file_volume_limit": "2 * 1000 * 1000",
    "cycle_interval": 1
  }
]

 

 

 

 

 

 

 

 

第一次運行是對指定最晚修改間以內的文件進行全量上傳,以後是每隔2秒(由json文件動態配置)檢查一次,將最近 10分鐘以內變化的文件,上傳到linux。json

相關文章
相關標籤/搜索