Files
Pac-cogs/videoarchiver/processor/url_extractor.py
2024-11-17 19:47:18 +00:00

361 lines
12 KiB
Python

"""URL extraction functionality for video processing"""
import logging
import re
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set, Pattern, ClassVar
from datetime import datetime
import discord # type: ignore
from urllib.parse import urlparse, parse_qs, ParseResult
logger = logging.getLogger("VideoArchiver")
@dataclass
class URLPattern:
"""Defines a URL pattern for a video site"""
site: str
pattern: Pattern
requires_api: bool = False
supports_timestamp: bool = False
supports_playlist: bool = False
def __post_init__(self) -> None:
"""Validate pattern after initialization"""
if not isinstance(self.pattern, Pattern):
raise ValueError("Pattern must be a compiled regular expression")
@dataclass
class URLMetadata:
"""Metadata about an extracted URL"""
url: str
site: str
timestamp: Optional[int] = None
playlist_id: Optional[str] = None
video_id: Optional[str] = None
quality: Optional[str] = None
extraction_time: str = field(default_factory=lambda: datetime.utcnow().isoformat())
class URLType(Enum):
"""Types of video URLs"""
DIRECT = "direct"
PLATFORM = "platform"
UNKNOWN = "unknown"
class URLPatternManager:
"""Manages URL patterns for different video sites"""
YOUTUBE_PATTERN: ClassVar[Pattern] = re.compile(
r'(?:https?://)?(?:www\.)?'
r'(?:youtube\.com/watch\?v=|youtu\.be/)'
r'([a-zA-Z0-9_-]{11})'
)
VIMEO_PATTERN: ClassVar[Pattern] = re.compile(
r'(?:https?://)?(?:www\.)?'
r'vimeo\.com/(?:channels/(?:\w+/)?|groups/(?:[^/]*/)*|)'
r'(\d+)(?:|/\w+)*'
)
TWITTER_PATTERN: ClassVar[Pattern] = re.compile(
r'(?:https?://)?(?:www\.)?'
r'(?:twitter\.com|x\.com)/\w+/status/(\d+)'
)
def __init__(self) -> None:
self.patterns: Dict[str, URLPattern] = {
"youtube": URLPattern(
site="youtube",
pattern=self.YOUTUBE_PATTERN,
supports_timestamp=True,
supports_playlist=True
),
"vimeo": URLPattern(
site="vimeo",
pattern=self.VIMEO_PATTERN,
supports_timestamp=True
),
"twitter": URLPattern(
site="twitter",
pattern=self.TWITTER_PATTERN,
requires_api=True
)
}
self.direct_extensions: Set[str] = {'.mp4', '.mov', '.avi', '.webm', '.mkv'}
def get_pattern(self, site: str) -> Optional[URLPattern]:
"""
Get pattern for a site.
Args:
site: Site identifier
Returns:
URLPattern for the site or None if not found
"""
return self.patterns.get(site.lower())
def is_supported_site(self, url: str, enabled_sites: Optional[List[str]]) -> bool:
"""
Check if URL is from a supported site.
Args:
url: URL to check
enabled_sites: List of enabled site identifiers
Returns:
True if site is supported, False otherwise
"""
if not enabled_sites:
return True
try:
parsed = urlparse(url.lower())
domain = parsed.netloc.replace('www.', '')
return any(site.lower() in domain for site in enabled_sites)
except Exception as e:
logger.error(f"Error checking site support for {url}: {e}")
return False
class URLValidator:
"""Validates extracted URLs"""
def __init__(self, pattern_manager: URLPatternManager) -> None:
self.pattern_manager = pattern_manager
def get_url_type(self, url: str) -> URLType:
"""
Determine URL type.
Args:
url: URL to check
Returns:
URLType indicating the type of URL
"""
try:
parsed = urlparse(url)
if any(parsed.path.lower().endswith(ext) for ext in self.pattern_manager.direct_extensions):
return URLType.DIRECT
if any(pattern.pattern.match(url) for pattern in self.pattern_manager.patterns.values()):
return URLType.PLATFORM
return URLType.UNKNOWN
except Exception as e:
logger.error(f"Error determining URL type for {url}: {e}")
return URLType.UNKNOWN
def is_valid_url(self, url: str) -> bool:
"""
Validate URL format.
Args:
url: URL to validate
Returns:
True if URL is valid, False otherwise
"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception as e:
logger.error(f"Error validating URL {url}: {e}")
return False
class URLMetadataExtractor:
"""Extracts metadata from URLs"""
def __init__(self, pattern_manager: URLPatternManager) -> None:
self.pattern_manager = pattern_manager
def extract_metadata(self, url: str) -> Optional[URLMetadata]:
"""
Extract metadata from URL.
Args:
url: URL to extract metadata from
Returns:
URLMetadata object or None if extraction fails
"""
try:
parsed = urlparse(url)
# Handle direct video URLs
if any(parsed.path.lower().endswith(ext) for ext in self.pattern_manager.direct_extensions):
return URLMetadata(url=url, site="direct")
# Handle platform URLs
for site, pattern in self.pattern_manager.patterns.items():
if match := pattern.pattern.match(url):
metadata = URLMetadata(
url=url,
site=site,
video_id=match.group(1)
)
# Extract additional metadata
if pattern.supports_timestamp:
metadata.timestamp = self._extract_timestamp(parsed)
if pattern.supports_playlist:
metadata.playlist_id = self._extract_playlist_id(parsed)
return metadata
return None
except Exception as e:
logger.error(f"Error extracting metadata from URL {url}: {e}", exc_info=True)
return None
def _extract_timestamp(self, parsed_url: ParseResult) -> Optional[int]:
"""Extract timestamp from URL"""
try:
params = parse_qs(parsed_url.query)
if 't' in params:
return int(params['t'][0])
return None
except (ValueError, IndexError) as e:
logger.debug(f"Error extracting timestamp: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error extracting timestamp: {e}")
return None
def _extract_playlist_id(self, parsed_url: ParseResult) -> Optional[str]:
"""Extract playlist ID from URL"""
try:
params = parse_qs(parsed_url.query)
if 'list' in params:
return params['list'][0]
return None
except (KeyError, IndexError) as e:
logger.debug(f"Error extracting playlist ID: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error extracting playlist ID: {e}")
return None
class URLExtractor:
"""Handles extraction of video URLs from messages"""
def __init__(self) -> None:
self.pattern_manager = URLPatternManager()
self.validator = URLValidator(self.pattern_manager)
self.metadata_extractor = URLMetadataExtractor(self.pattern_manager)
self._url_cache: Dict[str, Set[str]] = {}
async def extract_urls(
self,
message: discord.Message,
enabled_sites: Optional[List[str]] = None
) -> List[URLMetadata]:
"""
Extract video URLs from message content and attachments.
Args:
message: Discord message to extract URLs from
enabled_sites: Optional list of enabled site identifiers
Returns:
List of URLMetadata objects for extracted URLs
"""
urls: List[URLMetadata] = []
try:
# Check cache
cache_key = f"{message.id}_{'-'.join(enabled_sites) if enabled_sites else 'all'}"
if cache_key in self._url_cache:
return [
metadata for url in self._url_cache[cache_key]
if (metadata := self.metadata_extractor.extract_metadata(url))
]
# Extract URLs
content_urls = await self._extract_from_content(message.content, enabled_sites)
attachment_urls = await self._extract_from_attachments(message.attachments)
# Process all URLs
all_urls = content_urls + attachment_urls
valid_urls: Set[str] = set()
for url in all_urls:
if not self.validator.is_valid_url(url):
logger.debug(f"Invalid URL format: {url}")
continue
if not self.pattern_manager.is_supported_site(url, enabled_sites):
logger.debug(f"URL {url} doesn't match any enabled sites")
continue
metadata = self.metadata_extractor.extract_metadata(url)
if metadata:
urls.append(metadata)
valid_urls.add(url)
else:
logger.debug(f"Could not extract metadata from URL: {url}")
# Update cache
self._url_cache[cache_key] = valid_urls
return urls
except Exception as e:
logger.error(f"Error extracting URLs from message {message.id}: {e}", exc_info=True)
return []
async def _extract_from_content(
self,
content: Optional[str],
enabled_sites: Optional[List[str]]
) -> List[str]:
"""Extract video URLs from message content"""
if not content:
return []
try:
urls = []
for word in content.split():
if self.validator.get_url_type(word) != URLType.UNKNOWN:
urls.append(word)
return urls
except Exception as e:
logger.error(f"Error extracting URLs from content: {e}", exc_info=True)
return []
async def _extract_from_attachments(
self,
attachments: List[discord.Attachment]
) -> List[str]:
"""Extract video URLs from message attachments"""
try:
return [
attachment.url
for attachment in attachments
if any(
attachment.filename.lower().endswith(ext)
for ext in self.pattern_manager.direct_extensions
)
]
except Exception as e:
logger.error(f"Error extracting URLs from attachments: {e}", exc_info=True)
return []
def clear_cache(self, message_id: Optional[int] = None) -> None:
"""
Clear URL cache.
Args:
message_id: Optional message ID to clear cache for. If None, clears all cache.
"""
try:
if message_id:
keys_to_remove = [
key for key in self._url_cache
if key.startswith(f"{message_id}_")
]
for key in keys_to_remove:
self._url_cache.pop(key, None)
else:
self._url_cache.clear()
except Exception as e:
logger.error(f"Error clearing URL cache: {e}", exc_info=True)