2024-10-14
深度学习
00

上次测试了一些模型:

https://www.dong-blog.fun/post/1741

使用别人的API代码,显存占用总是让人很难受,目前Qwen2-VL也没有tensorRT部署方式,所以我要在这个博客直接用transformers 直接部署起服务。

app.py:

python
import requests import torch from transformers import Qwen2VLForConditionalGeneration, AutoProcessor from qwen_vl_utils import process_vision_info import gc from PIL import Image import os import gradio as gr import os import tempfile import gradio as gr import uvicorn from fastapi import FastAPI from pydantic import BaseModel import json from starlette.responses import JSONResponse app = FastAPI() # 模型和处理器的路径 model_path = "/Qwen2-VL-Any" # 加载模型 model = Qwen2VLForConditionalGeneration.from_pretrained( model_path, torch_dtype="auto", device_map="auto" ) # The default range for the number of visual tokens per image in the model is 4-16384. # You can set min_pixels and max_pixels according to your needs, such as a token range of 256-1280, to balance performance and cost. # 获取环境变量,若不存在则使用默认值 min_pixels = int(os.getenv("MIN_PIXELS", 4 * 28 * 28)) max_pixels = int(os.getenv("MAX_PIXELS", 1024 * 28 * 28)) print("Min Pixels:", min_pixels) print("Max Pixels:", max_pixels) processor = AutoProcessor.from_pretrained(model_path, min_pixels=min_pixels, max_pixels=max_pixels) class MessagesRequest(BaseModel): messages: str # messages 的 JSON 字符串 @app.post("/api/chat") def chat_endpoint(request: MessagesRequest): try: messages = json.loads(request.messages) # 准备推理输入 text = processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(messages) inputs = processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) inputs = inputs.to("cuda") # 推理并生成输出 generated_ids = model.generate(**inputs, max_new_tokens=1280) generated_ids_trimmed = [ out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) ] output_text = processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False ) # print(output_text) # 清理缓存 gc.collect() torch.cuda.empty_cache() return JSONResponse(content={"response": output_text}) except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) def process_input(image, promptx): # 如果传入的是字符串(假设是 URL),则尝试下载图片 if isinstance(image, str): try: # 从 URL 下载图片 response = requests.get(image, timeout=20, verify=False) response.raise_for_status() # 将下载的内容转换为图片 with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: tmp.write(response.content) tmp_image_path = tmp.name image = Image.open(tmp_image_path) except Exception as e: return f"下载图片失败:{e}" # 否则假设输入的是一个上传的图片文件 else: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: image.save(tmp.name) tmp_image_path = tmp.name # 构建符合您要求的 messages messages = [ { "role": "user", "content": [ { "type": "image", "image": tmp_image_path, }, {"type": "text", "text": promptx}, ], } ] # 调用 OpenAI API try: # 准备推理输入 text = processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(messages) inputs = processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) inputs = inputs.to("cuda") # 推理并生成输出 generated_ids = model.generate(**inputs, max_new_tokens=1280) generated_ids_trimmed = [ out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) ] output_text = processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False ) # print(output_text) # 清理缓存 gc.collect() torch.cuda.empty_cache() except Exception as e: output_text = f"错误: {e}" # 删除临时图片文件 os.remove(tmp_image_path) return output_text def create_gradio(): with gr.Blocks() as demo: gr.Markdown("# 图片和提示词输入") with gr.Tabs(): with gr.Tab("上传图片"): with gr.Row(): image_input = gr.Image(label="上传图片", type='pil') text_input = gr.Textbox(label="输入提示词") submit_button = gr.Button("提交") output_text = gr.Textbox(label="模型输出") submit_button.click(process_input, inputs=[image_input, text_input], outputs=output_text) with gr.Tab("图片 URL"): with gr.Row(): url_input = gr.Textbox(label="输入图片 URL") text_input_url = gr.Textbox(label="输入提示词") submit_button_url = gr.Button("提交") output_text_url = gr.Textbox(label="模型输出") submit_button_url.click(process_input, inputs=[url_input, text_input_url], outputs=output_text_url) return demo import psutil import subprocess def get_memory_usage_percentage(): memory_info = psutil.virtual_memory() # 获取内存使用率并转换为0到100的整数 memory_usage_percentage = int(memory_info.percent) return memory_usage_percentage # 显存占用率 def get_gpu_memory_usage(): # Get GPU memory details using nvidia-smi result = subprocess.check_output( ['nvidia-smi', '--query-gpu=memory.total,memory.used', '--format=csv,nounits,noheader'], encoding='utf-8') total_memory, used_memory = map(int, result.strip().split('\n')[0].split(',')) # Calculate memory usage as a percentage vmemory_usage_percentage = (used_memory / total_memory) * 100 return vmemory_usage_percentage # 存活探针 def get_cuda_info(): try: # 获取CUDA版本信息 cuda_version = torch.version.cuda except Exception as e: cuda_version = f"Error getting CUDA version: {str(e)}" try: # 获取GPU数量 gpu_count = torch.cuda.device_count() except Exception as e: gpu_count = f"Error getting GPU count: {str(e)}" try: # 获取GPU名称 gpu_names = [torch.cuda.get_device_name(i) for i in range(gpu_count)] except Exception as e: gpu_names = f"Error getting GPU names: {str(e)}" return cuda_version, gpu_count, gpu_names def get_nvidia_smi_output(): try: result = subprocess.run(["nvidia-smi"], capture_output=True, text=True) return result.stdout except Exception as e: return f"Error executing nvidia-smi: {str(e)}" def get_system_info(): try: # 获取内存信息 memory_info = psutil.virtual_memory()._asdict() except Exception as e: memory_info = f"Error getting memory info: {str(e)}" try: # 获取硬盘信息 disk_info = {disk.device: disk._asdict() for disk in psutil.disk_partitions(all=False)} except Exception as e: disk_info = f"Error getting disk info: {str(e)}" try: # 获取CPU信息 cpu_info = { "cpu_count": psutil.cpu_count(logical=False), "cpu_count_logical": psutil.cpu_count(logical=True), "cpu_freq": psutil.cpu_freq()._asdict(), "cpu_percent": psutil.cpu_percent(interval=1) } except Exception as e: cpu_info = f"Error getting CPU info: {str(e)}" return memory_info, disk_info, cpu_info # 存活探针 @app.get("/liveness") def liveness_probe(): try: # 检查CUDA是否可用 cuda_available = torch.cuda.is_available() memory_usage_percentage = get_memory_usage_percentage() vmemory_usage_percentage = get_gpu_memory_usage() if cuda_available and memory_usage_percentage < 98 and vmemory_usage_percentage < 98: return JSONResponse(content={"status": "alive"}, status_code=200) else: cuda_version, gpu_count, gpu_names = get_cuda_info() nvidia_smi_output = get_nvidia_smi_output() memory_info, disk_info, cpu_info = get_system_info() return JSONResponse(content={ "status": "cuda 不可用", "cuda_available": cuda_available, "cuda_version": cuda_version, "gpu_count": gpu_count, "gpu_names": gpu_names, "nvidia_smi_output": nvidia_smi_output, "memory_info": memory_info, "disk_info": disk_info, "cpu_info": cpu_info, "memory_usage_percentage": memory_usage_percentage, "vmemory_usage_percentage": vmemory_usage_percentage }, status_code=500) except Exception as e: return JSONResponse(content={ "status": "error", "message": str(e) }, status_code=500) # 挂载 Gradio 应用 app = gr.mount_gradio_app(app, create_gradio(), path="/gradio") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)

