refactor: Clean up cleanup.py

- Remove duplicate docstring
- Remove unused signal import
- Reorganize imports in a more logical order
- Group standard library imports together
This commit is contained in:
pacnpal
2024-11-16 21:59:13 +00:00
parent 202a909057
commit b6e397fb95

View File

@@ -1,187 +1,464 @@
"""Cleanup functionality for VideoArchiver""" """Cleanup functionality for VideoArchiver"""
import logging
import asyncio import asyncio
import signal import logging
import os import os
from typing import TYPE_CHECKING from datetime import datetime
from enum import Enum, auto
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Dict, Any, Optional, TypedDict, ClassVar
from ..utils.file_ops import cleanup_downloads from ..utils.file_ops import cleanup_downloads
from ..utils.exceptions import (
CleanupError,
ErrorContext,
ErrorSeverity
)
if TYPE_CHECKING: if TYPE_CHECKING:
from .base import VideoArchiver from .base import VideoArchiver
logger = logging.getLogger("VideoArchiver") logger = logging.getLogger("VideoArchiver")
CLEANUP_TIMEOUT = 5 # Reduced timeout to 5 seconds class CleanupPhase(Enum):
FORCE_CLEANUP_TIMEOUT = 3 # Even shorter timeout for force cleanup """Cleanup phases"""
INITIALIZATION = auto()
UPDATE_CHECKER = auto()
PROCESSOR = auto()
QUEUE_MANAGER = auto()
COMPONENTS = auto()
FFMPEG = auto()
DOWNLOADS = auto()
REFERENCES = auto()
class CleanupStatus(Enum):
"""Cleanup status"""
SUCCESS = auto()
TIMEOUT = auto()
ERROR = auto()
SKIPPED = auto()
class CleanupResult(TypedDict):
"""Type definition for cleanup result"""
phase: CleanupPhase
status: CleanupStatus
error: Optional[str]
duration: float
timestamp: str
class CleanupManager:
"""Manages cleanup operations"""
CLEANUP_TIMEOUT: ClassVar[int] = 5 # Reduced timeout to 5 seconds
FORCE_CLEANUP_TIMEOUT: ClassVar[int] = 3 # Even shorter timeout for force cleanup
def __init__(self) -> None:
self.results: Dict[CleanupPhase, CleanupResult] = {}
def record_result(
self,
phase: CleanupPhase,
status: CleanupStatus,
error: Optional[str] = None,
duration: float = 0.0
) -> None:
"""Record result of a cleanup phase"""
self.results[phase] = CleanupResult(
phase=phase,
status=status,
error=error,
duration=duration,
timestamp=datetime.utcnow().isoformat()
)
def get_results(self) -> Dict[CleanupPhase, CleanupResult]:
"""Get cleanup results"""
return self.results.copy()
async def cleanup_resources(cog: "VideoArchiver") -> None: async def cleanup_resources(cog: "VideoArchiver") -> None:
"""Clean up all resources with proper handling""" """
Clean up all resources with proper handling.
Args:
cog: VideoArchiver cog instance
Raises:
CleanupError: If cleanup fails
"""
cleanup_manager = CleanupManager()
start_time = datetime.utcnow()
try: try:
logger.info("Starting resource cleanup...") logger.info("Starting resource cleanup...")
# Cancel initialization if still running # Cancel initialization if still running
if cog._init_task and not cog._init_task.done(): if cog._init_task and not cog._init_task.done():
logger.info("Cancelling initialization task") phase_start = datetime.utcnow()
cog._init_task.cancel()
try: try:
await asyncio.wait_for(cog._init_task, timeout=CLEANUP_TIMEOUT) logger.info("Cancelling initialization task")
except (asyncio.TimeoutError, asyncio.CancelledError): cog._init_task.cancel()
await asyncio.wait_for(cog._init_task, timeout=cleanup_manager.CLEANUP_TIMEOUT)
cleanup_manager.record_result(
CleanupPhase.INITIALIZATION,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
logger.warning("Initialization task cancellation timed out") logger.warning("Initialization task cancellation timed out")
cleanup_manager.record_result(
CleanupPhase.INITIALIZATION,
CleanupStatus.TIMEOUT,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
# Stop update checker # Stop update checker
if hasattr(cog, "update_checker") and cog.update_checker: if hasattr(cog, "update_checker") and cog.update_checker:
logger.info("Stopping update checker") phase_start = datetime.utcnow()
try: try:
logger.info("Stopping update checker")
await asyncio.wait_for( await asyncio.wait_for(
cog.update_checker.stop(), timeout=CLEANUP_TIMEOUT cog.update_checker.stop(),
timeout=cleanup_manager.CLEANUP_TIMEOUT
) )
except asyncio.TimeoutError: cleanup_manager.record_result(
CleanupPhase.UPDATE_CHECKER,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except asyncio.TimeoutError as e:
logger.warning("Update checker stop timed out") logger.warning("Update checker stop timed out")
cleanup_manager.record_result(
CleanupPhase.UPDATE_CHECKER,
CleanupStatus.TIMEOUT,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
cog.update_checker = None cog.update_checker = None
# Clean up processor # Clean up processor
if hasattr(cog, "processor") and cog.processor: if hasattr(cog, "processor") and cog.processor:
logger.info("Cleaning up processor") phase_start = datetime.utcnow()
try: try:
logger.info("Cleaning up processor")
await asyncio.wait_for( await asyncio.wait_for(
cog.processor.cleanup(), timeout=CLEANUP_TIMEOUT cog.processor.cleanup(),
timeout=cleanup_manager.CLEANUP_TIMEOUT
) )
except asyncio.TimeoutError: cleanup_manager.record_result(
CleanupPhase.PROCESSOR,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except asyncio.TimeoutError as e:
logger.warning("Processor cleanup timed out, forcing cleanup") logger.warning("Processor cleanup timed out, forcing cleanup")
await cog.processor.force_cleanup() await cog.processor.force_cleanup()
cleanup_manager.record_result(
CleanupPhase.PROCESSOR,
CleanupStatus.TIMEOUT,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
cog.processor = None cog.processor = None
# Clean up queue manager # Clean up queue manager
if hasattr(cog, "queue_manager") and cog.queue_manager: if hasattr(cog, "queue_manager") and cog.queue_manager:
logger.info("Cleaning up queue manager") phase_start = datetime.utcnow()
try: try:
logger.info("Cleaning up queue manager")
await asyncio.wait_for( await asyncio.wait_for(
cog.queue_manager.cleanup(), timeout=CLEANUP_TIMEOUT cog.queue_manager.cleanup(),
timeout=cleanup_manager.CLEANUP_TIMEOUT
) )
except asyncio.TimeoutError: cleanup_manager.record_result(
CleanupPhase.QUEUE_MANAGER,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except asyncio.TimeoutError as e:
logger.warning("Queue manager cleanup timed out, forcing stop") logger.warning("Queue manager cleanup timed out, forcing stop")
cog.queue_manager.force_stop() cog.queue_manager.force_stop()
cleanup_manager.record_result(
CleanupPhase.QUEUE_MANAGER,
CleanupStatus.TIMEOUT,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
cog.queue_manager = None cog.queue_manager = None
# Clean up components for each guild # Clean up components for each guild
if hasattr(cog, "components"): if hasattr(cog, "components"):
logger.info("Cleaning up guild components") phase_start = datetime.utcnow()
for guild_id, components in cog.components.items(): errors = []
try: try:
if "message_manager" in components: logger.info("Cleaning up guild components")
await components["message_manager"].cancel_all_deletions() for guild_id, components in cog.components.items():
if "downloader" in components: try:
components["downloader"] = None if "message_manager" in components:
if "ffmpeg_mgr" in components: await components["message_manager"].cancel_all_deletions()
components["ffmpeg_mgr"] = None if "downloader" in components:
except Exception as e: components["downloader"] = None
logger.error(f"Error cleaning up guild {guild_id}: {str(e)}") if "ffmpeg_mgr" in components:
components["ffmpeg_mgr"] = None
except Exception as e:
errors.append(f"Guild {guild_id}: {str(e)}")
cog.components.clear() cog.components.clear()
status = CleanupStatus.SUCCESS if not errors else CleanupStatus.ERROR
cleanup_manager.record_result(
CleanupPhase.COMPONENTS,
status,
"\n".join(errors) if errors else None,
(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e:
cleanup_manager.record_result(
CleanupPhase.COMPONENTS,
CleanupStatus.ERROR,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
# Kill any FFmpeg processes # Kill any FFmpeg processes
if hasattr(cog, "ffmpeg_mgr") and cog.ffmpeg_mgr: phase_start = datetime.utcnow()
logger.info("Killing FFmpeg processes")
cog.ffmpeg_mgr.kill_all_processes()
cog.ffmpeg_mgr = None
# Clean up download directory
if hasattr(cog, "download_path") and cog.download_path.exists():
logger.info("Cleaning up download directory")
try:
await asyncio.wait_for(
cleanup_downloads(str(cog.download_path)),
timeout=CLEANUP_TIMEOUT
)
if cog.download_path.exists():
cog.download_path.rmdir()
except Exception as e:
logger.error(f"Error cleaning up download directory: {str(e)}")
# Kill any remaining FFmpeg processes system-wide
try: try:
if hasattr(cog, "ffmpeg_mgr") and cog.ffmpeg_mgr:
logger.info("Killing FFmpeg processes")
cog.ffmpeg_mgr.kill_all_processes()
cog.ffmpeg_mgr = None
# Kill any remaining FFmpeg processes system-wide
if os.name != 'nt': # Unix-like systems if os.name != 'nt': # Unix-like systems
os.system("pkill -9 ffmpeg") os.system("pkill -9 ffmpeg")
else: # Windows else: # Windows
os.system("taskkill /F /IM ffmpeg.exe") os.system("taskkill /F /IM ffmpeg.exe")
cleanup_manager.record_result(
CleanupPhase.FFMPEG,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e: except Exception as e:
logger.error(f"Error killing FFmpeg processes: {str(e)}") cleanup_manager.record_result(
CleanupPhase.FFMPEG,
CleanupStatus.ERROR,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
# Clean up download directory
if hasattr(cog, "download_path") and cog.download_path.exists():
phase_start = datetime.utcnow()
try:
logger.info("Cleaning up download directory")
await asyncio.wait_for(
cleanup_downloads(str(cog.download_path)),
timeout=cleanup_manager.CLEANUP_TIMEOUT
)
if cog.download_path.exists():
cog.download_path.rmdir()
cleanup_manager.record_result(
CleanupPhase.DOWNLOADS,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e:
cleanup_manager.record_result(
CleanupPhase.DOWNLOADS,
CleanupStatus.ERROR,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e: except Exception as e:
logger.error(f"Error during cleanup: {str(e)}") error = f"Error during cleanup: {str(e)}"
raise logger.error(error, exc_info=True)
raise CleanupError(
error,
context=ErrorContext(
"Cleanup",
"cleanup_resources",
{"duration": (datetime.utcnow() - start_time).total_seconds()},
ErrorSeverity.HIGH
)
)
finally: finally:
logger.info("Clearing ready flag") logger.info("Clearing ready flag")
cog.ready.clear() cog.ready.clear()
# Log cleanup results
for phase, result in cleanup_manager.get_results().items():
status_str = f"{result['status'].name}"
if result['error']:
status_str += f" ({result['error']})"
logger.info(
f"Cleanup phase {phase.name}: {status_str} "
f"(Duration: {result['duration']:.2f}s)"
)
async def force_cleanup_resources(cog: "VideoArchiver") -> None: async def force_cleanup_resources(cog: "VideoArchiver") -> None:
"""Force cleanup of resources when timeout occurs""" """
Force cleanup of resources when timeout occurs.
Args:
cog: VideoArchiver cog instance
"""
cleanup_manager = CleanupManager()
start_time = datetime.utcnow()
try: try:
logger.info("Starting force cleanup...") logger.info("Starting force cleanup...")
# Cancel all tasks immediately # Cancel all tasks immediately
if hasattr(cog, "processor") and cog.processor: if hasattr(cog, "processor") and cog.processor:
logger.info("Force cleaning processor") phase_start = datetime.utcnow()
await cog.processor.force_cleanup() try:
logger.info("Force cleaning processor")
await cog.processor.force_cleanup()
cleanup_manager.record_result(
CleanupPhase.PROCESSOR,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e:
cleanup_manager.record_result(
CleanupPhase.PROCESSOR,
CleanupStatus.ERROR,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
cog.processor = None cog.processor = None
# Force stop queue manager # Force stop queue manager
if hasattr(cog, "queue_manager") and cog.queue_manager: if hasattr(cog, "queue_manager") and cog.queue_manager:
logger.info("Force stopping queue manager") phase_start = datetime.utcnow()
cog.queue_manager.force_stop() try:
logger.info("Force stopping queue manager")
cog.queue_manager.force_stop()
cleanup_manager.record_result(
CleanupPhase.QUEUE_MANAGER,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e:
cleanup_manager.record_result(
CleanupPhase.QUEUE_MANAGER,
CleanupStatus.ERROR,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
cog.queue_manager = None cog.queue_manager = None
# Kill FFmpeg processes # Kill FFmpeg processes
if hasattr(cog, "ffmpeg_mgr") and cog.ffmpeg_mgr: phase_start = datetime.utcnow()
logger.info("Force killing FFmpeg processes")
cog.ffmpeg_mgr.kill_all_processes()
cog.ffmpeg_mgr = None
# Force kill any remaining FFmpeg processes system-wide
try: try:
if hasattr(cog, "ffmpeg_mgr") and cog.ffmpeg_mgr:
logger.info("Force killing FFmpeg processes")
cog.ffmpeg_mgr.kill_all_processes()
cog.ffmpeg_mgr = None
# Force kill any remaining FFmpeg processes system-wide
if os.name != 'nt': # Unix-like systems if os.name != 'nt': # Unix-like systems
os.system("pkill -9 ffmpeg") os.system("pkill -9 ffmpeg")
else: # Windows else: # Windows
os.system("taskkill /F /IM ffmpeg.exe") os.system("taskkill /F /IM ffmpeg.exe")
cleanup_manager.record_result(
CleanupPhase.FFMPEG,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e: except Exception as e:
logger.error(f"Error force killing FFmpeg processes: {str(e)}") cleanup_manager.record_result(
CleanupPhase.FFMPEG,
CleanupStatus.ERROR,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
# Clean up download directory # Clean up download directory
if hasattr(cog, "download_path") and cog.download_path.exists(): if hasattr(cog, "download_path") and cog.download_path.exists():
logger.info("Force cleaning download directory") phase_start = datetime.utcnow()
try: try:
logger.info("Force cleaning download directory")
await asyncio.wait_for( await asyncio.wait_for(
cleanup_downloads(str(cog.download_path)), cleanup_downloads(str(cog.download_path)),
timeout=FORCE_CLEANUP_TIMEOUT timeout=cleanup_manager.FORCE_CLEANUP_TIMEOUT
) )
if cog.download_path.exists(): if cog.download_path.exists():
cog.download_path.rmdir() cog.download_path.rmdir()
cleanup_manager.record_result(
CleanupPhase.DOWNLOADS,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e: except Exception as e:
logger.error(f"Error force cleaning download directory: {str(e)}") cleanup_manager.record_result(
CleanupPhase.DOWNLOADS,
CleanupStatus.ERROR,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
# Clear all components # Clear all components
if hasattr(cog, "components"): phase_start = datetime.utcnow()
try:
logger.info("Force clearing components") logger.info("Force clearing components")
cog.components.clear() if hasattr(cog, "components"):
cog.components.clear()
cleanup_manager.record_result(
CleanupPhase.COMPONENTS,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e:
cleanup_manager.record_result(
CleanupPhase.COMPONENTS,
CleanupStatus.ERROR,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e: except Exception as e:
logger.error(f"Error during force cleanup: {str(e)}") error = f"Error during force cleanup: {str(e)}"
logger.error(error, exc_info=True)
finally: finally:
logger.info("Clearing ready flag") logger.info("Clearing ready flag")
cog.ready.clear() cog.ready.clear()
# Clear all references # Clear all references
cog.bot = None phase_start = datetime.utcnow()
cog.processor = None try:
cog.queue_manager = None cog.bot = None
cog.update_checker = None cog.processor = None
cog.ffmpeg_mgr = None cog.queue_manager = None
cog.components = {} cog.update_checker = None
cog.db = None cog.ffmpeg_mgr = None
cog._init_task = None cog.components = {}
cog._cleanup_task = None cog.db = None
if hasattr(cog, '_queue_task'): cog._init_task = None
cog._queue_task = None cog._cleanup_task = None
if hasattr(cog, '_queue_task'):
cog._queue_task = None
cleanup_manager.record_result(
CleanupPhase.REFERENCES,
CleanupStatus.SUCCESS,
duration=(datetime.utcnow() - phase_start).total_seconds()
)
except Exception as e:
cleanup_manager.record_result(
CleanupPhase.REFERENCES,
CleanupStatus.ERROR,
str(e),
(datetime.utcnow() - phase_start).total_seconds()
)
# Log cleanup results
for phase, result in cleanup_manager.get_results().items():
status_str = f"{result['status'].name}"
if result['error']:
status_str += f" ({result['error']})"
logger.info(
f"Force cleanup phase {phase.name}: {status_str} "
f"(Duration: {result['duration']:.2f}s)"
)