#!/usr/bin/env python3 """ Template VM Manager for ThrillWiki Handles copying template VM disks and managing template-based deployments. """ import os import sys import time import logging import subprocess from typing import Dict logger = logging.getLogger(__name__) class TemplateVMManager: """Manages template-based VM deployment on Unraid.""" def __init__(self, unraid_host: str, unraid_user: str = "root"): self.unraid_host = unraid_host self.unraid_user = unraid_user self.template_vm_name = "thrillwiki-template-ubuntu" self.template_path = f"/mnt/user/domains/{self.template_vm_name}" def authenticate(self) -> bool: """Test SSH connectivity to Unraid server.""" try: result = subprocess.run( f"ssh -o ConnectTimeout=10 {self.unraid_user}@{self.unraid_host} 'echo Connected'", shell=True, capture_output=True, text=True, timeout=15, ) if result.returncode == 0 and "Connected" in result.stdout: logger.info("Successfully connected to Unraid via SSH") return True else: logger.error(f"SSH connection failed: {result.stderr}") return False except Exception as e: logger.error(f"SSH authentication error: {e}") return False def check_template_exists(self) -> bool: """Check if template VM disk exists.""" try: result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'test -f {self.template_path}/vdisk1.qcow2'", shell=True, capture_output=True, text=True, ) if result.returncode == 0: logger.info( f"Template VM disk found at { self.template_path}/vdisk1.qcow2" ) return True else: logger.error( f"Template VM disk not found at { self.template_path}/vdisk1.qcow2" ) return False except Exception as e: logger.error(f"Error checking template existence: {e}") return False def get_template_info(self) -> Dict[str, str]: """Get information about the template VM.""" try: # Get disk size size_result = subprocess.run( f"ssh { self.unraid_user}@{ self.unraid_host} 'qemu-img info { self.template_path}/vdisk1.qcow2 | grep \"virtual size\"'", shell=True, capture_output=True, text=True, ) # Get file size file_size_result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'ls -lh {self.template_path}/vdisk1.qcow2'", shell=True, capture_output=True, text=True, ) # Get last modification time mod_time_result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'stat -c \"%y\" {self.template_path}/vdisk1.qcow2'", shell=True, capture_output=True, text=True, ) info = { "template_path": f"{ self.template_path}/vdisk1.qcow2", "virtual_size": ( size_result.stdout.strip() if size_result.returncode == 0 else "Unknown" ), "file_size": ( file_size_result.stdout.split()[4] if file_size_result.returncode == 0 else "Unknown" ), "last_modified": ( mod_time_result.stdout.strip() if mod_time_result.returncode == 0 else "Unknown" ), } return info except Exception as e: logger.error(f"Error getting template info: {e}") return {} def copy_template_disk(self, target_vm_name: str) -> bool: """Copy template VM disk to a new VM instance.""" try: if not self.check_template_exists(): logger.error("Template VM disk not found. Cannot proceed with copy.") return False target_path = f"/mnt/user/domains/{target_vm_name}" target_disk = f"{target_path}/vdisk1.qcow2" logger.info(f"Copying template disk to new VM: {target_vm_name}") # Create target directory subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'mkdir -p {target_path}'", shell=True, check=True, ) # Check if target disk already exists disk_check = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'test -f {target_disk}'", shell=True, capture_output=True, ) if disk_check.returncode == 0: logger.warning(f"Target disk already exists: {target_disk}") logger.info( "Removing existing disk to replace with fresh template copy..." ) subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'rm -f {target_disk}'", shell=True, check=True, ) # Copy template disk with rsync progress display logger.info("🚀 Copying template disk with rsync progress display...") start_time = time.time() # First, get the size of the template disk for progress calculation size_result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'stat -c%s {self.template_path}/vdisk1.qcow2'", shell=True, capture_output=True, text=True, ) template_size = "unknown size" if size_result.returncode == 0: size_bytes = int(size_result.stdout.strip()) if size_bytes > 1024 * 1024 * 1024: # GB template_size = f"{size_bytes / (1024 * 1024 * 1024):.1f}GB" elif size_bytes > 1024 * 1024: # MB template_size = f"{size_bytes / (1024 * 1024):.1f}MB" else: template_size = f"{size_bytes / 1024:.1f}KB" logger.info(f"📊 Template disk size: {template_size}") # Use rsync with progress display logger.info("📈 Using rsync for real-time progress display...") # Force rsync to output progress to stderr and capture it copy_cmd = f"ssh { self.unraid_user}@{ self.unraid_host} 'rsync -av --progress --stats { self.template_path}/vdisk1.qcow2 {target_disk}'" # Run with real-time output, unbuffered process = subprocess.Popen( copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=0, # Unbuffered universal_newlines=True, ) import select # Read both stdout and stderr for progress with real-time display while True: # Check if process is still running if process.poll() is not None: # Process finished, read any remaining output remaining_out = process.stdout.read() remaining_err = process.stderr.read() if remaining_out: print(f"📊 {remaining_out.strip()}", flush=True) logger.info(f"📊 {remaining_out.strip()}") if remaining_err: for line in remaining_err.strip().split("\n"): if line.strip(): print(f"⚡ {line.strip()}", flush=True) logger.info(f"⚡ {line.strip()}") break # Use select to check for available data try: ready, _, _ = select.select( [process.stdout, process.stderr], [], [], 0.1 ) for stream in ready: line = stream.readline() if line: line = line.strip() if line: if stream == process.stdout: print(f"📊 {line}", flush=True) logger.info(f"📊 {line}") else: # stderr # rsync progress goes to stderr if any( keyword in line for keyword in [ "%", "bytes/sec", "to-check=", "xfr#", ] ): print(f"⚡ {line}", flush=True) logger.info(f"⚡ {line}") else: print(f"📋 {line}", flush=True) logger.info(f"📋 {line}") except select.error: # Fallback for systems without select (like some Windows # environments) print( "⚠️ select() not available, using fallback method...", flush=True, ) logger.info("⚠️ select() not available, using fallback method...") # Simple fallback - just wait and read what's available time.sleep(0.5) try: # Try to read non-blocking import fcntl import os # Make stdout/stderr non-blocking fd_out = process.stdout.fileno() fd_err = process.stderr.fileno() fl_out = fcntl.fcntl(fd_out, fcntl.F_GETFL) fl_err = fcntl.fcntl(fd_err, fcntl.F_GETFL) fcntl.fcntl(fd_out, fcntl.F_SETFL, fl_out | os.O_NONBLOCK) fcntl.fcntl(fd_err, fcntl.F_SETFL, fl_err | os.O_NONBLOCK) try: out_line = process.stdout.readline() if out_line: print(f"📊 {out_line.strip()}", flush=True) logger.info(f"📊 {out_line.strip()}") except BaseException: pass try: err_line = process.stderr.readline() if err_line: if any( keyword in err_line for keyword in [ "%", "bytes/sec", "to-check=", "xfr#", ] ): print(f"⚡ {err_line.strip()}", flush=True) logger.info(f"⚡ {err_line.strip()}") else: print(f"📋 {err_line.strip()}", flush=True) logger.info(f"📋 {err_line.strip()}") except BaseException: pass except ImportError: # If fcntl not available, just continue print( "📊 Progress display limited - continuing copy...", flush=True, ) logger.info("📊 Progress display limited - continuing copy...") break copy_result_code = process.wait() end_time = time.time() copy_time = end_time - start_time if copy_result_code == 0: logger.info( f"✅ Template disk copied successfully in { copy_time:.1f} seconds" ) logger.info(f"🎯 New VM disk created: {target_disk}") # Verify the copy by checking file size verify_result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'ls -lh {target_disk}'", shell=True, capture_output=True, text=True, ) if verify_result.returncode == 0: file_info = verify_result.stdout.strip().split() if len(file_info) >= 5: copied_size = file_info[4] logger.info(f"📋 Copied disk size: {copied_size}") return True else: logger.error( f"❌ Failed to copy template disk (exit code: {copy_result_code})" ) logger.error("Check Unraid server disk space and permissions") return False except Exception as e: logger.error(f"Error copying template disk: {e}") return False def prepare_vm_from_template( self, target_vm_name: str, vm_memory: int, vm_vcpus: int, vm_ip: str ) -> bool: """Complete template-based VM preparation.""" try: logger.info(f"Preparing VM '{target_vm_name}' from template...") # Step 1: Copy template disk if not self.copy_template_disk(target_vm_name): return False logger.info(f"VM '{target_vm_name}' prepared successfully from template") logger.info("The VM disk is ready with Ubuntu pre-installed") logger.info("You can now create the VM configuration and start it") return True except Exception as e: logger.error(f"Error preparing VM from template: {e}") return False def update_template(self) -> bool: """Update the template VM with latest changes.""" try: logger.info("Updating template VM...") logger.info("Note: This should be done manually by:") logger.info("1. Starting the template VM") logger.info("2. Updating Ubuntu packages") logger.info("3. Updating ThrillWiki dependencies") logger.info("4. Stopping the template VM") logger.info("5. The disk will automatically be the new template") # Check template VM status template_status = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh domstate {self.template_vm_name}'", shell=True, capture_output=True, text=True, ) if template_status.returncode == 0: status = template_status.stdout.strip() logger.info( f"Template VM '{ self.template_vm_name}' status: {status}" ) if status == "running": logger.warning("Template VM is currently running!") logger.warning("Stop the template VM when updates are complete") logger.warning("Running VMs should not be used as templates") return False elif status in ["shut off", "shutoff"]: logger.info( "Template VM is properly stopped and ready to use as template" ) return True else: logger.warning(f"Template VM in unexpected state: {status}") return False else: logger.error("Could not check template VM status") return False except Exception as e: logger.error(f"Error updating template: {e}") return False def list_template_instances(self) -> list: """List all VMs that were created from the template.""" try: # Get all domains result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh list --all --name'", shell=True, capture_output=True, text=True, ) if result.returncode != 0: logger.error("Failed to list VMs") return [] all_vms = result.stdout.strip().split("\n") # Filter for thrillwiki VMs (excluding template) template_instances = [] for vm in all_vms: vm = vm.strip() if vm and "thrillwiki" in vm.lower() and vm != self.template_vm_name: # Get VM status status_result = subprocess.run( f"ssh {self.unraid_user}@{self.unraid_host} 'virsh domstate {vm}'", shell=True, capture_output=True, text=True, ) status = ( status_result.stdout.strip() if status_result.returncode == 0 else "unknown" ) template_instances.append({"name": vm, "status": status}) return template_instances except Exception as e: logger.error(f"Error listing template instances: {e}") return [] def main(): """Main entry point for template manager.""" import argparse parser = argparse.ArgumentParser( description="ThrillWiki Template VM Manager", epilog=""" Examples: python template_manager.py info # Show template info python template_manager.py copy my-vm # Copy template to new VM python template_manager.py list # List template instances python template_manager.py update # Update template VM """, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "action", choices=["info", "copy", "list", "update", "check"], help="Action to perform", ) parser.add_argument("vm_name", nargs="?", help="VM name (required for copy action)") args = parser.parse_args() # Get Unraid connection details from environment unraid_host = os.environ.get("UNRAID_HOST") unraid_user = os.environ.get("UNRAID_USER", "root") if not unraid_host: logger.error("UNRAID_HOST environment variable is required") sys.exit(1) # Create template manager template_manager = TemplateVMManager(unraid_host, unraid_user) # Authenticate if not template_manager.authenticate(): logger.error("Failed to connect to Unraid server") sys.exit(1) if args.action == "info": logger.info("📋 Template VM Information") info = template_manager.get_template_info() if info: print(f"Template Path: {info['template_path']}") print(f"Virtual Size: {info['virtual_size']}") print(f"File Size: {info['file_size']}") print(f"Last Modified: {info['last_modified']}") else: print("❌ Failed to get template information") sys.exit(1) elif args.action == "check": if template_manager.check_template_exists(): logger.info("✅ Template VM disk exists and is ready to use") sys.exit(0) else: logger.error("❌ Template VM disk not found") sys.exit(1) elif args.action == "copy": if not args.vm_name: logger.error("VM name is required for copy action") sys.exit(1) success = template_manager.copy_template_disk(args.vm_name) sys.exit(0 if success else 1) elif args.action == "list": logger.info("📋 Template-based VM Instances") instances = template_manager.list_template_instances() if instances: for instance in instances: status_emoji = ( "🟢" if instance["status"] == "running" else "🔴" if instance["status"] == "shut off" else "🟡" ) print( f"{status_emoji} { instance['name']} ({ instance['status']})" ) else: print("No template instances found") elif args.action == "update": success = template_manager.update_template() sys.exit(0 if success else 1) if __name__ == "__main__": # Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler()], ) main()