Dockerfile:

dockerfile
FROM kevinchina/deeplearning:llamafactory20241009 COPY ./app.py /app/app.py EXPOSE 7860 ENTRYPOINT ["python3", "/app/app.py"]

build:

bash
docker build -t kevinchina/deeplearning:llamafactory20241009qwenapi .

run:

bash
docker run --gpus all -d --shm-size 16G \ --rm -v /root/xiedong/Qwen2-VL-7B-Instruct:/Qwen2-VL-Any \ --net host \ -e MAX_PIXELS=602112 \ kevinchina/deeplearning:llamafactory20241009qwenapi | xargs -I {} docker logs -f {}

访问代码:

python
import requests import json # 定义接口的URL url = "http://101.136.22.140:7860/api/chat" # 假设你的FastAPI服务器运行在本地的8000端口 # 准备请求数据 tmp_image_path = 'http://101.136.19.26:8005/16.营业性演出许可证/005_yanchu-5.jpg' promptx = "描述图片。" # 替换为你的提示文本 messages = [ { "role": "user", "content": [ { "type": "image", "image": tmp_image_path, }, {"type": "text", "text": promptx}, ], } ] # 将消息转换为JSON字符串 messages_json_str = json.dumps(messages) # 准备请求数据 payload = { "messages": messages_json_str } # 发送POST请求 response = requests.post(url, json=payload) # 处理响应 if response.status_code == 200: print("Response:", response.json()["response"]) else: print("Error:", response.json()["error"])

健康存活监测:

http://101.136.22.140:7860/liveness

如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Dong

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!