123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- import os
- import sys
- import time
- import hashlib
- import asyncssh
- import asyncio
- from tqdm import tqdm
- from loguru import logger
- from file import get_files_to_upload
- # 配置日志输出到文件和控制台
- logger.add("output.log")
- logger.add(sys.stderr, level="WARNING")
- # 上传文件的函数
- async def upload_file(local_path, local_md5, remote_path, sftp):
- file_size = os.path.getsize(local_path)
- async with await sftp.open(remote_path, 'wb') as file:
- bytes_uploaded = 0
- with open(local_path, 'rb') as local_file:
- with tqdm(total=file_size, unit='B', unit_scale=True, ncols=80,
- desc=os.path.basename(local_path)) as progress_bar:
- while True:
- chunk = local_file.read(65536) # 读取文件块
- if not chunk:
- break
- await file.write(chunk) # 写入文件块
- bytes_uploaded += len(chunk)
- progress_bar.update(len(chunk))
- if bytes_uploaded == file_size:
- remote_md5 = await asyncio.wait_for(calculate_remote_md5(sftp, remote_path), timeout=calculate_timeout) # 设置超时时间为5秒
- if remote_md5 == local_md5:
- await write_remote_meta_md5(remote_path, remote_md5, sftp)
- logger.info("文件上传成功")
- else:
- logger.info("本地与远程MD5不一致,请检查上传过程中是否发生错误!")
- else:
- logger.info("文件上传失败,请检查文件后再试验。")
- # 递归创建远程目录
- async def create_remote_directory_recursive(directory, sftp):
- parent_dir = os.path.dirname(directory)
- try:
- await sftp.stat(parent_dir)
- except asyncssh.SFTPError as e:
- if "No such file" in str(e):
- await create_remote_directory_recursive(parent_dir, sftp)
- else:
- logger.error(f"检查远程目录失败: {parent_dir}. 错误信息: {e}")
- return
- try:
- await sftp.mkdir(directory)
- except asyncssh.SFTPError as e:
- logger.error(f"创建远程目录失败: {directory}. 错误信息: {e}")
- # 创建一个锁对象
- lock = asyncio.Lock()
- # 创建目录并上传文件的函数
- async def create_directory_and_upload(local_path, local_md5, remote_path, sftp):
- directory = os.path.dirname(remote_path)
- # 检查远程目录是否存在,如果不存在则创建
- try:
- await lock.acquire() # 获取锁对象
- attempts = 5 # 尝试次数
- while attempts > 0:
- try:
- await sftp.stat(directory)
- break # 目录已存在,跳出循环
- except asyncssh.SFTPError as e:
- if "No such file" in str(e):
- await create_remote_directory_recursive(directory, sftp)
- break # 目录创建成功,跳出循环
- elif "Failure" in str(e): # 这里可以根据实际情况修改判断条件
- await asyncio.sleep(5) # 等待5秒
- attempts -= 1
- else:
- logger.error(f"检查远程目录失败: {directory}. 错误信息: {e}")
- return
- finally:
- lock.release()
- # 上传文件
- for attempt in range(3): # 最多尝试3次上传操作
- try:
- await upload_file(local_path, local_md5, remote_path, sftp)
- break # 上传成功,退出循环
- except Exception as e:
- logger.error(f"上传文件失败: {os.path.basename(local_path)}。错误信息: {e}")
- if attempt < 2:
- logger.info("重新尝试上传文件...")
- else:
- logger.info("多次上传文件失败.")
- return
- async def compare_md5(local_path, remote_path, sftp):
- logger.info(f"正在比对文件:{os.path.basename(local_path)}中")
- try:
- local_md5 = calculate_md5(local_path)
- # 获取远程meta文件中的md5
- remote_meta_md5 = await get_remote_meta_md5(remote_path, sftp)
- if remote_meta_md5 and local_md5 == remote_meta_md5:
- logger.info("Meta文件校验成功 - 跳过远程md5计算。")
- return
- # 计算远程MD5码,并设置较长的超时时间
- remote_md5 = await asyncio.wait_for(calculate_remote_md5(sftp, remote_path), timeout=calculate_timeout) # 设置超时时间为5秒
- if remote_md5 is None:
- # 执行上传操作
- logger.info(f'准备文件上传 - 文件名为:{os.path.basename(local_path)}')
- await create_directory_and_upload(local_path, local_md5, remote_path, sftp)
- elif local_md5 != remote_md5:
- logger.info(f"MD5校验失败 - 校验结果为:{local_md5} - {remote_md5}")
- # 删除远程文件
- for attempt in range(3): # 最多尝试3次删除操作
- try:
- await sftp.remove(remote_path)
- logger.info(f'删除远程文件成功:{remote_path}')
- break # 删除成功,退出循环
- except Exception as e:
- logger.error(f"删除远程文件失败: {remote_path}. 错误信息: {e}")
- if attempt < 2:
- logger.info("重新尝试删除远程文件...")
- else:
- return
- # 执行上传操作
- logger.info(f'准备文件上传 - 文件名为:{os.path.basename(local_path)}')
- await create_directory_and_upload(local_path, local_md5, remote_path, sftp)
- else:
- logger.info(f"MD5码校验成功 - 校验结果为:{local_md5} - {remote_md5}")
- await write_remote_meta_md5(remote_path, remote_md5, sftp)
- except asyncio.TimeoutError:
- logger.error(f"Timeout occurred while comparing MD5 for {local_path} and {remote_path}")
- return
- async def get_remote_meta_md5(remote_path, sftp):
- meta_path = f"{remote_path}.meta"
- try:
- async with await sftp.open(meta_path, 'r') as meta_file:
- meta_content = await meta_file.read()
- if 'MD5:' in meta_content:
- remote_md5 = meta_content.split('MD5:')[1].strip()
- return remote_md5
- except asyncssh.SFTPNoSuchFile:
- logger.warning(f"Meta文件不存在: {os.path.basename(meta_path)}")
- pass
- return None
- async def write_remote_meta_md5(remote_path, remote_md5, sftp):
- try:
- meta_content = f"MD5: {remote_md5}"
- meta_path = f"{remote_path}.meta"
- async with await sftp.open(meta_path, 'w') as meta_file:
- await meta_file.write(meta_content)
- logger.info(f'Meta文件创建成功:{os.path.basename(meta_path)}')
- except Exception as e:
- logger.error(f"写入远程.meta文件失败:{remote_path}. 错误信息:{e}")
- async def calculate_remote_md5(sftp, remote_path):
- hasher = hashlib.md5()
- try:
- async with sftp.open(remote_path, 'rb') as file:
- while True:
- data = await file.read(65536) # Read the file in chunks of 64KB
- if not data:
- break
- hasher.update(data)
- except asyncssh.SFTPNoSuchFile:
- logger.warning(f"远程文件不存在: {os.path.basename(remote_path)}")
- return None
- return hasher.hexdigest()
- def calculate_md5(file_path):
- hasher = hashlib.md5()
- with open(file_path, 'rb') as file:
- while True:
- data = file.read(65536) # Read the file in chunks of 64KB
- if not data:
- break
- hasher.update(data)
- return hasher.hexdigest()
- # Example usage:
- hostname = '192.168.188.252'
- username = 'neozhang'
- password = '58920912a'
- local_directory = 'E:\\AI\\stable-diffusion-webui-master\\models\\Stable-diffusion'
- remote_directory = '/home/neozhang/work/project/stable-diffusion-webui/models/Stable-diffusion'
- file_extension = '*'
- depth = -1 # 包括多少级子目录下的文件(-1代表包括所有子目录)
- calculate_timeout = 1000 # 计算远程MD5的超时时间
- coroutine_count = 5
- files_to_upload = get_files_to_upload(local_directory, remote_directory, depth, file_extension)
- logger.info(files_to_upload)
- async def main():
- logger.info('======================================================================')
- async with asyncssh.connect(hostname, username=username, password=password, known_hosts=None) as conn:
- sftp = await conn.start_sftp_client()
- sem = asyncio.Semaphore(coroutine_count) # 设置最大并发协程数量为5
- async def limited_compare_md5(local_path, remote_path):
- async with sem:
- await compare_md5(local_path, remote_path, sftp)
- coroutines = [limited_compare_md5(file_info['local_path'], file_info['remote_path'])
- for file_info in files_to_upload]
- await asyncio.gather(*coroutines)
- # Run the main coroutine
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main())
|