断点续传文件到服务器 使用 paramiko 库实现 SFTP 断点续传
2026-02-02
Python
00
bash
展开代码
#!/usr/bin/env python3 """ 断点续传文件到服务器 使用 paramiko 库实现 SFTP 断点续传 """ import os import sys import paramiko from tqdm import tqdm def get_remote_file_size(sftp, remote_path): """获取远程文件大小""" try: return sftp.stat(remote_path).st_size except FileNotFoundError: return 0 def resume_upload(local_file, server, remote_path, username, password=None, key_file=None): """ 断点续传文件到服务器 Args: local_file: 本地文件路径 server: 服务器地址 remote_path: 远程文件路径 username: 用户名 password: 密码(可选) key_file: SSH密钥文件(可选) """ # 获取本地文件大小 local_size = os.path.getsize(local_file) print(f"本地文件大小: {local_size / (1024**3):.2f} GB") # 连接服务器 print(f"连接到服务器 {server}...") ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: if key_file: ssh.connect(server, username=username, key_filename=key_file) elif password: ssh.connect(server, username=username, password=password) else: # 尝试使用默认的 SSH 密钥或 SSH agent ssh.connect(server, username=username) sftp = ssh.open_sftp() # 获取远程已传输的大小 remote_size = get_remote_file_size(sftp, remote_path) print(f"远程文件大小: {remote_size / (1024**3):.2f} GB") if remote_size >= local_size: print("文件已完全传输!") return # 断点续传 print(f"从 {remote_size / (1024**3):.2f} GB 处继续传输...") with open(local_file, 'rb') as local_fp: # 跳到断点位置 local_fp.seek(remote_size) # 以追加模式打开远程文件 with sftp.file(remote_path, 'ab') as remote_fp: # 创建进度条 with tqdm(total=local_size, initial=remote_size, unit='B', unit_scale=True, unit_divisor=1024, desc="上传进度") as pbar: # 分块上传 chunk_size = 1024 * 1024 # 1MB while True: chunk = local_fp.read(chunk_size) if not chunk: break remote_fp.write(chunk) pbar.update(len(chunk)) print("\n✓ 文件传输完成!") # 验证文件大小 final_remote_size = get_remote_file_size(sftp, remote_path) if final_remote_size == local_size: print(f"✓ 验证成功: 远程文件大小 {final_remote_size} == 本地文件大小 {local_size}") else: print(f"✗ 警告: 远程文件大小 {final_remote_size} != 本地文件大小 {local_size}") except Exception as e: print(f"错误: {e}") sys.exit(1) finally: sftp.close() ssh.close() if __name__ == "__main__": # 配置参数 LOCAL_FILE = r"D:\process_mp4.zip" SERVER = "74.48.179.187" REMOTE_PATH = "/root/process_mp4.zip" USERNAME = "root" # 检查本地文件是否存在 if not os.path.exists(LOCAL_FILE): print(f"错误: 本地文件不存在: {LOCAL_FILE}") sys.exit(1) print("=" * 60) print("断点续传工具") print("=" * 60) print(f"本地文件: {LOCAL_FILE}") print(f"服务器: {SERVER}") print(f"远程路径: {REMOTE_PATH}") print("=" * 60) print("\n提示: 需要输入服务器密码(如果没有配置SSH密钥)") # 提示输入密码(可选) import getpass password = getpass.getpass("请输入SSH密码(直接回车跳过,使用SSH密钥): ") if not password: password = None # 执行断点续传 resume_upload(LOCAL_FILE, SERVER, REMOTE_PATH, USERNAME, password=password)
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Dong

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!