python展开代码#!/usr/bin/env python3
import torch
import torch.nn as nn
import time
import threading
from datetime import datetime
def detect_gpus():
"""Detect available GPUs"""
if not torch.cuda.is_available():
print("CUDA is not available. No GPUs detected.")
return []
gpu_count = torch.cuda.device_count()
gpus = []
print(f"Detected {gpu_count} GPU(s):")
for i in range(gpu_count):
gpu_name = torch.cuda.get_device_name(i)
gpu_memory = torch.cuda.get_device_properties(i).total_memory / (1024**3) # GB
print(f" GPU {i}: {gpu_name} ({gpu_memory:.1f} GB)")
gpus.append(i)
return gpus
def gpu_worker(gpu_id, running):
"""Worker function for each GPU"""
device = torch.device(f'cuda:{gpu_id}')
try:
# Create large tensors to use GPU memory (about 60% of available memory)
gpu_memory = torch.cuda.get_device_properties(gpu_id).total_memory
target_memory = int(gpu_memory * 0.6) # Use 60% of GPU memory
elements_needed = target_memory // 4 # Each float32 is 4 bytes
# Create multiple large tensors
tensors = []
remaining = elements_needed
while remaining > 1000000: # At least 1M elements per tensor
size = min(remaining // 3, 5000000) # Max 5M elements per tensor
if size < 1000000:
break
tensor = torch.randn(size, device=device, requires_grad=True)
tensors.append(tensor)
remaining -= size
# Create a simple neural network
model = nn.Sequential(
nn.Linear(1000, 2000),
nn.ReLU(),
nn.Linear(2000, 1000),
nn.ReLU(),
nn.Linear(1000, 500)
).to(device)
# Create input data
input_data = torch.randn(64, 1000, device=device)
# Create optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
print(f"GPU {gpu_id}: Started with {len(tensors)} tensors")
while running[0]:
try:
# Forward pass
optimizer.zero_grad()
output = model(input_data)
loss = output.sum()
# Backward pass
loss.backward()
optimizer.step()
# Update tensors to keep them active
for i, tensor in enumerate(tensors):
if i % 2 == 0:
tensor.data = tensor.data * 0.99 + torch.randn_like(tensor) * 0.01
else:
tensor.data = torch.sin(tensor.data)
time.sleep(0.1) # Small delay
except Exception as e:
print(f"GPU {gpu_id}: Error - {e}")
time.sleep(1)
except Exception as e:
print(f"GPU {gpu_id}: Failed to initialize - {e}")
def display_status(gpus, running):
"""Display GPU status"""
while running[0]:
try:
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] GPU Status:")
print("-" * 40)
for gpu_id in gpus:
try:
memory_allocated = torch.cuda.memory_allocated(gpu_id) / (1024**3)
memory_total = torch.cuda.get_device_properties(gpu_id).total_memory / (1024**3)
utilization = torch.cuda.utilization(gpu_id) if hasattr(torch.cuda, 'utilization') else "N/A"
print(f"GPU {gpu_id}: {memory_allocated:.1f}GB / {memory_total:.1f}GB ({(memory_allocated/memory_total)*100:.1f}%) | Util: {utilization}%")
except:
print(f"GPU {gpu_id}: Status unavailable")
time.sleep(5) # Update every 5 seconds
except KeyboardInterrupt:
break
except Exception as e:
print(f"Status display error: {e}")
time.sleep(5)
def main():
print("GPU Keep-Alive Script")
print("=" * 30)
# Detect GPUs
gpus = detect_gpus()
if not gpus:
return
# Shared flag to control all threads
running = [True]
# Start worker threads for each GPU
threads = []
for gpu_id in gpus:
thread = threading.Thread(target=gpu_worker, args=(gpu_id, running))
thread.daemon = True
thread.start()
threads.append(thread)
# Start status display thread
status_thread = threading.Thread(target=display_status, args=(gpus, running))
status_thread.daemon = True
status_thread.start()
print(f"\nStarted {len(gpus)} GPU workers")
print("Press Ctrl+C to stop...")
try:
# Keep main thread alive
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping GPU workers...")
running[0] = False
# Wait for threads to finish
for thread in threads:
thread.join(timeout=2)
# Clear GPU memory
torch.cuda.empty_cache()
print("GPU workers stopped. Memory cleared.")
if __name__ == "__main__":
main()
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!