2024-10-09
Linux运维
00

目录

使用 Docker 容器下载 laionaesthetics102433M9.parquet 文件示例
构建带有环境的 Docker 镜像
创建下载脚本 download_parquet.py
创建 Dockerfile
构建镜像
配置文件 config_part00059.json
执行下载

使用 Docker 容器下载 laion_aesthetics_1024_33M_9.parquet 文件示例

我下载了一个示例文件 laion_aesthetics_1024_33M_9.parquet 并存放在 /ssd/xiedong/parquet_test 目录中,接下来,我将通过启动 Docker 容器来完成该文件的数据下载。

核心思路是:每个容器配备一个配置文件,配置文件内定义了下载设置(如输入和输出文件位置等)。容器启动后会依据配置文件执行任务,任务完成后容器自动关闭。

构建带有环境的 Docker 镜像

首先,制作一个包含所需环境的镜像:

bash
docker run -it -v /ssd/xiedong/parquet_test:/ssd/xiedong/parquet_test buluma/python:3.10.4-ubuntu22.04 bash pip install img2dataset apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* pip cache purge docker commit e768addf27ec kevinchina/deeplearning:ubuntu22.python3.10.img2dataset

创建下载脚本 download_parquet.py

此脚本负责根据配置文件下载 Parquet 数据,并处理失败的下载任务重试:

python
import argparse import json import multiprocessing import os import pandas as pd from img2dataset import download import os import time def prepare_retry_download(url_list_dir, output_folder): # 定义重试下载逻辑,生成一个包含失败下载项的 Parquet 文件 src_in = os.path.dirname(url_list_dir) src_in = os.path.join(src_in, "retry_download") os.makedirs(src_in, exist_ok=True) parquet_file = os.path.join(src_in, "retry_download.parquet") if os.path.exists(parquet_file): print("已经存在 retry_download.parquet") return parquet_file # 从输出文件夹中查找需要重试的下载文件 files = [f for f in os.listdir(output_folder) if f.endswith(".parquet")] files_abs = sorted([os.path.join(output_folder, f) for f in files]) df = pd.read_parquet(files_abs[0], engine='pyarrow') df = df[df['status'] == "failed_to_download"] # 处理前1000个文件,合并失败记录 for file in files_abs[1:1000]: file_b = os.path.splitext(os.path.basename(file))[0] + "_stats.json" stats_file = os.path.join(output_folder, file_b) if not os.path.exists(stats_file): continue with open(stats_file, "r") as f: data = json.load(f) if data["successes"] / data["count"] > 0.60: continue df_tmp = pd.read_parquet(file, engine='pyarrow') df = pd.concat([df, df_tmp[df_tmp['status'] == "failed_to_download"]], axis=0) # 排除不需要重试的错误 exclude_errors = ["Max retries exceeded with url", "HTTP Error 400", "HTTP Error 403", "HTTP Error 404", "HTTP Error 415", "HTTP Error 500", "HTTP Error 501", "HTTP Error 502", "HTTP Error 503"] for e in exclude_errors: df = df[~df['error_message'].str.contains(e)] # 保留必要列并重命名 df = df.rename(columns={"caption": "TEXT", "url": "URL", "key": "KEY"})[["TEXT", "URL", "KEY"]] df.to_parquet(parquet_file, engine='pyarrow') return parquet_file def wait_for_env_variable(env_var_name, check_interval=5): # 等待环境变量的设置 while True: env_var_value = os.getenv(env_var_name) if env_var_value: print(f"找到环境变量 '{env_var_name}':{env_var_value}") return env_var_value else: print(f"等待环境变量 '{env_var_name}' 被设置...") time.sleep(check_interval) if __name__ == '__main__': # 等待环境变量 env_var_value = wait_for_env_variable("PFVAR") # 读取配置文件 with open(env_var_value, 'r', encoding='utf-8') as f: config = json.load(f) # 设置下载参数 url_list = config["url_list"] output_folder = config["output_folder"] thread_count = config.get("thread_count", 16) timeout = config.get("timeout", 120) number_sample_per_shard = config.get("number_sample_per_shard", 2000) print("并发请求数量:", multiprocessing.cpu_count() * thread_count) download( processes_count=multiprocessing.cpu_count(), thread_count=thread_count, url_list=url_list, output_folder=output_folder, image_size=1024, resize_mode="no", skip_reencode=True, output_format="webdataset", input_format="parquet", url_col="URL", caption_col="TEXT", enable_wandb=False, number_sample_per_shard=number_sample_per_shard, distributor="multiprocessing", min_image_size=128, max_aspect_ratio=5, incremental_mode="incremental", timeout=timeout, ) # 准备重试下载 parquet_file = prepare_retry_download(url_list, output_folder) retry_output_folder = os.path.join(output_folder, "_retry_download") download( processes_count=multiprocessing.cpu_count(), thread_count=thread_count, url_list=parquet_file, output_folder=retry_output_folder, image_size=1024, resize_mode="no", skip_reencode=True, output_format="webdataset", input_format="parquet", url_col="URL", caption_col="TEXT", save_additional_columns=["KEY"], enable_wandb=False, number_sample_per_shard=number_sample_per_shard, distributor="multiprocessing", min_image_size=128, max_aspect_ratio=5, incremental_mode="incremental", timeout=timeout, ) print("重试成功率可能为0,说明第一次下载已经非常成功。重试时无需关注成功率。") print("程序执行完毕,无需重新运行。")

创建 Dockerfile

bash
FROM kevinchina/deeplearning:ubuntu22.python3.10.img2dataset WORKDIR /app COPY download_parquet.py . ENTRYPOINT ["python", "download_parquet.py"]

构建镜像

bash
docker build -t kevinchina/deeplearning:ubuntu22.python3.10.img2dataset.app .

配置文件 config_part00059.json

配置文件应指定 Parquet 文件位置和数据下载的持久化存储目录:

json
{ "url_list": "/ssd/xiedong/parquet_test/laion_aesthetics_1024_33M_9.parquet", "output_folder": "/ssd/xiedong/parquet_test/laion_aesthetics_1024_33M_9", "thread_count": 16, "timeout": 120, "number_sample_per_shard": 2000 }

执行下载

最后,使用以下命令启动下载任务:

bash
docker run -d \ -v /ssd/xiedong/parquet_test:/ssd/xiedong/parquet_test \ -e PFVAR=/ssd/xiedong/parquet_test/config_part00059.json \ kevinchina/deeplearning:ubuntu22.python3.10.img2dataset.app | xargs -I {} docker logs -f {}
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Dong

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!