fix imports

This commit is contained in:
pacnpal
2024-11-14 23:45:36 +00:00
parent d8ca4e5b8e
commit f3d28231e8
8 changed files with 43 additions and 276 deletions

View File

@@ -19,16 +19,30 @@ A Red-DiscordBot cog for automatically archiving videos from monitored Discord c
The cog is organized into several modules for better maintainability:
### Core Files
- `video_archiver.py`: Main cog class and entry point
- `commands.py`: Discord command handlers
- `config_manager.py`: Guild configuration management
- `processor.py`: Video processing logic
- `enhanced_queue.py`: Advanced queue management system
- `update_checker.py`: yt-dlp update management
- `utils.py`: Utility functions and classes
- `ffmpeg_manager.py`: FFmpeg configuration and hardware acceleration
- `exceptions.py`: Custom exception classes
### Utils Package
- `utils/video_downloader.py`: Video download and processing
- `utils/message_manager.py`: Message handling and cleanup
- `utils/file_ops.py`: File operations and secure deletion
- `utils/path_manager.py`: Path management utilities
- `utils/exceptions.py`: Utility-specific exceptions
### FFmpeg Package
- `ffmpeg/ffmpeg_manager.py`: FFmpeg configuration and management
- `ffmpeg/gpu_detector.py`: GPU capability detection
- `ffmpeg/video_analyzer.py`: Video analysis utilities
- `ffmpeg/encoder_params.py`: Encoding parameter optimization
- `ffmpeg/ffmpeg_downloader.py`: FFmpeg binary management
- `ffmpeg/exceptions.py`: FFmpeg-specific exceptions
## Installation
1. Install the cog using Red's cog manager:

View File

@@ -3,9 +3,9 @@
from redbot.core.bot import Red
from redbot.core.utils import get_end_user_data_statement
from .video_archiver import VideoArchiver
from .ffmpeg.ffmpeg_manager import FFmpegManager
from .ffmpeg.exceptions import FFmpegError, GPUError, DownloadError
from videoarchiver.video_archiver import VideoArchiver
from videoarchiver.ffmpeg.ffmpeg_manager import FFmpegManager
from videoarchiver.ffmpeg.exceptions import FFmpegError, GPUError, DownloadError
__red_end_user_data_statement__ = get_end_user_data_statement(__file__)

View File

@@ -1,6 +1,6 @@
"""FFmpeg management package"""
from .exceptions import FFmpegError, GPUError, DownloadError
from .ffmpeg_manager import FFmpegManager
from videoarchiver.ffmpeg.exceptions import FFmpegError, GPUError, DownloadError
from videoarchiver.ffmpeg.ffmpeg_manager import FFmpegManager
__all__ = ['FFmpegManager', 'FFmpegError', 'GPUError', 'DownloadError']

View File

@@ -7,11 +7,11 @@ import logging
from pathlib import Path
from typing import Dict, Any, Optional
from .exceptions import FFmpegError
from .gpu_detector import GPUDetector
from .video_analyzer import VideoAnalyzer
from .encoder_params import EncoderParams
from .ffmpeg_downloader import FFmpegDownloader
from videoarchiver.ffmpeg.exceptions import FFmpegError
from videoarchiver.ffmpeg.gpu_detector import GPUDetector
from videoarchiver.ffmpeg.video_analyzer import VideoAnalyzer
from videoarchiver.ffmpeg.encoder_params import EncoderParams
from videoarchiver.ffmpeg.ffmpeg_downloader import FFmpegDownloader
logger = logging.getLogger("VideoArchiver")

View File

@@ -9,10 +9,10 @@ import asyncio
import traceback
from datetime import datetime
from .utils.video_downloader import VideoDownloader
from .utils.file_ops import secure_delete_file, cleanup_downloads
from .exceptions import ProcessingError, DiscordAPIError
from .enhanced_queue import EnhancedVideoQueueManager
from videoarchiver.utils.video_downloader import VideoDownloader
from videoarchiver.utils.file_ops import secure_delete_file, cleanup_downloads
from videoarchiver.exceptions import ProcessingError, DiscordAPIError
from videoarchiver.enhanced_queue import EnhancedVideoQueueManager
logger = logging.getLogger('VideoArchiver')

