仓库:https://huggingface.co/google/siglip-so400m-patch14-384
下载仓库:
./hfd.sh google/siglip-so400m-patch14-384 --local-dir ./google/siglip-so400m-patch14-384
懒得装环境,直接docker:
docker run -it \ --gpus '"device=0"' \ -v ./google/siglip-so400m-patch14-384:/google/siglip-so400m-patch14-384 \ -p 8033:8033 \ --shm-size 32g \ kevinchina/deeplearning:llamafactory20250311-3 bash
图片直接推理得到特征:
pythonfrom PIL import Image
import requests
import torch
from transformers import AutoProcessor, AutoModel
# 1. 指定 GPU 设备(如 GPU 0)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# 2. 加载模型并分配到 GPU
model = AutoModel.from_pretrained("/google/siglip-so400m-patch14-384").to(device)
processor = AutoProcessor.from_pretrained("/google/siglip-so400m-patch14-384")
# 3. 加载图像并预处理
url = "http://images.cocodataset.org/val2017/000000039769.jpg"
image = Image.open(requests.get(url, stream=True).raw)
inputs = processor(images=image, return_tensors="pt", padding=True).to(device) # 输入数据也移到 GPU
# 4. 推理(确保在 GPU 上运行)
with torch.no_grad():
outputs = model.get_image_features(**inputs)
# 5. 输出特征向量(自动在 GPU 上,如需转 CPU 用 .cpu())
print("Feature vector shape:", outputs.shape)
print("Sample features (first 10 dims):", outputs[0, :10])
得到结果:
Using device: cuda:0 Feature vector shape: torch.Size([1, 1152]) Sample features (first 10 dims): tensor([ 0.0044, -0.3041, -0.2630, 0.3240, -0.3151, -0.7515, 0.3243, 0.5557, -0.0269, -0.0519], device='cuda:0')
pythonimport torch
from transformers import AutoProcessor, AutoModel
from PIL import Image
import io
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import uvicorn
app = FastAPI()
# Global variables for model and processor
model = None
processor = None
device = None
@app.on_event("startup")
async def startup_event():
global model, processor, device
# 1. Specify GPU device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# 2. Load model and processor
# Make sure the model path is correct as per your Docker volume mapping
model_path = "/google/siglip-so400m-patch14-384"
try:
model = AutoModel.from_pretrained(model_path).to(device)
processor = AutoProcessor.from_pretrained(model_path)
print(f"Model and processor loaded successfully from {model_path}")
except Exception as e:
print(f"Error loading model/processor: {e}")
# Optionally, re-raise or handle as critical failure
raise
@app.post("/extract-features/")
async def extract_features(file: UploadFile = File(...)):
if not model or not processor:
return JSONResponse(status_code=503, content={"error": "Model not loaded. Please check server logs."})
try:
# 3. Load image from uploaded file
contents = await file.read()
image = Image.open(io.BytesIO(contents))
# Preprocess image
inputs = processor(images=image, return_tensors="pt", padding=True).to(device)
# 4. Inference
with torch.no_grad():
outputs = model.get_image_features(**inputs)
# 5. Prepare response
# Convert tensor to list for JSON serialization
features_list = outputs.cpu().tolist()
return {"filename": file.filename, "features": features_list}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
if __name__ == "__main__":
# This part is for running with uvicorn programmatically,
# but you'll likely run it from the command line as:
# uvicorn main:app --host 0.0.0.0 --port 8033
uvicorn.run(app, host="0.0.0.0", port=8033)
pythonimport requests
# API endpoint URL
url = "http://10.136.19.27:8033/extract-features/"
# Path to the image file you want to send
# Make sure '077.jpg' is in the same directory as this script, or provide the full path.
image_path = "077.jpg"
# Prepare the files dictionary for the POST request
# The key ('file') must match the parameter name in the FastAPI endpoint
try:
with open(image_path, "rb") as img_file:
files = {"file": (image_path, img_file, "image/jpeg")}
# Send the POST request
print(f"Sending request for image: {image_path}")
response = requests.post(url, files=files)
# Check the response
if response.status_code == 200:
print("Successfully received features:")
result = response.json()
print(f"Filename: {result.get('filename')}")
# Print only a subset of features for brevity
if result.get('features') and isinstance(result['features'], list) and len(result['features']) > 0:
print(f"Feature vector shape: (1, {len(result['features'][0])})") # Assuming batch size of 1
print(f"Sample features (first 10 dims): {result['features'][0][:10]}")
else:
print(f"Features data: {result.get('features')}")
else:
print(f"Error: {response.status_code}")
try:
print(f"Response content: {response.json()}")
except requests.exceptions.JSONDecodeError:
print(f"Response content: {response.text}")
except FileNotFoundError:
print(f"Error: Image file not found at {image_path}. Please ensure the file exists.")
except requests.exceptions.ConnectionError:
print(f"Error: Could not connect to the server at {url}. Ensure the FastAPI server is running.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
# 可以得到:
# (base) 三 5月 07 # 21:51:56 # /mnt/workcode/amex-datasets-process # python client.py
# Sending request for image: 077.jpg
# Successfully received features:
# Filename: 077.jpg
# Feature vector shape: (1, 1152)
# Sample features (first 10 dims): [0.10803146660327911, 0.017530322074890137, 0.1551334261894226, -0.0017879307270050049, -0.687953531742096, -0.3093825578689575, 0.8667486906051636, -0.04316258430480957, -0.4580515921115875, 0.49320095777511597]
这段代码实现了一个图像匹配与拼接系统,主要功能包括:1) 从三个不同目录(yuantu/原图、Effect_ox1/效果图1、Effect_pf/效果图2)加载图像;2) 通过SigLIP API提取图像特征向量并实现本地缓存优化;3) 使用余弦相似度计算图像间的距离矩阵;4) 采用全局优化算法为原图寻找最匹配的效果图对;5) 将匹配成功的原图与两个效果图水平拼接保存。系统通过特征缓存机制减少API调用,支持断点续传,并采用最优匹配策略确保每组三张图片在视觉特征上高度相似,最终生成高质量的组合图像。
pythonimport os
import numpy as np
from PIL import Image
import io
from pathlib import Path
# from scipy.fftpack import dct # No longer needed for pHash
from collections import defaultdict
import requests # Added for API calls
from numpy.linalg import norm # Added for cosine similarity
import time # Added for retry backoff
import hashlib # Added for creating cache keys
import pickle # Added for feature serialization
from scipy.optimize import linear_sum_assignment # Added for optimal matching
try:
from tqdm import tqdm
tqdm_available = True
except ImportError:
tqdm_available = False
print("提示: 安装 tqdm 库可以获得更好的进度条显示 (pip install tqdm)")
# Helper function to resize images before feature extraction
def resize_image_if_needed(img_path, max_dim=768):
"""Opens and resizes an image if it exceeds max_dim, maintaining aspect ratio."""
img = Image.open(img_path)
width, height = img.size
if width > max_dim or height > max_dim:
ratio = min(max_dim / width, max_dim / height)
new_width = int(width * ratio)
new_height = int(height * ratio)
img = img.resize((new_width, new_height), Image.LANCZOS)
return img
# New function to just open images without resizing for final collage
def open_original_image(img_path):
"""Opens an image without resizing, for full resolution collages."""
try:
return Image.open(img_path)
except Exception as e:
print(f"Error opening image {os.path.basename(img_path)}: {e}")
return None
# 创建保存目录
save_dir = "save"
os.makedirs(save_dir, exist_ok=True)
# 获取所有图片路径
yuantu_dir = os.path.join("heji", "yuantu")
Effect_ox1_dir = os.path.join("heji", "Effect_ox1")
Effect_pf_dir = os.path.join("heji", "Effect_pf")
yuantu_images = [os.path.join(yuantu_dir, f) for f in os.listdir(yuantu_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
Effect_ox1_images = [os.path.join(Effect_ox1_dir, f) for f in os.listdir(Effect_ox1_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
Effect_pf_images = [os.path.join(Effect_pf_dir, f) for f in os.listdir(Effect_pf_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
# --- NEW: API-specific resizing ---
def resize_for_api(img_path, target_size=384):
"""Resizes an image to target_size x target_size specifically for the API request."""
try:
img = Image.open(img_path)
img = img.convert('RGB') # Ensure RGB mode for consistency
img = img.resize((target_size, target_size), Image.LANCZOS)
# Return as bytes in memory
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='JPEG', quality=95)
img_byte_arr.seek(0)
return img_byte_arr
except Exception as e:
print(f"Error resizing image {os.path.basename(img_path)}: {e}")
return None
# --- NEW: Feature caching ---
# Create cache directory
cache_dir = "feature_cache"
os.makedirs(cache_dir, exist_ok=True)
def get_cache_path(image_path):
"""Generate a unique cache file path for an image based on its path."""
# Create a hash of the image path to use as the filename
img_hash = hashlib.md5(image_path.encode('utf-8')).hexdigest()
return os.path.join(cache_dir, f"{img_hash}.pkl")
def save_features_to_cache(image_path, features):
"""Save features to the cache."""
if features is None:
return False
try:
cache_path = get_cache_path(image_path)
with open(cache_path, 'wb') as f:
pickle.dump(features, f)
return True
except Exception as e:
print(f"Warning: Could not cache features for {os.path.basename(image_path)}: {e}")
return False
def load_features_from_cache(image_path):
"""Load features from the cache if available."""
cache_path = get_cache_path(image_path)
# Check if cache file exists and is newer than the image file
if not os.path.exists(cache_path):
return None
# Optionally check if image was modified after cache was created
# Skip this if you don't want to regenerate features when images change
img_mtime = os.path.getmtime(image_path)
cache_mtime = os.path.getmtime(cache_path)
if img_mtime > cache_mtime:
# Image was modified after the cache was created
print(f"Image {os.path.basename(image_path)} modified after cache, will re-extract features.")
return None
try:
with open(cache_path, 'rb') as f:
features = pickle.load(f)
return features
except Exception as e:
print(f"Warning: Could not load cached features for {os.path.basename(image_path)}: {e}")
return None
# --- SigLIP API Configuration ---
SIGLIP_API_URL = "http://10.136.19.27:8033/extract-features/"
# --- Feature Extraction via SigLIP API with retry and caching ---
def compute_siglip_features(image_path, api_url=SIGLIP_API_URL, max_retries=3, use_cache=True):
"""Computes image features using the SigLIP FastAPI service with caching and retry mechanism."""
# Check cache first if enabled
if use_cache:
cached_features = load_features_from_cache(image_path)
if cached_features is not None:
# print(f"Using cached features for {os.path.basename(image_path)}")
return cached_features
# If not in cache or cache disabled, compute features
# Resize image to 384x384 before sending to API
img_bytes = resize_for_api(image_path)
if img_bytes is None:
return None
retry_count = 0
backoff_time = 1 # Start with 1 second backoff, will increase exponentially
while retry_count < max_retries:
try:
# Send the already resized image instead of opening the file
files = {"file": (os.path.basename(image_path), img_bytes, "image/jpeg")}
response = requests.post(api_url, files=files, timeout=30)
response.raise_for_status() # Raise an exception for HTTP errors
result = response.json()
if result.get('features') and isinstance(result['features'], list) and len(result['features']) > 0:
# Assuming features are returned as a list containing one feature vector list
feature_vector = np.array(result['features'][0], dtype=np.float32)
if feature_vector.ndim == 1 and feature_vector.size > 0:
# Cache successful result
if use_cache:
save_features_to_cache(image_path, feature_vector)
return feature_vector
else:
print(f"Warning: Invalid feature vector structure for {os.path.basename(image_path)}. Shape: {feature_vector.shape}")
return None
else:
print(f"Warning: No features returned or unexpected format for {os.path.basename(image_path)}. API response: {result}")
return None
except requests.exceptions.HTTPError as e:
retry_count += 1
if retry_count < max_retries:
print(f"HTTP Error ({retry_count}/{max_retries}): {e}. Retrying in {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= 2 # Exponential backoff
else:
print(f"Failed after {max_retries} attempts. HTTP Error: {e}")
return None
except requests.exceptions.RequestException as e:
retry_count += 1
if retry_count < max_retries:
print(f"Request error ({retry_count}/{max_retries}): {e}. Retrying in {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= 2
else:
print(f"Failed after {max_retries} attempts. Request error: {e}")
return None
except Exception as e:
retry_count += 1
if retry_count < max_retries:
print(f"Unexpected error ({retry_count}/{max_retries}): {e}. Retrying in {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= 2
else:
print(f"Failed after {max_retries} attempts. Unexpected error: {e}")
return None
# --- NEW: Distance Calculation for SigLIP Features ---
def calculate_siglip_distance(vec1, vec2):
"""Calculates cosine distance (1 - similarity) between two SigLIP feature vectors."""
if vec1 is None or vec2 is None:
return float('inf') # Max distance if one feature is missing
vec1 = np.asarray(vec1).flatten()
vec2 = np.asarray(vec2).flatten()
if vec1.shape != vec2.shape or vec1.size == 0 or vec2.size == 0:
# print(f"Warning: Feature vectors have different shapes or are empty. Cannot compute distance. {vec1.shape} vs {vec2.shape}")
return float('inf') # Max distance for incompatible vectors
norm_vec1 = norm(vec1)
norm_vec2 = norm(vec2)
if norm_vec1 == 0 or norm_vec2 == 0:
# If one vector is zero, they are maximally dissimilar unless both are zero
return 1.0 if not (norm_vec1 == 0 and norm_vec2 == 0) else 0.0
similarity = np.dot(vec1, vec2) / (norm_vec1 * norm_vec2)
# Clamp similarity to [-1, 1] to handle potential floating point inaccuracies
similarity = np.clip(similarity, -1.0, 1.0)
distance = 1 - similarity # Cosine distance
return distance
# --- 特征提取 ---
print("正在提取图像特征 (使用 SigLIP API,带缓存)...")
# 提取特征
yuantu_features = {}
Effect_ox1_features = {}
Effect_pf_features = {}
# 对所有图像计算特征
print("处理原图特征...")
yuantu_iter = tqdm(yuantu_images) if tqdm_available else yuantu_images
cached_count = 0
api_count = 0
for image_path in yuantu_iter:
if not tqdm_available:
print(f"处理 {len(yuantu_features)+1}/{len(yuantu_images)}: {os.path.basename(image_path)}", end="\r")
# Check if already in cache
cached_features = load_features_from_cache(image_path)
if cached_features is not None:
yuantu_features[image_path] = {'siglip': cached_features}
cached_count += 1
else:
# Request from API
features = compute_siglip_features(image_path, use_cache=True)
if features is not None:
yuantu_features[image_path] = {'siglip': features}
api_count += 1
if not tqdm_available:
print()
print(f"原图特征: {cached_count} 从缓存加载, {api_count} 从API请求")
print("处理Effect_ox1特征...")
Effect_ox1_iter = tqdm(Effect_ox1_images) if tqdm_available else Effect_ox1_images
cached_count = 0
api_count = 0
for image_path in Effect_ox1_iter:
if not tqdm_available:
print(f"处理 {len(Effect_ox1_features)+1}/{len(Effect_ox1_images)}: {os.path.basename(image_path)}", end="\r")
# Check if already in cache
cached_features = load_features_from_cache(image_path)
if cached_features is not None:
Effect_ox1_features[image_path] = {'siglip': cached_features}
cached_count += 1
else:
# Request from API
features = compute_siglip_features(image_path, use_cache=True)
if features is not None:
Effect_ox1_features[image_path] = {'siglip': features}
api_count += 1
if not tqdm_available:
print()
print(f"Effect_ox1特征: {cached_count} 从缓存加载, {api_count} 从API请求")
print("处理Effect_pf特征...")
Effect_pf_iter = tqdm(Effect_pf_images) if tqdm_available else Effect_pf_images
cached_count = 0
api_count = 0
for image_path in Effect_pf_iter:
if not tqdm_available:
print(f"处理 {len(Effect_pf_features)+1}/{len(Effect_pf_images)}: {os.path.basename(image_path)}", end="\r")
# Check if already in cache
cached_features = load_features_from_cache(image_path)
if cached_features is not None:
Effect_pf_features[image_path] = {'siglip': cached_features}
cached_count += 1
else:
# Request from API
features = compute_siglip_features(image_path, use_cache=True)
if features is not None:
Effect_pf_features[image_path] = {'siglip': features}
api_count += 1
if not tqdm_available:
print()
print(f"Effect_pf特征: {cached_count} 从缓存加载, {api_count} 从API请求")
print("特征提取完成!")
# --- NEW: 全局优化匹配算法 ---
print("开始全局优化匹配...")
# 将所有图片转换为索引列表,方便后续处理
valid_yuantu_images = [img_path for img_path in yuantu_images if img_path in yuantu_features]
valid_Effect_ox1_images = [img_path for img_path in Effect_ox1_images if img_path in Effect_ox1_features]
valid_Effect_pf_images = [img_path for img_path in Effect_pf_images if img_path in Effect_pf_features]
print(f"有效图片数量: 原图={len(valid_yuantu_images)}, Effect_ox1={len(valid_Effect_ox1_images)}, Effect_pf={len(valid_Effect_pf_images)}")
if len(valid_yuantu_images) == 0 or len(valid_Effect_ox1_images) == 0 or len(valid_Effect_pf_images) == 0:
print("错误: 至少一个文件夹中没有有效的特征图片,无法进行匹配。")
exit(1)
# 创建距离矩阵:yuantu-Effect_ox1和yuantu-Effect_pf
print("计算所有图片对之间的距离...")
# 计算yuantu与Effect_ox1之间的距离矩阵
yuantu_Effect_ox1_distances = np.zeros((len(valid_yuantu_images), len(valid_Effect_ox1_images)))
for i, yuantu_path in enumerate(valid_yuantu_images):
yuantu_feat = yuantu_features[yuantu_path]['siglip']
for j, Effect_ox1_path in enumerate(valid_Effect_ox1_images):
Effect_ox1_feat = Effect_ox1_features[Effect_ox1_path]['siglip']
yuantu_Effect_ox1_distances[i, j] = calculate_siglip_distance(yuantu_feat, Effect_ox1_feat)
# 计算yuantu与Effect_pf之间的距离矩阵
yuantu_Effect_pf_distances = np.zeros((len(valid_yuantu_images), len(valid_Effect_pf_images)))
for i, yuantu_path in enumerate(valid_yuantu_images):
yuantu_feat = yuantu_features[yuantu_path]['siglip']
for j, Effect_pf_path in enumerate(valid_Effect_pf_images):
Effect_pf_feat = Effect_pf_features[Effect_pf_path]['siglip']
yuantu_Effect_pf_distances[i, j] = calculate_siglip_distance(yuantu_feat, Effect_pf_feat)
print("距离计算完成,开始优化匹配...")
# 优化匹配逻辑
matched_triplets = []
remaining_yuantu = list(range(len(valid_yuantu_images)))
remaining_Effect_ox1 = list(range(len(valid_Effect_ox1_images)))
remaining_Effect_pf = list(range(len(valid_Effect_pf_images)))
# 因为我们需要三个文件夹的最优匹配,我们会迭代地移除已匹配的图片
iteration = 0
max_iterations = min(len(valid_yuantu_images), len(valid_Effect_ox1_images), len(valid_Effect_pf_images))
while (iteration < max_iterations and
len(remaining_yuantu) > 0 and
len(remaining_Effect_ox1) > 0 and
len(remaining_Effect_pf) > 0):
iteration += 1
print(f"匹配迭代 {iteration}/{max_iterations},剩余: 原图={len(remaining_yuantu)}, Effect_ox1={len(remaining_Effect_ox1)}, Effect_pf={len(remaining_Effect_pf)}")
# 构建当前迭代的子距离矩阵
curr_yuantu_Effect_ox1 = np.zeros((len(remaining_yuantu), len(remaining_Effect_ox1)))
for i, yuantu_idx in enumerate(remaining_yuantu):
for j, Effect_ox1_idx in enumerate(remaining_Effect_ox1):
curr_yuantu_Effect_ox1[i, j] = yuantu_Effect_ox1_distances[yuantu_idx, Effect_ox1_idx]
curr_yuantu_Effect_pf = np.zeros((len(remaining_yuantu), len(remaining_Effect_pf)))
for i, yuantu_idx in enumerate(remaining_yuantu):
for j, Effect_pf_idx in enumerate(remaining_Effect_pf):
curr_yuantu_Effect_pf[i, j] = yuantu_Effect_pf_distances[yuantu_idx, Effect_pf_idx]
# 合并距离矩阵,寻找最佳组合
best_total_distance = float('inf')
best_triplet = None
for i in range(len(remaining_yuantu)):
yuantu_idx = remaining_yuantu[i]
# 找到与当前yuantu最相似的Effect_ox1
Effect_ox1_distances = curr_yuantu_Effect_ox1[i, :]
min_Effect_ox1_distance = np.min(Effect_ox1_distances)
min_Effect_ox1_j = np.argmin(Effect_ox1_distances)
Effect_ox1_idx = remaining_Effect_ox1[min_Effect_ox1_j]
# 找到与当前yuantu最相似的Effect_pf
Effect_pf_distances = curr_yuantu_Effect_pf[i, :]
min_Effect_pf_distance = np.min(Effect_pf_distances)
min_Effect_pf_j = np.argmin(Effect_pf_distances)
Effect_pf_idx = remaining_Effect_pf[min_Effect_pf_j]
# 计算总距离
total_distance = min_Effect_ox1_distance + min_Effect_pf_distance
# 更新最佳匹配
if total_distance < best_total_distance:
best_total_distance = total_distance
best_triplet = (yuantu_idx, Effect_ox1_idx, Effect_pf_idx, min_Effect_ox1_j, min_Effect_pf_j)
if best_triplet:
yuantu_idx, Effect_ox1_idx, Effect_pf_idx, Effect_ox1_local_idx, Effect_pf_local_idx = best_triplet
matched_triplets.append((valid_yuantu_images[yuantu_idx],
valid_Effect_ox1_images[Effect_ox1_idx],
valid_Effect_pf_images[Effect_pf_idx]))
# 从可用索引中移除已匹配的图片
remaining_yuantu.remove(yuantu_idx)
remaining_Effect_ox1.remove(Effect_ox1_idx)
remaining_Effect_pf.remove(Effect_pf_idx)
else:
# 如果无法找到最佳匹配,退出循环
print("无法找到更多的最佳匹配,结束匹配过程。")
break
print(f"成功匹配了 {len(matched_triplets)} 组三元组。")
# 剩下图片数量不平衡的情况下,可以选择一些启发式方法来处理
# 这里为简单起见,我们就只使用已匹配的三元组
# --- 拼接和保存图像 ---
print("开始拼接和保存图像...")
print("使用原始尺寸图像进行拼接,可能会生成较大文件...")
matched_iter = tqdm(enumerate(matched_triplets), total=len(matched_triplets)) if tqdm_available else enumerate(matched_triplets)
for i, (yuantu_path, Effect_ox1_path, Effect_pf_path) in matched_iter:
if not tqdm_available:
print(f"拼接图片 {i+1}/{len(matched_triplets)}", end="\r")
try:
# 打开原始尺寸图片,不进行预缩放
yuantu_img = open_original_image(yuantu_path)
Effect_ox1_img = open_original_image(Effect_ox1_path)
Effect_pf_img = open_original_image(Effect_pf_path)
if not yuantu_img or not Effect_ox1_img or not Effect_pf_img:
print(f"无法打开三元组中的一个或多个图像,跳过: {os.path.basename(yuantu_path)}")
continue
# 获取原始尺寸
yuantu_w, yuantu_h = yuantu_img.size
Effect_ox1_w, Effect_ox1_h = Effect_ox1_img.size
Effect_pf_w, Effect_pf_h = Effect_pf_img.size
# 计算最大高度,使用原图的高度作为参考
target_height = yuantu_h
# 调整其他图片高度与原图一致,保持宽高比
new_Effect_ox1_w = int(Effect_ox1_w * target_height / Effect_ox1_h)
Effect_ox1_img = Effect_ox1_img.resize((new_Effect_ox1_w, target_height), Image.LANCZOS)
new_Effect_pf_w = int(Effect_pf_w * target_height / Effect_pf_h)
Effect_pf_img = Effect_pf_img.resize((new_Effect_pf_w, target_height), Image.LANCZOS)
# 创建新图像
total_width = yuantu_w + new_Effect_ox1_w + new_Effect_pf_w
new_img = Image.new('RGB', (total_width, target_height))
# 粘贴图像
current_x = 0
new_img.paste(yuantu_img, (current_x, 0))
current_x += yuantu_w
new_img.paste(Effect_ox1_img, (current_x, 0))
current_x += new_Effect_ox1_w
new_img.paste(Effect_pf_img, (current_x, 0))
# 保存拼接图像
save_path = os.path.join(save_dir, f"{i+1:03d}.jpg")
new_img.save(save_path, quality=95) # 使用高质量保存
except Exception as e:
print(f"拼接图片时出错 ({i+1}): {e}")
continue
print("\n所有图片拼接完成!共生成 {len(matched_triplets)} 张拼接图像。")
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!