Files
Pac-cogs/videoarchiver/utils/permission_manager.py
2024-11-18 05:17:37 +00:00

203 lines
6.6 KiB
Python

"""Module for managing file and directory permissions"""
import os
import stat
import logging
from pathlib import Path
from typing import Optional, Union, List
from utils.exceptions import FileCleanupError
logger = logging.getLogger("PermissionManager")
class PermissionManager:
"""Handles file and directory permission operations"""
DEFAULT_FILE_MODE = 0o644 # rw-r--r--
DEFAULT_DIR_MODE = 0o755 # rwxr-xr-x
FULL_ACCESS_MODE = 0o777 # rwxrwxrwx
def __init__(self):
self._is_windows = os.name == 'nt'
async def ensure_writable(
self,
path: Union[str, Path],
recursive: bool = False
) -> None:
"""Ensure a path is writable
Args:
path: Path to make writable
recursive: Whether to apply recursively to directories
Raises:
FileCleanupError: If permissions cannot be modified
"""
try:
path = Path(path)
if not path.exists():
return
if path.is_file():
await self._make_file_writable(path)
elif path.is_dir():
await self._make_directory_writable(path, recursive)
except Exception as e:
logger.error(f"Error ensuring writable permissions for {path}: {e}")
raise FileCleanupError(f"Failed to set writable permissions: {str(e)}")
async def _make_file_writable(self, path: Path) -> None:
"""Make a file writable"""
try:
current_mode = path.stat().st_mode
if self._is_windows:
os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
else:
os.chmod(path, current_mode | stat.S_IWRITE)
except Exception as e:
logger.error(f"Failed to make file {path} writable: {e}")
raise
async def _make_directory_writable(
self,
path: Path,
recursive: bool
) -> None:
"""Make a directory writable"""
try:
if self._is_windows:
os.chmod(path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
else:
current_mode = path.stat().st_mode
os.chmod(path, current_mode | stat.S_IWRITE | stat.S_IEXEC)
if recursive:
for item in path.rglob('*'):
if item.is_file():
await self._make_file_writable(item)
elif item.is_dir():
await self._make_directory_writable(item, False)
except Exception as e:
logger.error(f"Failed to make directory {path} writable: {e}")
raise
async def set_permissions(
self,
path: Union[str, Path],
mode: int,
recursive: bool = False
) -> None:
"""Set specific permissions on a path
Args:
path: Path to set permissions on
mode: Permission mode (e.g., 0o755)
recursive: Whether to apply recursively
Raises:
FileCleanupError: If permissions cannot be set
"""
try:
path = Path(path)
if not path.exists():
return
if not self._is_windows: # Skip on Windows
os.chmod(path, mode)
if recursive and path.is_dir():
file_mode = mode & ~stat.S_IXUSR & ~stat.S_IXGRP & ~stat.S_IXOTH
for item in path.rglob('*'):
if item.is_file():
os.chmod(item, file_mode)
elif item.is_dir():
os.chmod(item, mode)
except Exception as e:
logger.error(f"Error setting permissions for {path}: {e}")
raise FileCleanupError(f"Failed to set permissions: {str(e)}")
async def check_permissions(
self,
path: Union[str, Path],
require_writable: bool = True,
require_readable: bool = True,
require_executable: bool = False
) -> bool:
"""Check if a path has required permissions
Args:
path: Path to check
require_writable: Whether write permission is required
require_readable: Whether read permission is required
require_executable: Whether execute permission is required
Returns:
bool: True if path has required permissions
"""
try:
path = Path(path)
if not path.exists():
return False
if require_readable and not os.access(path, os.R_OK):
return False
if require_writable and not os.access(path, os.W_OK):
return False
if require_executable and not os.access(path, os.X_OK):
return False
return True
except Exception as e:
logger.error(f"Error checking permissions for {path}: {e}")
return False
async def fix_permissions(
self,
path: Union[str, Path],
recursive: bool = False
) -> List[str]:
"""Fix common permission issues on a path
Args:
path: Path to fix permissions on
recursive: Whether to apply recursively
Returns:
List[str]: List of errors encountered
"""
errors = []
try:
path = Path(path)
if not path.exists():
return errors
if path.is_file():
try:
await self.set_permissions(path, self.DEFAULT_FILE_MODE)
except Exception as e:
errors.append(f"Error fixing file permissions for {path}: {str(e)}")
elif path.is_dir():
try:
await self.set_permissions(path, self.DEFAULT_DIR_MODE)
if recursive:
for item in path.rglob('*'):
try:
if item.is_file():
await self.set_permissions(item, self.DEFAULT_FILE_MODE)
elif item.is_dir():
await self.set_permissions(item, self.DEFAULT_DIR_MODE)
except Exception as e:
errors.append(f"Error fixing permissions for {item}: {str(e)}")
except Exception as e:
errors.append(f"Error fixing directory permissions for {path}: {str(e)}")
except Exception as e:
errors.append(f"Error during permission fix: {str(e)}")
return errors