laion_aesthetics_1024_33M_9.parquet
文件示例我下载了一个示例文件 laion_aesthetics_1024_33M_9.parquet
并存放在 /ssd/xiedong/parquet_test
目录中,接下来,我将通过启动 Docker 容器来完成该文件的数据下载。
核心思路是:每个容器配备一个配置文件,配置文件内定义了下载设置(如输入和输出文件位置等)。容器启动后会依据配置文件执行任务,任务完成后容器自动关闭。
首先,制作一个包含所需环境的镜像:
bashdocker run -it -v /ssd/xiedong/parquet_test:/ssd/xiedong/parquet_test buluma/python:3.10.4-ubuntu22.04 bash
pip install img2dataset
apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
pip cache purge
docker commit e768addf27ec kevinchina/deeplearning:ubuntu22.python3.10.img2dataset
download_parquet.py
此脚本负责根据配置文件下载 Parquet 数据,并处理失败的下载任务重试:
pythonimport argparse
import json
import multiprocessing
import os
import pandas as pd
from img2dataset import download
import os
import time
def prepare_retry_download(url_list_dir, output_folder):
# 定义重试下载逻辑,生成一个包含失败下载项的 Parquet 文件
src_in = os.path.dirname(url_list_dir)
src_in = os.path.join(src_in, "retry_download")
os.makedirs(src_in, exist_ok=True)
parquet_file = os.path.join(src_in, "retry_download.parquet")
if os.path.exists(parquet_file):
print("已经存在 retry_download.parquet")
return parquet_file
# 从输出文件夹中查找需要重试的下载文件
files = [f for f in os.listdir(output_folder) if f.endswith(".parquet")]
files_abs = sorted([os.path.join(output_folder, f) for f in files])
df = pd.read_parquet(files_abs[0], engine='pyarrow')
df = df[df['status'] == "failed_to_download"]
# 处理前1000个文件,合并失败记录
for file in files_abs[1:1000]:
file_b = os.path.splitext(os.path.basename(file))[0] + "_stats.json"
stats_file = os.path.join(output_folder, file_b)
if not os.path.exists(stats_file):
continue
with open(stats_file, "r") as f:
data = json.load(f)
if data["successes"] / data["count"] > 0.60:
continue
df_tmp = pd.read_parquet(file, engine='pyarrow')
df = pd.concat([df, df_tmp[df_tmp['status'] == "failed_to_download"]], axis=0)
# 排除不需要重试的错误
exclude_errors = ["Max retries exceeded with url", "HTTP Error 400", "HTTP Error 403", "HTTP Error 404",
"HTTP Error 415", "HTTP Error 500", "HTTP Error 501", "HTTP Error 502", "HTTP Error 503"]
for e in exclude_errors:
df = df[~df['error_message'].str.contains(e)]
# 保留必要列并重命名
df = df.rename(columns={"caption": "TEXT", "url": "URL", "key": "KEY"})[["TEXT", "URL", "KEY"]]
df.to_parquet(parquet_file, engine='pyarrow')
return parquet_file
def wait_for_env_variable(env_var_name, check_interval=5):
# 等待环境变量的设置
while True:
env_var_value = os.getenv(env_var_name)
if env_var_value:
print(f"找到环境变量 '{env_var_name}':{env_var_value}")
return env_var_value
else:
print(f"等待环境变量 '{env_var_name}' 被设置...")
time.sleep(check_interval)
if __name__ == '__main__':
# 等待环境变量
env_var_value = wait_for_env_variable("PFVAR")
# 读取配置文件
with open(env_var_value, 'r', encoding='utf-8') as f:
config = json.load(f)
# 设置下载参数
url_list = config["url_list"]
output_folder = config["output_folder"]
thread_count = config.get("thread_count", 16)
timeout = config.get("timeout", 120)
number_sample_per_shard = config.get("number_sample_per_shard", 2000)
print("并发请求数量:", multiprocessing.cpu_count() * thread_count)
download(
processes_count=multiprocessing.cpu_count(),
thread_count=thread_count,
url_list=url_list,
output_folder=output_folder,
image_size=1024,
resize_mode="no",
skip_reencode=True,
output_format="webdataset",
input_format="parquet",
url_col="URL",
caption_col="TEXT",
enable_wandb=False,
number_sample_per_shard=number_sample_per_shard,
distributor="multiprocessing",
min_image_size=128,
max_aspect_ratio=5,
incremental_mode="incremental",
timeout=timeout,
)
# 准备重试下载
parquet_file = prepare_retry_download(url_list, output_folder)
retry_output_folder = os.path.join(output_folder, "_retry_download")
download(
processes_count=multiprocessing.cpu_count(),
thread_count=thread_count,
url_list=parquet_file,
output_folder=retry_output_folder,
image_size=1024,
resize_mode="no",
skip_reencode=True,
output_format="webdataset",
input_format="parquet",
url_col="URL",
caption_col="TEXT",
save_additional_columns=["KEY"],
enable_wandb=False,
number_sample_per_shard=number_sample_per_shard,
distributor="multiprocessing",
min_image_size=128,
max_aspect_ratio=5,
incremental_mode="incremental",
timeout=timeout,
)
print("重试成功率可能为0,说明第一次下载已经非常成功。重试时无需关注成功率。")
print("程序执行完毕,无需重新运行。")
bashFROM kevinchina/deeplearning:ubuntu22.python3.10.img2dataset
WORKDIR /app
COPY download_parquet.py .
ENTRYPOINT ["python", "download_parquet.py"]
bashdocker build -t kevinchina/deeplearning:ubuntu22.python3.10.img2dataset.app .
config_part00059.json
配置文件应指定 Parquet 文件位置和数据下载的持久化存储目录:
json{
"url_list": "/ssd/xiedong/parquet_test/laion_aesthetics_1024_33M_9.parquet",
"output_folder": "/ssd/xiedong/parquet_test/laion_aesthetics_1024_33M_9",
"thread_count": 16,
"timeout": 120,
"number_sample_per_shard": 2000
}
最后,使用以下命令启动下载任务:
bashdocker run -d \ -v /ssd/xiedong/parquet_test:/ssd/xiedong/parquet_test \ -e PFVAR=/ssd/xiedong/parquet_test/config_part00059.json \ kevinchina/deeplearning:ubuntu22.python3.10.img2dataset.app | xargs -I {} docker logs -f {}
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!