Files
thrillwiki_django_no_react/scripts/unraid/template_manager.py
pacnpal c26414ff74 Add comprehensive tests for Parks API and models
- Implemented extensive test cases for the Parks API, covering endpoints for listing, retrieving, creating, updating, and deleting parks.
- Added tests for filtering, searching, and ordering parks in the API.
- Created tests for error handling in the API, including malformed JSON and unsupported methods.
- Developed model tests for Park, ParkArea, Company, and ParkReview models, ensuring validation and constraints are enforced.
- Introduced utility mixins for API and model testing to streamline assertions and enhance test readability.
- Included integration tests to validate complete workflows involving park creation, retrieval, updating, and deletion.
2025-08-17 19:36:20 -04:00

500 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Template VM Manager for ThrillWiki
Handles copying template VM disks and managing template-based deployments.
"""
import os
import sys
import time
import logging
import subprocess
from pathlib import Path
from typing import Optional, Dict
logger = logging.getLogger(__name__)
class TemplateVMManager:
"""Manages template-based VM deployment on Unraid."""
def __init__(self, unraid_host: str, unraid_user: str = "root"):
self.unraid_host = unraid_host
self.unraid_user = unraid_user
self.template_vm_name = "thrillwiki-template-ubuntu"
self.template_path = f"/mnt/user/domains/{self.template_vm_name}"
def authenticate(self) -> bool:
"""Test SSH connectivity to Unraid server."""
try:
result = subprocess.run(
f"ssh -o ConnectTimeout=10 {self.unraid_user}@{self.unraid_host} 'echo Connected'",
shell=True,
capture_output=True,
text=True,
timeout=15
)
if result.returncode == 0 and "Connected" in result.stdout:
logger.info("Successfully connected to Unraid via SSH")
return True
else:
logger.error(f"SSH connection failed: {result.stderr}")
return False
except Exception as e:
logger.error(f"SSH authentication error: {e}")
return False
def check_template_exists(self) -> bool:
"""Check if template VM disk exists."""
try:
result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'test -f {self.template_path}/vdisk1.qcow2'",
shell=True,
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.info(f"Template VM disk found at {self.template_path}/vdisk1.qcow2")
return True
else:
logger.error(f"Template VM disk not found at {self.template_path}/vdisk1.qcow2")
return False
except Exception as e:
logger.error(f"Error checking template existence: {e}")
return False
def get_template_info(self) -> Dict[str, str]:
"""Get information about the template VM."""
try:
# Get disk size
size_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'qemu-img info {self.template_path}/vdisk1.qcow2 | grep \"virtual size\"'",
shell=True,
capture_output=True,
text=True,
)
# Get file size
file_size_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'ls -lh {self.template_path}/vdisk1.qcow2'",
shell=True,
capture_output=True,
text=True,
)
# Get last modification time
mod_time_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'stat -c \"%y\" {self.template_path}/vdisk1.qcow2'",
shell=True,
capture_output=True,
text=True,
)
info = {
"template_path": f"{self.template_path}/vdisk1.qcow2",
"virtual_size": size_result.stdout.strip() if size_result.returncode == 0 else "Unknown",
"file_size": file_size_result.stdout.split()[4] if file_size_result.returncode == 0 else "Unknown",
"last_modified": mod_time_result.stdout.strip() if mod_time_result.returncode == 0 else "Unknown"
}
return info
except Exception as e:
logger.error(f"Error getting template info: {e}")
return {}
def copy_template_disk(self, target_vm_name: str) -> bool:
"""Copy template VM disk to a new VM instance."""
try:
if not self.check_template_exists():
logger.error("Template VM disk not found. Cannot proceed with copy.")
return False
target_path = f"/mnt/user/domains/{target_vm_name}"
target_disk = f"{target_path}/vdisk1.qcow2"
logger.info(f"Copying template disk to new VM: {target_vm_name}")
# Create target directory
subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'mkdir -p {target_path}'",
shell=True,
check=True,
)
# Check if target disk already exists
disk_check = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'test -f {target_disk}'",
shell=True,
capture_output=True,
)
if disk_check.returncode == 0:
logger.warning(f"Target disk already exists: {target_disk}")
logger.info("Removing existing disk to replace with fresh template copy...")
subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'rm -f {target_disk}'",
shell=True,
check=True,
)
# Copy template disk with rsync progress display
logger.info("🚀 Copying template disk with rsync progress display...")
start_time = time.time()
# First, get the size of the template disk for progress calculation
size_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'stat -c%s {self.template_path}/vdisk1.qcow2'",
shell=True,
capture_output=True,
text=True,
)
template_size = "unknown size"
if size_result.returncode == 0:
size_bytes = int(size_result.stdout.strip())
if size_bytes > 1024*1024*1024: # GB
template_size = f"{size_bytes/(1024*1024*1024):.1f}GB"
elif size_bytes > 1024*1024: # MB
template_size = f"{size_bytes/(1024*1024):.1f}MB"
else:
template_size = f"{size_bytes/1024:.1f}KB"
logger.info(f"📊 Template disk size: {template_size}")
# Use rsync with progress display
logger.info("📈 Using rsync for real-time progress display...")
# Force rsync to output progress to stderr and capture it
copy_cmd = f"ssh {self.unraid_user}@{self.unraid_host} 'rsync -av --progress --stats {self.template_path}/vdisk1.qcow2 {target_disk}'"
# Run with real-time output, unbuffered
process = subprocess.Popen(
copy_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0, # Unbuffered
universal_newlines=True
)
import select
import sys
# Read both stdout and stderr for progress with real-time display
while True:
# Check if process is still running
if process.poll() is not None:
# Process finished, read any remaining output
remaining_out = process.stdout.read()
remaining_err = process.stderr.read()
if remaining_out:
print(f"📊 {remaining_out.strip()}", flush=True)
logger.info(f"📊 {remaining_out.strip()}")
if remaining_err:
for line in remaining_err.strip().split('\n'):
if line.strip():
print(f"{line.strip()}", flush=True)
logger.info(f"{line.strip()}")
break
# Use select to check for available data
try:
ready, _, _ = select.select([process.stdout, process.stderr], [], [], 0.1)
for stream in ready:
line = stream.readline()
if line:
line = line.strip()
if line:
if stream == process.stdout:
print(f"📊 {line}", flush=True)
logger.info(f"📊 {line}")
else: # stderr
# rsync progress goes to stderr
if any(keyword in line for keyword in ['%', 'bytes/sec', 'to-check=', 'xfr#']):
print(f"{line}", flush=True)
logger.info(f"{line}")
else:
print(f"📋 {line}", flush=True)
logger.info(f"📋 {line}")
except select.error:
# Fallback for systems without select (like some Windows environments)
print("⚠️ select() not available, using fallback method...", flush=True)
logger.info("⚠️ select() not available, using fallback method...")
# Simple fallback - just wait and read what's available
time.sleep(0.5)
try:
# Try to read non-blocking
import fcntl
import os
# Make stdout/stderr non-blocking
fd_out = process.stdout.fileno()
fd_err = process.stderr.fileno()
fl_out = fcntl.fcntl(fd_out, fcntl.F_GETFL)
fl_err = fcntl.fcntl(fd_err, fcntl.F_GETFL)
fcntl.fcntl(fd_out, fcntl.F_SETFL, fl_out | os.O_NONBLOCK)
fcntl.fcntl(fd_err, fcntl.F_SETFL, fl_err | os.O_NONBLOCK)
try:
out_line = process.stdout.readline()
if out_line:
print(f"📊 {out_line.strip()}", flush=True)
logger.info(f"📊 {out_line.strip()}")
except:
pass
try:
err_line = process.stderr.readline()
if err_line:
if any(keyword in err_line for keyword in ['%', 'bytes/sec', 'to-check=', 'xfr#']):
print(f"{err_line.strip()}", flush=True)
logger.info(f"{err_line.strip()}")
else:
print(f"📋 {err_line.strip()}", flush=True)
logger.info(f"📋 {err_line.strip()}")
except:
pass
except ImportError:
# If fcntl not available, just continue
print("📊 Progress display limited - continuing copy...", flush=True)
logger.info("📊 Progress display limited - continuing copy...")
break
copy_result_code = process.wait()
end_time = time.time()
copy_time = end_time - start_time
if copy_result_code == 0:
logger.info(f"✅ Template disk copied successfully in {copy_time:.1f} seconds")
logger.info(f"🎯 New VM disk created: {target_disk}")
# Verify the copy by checking file size
verify_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'ls -lh {target_disk}'",
shell=True,
capture_output=True,
text=True,
)
if verify_result.returncode == 0:
file_info = verify_result.stdout.strip().split()
if len(file_info) >= 5:
copied_size = file_info[4]
logger.info(f"📋 Copied disk size: {copied_size}")
return True
else:
logger.error(f"❌ Failed to copy template disk (exit code: {copy_result_code})")
logger.error("Check Unraid server disk space and permissions")
return False
except Exception as e:
logger.error(f"Error copying template disk: {e}")
return False
def prepare_vm_from_template(self, target_vm_name: str, vm_memory: int,
vm_vcpus: int, vm_ip: str) -> bool:
"""Complete template-based VM preparation."""
try:
logger.info(f"Preparing VM '{target_vm_name}' from template...")
# Step 1: Copy template disk
if not self.copy_template_disk(target_vm_name):
return False
logger.info(f"VM '{target_vm_name}' prepared successfully from template")
logger.info("The VM disk is ready with Ubuntu pre-installed")
logger.info("You can now create the VM configuration and start it")
return True
except Exception as e:
logger.error(f"Error preparing VM from template: {e}")
return False
def update_template(self) -> bool:
"""Update the template VM with latest changes."""
try:
logger.info("Updating template VM...")
logger.info("Note: This should be done manually by:")
logger.info("1. Starting the template VM")
logger.info("2. Updating Ubuntu packages")
logger.info("3. Updating ThrillWiki dependencies")
logger.info("4. Stopping the template VM")
logger.info("5. The disk will automatically be the new template")
# Check template VM status
template_status = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh domstate {self.template_vm_name}'",
shell=True,
capture_output=True,
text=True,
)
if template_status.returncode == 0:
status = template_status.stdout.strip()
logger.info(f"Template VM '{self.template_vm_name}' status: {status}")
if status == "running":
logger.warning("Template VM is currently running!")
logger.warning("Stop the template VM when updates are complete")
logger.warning("Running VMs should not be used as templates")
return False
elif status in ["shut off", "shutoff"]:
logger.info("Template VM is properly stopped and ready to use as template")
return True
else:
logger.warning(f"Template VM in unexpected state: {status}")
return False
else:
logger.error("Could not check template VM status")
return False
except Exception as e:
logger.error(f"Error updating template: {e}")
return False
def list_template_instances(self) -> list:
"""List all VMs that were created from the template."""
try:
# Get all domains
result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh list --all --name'",
shell=True,
capture_output=True,
text=True,
)
if result.returncode != 0:
logger.error("Failed to list VMs")
return []
all_vms = result.stdout.strip().split('\n')
# Filter for thrillwiki VMs (excluding template)
template_instances = []
for vm in all_vms:
vm = vm.strip()
if vm and 'thrillwiki' in vm.lower() and vm != self.template_vm_name:
# Get VM status
status_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh domstate {vm}'",
shell=True,
capture_output=True,
text=True,
)
status = status_result.stdout.strip() if status_result.returncode == 0 else "unknown"
template_instances.append({"name": vm, "status": status})
return template_instances
except Exception as e:
logger.error(f"Error listing template instances: {e}")
return []
def main():
"""Main entry point for template manager."""
import argparse
parser = argparse.ArgumentParser(
description="ThrillWiki Template VM Manager",
epilog="""
Examples:
python template_manager.py info # Show template info
python template_manager.py copy my-vm # Copy template to new VM
python template_manager.py list # List template instances
python template_manager.py update # Update template VM
""",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"action",
choices=["info", "copy", "list", "update", "check"],
help="Action to perform"
)
parser.add_argument(
"vm_name",
nargs="?",
help="VM name (required for copy action)"
)
args = parser.parse_args()
# Get Unraid connection details from environment
unraid_host = os***REMOVED***iron.get("UNRAID_HOST")
unraid_user = os***REMOVED***iron.get("UNRAID_USER", "root")
if not unraid_host:
logger.error("UNRAID_HOST environment variable is required")
sys.exit(1)
# Create template manager
template_manager = TemplateVMManager(unraid_host, unraid_user)
# Authenticate
if not template_manager.authenticate():
logger.error("Failed to connect to Unraid server")
sys.exit(1)
if args.action == "info":
logger.info("📋 Template VM Information")
info = template_manager.get_template_info()
if info:
print(f"Template Path: {info['template_path']}")
print(f"Virtual Size: {info['virtual_size']}")
print(f"File Size: {info['file_size']}")
print(f"Last Modified: {info['last_modified']}")
else:
print("❌ Failed to get template information")
sys.exit(1)
elif args.action == "check":
if template_manager.check_template_exists():
logger.info("✅ Template VM disk exists and is ready to use")
sys.exit(0)
else:
logger.error("❌ Template VM disk not found")
sys.exit(1)
elif args.action == "copy":
if not args.vm_name:
logger.error("VM name is required for copy action")
sys.exit(1)
success = template_manager.copy_template_disk(args.vm_name)
sys.exit(0 if success else 1)
elif args.action == "list":
logger.info("📋 Template-based VM Instances")
instances = template_manager.list_template_instances()
if instances:
for instance in instances:
status_emoji = "🟢" if instance["status"] == "running" else "🔴" if instance["status"] == "shut off" else "🟡"
print(f"{status_emoji} {instance['name']} ({instance['status']})")
else:
print("No template instances found")
elif args.action == "update":
success = template_manager.update_template()
sys.exit(0 if success else 1)
if __name__ == "__main__":
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
main()