Files
Pac-cogs/videoarchiver/utils/permission_manager.py
pacnpal a4ca6e8ea6 Core Systems:
Component-based architecture with lifecycle management
Enhanced error handling and recovery mechanisms
Comprehensive state management and tracking
Event-driven architecture with monitoring
Queue Management:

Multiple processing strategies for different scenarios
Advanced state management with recovery
Comprehensive metrics and health monitoring
Sophisticated cleanup system with multiple strategies
Processing Pipeline:

Enhanced message handling with validation
Improved URL extraction and processing
Better queue management and monitoring
Advanced cleanup mechanisms
Overall Benefits:

Better code organization and maintainability
Improved error handling and recovery
Enhanced monitoring and reporting
More robust and reliable system
2024-11-16 05:01:29 +00:00

203 lines
6.6 KiB
Python

"""Module for managing file and directory permissions"""
import os
import stat
import logging
from pathlib import Path
from typing import Optional, Union, List
from .exceptions import FileCleanupError
logger = logging.getLogger("PermissionManager")
class PermissionManager:
"""Handles file and directory permission operations"""
DEFAULT_FILE_MODE = 0o644 # rw-r--r--
DEFAULT_DIR_MODE = 0o755 # rwxr-xr-x
FULL_ACCESS_MODE = 0o777 # rwxrwxrwx
def __init__(self):
self._is_windows = os.name == 'nt'
async def ensure_writable(
self,
path: Union[str, Path],
recursive: bool = False
) -> None:
"""Ensure a path is writable
Args:
path: Path to make writable
recursive: Whether to apply recursively to directories
Raises:
FileCleanupError: If permissions cannot be modified
"""
try:
path = Path(path)
if not path.exists():
return
if path.is_file():
await self._make_file_writable(path)
elif path.is_dir():
await self._make_directory_writable(path, recursive)
except Exception as e:
logger.error(f"Error ensuring writable permissions for {path}: {e}")
raise FileCleanupError(f"Failed to set writable permissions: {str(e)}")
async def _make_file_writable(self, path: Path) -> None:
"""Make a file writable"""
try:
current_mode = path.stat().st_mode
if self._is_windows:
os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
else:
os.chmod(path, current_mode | stat.S_IWRITE)
except Exception as e:
logger.error(f"Failed to make file {path} writable: {e}")
raise
async def _make_directory_writable(
self,
path: Path,
recursive: bool
) -> None:
"""Make a directory writable"""
try:
if self._is_windows:
os.chmod(path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
else:
current_mode = path.stat().st_mode
os.chmod(path, current_mode | stat.S_IWRITE | stat.S_IEXEC)
if recursive:
for item in path.rglob('*'):
if item.is_file():
await self._make_file_writable(item)
elif item.is_dir():
await self._make_directory_writable(item, False)
except Exception as e:
logger.error(f"Failed to make directory {path} writable: {e}")
raise
async def set_permissions(
self,
path: Union[str, Path],
mode: int,
recursive: bool = False
) -> None:
"""Set specific permissions on a path
Args:
path: Path to set permissions on
mode: Permission mode (e.g., 0o755)
recursive: Whether to apply recursively
Raises:
FileCleanupError: If permissions cannot be set
"""
try:
path = Path(path)
if not path.exists():
return
if not self._is_windows: # Skip on Windows
os.chmod(path, mode)
if recursive and path.is_dir():
file_mode = mode & ~stat.S_IXUSR & ~stat.S_IXGRP & ~stat.S_IXOTH
for item in path.rglob('*'):
if item.is_file():
os.chmod(item, file_mode)
elif item.is_dir():
os.chmod(item, mode)
except Exception as e:
logger.error(f"Error setting permissions for {path}: {e}")
raise FileCleanupError(f"Failed to set permissions: {str(e)}")
async def check_permissions(
self,
path: Union[str, Path],
require_writable: bool = True,
require_readable: bool = True,
require_executable: bool = False
) -> bool:
"""Check if a path has required permissions
Args:
path: Path to check
require_writable: Whether write permission is required
require_readable: Whether read permission is required
require_executable: Whether execute permission is required
Returns:
bool: True if path has required permissions
"""
try:
path = Path(path)
if not path.exists():
return False
if require_readable and not os.access(path, os.R_OK):
return False
if require_writable and not os.access(path, os.W_OK):
return False
if require_executable and not os.access(path, os.X_OK):
return False
return True
except Exception as e:
logger.error(f"Error checking permissions for {path}: {e}")
return False
async def fix_permissions(
self,
path: Union[str, Path],
recursive: bool = False
) -> List[str]:
"""Fix common permission issues on a path
Args:
path: Path to fix permissions on
recursive: Whether to apply recursively
Returns:
List[str]: List of errors encountered
"""
errors = []
try:
path = Path(path)
if not path.exists():
return errors
if path.is_file():
try:
await self.set_permissions(path, self.DEFAULT_FILE_MODE)
except Exception as e:
errors.append(f"Error fixing file permissions for {path}: {str(e)}")
elif path.is_dir():
try:
await self.set_permissions(path, self.DEFAULT_DIR_MODE)
if recursive:
for item in path.rglob('*'):
try:
if item.is_file():
await self.set_permissions(item, self.DEFAULT_FILE_MODE)
elif item.is_dir():
await self.set_permissions(item, self.DEFAULT_DIR_MODE)
except Exception as e:
errors.append(f"Error fixing permissions for {item}: {str(e)}")
except Exception as e:
errors.append(f"Error fixing directory permissions for {path}: {str(e)}")
except Exception as e:
errors.append(f"Error during permission fix: {str(e)}")
return errors