占用显卡显存代码
2025-08-05
深度学习
00
python
展开代码
#!/usr/bin/env python3 import torch import torch.nn as nn import time import threading from datetime import datetime def detect_gpus(): """Detect available GPUs""" if not torch.cuda.is_available(): print("CUDA is not available. No GPUs detected.") return [] gpu_count = torch.cuda.device_count() gpus = [] print(f"Detected {gpu_count} GPU(s):") for i in range(gpu_count): gpu_name = torch.cuda.get_device_name(i) gpu_memory = torch.cuda.get_device_properties(i).total_memory / (1024**3) # GB print(f" GPU {i}: {gpu_name} ({gpu_memory:.1f} GB)") gpus.append(i) return gpus def gpu_worker(gpu_id, running): """Worker function for each GPU""" device = torch.device(f'cuda:{gpu_id}') try: # Create large tensors to use GPU memory (about 60% of available memory) gpu_memory = torch.cuda.get_device_properties(gpu_id).total_memory target_memory = int(gpu_memory * 0.6) # Use 60% of GPU memory elements_needed = target_memory // 4 # Each float32 is 4 bytes # Create multiple large tensors tensors = [] remaining = elements_needed while remaining > 1000000: # At least 1M elements per tensor size = min(remaining // 3, 5000000) # Max 5M elements per tensor if size < 1000000: break tensor = torch.randn(size, device=device, requires_grad=True) tensors.append(tensor) remaining -= size # Create a simple neural network model = nn.Sequential( nn.Linear(1000, 2000), nn.ReLU(), nn.Linear(2000, 1000), nn.ReLU(), nn.Linear(1000, 500) ).to(device) # Create input data input_data = torch.randn(64, 1000, device=device) # Create optimizer optimizer = torch.optim.Adam(model.parameters(), lr=0.001) print(f"GPU {gpu_id}: Started with {len(tensors)} tensors") while running[0]: try: # Forward pass optimizer.zero_grad() output = model(input_data) loss = output.sum() # Backward pass loss.backward() optimizer.step() # Update tensors to keep them active for i, tensor in enumerate(tensors): if i % 2 == 0: tensor.data = tensor.data * 0.99 + torch.randn_like(tensor) * 0.01 else: tensor.data = torch.sin(tensor.data) time.sleep(0.1) # Small delay except Exception as e: print(f"GPU {gpu_id}: Error - {e}") time.sleep(1) except Exception as e: print(f"GPU {gpu_id}: Failed to initialize - {e}") def display_status(gpus, running): """Display GPU status""" while running[0]: try: print(f"\n[{datetime.now().strftime('%H:%M:%S')}] GPU Status:") print("-" * 40) for gpu_id in gpus: try: memory_allocated = torch.cuda.memory_allocated(gpu_id) / (1024**3) memory_total = torch.cuda.get_device_properties(gpu_id).total_memory / (1024**3) utilization = torch.cuda.utilization(gpu_id) if hasattr(torch.cuda, 'utilization') else "N/A" print(f"GPU {gpu_id}: {memory_allocated:.1f}GB / {memory_total:.1f}GB ({(memory_allocated/memory_total)*100:.1f}%) | Util: {utilization}%") except: print(f"GPU {gpu_id}: Status unavailable") time.sleep(5) # Update every 5 seconds except KeyboardInterrupt: break except Exception as e: print(f"Status display error: {e}") time.sleep(5) def main(): print("GPU Keep-Alive Script") print("=" * 30) # Detect GPUs gpus = detect_gpus() if not gpus: return # Shared flag to control all threads running = [True] # Start worker threads for each GPU threads = [] for gpu_id in gpus: thread = threading.Thread(target=gpu_worker, args=(gpu_id, running)) thread.daemon = True thread.start() threads.append(thread) # Start status display thread status_thread = threading.Thread(target=display_status, args=(gpus, running)) status_thread.daemon = True status_thread.start() print(f"\nStarted {len(gpus)} GPU workers") print("Press Ctrl+C to stop...") try: # Keep main thread alive while True: time.sleep(1) except KeyboardInterrupt: print("\nStopping GPU workers...") running[0] = False # Wait for threads to finish for thread in threads: thread.join(timeout=2) # Clear GPU memory torch.cuda.empty_cache() print("GPU workers stopped. Memory cleared.") if __name__ == "__main__": main()
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Dong

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!