mirror of
https://github.com/pacnpal/Pac-cogs.git
synced 2025-12-20 02:41:06 -05:00
361 lines
12 KiB
Python
361 lines
12 KiB
Python
"""URL extraction functionality for video processing"""
|
|
|
|
import logging
|
|
import re
|
|
from enum import Enum
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Dict, Optional, Set, Pattern, ClassVar
|
|
from datetime import datetime
|
|
import discord # type: ignore
|
|
from urllib.parse import urlparse, parse_qs, ParseResult
|
|
|
|
logger = logging.getLogger("VideoArchiver")
|
|
|
|
@dataclass
|
|
class URLPattern:
|
|
"""Defines a URL pattern for a video site"""
|
|
site: str
|
|
pattern: Pattern
|
|
requires_api: bool = False
|
|
supports_timestamp: bool = False
|
|
supports_playlist: bool = False
|
|
|
|
def __post_init__(self) -> None:
|
|
"""Validate pattern after initialization"""
|
|
if not isinstance(self.pattern, Pattern):
|
|
raise ValueError("Pattern must be a compiled regular expression")
|
|
|
|
@dataclass
|
|
class URLMetadata:
|
|
"""Metadata about an extracted URL"""
|
|
url: str
|
|
site: str
|
|
timestamp: Optional[int] = None
|
|
playlist_id: Optional[str] = None
|
|
video_id: Optional[str] = None
|
|
quality: Optional[str] = None
|
|
extraction_time: str = field(default_factory=lambda: datetime.utcnow().isoformat())
|
|
|
|
class URLType(Enum):
|
|
"""Types of video URLs"""
|
|
DIRECT = "direct"
|
|
PLATFORM = "platform"
|
|
UNKNOWN = "unknown"
|
|
|
|
class URLPatternManager:
|
|
"""Manages URL patterns for different video sites"""
|
|
|
|
YOUTUBE_PATTERN: ClassVar[Pattern] = re.compile(
|
|
r'(?:https?://)?(?:www\.)?'
|
|
r'(?:youtube\.com/watch\?v=|youtu\.be/)'
|
|
r'([a-zA-Z0-9_-]{11})'
|
|
)
|
|
VIMEO_PATTERN: ClassVar[Pattern] = re.compile(
|
|
r'(?:https?://)?(?:www\.)?'
|
|
r'vimeo\.com/(?:channels/(?:\w+/)?|groups/(?:[^/]*/)*|)'
|
|
r'(\d+)(?:|/\w+)*'
|
|
)
|
|
TWITTER_PATTERN: ClassVar[Pattern] = re.compile(
|
|
r'(?:https?://)?(?:www\.)?'
|
|
r'(?:twitter\.com|x\.com)/\w+/status/(\d+)'
|
|
)
|
|
|
|
def __init__(self) -> None:
|
|
self.patterns: Dict[str, URLPattern] = {
|
|
"youtube": URLPattern(
|
|
site="youtube",
|
|
pattern=self.YOUTUBE_PATTERN,
|
|
supports_timestamp=True,
|
|
supports_playlist=True
|
|
),
|
|
"vimeo": URLPattern(
|
|
site="vimeo",
|
|
pattern=self.VIMEO_PATTERN,
|
|
supports_timestamp=True
|
|
),
|
|
"twitter": URLPattern(
|
|
site="twitter",
|
|
pattern=self.TWITTER_PATTERN,
|
|
requires_api=True
|
|
)
|
|
}
|
|
|
|
self.direct_extensions: Set[str] = {'.mp4', '.mov', '.avi', '.webm', '.mkv'}
|
|
|
|
def get_pattern(self, site: str) -> Optional[URLPattern]:
|
|
"""
|
|
Get pattern for a site.
|
|
|
|
Args:
|
|
site: Site identifier
|
|
|
|
Returns:
|
|
URLPattern for the site or None if not found
|
|
"""
|
|
return self.patterns.get(site.lower())
|
|
|
|
def is_supported_site(self, url: str, enabled_sites: Optional[List[str]]) -> bool:
|
|
"""
|
|
Check if URL is from a supported site.
|
|
|
|
Args:
|
|
url: URL to check
|
|
enabled_sites: List of enabled site identifiers
|
|
|
|
Returns:
|
|
True if site is supported, False otherwise
|
|
"""
|
|
if not enabled_sites:
|
|
return True
|
|
|
|
try:
|
|
parsed = urlparse(url.lower())
|
|
domain = parsed.netloc.replace('www.', '')
|
|
return any(site.lower() in domain for site in enabled_sites)
|
|
except Exception as e:
|
|
logger.error(f"Error checking site support for {url}: {e}")
|
|
return False
|
|
|
|
class URLValidator:
|
|
"""Validates extracted URLs"""
|
|
|
|
def __init__(self, pattern_manager: URLPatternManager) -> None:
|
|
self.pattern_manager = pattern_manager
|
|
|
|
def get_url_type(self, url: str) -> URLType:
|
|
"""
|
|
Determine URL type.
|
|
|
|
Args:
|
|
url: URL to check
|
|
|
|
Returns:
|
|
URLType indicating the type of URL
|
|
"""
|
|
try:
|
|
parsed = urlparse(url)
|
|
if any(parsed.path.lower().endswith(ext) for ext in self.pattern_manager.direct_extensions):
|
|
return URLType.DIRECT
|
|
if any(pattern.pattern.match(url) for pattern in self.pattern_manager.patterns.values()):
|
|
return URLType.PLATFORM
|
|
return URLType.UNKNOWN
|
|
except Exception as e:
|
|
logger.error(f"Error determining URL type for {url}: {e}")
|
|
return URLType.UNKNOWN
|
|
|
|
def is_valid_url(self, url: str) -> bool:
|
|
"""
|
|
Validate URL format.
|
|
|
|
Args:
|
|
url: URL to validate
|
|
|
|
Returns:
|
|
True if URL is valid, False otherwise
|
|
"""
|
|
try:
|
|
result = urlparse(url)
|
|
return all([result.scheme, result.netloc])
|
|
except Exception as e:
|
|
logger.error(f"Error validating URL {url}: {e}")
|
|
return False
|
|
|
|
class URLMetadataExtractor:
|
|
"""Extracts metadata from URLs"""
|
|
|
|
def __init__(self, pattern_manager: URLPatternManager) -> None:
|
|
self.pattern_manager = pattern_manager
|
|
|
|
def extract_metadata(self, url: str) -> Optional[URLMetadata]:
|
|
"""
|
|
Extract metadata from URL.
|
|
|
|
Args:
|
|
url: URL to extract metadata from
|
|
|
|
Returns:
|
|
URLMetadata object or None if extraction fails
|
|
"""
|
|
try:
|
|
parsed = urlparse(url)
|
|
|
|
# Handle direct video URLs
|
|
if any(parsed.path.lower().endswith(ext) for ext in self.pattern_manager.direct_extensions):
|
|
return URLMetadata(url=url, site="direct")
|
|
|
|
# Handle platform URLs
|
|
for site, pattern in self.pattern_manager.patterns.items():
|
|
if match := pattern.pattern.match(url):
|
|
metadata = URLMetadata(
|
|
url=url,
|
|
site=site,
|
|
video_id=match.group(1)
|
|
)
|
|
|
|
# Extract additional metadata
|
|
if pattern.supports_timestamp:
|
|
metadata.timestamp = self._extract_timestamp(parsed)
|
|
if pattern.supports_playlist:
|
|
metadata.playlist_id = self._extract_playlist_id(parsed)
|
|
|
|
return metadata
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting metadata from URL {url}: {e}", exc_info=True)
|
|
return None
|
|
|
|
def _extract_timestamp(self, parsed_url: ParseResult) -> Optional[int]:
|
|
"""Extract timestamp from URL"""
|
|
try:
|
|
params = parse_qs(parsed_url.query)
|
|
if 't' in params:
|
|
return int(params['t'][0])
|
|
return None
|
|
except (ValueError, IndexError) as e:
|
|
logger.debug(f"Error extracting timestamp: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error extracting timestamp: {e}")
|
|
return None
|
|
|
|
def _extract_playlist_id(self, parsed_url: ParseResult) -> Optional[str]:
|
|
"""Extract playlist ID from URL"""
|
|
try:
|
|
params = parse_qs(parsed_url.query)
|
|
if 'list' in params:
|
|
return params['list'][0]
|
|
return None
|
|
except (KeyError, IndexError) as e:
|
|
logger.debug(f"Error extracting playlist ID: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error extracting playlist ID: {e}")
|
|
return None
|
|
|
|
class URLExtractor:
|
|
"""Handles extraction of video URLs from messages"""
|
|
|
|
def __init__(self) -> None:
|
|
self.pattern_manager = URLPatternManager()
|
|
self.validator = URLValidator(self.pattern_manager)
|
|
self.metadata_extractor = URLMetadataExtractor(self.pattern_manager)
|
|
self._url_cache: Dict[str, Set[str]] = {}
|
|
|
|
async def extract_urls(
|
|
self,
|
|
message: discord.Message,
|
|
enabled_sites: Optional[List[str]] = None
|
|
) -> List[URLMetadata]:
|
|
"""
|
|
Extract video URLs from message content and attachments.
|
|
|
|
Args:
|
|
message: Discord message to extract URLs from
|
|
enabled_sites: Optional list of enabled site identifiers
|
|
|
|
Returns:
|
|
List of URLMetadata objects for extracted URLs
|
|
"""
|
|
urls: List[URLMetadata] = []
|
|
|
|
try:
|
|
# Check cache
|
|
cache_key = f"{message.id}_{'-'.join(enabled_sites) if enabled_sites else 'all'}"
|
|
if cache_key in self._url_cache:
|
|
return [
|
|
metadata for url in self._url_cache[cache_key]
|
|
if (metadata := self.metadata_extractor.extract_metadata(url))
|
|
]
|
|
|
|
# Extract URLs
|
|
content_urls = await self._extract_from_content(message.content, enabled_sites)
|
|
attachment_urls = await self._extract_from_attachments(message.attachments)
|
|
|
|
# Process all URLs
|
|
all_urls = content_urls + attachment_urls
|
|
valid_urls: Set[str] = set()
|
|
|
|
for url in all_urls:
|
|
if not self.validator.is_valid_url(url):
|
|
logger.debug(f"Invalid URL format: {url}")
|
|
continue
|
|
|
|
if not self.pattern_manager.is_supported_site(url, enabled_sites):
|
|
logger.debug(f"URL {url} doesn't match any enabled sites")
|
|
continue
|
|
|
|
metadata = self.metadata_extractor.extract_metadata(url)
|
|
if metadata:
|
|
urls.append(metadata)
|
|
valid_urls.add(url)
|
|
else:
|
|
logger.debug(f"Could not extract metadata from URL: {url}")
|
|
|
|
# Update cache
|
|
self._url_cache[cache_key] = valid_urls
|
|
|
|
return urls
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting URLs from message {message.id}: {e}", exc_info=True)
|
|
return []
|
|
|
|
async def _extract_from_content(
|
|
self,
|
|
content: Optional[str],
|
|
enabled_sites: Optional[List[str]]
|
|
) -> List[str]:
|
|
"""Extract video URLs from message content"""
|
|
if not content:
|
|
return []
|
|
|
|
try:
|
|
urls = []
|
|
for word in content.split():
|
|
if self.validator.get_url_type(word) != URLType.UNKNOWN:
|
|
urls.append(word)
|
|
return urls
|
|
except Exception as e:
|
|
logger.error(f"Error extracting URLs from content: {e}", exc_info=True)
|
|
return []
|
|
|
|
async def _extract_from_attachments(
|
|
self,
|
|
attachments: List[discord.Attachment]
|
|
) -> List[str]:
|
|
"""Extract video URLs from message attachments"""
|
|
try:
|
|
return [
|
|
attachment.url
|
|
for attachment in attachments
|
|
if any(
|
|
attachment.filename.lower().endswith(ext)
|
|
for ext in self.pattern_manager.direct_extensions
|
|
)
|
|
]
|
|
except Exception as e:
|
|
logger.error(f"Error extracting URLs from attachments: {e}", exc_info=True)
|
|
return []
|
|
|
|
def clear_cache(self, message_id: Optional[int] = None) -> None:
|
|
"""
|
|
Clear URL cache.
|
|
|
|
Args:
|
|
message_id: Optional message ID to clear cache for. If None, clears all cache.
|
|
"""
|
|
try:
|
|
if message_id:
|
|
keys_to_remove = [
|
|
key for key in self._url_cache
|
|
if key.startswith(f"{message_id}_")
|
|
]
|
|
for key in keys_to_remove:
|
|
self._url_cache.pop(key, None)
|
|
else:
|
|
self._url_cache.clear()
|
|
except Exception as e:
|
|
logger.error(f"Error clearing URL cache: {e}", exc_info=True)
|