Files
thrillwiki_django_no_react/shared/scripts/unraid/template_manager.py
pacnpal d504d41de2 feat: complete monorepo structure with frontend and shared resources
- Add complete backend/ directory with full Django application
- Add frontend/ directory with Vite + TypeScript setup ready for Next.js
- Add comprehensive shared/ directory with:
  - Complete documentation and memory-bank archives
  - Media files and avatars (letters, park/ride images)
  - Deployment scripts and automation tools
  - Shared types and utilities
- Add architecture/ directory with migration guides
- Configure pnpm workspace for monorepo development
- Update .gitignore to exclude .django_tailwind_cli/ build artifacts
- Preserve all historical documentation in shared/docs/memory-bank/
- Set up proper structure for full-stack development with shared resources
2025-08-23 18:40:07 -04:00

572 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Template VM Manager for ThrillWiki
Handles copying template VM disks and managing template-based deployments.
"""
import os
import sys
import time
import logging
import subprocess
from typing import Dict
logger = logging.getLogger(__name__)
class TemplateVMManager:
"""Manages template-based VM deployment on Unraid."""
def __init__(self, unraid_host: str, unraid_user: str = "root"):
self.unraid_host = unraid_host
self.unraid_user = unraid_user
self.template_vm_name = "thrillwiki-template-ubuntu"
self.template_path = f"/mnt/user/domains/{self.template_vm_name}"
def authenticate(self) -> bool:
"""Test SSH connectivity to Unraid server."""
try:
result = subprocess.run(
f"ssh -o ConnectTimeout=10 {self.unraid_user}@{self.unraid_host} 'echo Connected'",
shell=True,
capture_output=True,
text=True,
timeout=15,
)
if result.returncode == 0 and "Connected" in result.stdout:
logger.info("Successfully connected to Unraid via SSH")
return True
else:
logger.error(f"SSH connection failed: {result.stderr}")
return False
except Exception as e:
logger.error(f"SSH authentication error: {e}")
return False
def check_template_exists(self) -> bool:
"""Check if template VM disk exists."""
try:
result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'test -f {self.template_path}/vdisk1.qcow2'",
shell=True,
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.info(
f"Template VM disk found at {
self.template_path}/vdisk1.qcow2"
)
return True
else:
logger.error(
f"Template VM disk not found at {
self.template_path}/vdisk1.qcow2"
)
return False
except Exception as e:
logger.error(f"Error checking template existence: {e}")
return False
def get_template_info(self) -> Dict[str, str]:
"""Get information about the template VM."""
try:
# Get disk size
size_result = subprocess.run(
f"ssh {
self.unraid_user}@{
self.unraid_host} 'qemu-img info {
self.template_path}/vdisk1.qcow2 | grep \"virtual size\"'",
shell=True,
capture_output=True,
text=True,
)
# Get file size
file_size_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'ls -lh {self.template_path}/vdisk1.qcow2'",
shell=True,
capture_output=True,
text=True,
)
# Get last modification time
mod_time_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'stat -c \"%y\" {self.template_path}/vdisk1.qcow2'",
shell=True,
capture_output=True,
text=True,
)
info = {
"template_path": f"{
self.template_path}/vdisk1.qcow2",
"virtual_size": (
size_result.stdout.strip()
if size_result.returncode == 0
else "Unknown"
),
"file_size": (
file_size_result.stdout.split()[4]
if file_size_result.returncode == 0
else "Unknown"
),
"last_modified": (
mod_time_result.stdout.strip()
if mod_time_result.returncode == 0
else "Unknown"
),
}
return info
except Exception as e:
logger.error(f"Error getting template info: {e}")
return {}
def copy_template_disk(self, target_vm_name: str) -> bool:
"""Copy template VM disk to a new VM instance."""
try:
if not self.check_template_exists():
logger.error("Template VM disk not found. Cannot proceed with copy.")
return False
target_path = f"/mnt/user/domains/{target_vm_name}"
target_disk = f"{target_path}/vdisk1.qcow2"
logger.info(f"Copying template disk to new VM: {target_vm_name}")
# Create target directory
subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'mkdir -p {target_path}'",
shell=True,
check=True,
)
# Check if target disk already exists
disk_check = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'test -f {target_disk}'",
shell=True,
capture_output=True,
)
if disk_check.returncode == 0:
logger.warning(f"Target disk already exists: {target_disk}")
logger.info(
"Removing existing disk to replace with fresh template copy..."
)
subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'rm -f {target_disk}'",
shell=True,
check=True,
)
# Copy template disk with rsync progress display
logger.info("🚀 Copying template disk with rsync progress display...")
start_time = time.time()
# First, get the size of the template disk for progress calculation
size_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'stat -c%s {self.template_path}/vdisk1.qcow2'",
shell=True,
capture_output=True,
text=True,
)
template_size = "unknown size"
if size_result.returncode == 0:
size_bytes = int(size_result.stdout.strip())
if size_bytes > 1024 * 1024 * 1024: # GB
template_size = f"{size_bytes /
(1024 *
1024 *
1024):.1f}GB"
elif size_bytes > 1024 * 1024: # MB
template_size = f"{size_bytes / (1024 * 1024):.1f}MB"
else:
template_size = f"{size_bytes / 1024:.1f}KB"
logger.info(f"📊 Template disk size: {template_size}")
# Use rsync with progress display
logger.info("📈 Using rsync for real-time progress display...")
# Force rsync to output progress to stderr and capture it
copy_cmd = f"ssh {
self.unraid_user}@{
self.unraid_host} 'rsync -av --progress --stats {
self.template_path}/vdisk1.qcow2 {target_disk}'"
# Run with real-time output, unbuffered
process = subprocess.Popen(
copy_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0, # Unbuffered
universal_newlines=True,
)
import select
# Read both stdout and stderr for progress with real-time display
while True:
# Check if process is still running
if process.poll() is not None:
# Process finished, read any remaining output
remaining_out = process.stdout.read()
remaining_err = process.stderr.read()
if remaining_out:
print(f"📊 {remaining_out.strip()}", flush=True)
logger.info(f"📊 {remaining_out.strip()}")
if remaining_err:
for line in remaining_err.strip().split("\n"):
if line.strip():
print(f"{line.strip()}", flush=True)
logger.info(f"{line.strip()}")
break
# Use select to check for available data
try:
ready, _, _ = select.select(
[process.stdout, process.stderr], [], [], 0.1
)
for stream in ready:
line = stream.readline()
if line:
line = line.strip()
if line:
if stream == process.stdout:
print(f"📊 {line}", flush=True)
logger.info(f"📊 {line}")
else: # stderr
# rsync progress goes to stderr
if any(
keyword in line
for keyword in [
"%",
"bytes/sec",
"to-check=",
"xfr#",
]
):
print(f"{line}", flush=True)
logger.info(f"{line}")
else:
print(f"📋 {line}", flush=True)
logger.info(f"📋 {line}")
except select.error:
# Fallback for systems without select (like some Windows
# environments)
print(
"⚠️ select() not available, using fallback method...",
flush=True,
)
logger.info("⚠️ select() not available, using fallback method...")
# Simple fallback - just wait and read what's available
time.sleep(0.5)
try:
# Try to read non-blocking
import fcntl
import os
# Make stdout/stderr non-blocking
fd_out = process.stdout.fileno()
fd_err = process.stderr.fileno()
fl_out = fcntl.fcntl(fd_out, fcntl.F_GETFL)
fl_err = fcntl.fcntl(fd_err, fcntl.F_GETFL)
fcntl.fcntl(fd_out, fcntl.F_SETFL, fl_out | os.O_NONBLOCK)
fcntl.fcntl(fd_err, fcntl.F_SETFL, fl_err | os.O_NONBLOCK)
try:
out_line = process.stdout.readline()
if out_line:
print(f"📊 {out_line.strip()}", flush=True)
logger.info(f"📊 {out_line.strip()}")
except BaseException:
pass
try:
err_line = process.stderr.readline()
if err_line:
if any(
keyword in err_line
for keyword in [
"%",
"bytes/sec",
"to-check=",
"xfr#",
]
):
print(f"{err_line.strip()}", flush=True)
logger.info(f"{err_line.strip()}")
else:
print(f"📋 {err_line.strip()}", flush=True)
logger.info(f"📋 {err_line.strip()}")
except BaseException:
pass
except ImportError:
# If fcntl not available, just continue
print(
"📊 Progress display limited - continuing copy...",
flush=True,
)
logger.info("📊 Progress display limited - continuing copy...")
break
copy_result_code = process.wait()
end_time = time.time()
copy_time = end_time - start_time
if copy_result_code == 0:
logger.info(
f"✅ Template disk copied successfully in {
copy_time:.1f} seconds"
)
logger.info(f"🎯 New VM disk created: {target_disk}")
# Verify the copy by checking file size
verify_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'ls -lh {target_disk}'",
shell=True,
capture_output=True,
text=True,
)
if verify_result.returncode == 0:
file_info = verify_result.stdout.strip().split()
if len(file_info) >= 5:
copied_size = file_info[4]
logger.info(f"📋 Copied disk size: {copied_size}")
return True
else:
logger.error(
f"❌ Failed to copy template disk (exit code: {copy_result_code})"
)
logger.error("Check Unraid server disk space and permissions")
return False
except Exception as e:
logger.error(f"Error copying template disk: {e}")
return False
def prepare_vm_from_template(
self, target_vm_name: str, vm_memory: int, vm_vcpus: int, vm_ip: str
) -> bool:
"""Complete template-based VM preparation."""
try:
logger.info(f"Preparing VM '{target_vm_name}' from template...")
# Step 1: Copy template disk
if not self.copy_template_disk(target_vm_name):
return False
logger.info(f"VM '{target_vm_name}' prepared successfully from template")
logger.info("The VM disk is ready with Ubuntu pre-installed")
logger.info("You can now create the VM configuration and start it")
return True
except Exception as e:
logger.error(f"Error preparing VM from template: {e}")
return False
def update_template(self) -> bool:
"""Update the template VM with latest changes."""
try:
logger.info("Updating template VM...")
logger.info("Note: This should be done manually by:")
logger.info("1. Starting the template VM")
logger.info("2. Updating Ubuntu packages")
logger.info("3. Updating ThrillWiki dependencies")
logger.info("4. Stopping the template VM")
logger.info("5. The disk will automatically be the new template")
# Check template VM status
template_status = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh domstate {self.template_vm_name}'",
shell=True,
capture_output=True,
text=True,
)
if template_status.returncode == 0:
status = template_status.stdout.strip()
logger.info(
f"Template VM '{
self.template_vm_name}' status: {status}"
)
if status == "running":
logger.warning("Template VM is currently running!")
logger.warning("Stop the template VM when updates are complete")
logger.warning("Running VMs should not be used as templates")
return False
elif status in ["shut off", "shutoff"]:
logger.info(
"Template VM is properly stopped and ready to use as template"
)
return True
else:
logger.warning(f"Template VM in unexpected state: {status}")
return False
else:
logger.error("Could not check template VM status")
return False
except Exception as e:
logger.error(f"Error updating template: {e}")
return False
def list_template_instances(self) -> list:
"""List all VMs that were created from the template."""
try:
# Get all domains
result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh list --all --name'",
shell=True,
capture_output=True,
text=True,
)
if result.returncode != 0:
logger.error("Failed to list VMs")
return []
all_vms = result.stdout.strip().split("\n")
# Filter for thrillwiki VMs (excluding template)
template_instances = []
for vm in all_vms:
vm = vm.strip()
if vm and "thrillwiki" in vm.lower() and vm != self.template_vm_name:
# Get VM status
status_result = subprocess.run(
f"ssh {self.unraid_user}@{self.unraid_host} 'virsh domstate {vm}'",
shell=True,
capture_output=True,
text=True,
)
status = (
status_result.stdout.strip()
if status_result.returncode == 0
else "unknown"
)
template_instances.append({"name": vm, "status": status})
return template_instances
except Exception as e:
logger.error(f"Error listing template instances: {e}")
return []
def main():
"""Main entry point for template manager."""
import argparse
parser = argparse.ArgumentParser(
description="ThrillWiki Template VM Manager",
epilog="""
Examples:
python template_manager.py info # Show template info
python template_manager.py copy my-vm # Copy template to new VM
python template_manager.py list # List template instances
python template_manager.py update # Update template VM
""",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"action",
choices=["info", "copy", "list", "update", "check"],
help="Action to perform",
)
parser.add_argument("vm_name", nargs="?", help="VM name (required for copy action)")
args = parser.parse_args()
# Get Unraid connection details from environment
unraid_host = os.environ.get("UNRAID_HOST")
unraid_user = os.environ.get("UNRAID_USER", "root")
if not unraid_host:
logger.error("UNRAID_HOST environment variable is required")
sys.exit(1)
# Create template manager
template_manager = TemplateVMManager(unraid_host, unraid_user)
# Authenticate
if not template_manager.authenticate():
logger.error("Failed to connect to Unraid server")
sys.exit(1)
if args.action == "info":
logger.info("📋 Template VM Information")
info = template_manager.get_template_info()
if info:
print(f"Template Path: {info['template_path']}")
print(f"Virtual Size: {info['virtual_size']}")
print(f"File Size: {info['file_size']}")
print(f"Last Modified: {info['last_modified']}")
else:
print("❌ Failed to get template information")
sys.exit(1)
elif args.action == "check":
if template_manager.check_template_exists():
logger.info("✅ Template VM disk exists and is ready to use")
sys.exit(0)
else:
logger.error("❌ Template VM disk not found")
sys.exit(1)
elif args.action == "copy":
if not args.vm_name:
logger.error("VM name is required for copy action")
sys.exit(1)
success = template_manager.copy_template_disk(args.vm_name)
sys.exit(0 if success else 1)
elif args.action == "list":
logger.info("📋 Template-based VM Instances")
instances = template_manager.list_template_instances()
if instances:
for instance in instances:
status_emoji = (
"🟢"
if instance["status"] == "running"
else "🔴" if instance["status"] == "shut off" else "🟡"
)
print(
f"{status_emoji} {
instance['name']} ({
instance['status']})"
)
else:
print("No template instances found")
elif args.action == "update":
success = template_manager.update_template()
sys.exit(0 if success else 1)
if __name__ == "__main__":
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
main()