View File

@@ -1,247 +0,0 @@
import asyncio
import logging
from typing import Dict, Optional, Set, Tuple, Callable, Any
from datetime import datetime
import traceback
from dataclasses import dataclass
import weakref
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('QueueManager')
@dataclass
class QueueItem:
"""Represents a video processing task in the queue"""
url: str
message_id: int
channel_id: int
guild_id: int
author_id: int
added_at: datetime
callback: Callable[[str, bool, str], Any]
status: str = "pending" # pending, processing, completed, failed
error: Optional[str] = None
attempt: int = 0
class VideoQueueManager:
"""Manages a queue of videos to be processed, ensuring sequential processing"""
def __init__(self, max_retries: int = 3, retry_delay: int = 5):
self.max_retries = max_retries
self.retry_delay = retry_delay
# Queue storage
self._queue: asyncio.Queue[QueueItem] = asyncio.Queue()
self._processing: Dict[str, QueueItem] = {}
self._failed: Dict[str, QueueItem] = {}
self._completed: Dict[str, QueueItem] = {}
# Track active tasks
self._active_tasks: Set[asyncio.Task] = set()
self._processing_lock = asyncio.Lock()
# Status tracking
self._guild_queues: Dict[int, Set[str]] = {}
self._channel_queues: Dict[int, Set[str]] = {}
# Cleanup references
self._weak_refs: Set[weakref.ref] = set()
# Start queue processor
self._processor_task = asyncio.create_task(self._process_queue())
self._active_tasks.add(self._processor_task)
async def add_to_queue(
self,
url: str,
message_id: int,
channel_id: int,
guild_id: int,
author_id: int,
callback: Callable[[str, bool, str], Any]
) -> bool:
"""Add a video to the processing queue"""
try:
# Create queue item
item = QueueItem(
url=url,
message_id=message_id,
channel_id=channel_id,
guild_id=guild_id,
author_id=author_id,
added_at=datetime.utcnow(),
callback=callback
)
# Add to tracking collections
if guild_id not in self._guild_queues:
self._guild_queues[guild_id] = set()
self._guild_queues[guild_id].add(url)
if channel_id not in self._channel_queues:
self._channel_queues[channel_id] = set()
self._channel_queues[channel_id].add(url)
# Add to queue
await self._queue.put(item)
# Create weak reference for cleanup
self._weak_refs.add(weakref.ref(item))
logger.info(f"Added video to queue: {url}")
return True
except Exception as e:
logger.error(f"Error adding video to queue: {str(e)}")
return False
async def _process_queue(self):
"""Process videos in the queue sequentially"""
while True:
try:
# Get next item from queue
item = await self._queue.get()
async with self._processing_lock:
self._processing[item.url] = item
item.status = "processing"
try:
# Execute callback with the URL
success = await item.callback(item.url, True, "")
if success:
item.status = "completed"
self._completed[item.url] = item
logger.info(f"Successfully processed video: {item.url}")
else:
# Handle retry logic
item.attempt += 1
if item.attempt < self.max_retries:
# Re-queue with delay
await asyncio.sleep(self.retry_delay * item.attempt)
await self._queue.put(item)
logger.info(f"Retrying video processing: {item.url} (Attempt {item.attempt + 1})")
else:
item.status = "failed"
item.error = "Max retries exceeded"
self._failed[item.url] = item
logger.error(f"Failed to process video after {self.max_retries} attempts: {item.url}")
# Notify callback of failure
await item.callback(item.url, False, item.error)
except Exception as e:
logger.error(f"Error processing video: {str(e)}\n{traceback.format_exc()}")
item.status = "failed"
item.error = str(e)
self._failed[item.url] = item
# Notify callback of failure
await item.callback(item.url, False, str(e))
finally:
# Clean up tracking
self._processing.pop(item.url, None)
if item.guild_id in self._guild_queues:
self._guild_queues[item.guild_id].discard(item.url)
if item.channel_id in self._channel_queues:
self._channel_queues[item.channel_id].discard(item.url)
# Mark queue item as done
self._queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Queue processor error: {str(e)}\n{traceback.format_exc()}")
await asyncio.sleep(1) # Prevent tight error loop
def get_queue_status(self, guild_id: Optional[int] = None) -> Dict[str, int]:
"""Get current queue status, optionally filtered by guild"""
if guild_id is not None:
guild_urls = self._guild_queues.get(guild_id, set())
return {
"pending": sum(1 for _ in self._queue._queue if _.url in guild_urls),
"processing": sum(1 for url in self._processing if url in guild_urls),
"completed": sum(1 for url in self._completed if url in guild_urls),
"failed": sum(1 for url in self._failed if url in guild_urls)
}
else:
return {
"pending": self._queue.qsize(),
"processing": len(self._processing),
"completed": len(self._completed),
"failed": len(self._failed)
}
def get_channel_queue_size(self, channel_id: int) -> int:
"""Get number of items queued for a specific channel"""
return len(self._channel_queues.get(channel_id, set()))
async def clear_guild_queue(self, guild_id: int) -> int:
"""Clear all queued items for a specific guild"""
if guild_id not in self._guild_queues:
return 0
cleared = 0
guild_urls = self._guild_queues[guild_id].copy()
# Remove from main queue
new_queue = asyncio.Queue()
while not self._queue.empty():
item = await self._queue.get()
if item.guild_id != guild_id:
await new_queue.put(item)
else:
cleared += 1
self._queue = new_queue
# Clean up tracking
for url in guild_urls:
self._processing.pop(url, None)
self._completed.pop(url, None)
self._failed.pop(url, None)
self._guild_queues.pop(guild_id, None)
# Clean up channel queues
for channel_id, urls in list(self._channel_queues.items()):
urls.difference_update(guild_urls)
if not urls:
self._channel_queues.pop(channel_id, None)
return cleared
async def cleanup(self):
"""Clean up resources and stop queue processing"""
# Cancel processor task
if self._processor_task and not self._processor_task.done():
self._processor_task.cancel()
try:
await self._processor_task
except asyncio.CancelledError:
pass
# Cancel all active tasks
for task in self._active_tasks:
if not task.done():
task.cancel()
await asyncio.gather(*self._active_tasks, return_exceptions=True)
# Clear all collections
self._queue = asyncio.Queue()
self._processing.clear()
self._completed.clear()
self._failed.clear()
self._guild_queues.clear()
self._channel_queues.clear()
# Clear weak references
self._weak_refs.clear()

