Files
Pac-cogs/videoarchiver/update_checker.py
2024-11-18 05:17:37 +00:00

334 lines
12 KiB
Python

"""Update checker for yt-dlp"""
import logging
from importlib.metadata import version as get_package_version
from datetime import datetime, timedelta
import aiohttp
from packaging import version
import discord # type: ignore
from typing import Optional, Tuple, Dict, Any
import asyncio
import sys
import json
from pathlib import Path
import subprocess
import tempfile
import os
import shutil
# try:
# Try relative imports first
from utils.exceptions import UpdateError
# except ImportError:
# Fall back to absolute imports if relative imports fail
# from videoarchiver.utils.exceptions import UpdateError
logger = logging.getLogger("VideoArchiver")
class UpdateChecker:
"""Handles checking for yt-dlp updates"""
GITHUB_API_URL = "https://api.github.com/repos/yt-dlp/yt-dlp/releases/latest"
UPDATE_CHECK_INTERVAL = 21600 # 6 hours in seconds
MAX_RETRIES = 3
RETRY_DELAY = 5
REQUEST_TIMEOUT = 30
SUBPROCESS_TIMEOUT = 300 # 5 minutes
def __init__(self, bot, config_manager):
self.bot = bot
self.config_manager = config_manager
self._check_task = None
self._session: Optional[aiohttp.ClientSession] = None
self._rate_limit_reset = 0
self._remaining_requests = 60
self._last_version_check: Dict[int, datetime] = {}
self._shutdown = False
async def _init_session(self) -> None:
"""Initialize aiohttp session with proper headers"""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={
"Accept": "application/vnd.github.v3+json",
"User-Agent": "VideoArchiver-Bot",
},
timeout=aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT),
)
async def start(self) -> None:
"""Start the update checker task"""
if self._check_task is None:
await self._init_session()
self._check_task = asyncio.create_task(self._check_loop())
logger.info("Update checker task started")
async def stop(self) -> None:
"""Stop the update checker task and cleanup"""
self._shutdown = True
if self._check_task and not self._check_task.done():
self._check_task.cancel()
try:
await self._check_task
except asyncio.CancelledError:
pass
self._check_task = None
if self._session and not self._session.closed:
await self._session.close()
self._session = None
logger.info("Update checker task stopped")
async def _check_loop(self) -> None:
"""Periodic update check loop with improved error handling"""
await self.bot.wait_until_ready()
while not self._shutdown:
try:
for guild in self.bot.guilds:
try:
settings = await self.config_manager.get_guild_settings(
guild.id
)
if settings.get("disable_update_check", False):
continue
current_time = datetime.utcnow()
# Check if we've checked recently
last_check = self._last_version_check.get(guild.id)
if (
last_check
and (current_time - last_check).total_seconds()
< self.UPDATE_CHECK_INTERVAL
):
continue
# Check rate limits
if self._remaining_requests <= 0:
if current_time.timestamp() < self._rate_limit_reset:
continue
# Reset rate limit counters
self._remaining_requests = 60
self._rate_limit_reset = 0
await self._check_guild(guild, settings)
self._last_version_check[guild.id] = current_time
except Exception as e:
logger.error(
f"Error checking updates for guild {guild.id}: {str(e)}"
)
continue
except asyncio.CancelledError:
logger.info("Update check loop cancelled")
break
except Exception as e:
logger.error(f"Error in update check task: {str(e)}")
await asyncio.sleep(self.UPDATE_CHECK_INTERVAL)
async def _check_guild(self, guild: discord.Guild, settings: dict) -> None:
"""Check updates for a specific guild with improved error handling"""
try:
current_version = await self._get_current_version()
if not current_version:
await self._log_error(
guild,
UpdateError("Could not determine current yt-dlp version"),
"checking current version",
)
return
latest_version = await self._get_latest_version()
if not latest_version:
return # Error already logged in _get_latest_version
# Update last check time
await self.config_manager.update_setting(
guild.id, "last_update_check", datetime.utcnow().isoformat()
)
# Compare versions
if version.parse(current_version) < version.parse(latest_version):
await self._notify_update(
guild, current_version, latest_version, settings
)
except Exception as e:
await self._log_error(guild, e, "checking for updates")
async def _get_current_version(self) -> Optional[str]:
"""Get current yt-dlp version with error handling"""
try:
return get_package_version("yt-dlp")
except Exception as e:
logger.error(f"Error getting current version: {str(e)}")
return None
async def _get_latest_version(self) -> Optional[str]:
"""Get the latest version from GitHub with retries and rate limit handling"""
await self._init_session()
for attempt in range(self.MAX_RETRIES):
try:
async with self._session.get(self.GITHUB_API_URL) as response:
# Update rate limit info
self._remaining_requests = int(
response.headers.get("X-RateLimit-Remaining", 0)
)
self._rate_limit_reset = int(
response.headers.get("X-RateLimit-Reset", 0)
)
if response.status == 200:
data = await response.json()
return data["tag_name"].lstrip("v")
elif (
response.status == 403
and "X-RateLimit-Remaining" in response.headers
):
logger.warning("GitHub API rate limit reached")
return None
elif response.status == 404:
raise UpdateError("GitHub API endpoint not found")
else:
raise UpdateError(
f"GitHub API returned status {response.status}"
)
except asyncio.TimeoutError:
logger.error(
f"Timeout getting latest version (attempt {attempt + 1}/{self.MAX_RETRIES})"
)
if attempt == self.MAX_RETRIES - 1:
return None
except Exception as e:
logger.error(
f"Error getting latest version (attempt {attempt + 1}/{self.MAX_RETRIES}): {str(e)}"
)
if attempt == self.MAX_RETRIES - 1:
return None
await asyncio.sleep(self.RETRY_DELAY * (attempt + 1))
return None
async def _notify_update(
self,
guild: discord.Guild,
current_version: str,
latest_version: str,
settings: dict,
) -> None:
"""Notify about available updates with retry mechanism"""
owner = self.bot.get_user(self.bot.owner_id)
if not owner:
await self._log_error(
guild,
UpdateError("Could not find bot owner"),
"sending update notification",
)
return
message = (
f"⚠️ A new version of yt-dlp is available!\n"
f"Current: {current_version}\n"
f"Latest: {latest_version}\n"
f"Use `[p]videoarchiver updateytdlp` to update."
)
for attempt in range(settings.get("discord_retry_attempts", 3)):
try:
await owner.send(message)
return
except discord.HTTPException as e:
if attempt == settings["discord_retry_attempts"] - 1:
await self._log_error(
guild,
UpdateError(f"Failed to send update notification: {str(e)}"),
"sending update notification",
)
else:
await asyncio.sleep(settings.get("discord_retry_delay", 5))
async def _log_error(
self, guild: discord.Guild, error: Exception, context: str
) -> None:
"""Log an error to the guild's log channel with enhanced formatting"""
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
error_message = f"[{timestamp}] Error {context}: {str(error)}"
log_channel = await self.config_manager.get_channel(guild, "log")
if log_channel:
try:
await log_channel.send(f"```\n{error_message}\n```")
except discord.HTTPException as e:
logger.error(f"Failed to send error to log channel: {str(e)}")
logger.error(f"Guild {guild.id} - {error_message}")
async def update_yt_dlp(self) -> Tuple[bool, str]:
"""Update yt-dlp to the latest version with improved error handling"""
temp_dir = None
try:
# Create temporary directory for pip output
temp_dir = tempfile.mkdtemp(prefix="ytdlp_update_")
log_file = Path(temp_dir) / "pip_log.txt"
# Prepare pip command
cmd = [
sys.executable,
"-m",
"pip",
"install",
"--upgrade",
"yt-dlp",
"--log",
str(log_file),
]
# Run pip in subprocess with timeout
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=self.SUBPROCESS_TIMEOUT
)
except asyncio.TimeoutError:
process.kill()
raise UpdateError("Update process timed out")
if process.returncode == 0:
new_version = await self._get_current_version()
if new_version:
return True, f"Successfully updated to version {new_version}"
return True, "Successfully updated (version unknown)"
else:
# Read detailed error log
error_details = "Unknown error"
if log_file.exists():
try:
error_details = log_file.read_text(errors="ignore")
except Exception:
pass
return False, f"Failed to update: {error_details}"
except Exception as e:
return False, f"Error updating: {str(e)}"
finally:
# Cleanup temporary directory
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.error(f"Failed to cleanup temporary directory: {str(e)}")