上次测试了一些模型:
https://www.dong-blog.fun/post/1741
使用别人的API代码,显存占用总是让人很难受,目前Qwen2-VL也没有tensorRT部署方式,所以我要在这个博客直接用transformers 直接部署起服务。
app.py:
pythonimport requests
import torch
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import gc
from PIL import Image
import os
import gradio as gr
import os
import tempfile
import gradio as gr
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel
import json
from starlette.responses import JSONResponse
app = FastAPI()
# 模型和处理器的路径
model_path = "/Qwen2-VL-Any"
# 加载模型
model = Qwen2VLForConditionalGeneration.from_pretrained(
model_path, torch_dtype="auto", device_map="auto"
)
# The default range for the number of visual tokens per image in the model is 4-16384.
# You can set min_pixels and max_pixels according to your needs, such as a token range of 256-1280, to balance performance and cost.
# 获取环境变量,若不存在则使用默认值
min_pixels = int(os.getenv("MIN_PIXELS", 4 * 28 * 28))
max_pixels = int(os.getenv("MAX_PIXELS", 1024 * 28 * 28))
print("Min Pixels:", min_pixels)
print("Max Pixels:", max_pixels)
processor = AutoProcessor.from_pretrained(model_path, min_pixels=min_pixels, max_pixels=max_pixels)
class MessagesRequest(BaseModel):
messages: str # messages 的 JSON 字符串
@app.post("/api/chat")
def chat_endpoint(request: MessagesRequest):
try:
messages = json.loads(request.messages)
# 准备推理输入
text = processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to("cuda")
# 推理并生成输出
generated_ids = model.generate(**inputs, max_new_tokens=1280)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
# print(output_text)
# 清理缓存
gc.collect()
torch.cuda.empty_cache()
return JSONResponse(content={"response": output_text})
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
def process_input(image, promptx):
# 如果传入的是字符串(假设是 URL),则尝试下载图片
if isinstance(image, str):
try:
# 从 URL 下载图片
response = requests.get(image, timeout=20, verify=False)
response.raise_for_status()
# 将下载的内容转换为图片
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
tmp.write(response.content)
tmp_image_path = tmp.name
image = Image.open(tmp_image_path)
except Exception as e:
return f"下载图片失败:{e}"
# 否则假设输入的是一个上传的图片文件
else:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
image.save(tmp.name)
tmp_image_path = tmp.name
# 构建符合您要求的 messages
messages = [
{
"role": "user",
"content": [
{
"type": "image",
"image": tmp_image_path,
},
{"type": "text", "text": promptx},
],
}
]
# 调用 OpenAI API
try:
# 准备推理输入
text = processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to("cuda")
# 推理并生成输出
generated_ids = model.generate(**inputs, max_new_tokens=1280)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
# print(output_text)
# 清理缓存
gc.collect()
torch.cuda.empty_cache()
except Exception as e:
output_text = f"错误: {e}"
# 删除临时图片文件
os.remove(tmp_image_path)
return output_text
def create_gradio():
with gr.Blocks() as demo:
gr.Markdown("# 图片和提示词输入")
with gr.Tabs():
with gr.Tab("上传图片"):
with gr.Row():
image_input = gr.Image(label="上传图片", type='pil')
text_input = gr.Textbox(label="输入提示词")
submit_button = gr.Button("提交")
output_text = gr.Textbox(label="模型输出")
submit_button.click(process_input, inputs=[image_input, text_input], outputs=output_text)
with gr.Tab("图片 URL"):
with gr.Row():
url_input = gr.Textbox(label="输入图片 URL")
text_input_url = gr.Textbox(label="输入提示词")
submit_button_url = gr.Button("提交")
output_text_url = gr.Textbox(label="模型输出")
submit_button_url.click(process_input, inputs=[url_input, text_input_url],
outputs=output_text_url)
return demo
import psutil
import subprocess
def get_memory_usage_percentage():
memory_info = psutil.virtual_memory()
# 获取内存使用率并转换为0到100的整数
memory_usage_percentage = int(memory_info.percent)
return memory_usage_percentage
# 显存占用率
def get_gpu_memory_usage():
# Get GPU memory details using nvidia-smi
result = subprocess.check_output(
['nvidia-smi', '--query-gpu=memory.total,memory.used', '--format=csv,nounits,noheader'], encoding='utf-8')
total_memory, used_memory = map(int, result.strip().split('\n')[0].split(','))
# Calculate memory usage as a percentage
vmemory_usage_percentage = (used_memory / total_memory) * 100
return vmemory_usage_percentage
# 存活探针
def get_cuda_info():
try:
# 获取CUDA版本信息
cuda_version = torch.version.cuda
except Exception as e:
cuda_version = f"Error getting CUDA version: {str(e)}"
try:
# 获取GPU数量
gpu_count = torch.cuda.device_count()
except Exception as e:
gpu_count = f"Error getting GPU count: {str(e)}"
try:
# 获取GPU名称
gpu_names = [torch.cuda.get_device_name(i) for i in range(gpu_count)]
except Exception as e:
gpu_names = f"Error getting GPU names: {str(e)}"
return cuda_version, gpu_count, gpu_names
def get_nvidia_smi_output():
try:
result = subprocess.run(["nvidia-smi"], capture_output=True, text=True)
return result.stdout
except Exception as e:
return f"Error executing nvidia-smi: {str(e)}"
def get_system_info():
try:
# 获取内存信息
memory_info = psutil.virtual_memory()._asdict()
except Exception as e:
memory_info = f"Error getting memory info: {str(e)}"
try:
# 获取硬盘信息
disk_info = {disk.device: disk._asdict() for disk in psutil.disk_partitions(all=False)}
except Exception as e:
disk_info = f"Error getting disk info: {str(e)}"
try:
# 获取CPU信息
cpu_info = {
"cpu_count": psutil.cpu_count(logical=False),
"cpu_count_logical": psutil.cpu_count(logical=True),
"cpu_freq": psutil.cpu_freq()._asdict(),
"cpu_percent": psutil.cpu_percent(interval=1)
}
except Exception as e:
cpu_info = f"Error getting CPU info: {str(e)}"
return memory_info, disk_info, cpu_info
# 存活探针
@app.get("/liveness")
def liveness_probe():
try:
# 检查CUDA是否可用
cuda_available = torch.cuda.is_available()
memory_usage_percentage = get_memory_usage_percentage()
vmemory_usage_percentage = get_gpu_memory_usage()
if cuda_available and memory_usage_percentage < 98 and vmemory_usage_percentage < 98:
return JSONResponse(content={"status": "alive"}, status_code=200)
else:
cuda_version, gpu_count, gpu_names = get_cuda_info()
nvidia_smi_output = get_nvidia_smi_output()
memory_info, disk_info, cpu_info = get_system_info()
return JSONResponse(content={
"status": "cuda 不可用",
"cuda_available": cuda_available,
"cuda_version": cuda_version,
"gpu_count": gpu_count,
"gpu_names": gpu_names,
"nvidia_smi_output": nvidia_smi_output,
"memory_info": memory_info,
"disk_info": disk_info,
"cpu_info": cpu_info,
"memory_usage_percentage": memory_usage_percentage,
"vmemory_usage_percentage": vmemory_usage_percentage
}, status_code=500)
except Exception as e:
return JSONResponse(content={
"status": "error",
"message": str(e)
}, status_code=500)
# 挂载 Gradio 应用
app = gr.mount_gradio_app(app, create_gradio(), path="/gradio")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)
Dockerfile:
dockerfileFROM kevinchina/deeplearning:llamafactory20241009 COPY ./app.py /app/app.py EXPOSE 7860 ENTRYPOINT ["python3", "/app/app.py"]
build:
bashdocker build -t kevinchina/deeplearning:llamafactory20241009qwenapi .
run:
bashdocker run --gpus all -d --shm-size 16G \
--rm -v /root/xiedong/Qwen2-VL-7B-Instruct:/Qwen2-VL-Any \
--net host \
-e MAX_PIXELS=602112 \
kevinchina/deeplearning:llamafactory20241009qwenapi | xargs -I {} docker logs -f {}
访问代码:
pythonimport requests
import json
# 定义接口的URL
url = "http://101.136.22.140:7860/api/chat" # 假设你的FastAPI服务器运行在本地的8000端口
# 准备请求数据
tmp_image_path = 'http://101.136.19.26:8005/16.营业性演出许可证/005_yanchu-5.jpg'
promptx = "描述图片。" # 替换为你的提示文本
messages = [
{
"role": "user",
"content": [
{
"type": "image",
"image": tmp_image_path,
},
{"type": "text", "text": promptx},
],
}
]
# 将消息转换为JSON字符串
messages_json_str = json.dumps(messages)
# 准备请求数据
payload = {
"messages": messages_json_str
}
# 发送POST请求
response = requests.post(url, json=payload)
# 处理响应
if response.status_code == 200:
print("Response:", response.json()["response"])
else:
print("Error:", response.json()["error"])
健康存活监测:
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!