Files
thrillwiki_django_no_react/shared/scripts/unraid/vm_manager.py
pacnpal d504d41de2 feat: complete monorepo structure with frontend and shared resources
- Add complete backend/ directory with full Django application
- Add frontend/ directory with Vite + TypeScript setup ready for Next.js
- Add comprehensive shared/ directory with:
  - Complete documentation and memory-bank archives
  - Media files and avatars (letters, park/ride images)
  - Deployment scripts and automation tools
  - Shared types and utilities
- Add architecture/ directory with migration guides
- Configure pnpm workspace for monorepo development
- Update .gitignore to exclude .django_tailwind_cli/ build artifacts
- Preserve all historical documentation in shared/docs/memory-bank/
- Set up proper structure for full-stack development with shared resources
2025-08-23 18:40:07 -04:00

571 lines
20 KiB
Python

#!/usr/bin/env python3
"""
VM Manager for Unraid
Handles VM creation, configuration, and lifecycle management.
"""
import os
import time
import logging
import subprocess
from pathlib import Path
from typing import Optional
import uuid
logger = logging.getLogger(__name__)
class UnraidVMManager:
"""Manages VMs on Unraid server."""
def __init__(self, vm_name: str, unraid_host: str, unraid_user: str = "root"):
self.vm_name = vm_name
self.unraid_host = unraid_host
self.unraid_user = unraid_user
self.vm_config_path = f"/mnt/user/domains/{vm_name}"
def authenticate(self) -> bool:
"""Test SSH connectivity to Unraid server."""
try:
result = subprocess.run(
f"ssh -o ConnectTimeout=10 {self.unraid_user}@{self.unraid_host} 'echo Connected'",
shell=True,
capture_output=True,
text=True,
timeout=15,
)
if result.returncode == 0 and "Connected" in result.stdout:
logger.info("Successfully connected to Unraid via SSH")
return True
else:
logger.error(f"SSH connection failed: {result.stderr}")
return False
except Exception as e:
logger.error(f"SSH authentication error: {e}")
return False
def check_vm_exists(self) -> bool:
"""Check if VM already exists."""
try:
result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh list --all | grep {self.vm_name}'",
shell=True,
capture_output=True,
text=True,
)
return self.vm_name in result.stdout
except Exception as e:
logger.error(f"Error checking VM existence: {e}")
return False
def _generate_mac_suffix(self, vm_ip: str) -> str:
"""Generate MAC address suffix based on VM IP or name."""
if vm_ip.lower() != "dhcp" and "." in vm_ip:
# Use last octet of static IP for MAC generation
last_octet = int(vm_ip.split(".")[-1])
return f"{last_octet:02x}:7d:fd"
else:
# Use hash of VM name for consistent MAC generation
import hashlib
hash_obj = hashlib.md5(self.vm_name.encode())
hash_bytes = hash_obj.digest()[:3]
return ":".join([f"{b:02x}" for b in hash_bytes])
def create_vm_xml(
self,
vm_memory: int,
vm_vcpus: int,
vm_ip: str,
existing_uuid: str = None,
) -> str:
"""Generate VM XML configuration from template file."""
vm_uuid = existing_uuid if existing_uuid else str(uuid.uuid4())
# Read XML template from file
template_path = Path(__file__).parent / "thrillwiki-vm-template.xml"
if not template_path.exists():
raise FileNotFoundError(f"VM XML template not found at {template_path}")
with open(template_path, "r", encoding="utf-8") as f:
xml_template = f.read()
# Calculate CPU topology
cpu_cores = vm_vcpus // 2 if vm_vcpus > 1 else 1
cpu_threads = 2 if vm_vcpus > 1 else 1
# Replace placeholders with actual values
xml_content = xml_template.format(
VM_NAME=self.vm_name,
VM_UUID=vm_uuid,
VM_MEMORY_KIB=vm_memory * 1024,
VM_VCPUS=vm_vcpus,
CPU_CORES=cpu_cores,
CPU_THREADS=cpu_threads,
MAC_SUFFIX=self._generate_mac_suffix(vm_ip),
)
return xml_content.strip()
def upload_iso_to_unraid(self, local_iso_path: Path) -> str:
"""Upload ISO to Unraid server."""
remote_iso_path = f"/mnt/user/isos/{
self.vm_name}-ubuntu-autoinstall.iso"
logger.info(f"Uploading ISO to Unraid: {remote_iso_path}")
try:
# Remove old ISO if exists
subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'rm -f {remote_iso_path}'",
shell=True,
check=False, # Don't fail if file doesn't exist
)
# Upload new ISO
subprocess.run(
f"scp {local_iso_path} {self.unraid_user}@{self.unraid_host}:{remote_iso_path}",
shell=True,
check=True,
)
logger.info(f"ISO uploaded successfully: {remote_iso_path}")
return remote_iso_path
except Exception as e:
logger.error(f"Failed to upload ISO: {e}")
raise
def create_vm(
self, vm_memory: int, vm_vcpus: int, vm_disk_size: int, vm_ip: str
) -> bool:
"""Create or update the VM on Unraid."""
try:
vm_exists = self.check_vm_exists()
if vm_exists:
logger.info(
f"VM {
self.vm_name} already exists, updating configuration..."
)
# Always try to stop VM before updating
current_status = self.vm_status()
logger.info(f"Current VM status: {current_status}")
if current_status not in ["shut off", "unknown"]:
logger.info(
f"Stopping VM {
self.vm_name} for configuration update..."
)
self.stop_vm()
time.sleep(3)
else:
logger.info(f"VM {self.vm_name} is already stopped")
else:
logger.info(f"Creating VM {self.vm_name}...")
# Ensure VM directory exists
subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'mkdir -p {self.vm_config_path}'",
shell=True,
check=True,
)
# Create virtual disk if it doesn't exist
disk_check = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'test -f {self.vm_config_path}/vdisk1.qcow2'",
shell=True,
capture_output=True,
)
if disk_check.returncode != 0:
logger.info(f"Creating virtual disk for VM {self.vm_name}...")
disk_cmd = f"""
ssh {self.unraid_user}@{self.unraid_host} 'qemu-img create -f qcow2 {self.vm_config_path}/vdisk1.qcow2 {vm_disk_size}G'
"""
subprocess.run(disk_cmd, shell=True, check=True)
else:
logger.info(
f"Virtual disk already exists for VM {
self.vm_name}"
)
existing_uuid = None
if vm_exists:
# Get existing VM UUID
cmd = f'ssh {
self.unraid_user}@{
self.unraid_host} \'virsh dumpxml {
self.vm_name} | grep "<uuid>" | sed "s/<uuid>//g" | sed "s/<\\/uuid>//g" | tr -d " "\''
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
)
if result.returncode == 0 and result.stdout.strip():
existing_uuid = result.stdout.strip()
logger.info(f"Found existing VM UUID: {existing_uuid}")
# Check if VM is persistent or transient
persistent_check = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh list --persistent --all | grep {self.vm_name}'",
shell=True,
capture_output=True,
text=True,
)
is_persistent = self.vm_name in persistent_check.stdout
if is_persistent:
# Undefine persistent VM with NVRAM flag
logger.info(
f"VM {
self.vm_name} is persistent, undefining with NVRAM for reconfiguration..."
)
subprocess.run(
f"ssh {
self.unraid_user}@{
self.unraid_host} 'virsh undefine {
self.vm_name} --nvram'",
shell=True,
check=True,
)
logger.info(
f"Persistent VM {
self.vm_name} undefined for reconfiguration"
)
else:
# Handle transient VM - just destroy it
logger.info(
f"VM {
self.vm_name} is transient, destroying for reconfiguration..."
)
if self.vm_status() == "running":
subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh destroy {self.vm_name}'",
shell=True,
check=True,
)
logger.info(
f"Transient VM {
self.vm_name} destroyed for reconfiguration"
)
# Generate VM XML with appropriate UUID
vm_xml = self.create_vm_xml(vm_memory, vm_vcpus, vm_ip, existing_uuid)
xml_file = f"/tmp/{self.vm_name}.xml"
with open(xml_file, "w", encoding="utf-8") as f:
f.write(vm_xml)
# Copy XML to Unraid and define/redefine VM
subprocess.run(
f"scp {xml_file} {self.unraid_user}@{self.unraid_host}:/tmp/",
shell=True,
check=True,
)
# Define VM as persistent domain
subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh define /tmp/{self.vm_name}.xml'",
shell=True,
check=True,
)
# Ensure VM is set to autostart for persistent configuration
subprocess.run(
f"ssh {
self.unraid_user}@{
self.unraid_host} 'virsh autostart {
self.vm_name}'",
shell=True,
check=False, # Don't fail if autostart is already enabled
)
action = "updated" if vm_exists else "created"
logger.info(f"VM {self.vm_name} {action} successfully")
# Cleanup
os.remove(xml_file)
return True
except Exception as e:
logger.error(f"Failed to create VM: {e}")
return False
def create_nvram_file(self, vm_uuid: str) -> bool:
"""Create NVRAM file for UEFI VM."""
try:
nvram_path = f"/etc/libvirt/qemu/nvram/{vm_uuid}_VARS-pure-efi.fd"
# Check if NVRAM file already exists
result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'test -f {nvram_path}'",
shell=True,
capture_output=True,
)
if result.returncode == 0:
logger.info(f"NVRAM file already exists: {nvram_path}")
return True
# Copy template to create NVRAM file
logger.info(f"Creating NVRAM file: {nvram_path}")
result = subprocess.run(
f"ssh {
self.unraid_user}@{
self.unraid_host} 'cp /usr/share/qemu/ovmf-x64/OVMF_VARS-pure-efi.fd {nvram_path}'",
shell=True,
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.info("NVRAM file created successfully")
return True
else:
logger.error(f"Failed to create NVRAM file: {result.stderr}")
return False
except Exception as e:
logger.error(f"Error creating NVRAM file: {e}")
return False
def start_vm(self) -> bool:
"""Start the VM if it's not already running."""
try:
# Check if VM is already running
current_status = self.vm_status()
if current_status == "running":
logger.info(f"VM {self.vm_name} is already running")
return True
logger.info(f"Starting VM {self.vm_name}...")
# For new VMs, we need to extract the UUID and create NVRAM file
vm_exists = self.check_vm_exists()
if not vm_exists:
logger.error("Cannot start VM that doesn't exist")
return False
# Get VM UUID from XML
cmd = f'ssh {
self.unraid_user}@{
self.unraid_host} \'virsh dumpxml {
self.vm_name} | grep "<uuid>" | sed "s/<uuid>//g" | sed "s/<\\/uuid>//g" | tr -d " "\''
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
)
if result.returncode == 0 and result.stdout.strip():
vm_uuid = result.stdout.strip()
logger.info(f"VM UUID: {vm_uuid}")
# Create NVRAM file if it doesn't exist
if not self.create_nvram_file(vm_uuid):
return False
result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh start {self.vm_name}'",
shell=True,
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.info(f"VM {self.vm_name} started successfully")
return True
else:
logger.error(f"Failed to start VM: {result.stderr}")
return False
except Exception as e:
logger.error(f"Error starting VM: {e}")
return False
def stop_vm(self) -> bool:
"""Stop the VM with timeout and force destroy if needed."""
try:
logger.info(f"Stopping VM {self.vm_name}...")
# Try graceful shutdown first
result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh shutdown {self.vm_name}'",
shell=True,
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
# Wait up to 30 seconds for graceful shutdown
logger.info(
f"Waiting for VM {
self.vm_name} to shutdown gracefully..."
)
for i in range(30):
status = self.vm_status()
if status in ["shut off", "unknown"]:
logger.info(f"VM {self.vm_name} stopped gracefully")
return True
time.sleep(1)
# If still running after 30 seconds, force destroy
logger.warning(
f"VM {
self.vm_name} didn't shutdown gracefully, forcing destroy..."
)
destroy_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh destroy {self.vm_name}'",
shell=True,
capture_output=True,
text=True,
timeout=10,
)
if destroy_result.returncode == 0:
logger.info(f"VM {self.vm_name} forcefully destroyed")
return True
else:
logger.error(
f"Failed to destroy VM: {
destroy_result.stderr}"
)
return False
else:
logger.error(
f"Failed to initiate VM shutdown: {
result.stderr}"
)
return False
except subprocess.TimeoutExpired:
logger.error(f"Timeout stopping VM {self.vm_name}")
return False
except Exception as e:
logger.error(f"Error stopping VM: {e}")
return False
def get_vm_ip(self) -> Optional[str]:
"""Get VM IP address."""
try:
# Wait for VM to get IP - Ubuntu autoinstall can take 20-30 minutes
max_attempts = 120 # 20 minutes total wait time
for attempt in range(max_attempts):
result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh domifaddr {self.vm_name}'",
shell=True,
capture_output=True,
text=True,
)
if result.returncode == 0 and "ipv4" in result.stdout:
lines = result.stdout.strip().split("\\n")
for line in lines:
if "ipv4" in line:
# Extract IP from line like: vnet0
# 52:54:00:xx:xx:xx ipv4
# 192.168.1.100/24
parts = line.split()
if len(parts) >= 4:
ip_with_mask = parts[3]
ip = ip_with_mask.split("/")[0]
logger.info(f"VM IP address: {ip}")
return ip
logger.info(
f"Waiting for VM IP... (attempt {
attempt + 1}/{max_attempts}) - Ubuntu autoinstall in progress"
)
time.sleep(10)
logger.error("Failed to get VM IP address")
return None
except Exception as e:
logger.error(f"Error getting VM IP: {e}")
return None
def vm_status(self) -> str:
"""Get VM status."""
try:
result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh domstate {self.vm_name}'",
shell=True,
capture_output=True,
text=True,
)
if result.returncode == 0:
return result.stdout.strip()
else:
return "unknown"
except Exception as e:
logger.error(f"Error getting VM status: {e}")
return "error"
def delete_vm(self) -> bool:
"""Completely remove VM and all associated files."""
try:
logger.info(
f"Deleting VM {
self.vm_name} and all associated files..."
)
# Check if VM exists
if not self.check_vm_exists():
logger.info(f"VM {self.vm_name} does not exist")
return True
# Stop VM if running
if self.vm_status() == "running":
logger.info(f"Stopping VM {self.vm_name}...")
self.stop_vm()
time.sleep(5)
# Undefine VM with NVRAM
logger.info(f"Undefining VM {self.vm_name}...")
subprocess.run(
f"ssh {
self.unraid_user}@{
self.unraid_host} 'virsh undefine {
self.vm_name} --nvram'",
shell=True,
check=True,
)
# Remove VM directory and all files
logger.info(f"Removing VM directory and files...")
subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'rm -rf {self.vm_config_path}'",
shell=True,
check=True,
)
# Remove autoinstall ISO
subprocess.run(
f"ssh {
self.unraid_user}@{
self.unraid_host} 'rm -f /mnt/user/isos/{
self.vm_name}-ubuntu-autoinstall.iso'",
shell=True,
check=False, # Don't fail if file doesn't exist
)
logger.info(f"VM {self.vm_name} completely removed")
return True
except Exception as e:
logger.error(f"Failed to delete VM: {e}")
return False