View File

@@ -9,10 +9,10 @@ from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Optional, Tuple
from pathlib import Path
from ..ffmpeg.ffmpeg_manager import FFmpegManager
from .exceptions import VideoVerificationError
from .file_ops import secure_delete_file
from .path_manager import temp_path_context
from videoarchiver.ffmpeg.ffmpeg_manager import FFmpegManager
from videoarchiver.utils.exceptions import VideoVerificationError
from videoarchiver.utils.file_ops import secure_delete_file
from videoarchiver.utils.path_manager import temp_path_context
logger = logging.getLogger("VideoArchiver")

View File

@@ -11,15 +11,15 @@ from datetime import datetime
import sys
import traceback
from .config_manager import ConfigManager
from .update_checker import UpdateChecker
from .processor import VideoProcessor
from .commands import VideoArchiverCommands
from .utils.video_downloader import VideoDownloader
from .utils.message_manager import MessageManager
from .utils.file_ops import cleanup_downloads
from .enhanced_queue import EnhancedVideoQueueManager
from .exceptions import (
from videoarchiver.config_manager import ConfigManager
from videoarchiver.update_checker import UpdateChecker
from videoarchiver.processor import VideoProcessor
from videoarchiver.commands import VideoArchiverCommands
from videoarchiver.utils.video_downloader import VideoDownloader
from videoarchiver.utils.message_manager import MessageManager
from videoarchiver.utils.file_ops import cleanup_downloads
from videoarchiver.enhanced_queue import EnhancedVideoQueueManager
from videoarchiver.exceptions import (
ProcessingError,
ConfigError,
UpdateError,