#!/usr/bin/env python3 """ VM Manager for Unraid Handles VM creation, configuration, and lifecycle management. """ import os import time import logging import subprocess from pathlib import Path from typing import Optional import uuid logger = logging.getLogger(__name__) class UnraidVMManager: """Manages VMs on Unraid server.""" def __init__(self, vm_name: str, unraid_host: str, unraid_user: str = "root"): self.vm_name = vm_name self.unraid_host = unraid_host self.unraid_user = unraid_user self.vm_config_path = f"/mnt/user/domains/{vm_name}" def authenticate(self) -> bool: """Test SSH connectivity to Unraid server.""" try: result = subprocess.run( f"ssh -o ConnectTimeout=10 {self.unraid_user}@{self.unraid_host} 'echo Connected'", shell=True, capture_output=True, text=True, timeout=15, ) if result.returncode == 0 and "Connected" in result.stdout: logger.info("Successfully connected to Unraid via SSH") return True else: logger.error(f"SSH connection failed: {result.stderr}") return False except Exception as e: logger.error(f"SSH authentication error: {e}") return False def check_vm_exists(self) -> bool: """Check if VM already exists.""" try: result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh list --all | grep {self.vm_name}'", shell=True, capture_output=True, text=True, ) return self.vm_name in result.stdout except Exception as e: logger.error(f"Error checking VM existence: {e}") return False def _generate_mac_suffix(self, vm_ip: str) -> str: """Generate MAC address suffix based on VM IP or name.""" if vm_ip.lower() != "dhcp" and "." in vm_ip: # Use last octet of static IP for MAC generation last_octet = int(vm_ip.split(".")[-1]) return f"{last_octet:02x}:7d:fd" else: # Use hash of VM name for consistent MAC generation import hashlib hash_obj = hashlib.md5(self.vm_name.encode()) hash_bytes = hash_obj.digest()[:3] return ":".join([f"{b:02x}" for b in hash_bytes]) def create_vm_xml( self, vm_memory: int, vm_vcpus: int, vm_ip: str, existing_uuid: str = None, ) -> str: """Generate VM XML configuration from template file.""" vm_uuid = existing_uuid if existing_uuid else str(uuid.uuid4()) # Read XML template from file template_path = Path(__file__).parent / "thrillwiki-vm-template.xml" if not template_path.exists(): raise FileNotFoundError(f"VM XML template not found at {template_path}") with open(template_path, "r", encoding="utf-8") as f: xml_template = f.read() # Calculate CPU topology cpu_cores = vm_vcpus // 2 if vm_vcpus > 1 else 1 cpu_threads = 2 if vm_vcpus > 1 else 1 # Replace placeholders with actual values xml_content = xml_template.format( VM_NAME=self.vm_name, VM_UUID=vm_uuid, VM_MEMORY_KIB=vm_memory * 1024, VM_VCPUS=vm_vcpus, CPU_CORES=cpu_cores, CPU_THREADS=cpu_threads, MAC_SUFFIX=self._generate_mac_suffix(vm_ip), ) return xml_content.strip() def upload_iso_to_unraid(self, local_iso_path: Path) -> str: """Upload ISO to Unraid server.""" remote_iso_path = f"/mnt/user/isos/{ self.vm_name}-ubuntu-autoinstall.iso" logger.info(f"Uploading ISO to Unraid: {remote_iso_path}") try: # Remove old ISO if exists subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'rm -f {remote_iso_path}'", shell=True, check=False, # Don't fail if file doesn't exist ) # Upload new ISO subprocess.run( f"scp {local_iso_path} {self.unraid_user}@{self.unraid_host}:{remote_iso_path}", shell=True, check=True, ) logger.info(f"ISO uploaded successfully: {remote_iso_path}") return remote_iso_path except Exception as e: logger.error(f"Failed to upload ISO: {e}") raise def create_vm( self, vm_memory: int, vm_vcpus: int, vm_disk_size: int, vm_ip: str ) -> bool: """Create or update the VM on Unraid.""" try: vm_exists = self.check_vm_exists() if vm_exists: logger.info( f"VM { self.vm_name} already exists, updating configuration..." ) # Always try to stop VM before updating current_status = self.vm_status() logger.info(f"Current VM status: {current_status}") if current_status not in ["shut off", "unknown"]: logger.info( f"Stopping VM { self.vm_name} for configuration update..." ) self.stop_vm() time.sleep(3) else: logger.info(f"VM {self.vm_name} is already stopped") else: logger.info(f"Creating VM {self.vm_name}...") # Ensure VM directory exists subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'mkdir -p {self.vm_config_path}'", shell=True, check=True, ) # Create virtual disk if it doesn't exist disk_check = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'test -f {self.vm_config_path}/vdisk1.qcow2'", shell=True, capture_output=True, ) if disk_check.returncode != 0: logger.info(f"Creating virtual disk for VM {self.vm_name}...") disk_cmd = f""" ssh {self.unraid_user}@{self.unraid_host} 'qemu-img create -f qcow2 {self.vm_config_path}/vdisk1.qcow2 {vm_disk_size}G' """ subprocess.run(disk_cmd, shell=True, check=True) else: logger.info( f"Virtual disk already exists for VM { self.vm_name}" ) existing_uuid = None if vm_exists: # Get existing VM UUID cmd = f'ssh { self.unraid_user}@{ self.unraid_host} \'virsh dumpxml { self.vm_name} | grep "" | sed "s///g" | sed "s/<\\/uuid>//g" | tr -d " "\'' result = subprocess.run( cmd, shell=True, capture_output=True, text=True, ) if result.returncode == 0 and result.stdout.strip(): existing_uuid = result.stdout.strip() logger.info(f"Found existing VM UUID: {existing_uuid}") # Check if VM is persistent or transient persistent_check = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh list --persistent --all | grep {self.vm_name}'", shell=True, capture_output=True, text=True, ) is_persistent = self.vm_name in persistent_check.stdout if is_persistent: # Undefine persistent VM with NVRAM flag logger.info( f"VM { self.vm_name} is persistent, undefining with NVRAM for reconfiguration..." ) subprocess.run( f"ssh { self.unraid_user}@{ self.unraid_host} 'virsh undefine { self.vm_name} --nvram'", shell=True, check=True, ) logger.info( f"Persistent VM { self.vm_name} undefined for reconfiguration" ) else: # Handle transient VM - just destroy it logger.info( f"VM { self.vm_name} is transient, destroying for reconfiguration..." ) if self.vm_status() == "running": subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh destroy {self.vm_name}'", shell=True, check=True, ) logger.info( f"Transient VM { self.vm_name} destroyed for reconfiguration" ) # Generate VM XML with appropriate UUID vm_xml = self.create_vm_xml(vm_memory, vm_vcpus, vm_ip, existing_uuid) xml_file = f"/tmp/{self.vm_name}.xml" with open(xml_file, "w", encoding="utf-8") as f: f.write(vm_xml) # Copy XML to Unraid and define/redefine VM subprocess.run( f"scp {xml_file} {self.unraid_user}@{self.unraid_host}:/tmp/", shell=True, check=True, ) # Define VM as persistent domain subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh define /tmp/{self.vm_name}.xml'", shell=True, check=True, ) # Ensure VM is set to autostart for persistent configuration subprocess.run( f"ssh { self.unraid_user}@{ self.unraid_host} 'virsh autostart { self.vm_name}'", shell=True, check=False, # Don't fail if autostart is already enabled ) action = "updated" if vm_exists else "created" logger.info(f"VM {self.vm_name} {action} successfully") # Cleanup os.remove(xml_file) return True except Exception as e: logger.error(f"Failed to create VM: {e}") return False def create_nvram_file(self, vm_uuid: str) -> bool: """Create NVRAM file for UEFI VM.""" try: nvram_path = f"/etc/libvirt/qemu/nvram/{vm_uuid}_VARS-pure-efi.fd" # Check if NVRAM file already exists result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'test -f {nvram_path}'", shell=True, capture_output=True, ) if result.returncode == 0: logger.info(f"NVRAM file already exists: {nvram_path}") return True # Copy template to create NVRAM file logger.info(f"Creating NVRAM file: {nvram_path}") result = subprocess.run( f"ssh { self.unraid_user}@{ self.unraid_host} 'cp /usr/share/qemu/ovmf-x64/OVMF_VARS-pure-efi.fd {nvram_path}'", shell=True, capture_output=True, text=True, ) if result.returncode == 0: logger.info("NVRAM file created successfully") return True else: logger.error(f"Failed to create NVRAM file: {result.stderr}") return False except Exception as e: logger.error(f"Error creating NVRAM file: {e}") return False def start_vm(self) -> bool: """Start the VM if it's not already running.""" try: # Check if VM is already running current_status = self.vm_status() if current_status == "running": logger.info(f"VM {self.vm_name} is already running") return True logger.info(f"Starting VM {self.vm_name}...") # For new VMs, we need to extract the UUID and create NVRAM file vm_exists = self.check_vm_exists() if not vm_exists: logger.error("Cannot start VM that doesn't exist") return False # Get VM UUID from XML cmd = f'ssh { self.unraid_user}@{ self.unraid_host} \'virsh dumpxml { self.vm_name} | grep "" | sed "s///g" | sed "s/<\\/uuid>//g" | tr -d " "\'' result = subprocess.run( cmd, shell=True, capture_output=True, text=True, ) if result.returncode == 0 and result.stdout.strip(): vm_uuid = result.stdout.strip() logger.info(f"VM UUID: {vm_uuid}") # Create NVRAM file if it doesn't exist if not self.create_nvram_file(vm_uuid): return False result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh start {self.vm_name}'", shell=True, capture_output=True, text=True, ) if result.returncode == 0: logger.info(f"VM {self.vm_name} started successfully") return True else: logger.error(f"Failed to start VM: {result.stderr}") return False except Exception as e: logger.error(f"Error starting VM: {e}") return False def stop_vm(self) -> bool: """Stop the VM with timeout and force destroy if needed.""" try: logger.info(f"Stopping VM {self.vm_name}...") # Try graceful shutdown first result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh shutdown {self.vm_name}'", shell=True, capture_output=True, text=True, timeout=10, ) if result.returncode == 0: # Wait up to 30 seconds for graceful shutdown logger.info( f"Waiting for VM { self.vm_name} to shutdown gracefully..." ) for i in range(30): status = self.vm_status() if status in ["shut off", "unknown"]: logger.info(f"VM {self.vm_name} stopped gracefully") return True time.sleep(1) # If still running after 30 seconds, force destroy logger.warning( f"VM { self.vm_name} didn't shutdown gracefully, forcing destroy..." ) destroy_result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh destroy {self.vm_name}'", shell=True, capture_output=True, text=True, timeout=10, ) if destroy_result.returncode == 0: logger.info(f"VM {self.vm_name} forcefully destroyed") return True else: logger.error( f"Failed to destroy VM: { destroy_result.stderr}" ) return False else: logger.error( f"Failed to initiate VM shutdown: { result.stderr}" ) return False except subprocess.TimeoutExpired: logger.error(f"Timeout stopping VM {self.vm_name}") return False except Exception as e: logger.error(f"Error stopping VM: {e}") return False def get_vm_ip(self) -> Optional[str]: """Get VM IP address.""" try: # Wait for VM to get IP - Ubuntu autoinstall can take 20-30 minutes max_attempts = 120 # 20 minutes total wait time for attempt in range(max_attempts): result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh domifaddr {self.vm_name}'", shell=True, capture_output=True, text=True, ) if result.returncode == 0 and "ipv4" in result.stdout: lines = result.stdout.strip().split("\\n") for line in lines: if "ipv4" in line: # Extract IP from line like: vnet0 # 52:54:00:xx:xx:xx ipv4 # 192.168.1.100/24 parts = line.split() if len(parts) >= 4: ip_with_mask = parts[3] ip = ip_with_mask.split("/")[0] logger.info(f"VM IP address: {ip}") return ip logger.info( f"Waiting for VM IP... (attempt { attempt + 1}/{max_attempts}) - Ubuntu autoinstall in progress" ) time.sleep(10) logger.error("Failed to get VM IP address") return None except Exception as e: logger.error(f"Error getting VM IP: {e}") return None def vm_status(self) -> str: """Get VM status.""" try: result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh domstate {self.vm_name}'", shell=True, capture_output=True, text=True, ) if result.returncode == 0: return result.stdout.strip() else: return "unknown" except Exception as e: logger.error(f"Error getting VM status: {e}") return "error" def delete_vm(self) -> bool: """Completely remove VM and all associated files.""" try: logger.info( f"Deleting VM { self.vm_name} and all associated files..." ) # Check if VM exists if not self.check_vm_exists(): logger.info(f"VM {self.vm_name} does not exist") return True # Stop VM if running if self.vm_status() == "running": logger.info(f"Stopping VM {self.vm_name}...") self.stop_vm() time.sleep(5) # Undefine VM with NVRAM logger.info(f"Undefining VM {self.vm_name}...") subprocess.run( f"ssh { self.unraid_user}@{ self.unraid_host} 'virsh undefine { self.vm_name} --nvram'", shell=True, check=True, ) # Remove VM directory and all files logger.info(f"Removing VM directory and files...") subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'rm -rf {self.vm_config_path}'", shell=True, check=True, ) # Remove autoinstall ISO subprocess.run( f"ssh { self.unraid_user}@{ self.unraid_host} 'rm -f /mnt/user/isos/{ self.vm_name}-ubuntu-autoinstall.iso'", shell=True, check=False, # Don't fail if file doesn't exist ) logger.info(f"VM {self.vm_name} completely removed") return True except Exception as e: logger.error(f"Failed to delete VM: {e}") return False