mirror of
https://github.com/pacnpal/Pac-cogs.git
synced 2025-12-20 10:51:05 -05:00
Component-based architecture with lifecycle management Enhanced error handling and recovery mechanisms Comprehensive state management and tracking Event-driven architecture with monitoring Queue Management: Multiple processing strategies for different scenarios Advanced state management with recovery Comprehensive metrics and health monitoring Sophisticated cleanup system with multiple strategies Processing Pipeline: Enhanced message handling with validation Improved URL extraction and processing Better queue management and monitoring Advanced cleanup mechanisms Overall Benefits: Better code organization and maintainability Improved error handling and recovery Enhanced monitoring and reporting More robust and reliable system
203 lines
6.6 KiB
Python
203 lines
6.6 KiB
Python
"""Module for managing file and directory permissions"""
|
|
|
|
import os
|
|
import stat
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional, Union, List
|
|
|
|
from .exceptions import FileCleanupError
|
|
|
|
logger = logging.getLogger("PermissionManager")
|
|
|
|
class PermissionManager:
|
|
"""Handles file and directory permission operations"""
|
|
|
|
DEFAULT_FILE_MODE = 0o644 # rw-r--r--
|
|
DEFAULT_DIR_MODE = 0o755 # rwxr-xr-x
|
|
FULL_ACCESS_MODE = 0o777 # rwxrwxrwx
|
|
|
|
def __init__(self):
|
|
self._is_windows = os.name == 'nt'
|
|
|
|
async def ensure_writable(
|
|
self,
|
|
path: Union[str, Path],
|
|
recursive: bool = False
|
|
) -> None:
|
|
"""Ensure a path is writable
|
|
|
|
Args:
|
|
path: Path to make writable
|
|
recursive: Whether to apply recursively to directories
|
|
|
|
Raises:
|
|
FileCleanupError: If permissions cannot be modified
|
|
"""
|
|
try:
|
|
path = Path(path)
|
|
if not path.exists():
|
|
return
|
|
|
|
if path.is_file():
|
|
await self._make_file_writable(path)
|
|
elif path.is_dir():
|
|
await self._make_directory_writable(path, recursive)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error ensuring writable permissions for {path}: {e}")
|
|
raise FileCleanupError(f"Failed to set writable permissions: {str(e)}")
|
|
|
|
async def _make_file_writable(self, path: Path) -> None:
|
|
"""Make a file writable"""
|
|
try:
|
|
current_mode = path.stat().st_mode
|
|
if self._is_windows:
|
|
os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
|
|
else:
|
|
os.chmod(path, current_mode | stat.S_IWRITE)
|
|
except Exception as e:
|
|
logger.error(f"Failed to make file {path} writable: {e}")
|
|
raise
|
|
|
|
async def _make_directory_writable(
|
|
self,
|
|
path: Path,
|
|
recursive: bool
|
|
) -> None:
|
|
"""Make a directory writable"""
|
|
try:
|
|
if self._is_windows:
|
|
os.chmod(path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
|
|
else:
|
|
current_mode = path.stat().st_mode
|
|
os.chmod(path, current_mode | stat.S_IWRITE | stat.S_IEXEC)
|
|
|
|
if recursive:
|
|
for item in path.rglob('*'):
|
|
if item.is_file():
|
|
await self._make_file_writable(item)
|
|
elif item.is_dir():
|
|
await self._make_directory_writable(item, False)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to make directory {path} writable: {e}")
|
|
raise
|
|
|
|
async def set_permissions(
|
|
self,
|
|
path: Union[str, Path],
|
|
mode: int,
|
|
recursive: bool = False
|
|
) -> None:
|
|
"""Set specific permissions on a path
|
|
|
|
Args:
|
|
path: Path to set permissions on
|
|
mode: Permission mode (e.g., 0o755)
|
|
recursive: Whether to apply recursively
|
|
|
|
Raises:
|
|
FileCleanupError: If permissions cannot be set
|
|
"""
|
|
try:
|
|
path = Path(path)
|
|
if not path.exists():
|
|
return
|
|
|
|
if not self._is_windows: # Skip on Windows
|
|
os.chmod(path, mode)
|
|
|
|
if recursive and path.is_dir():
|
|
file_mode = mode & ~stat.S_IXUSR & ~stat.S_IXGRP & ~stat.S_IXOTH
|
|
for item in path.rglob('*'):
|
|
if item.is_file():
|
|
os.chmod(item, file_mode)
|
|
elif item.is_dir():
|
|
os.chmod(item, mode)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error setting permissions for {path}: {e}")
|
|
raise FileCleanupError(f"Failed to set permissions: {str(e)}")
|
|
|
|
async def check_permissions(
|
|
self,
|
|
path: Union[str, Path],
|
|
require_writable: bool = True,
|
|
require_readable: bool = True,
|
|
require_executable: bool = False
|
|
) -> bool:
|
|
"""Check if a path has required permissions
|
|
|
|
Args:
|
|
path: Path to check
|
|
require_writable: Whether write permission is required
|
|
require_readable: Whether read permission is required
|
|
require_executable: Whether execute permission is required
|
|
|
|
Returns:
|
|
bool: True if path has required permissions
|
|
"""
|
|
try:
|
|
path = Path(path)
|
|
if not path.exists():
|
|
return False
|
|
|
|
if require_readable and not os.access(path, os.R_OK):
|
|
return False
|
|
if require_writable and not os.access(path, os.W_OK):
|
|
return False
|
|
if require_executable and not os.access(path, os.X_OK):
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking permissions for {path}: {e}")
|
|
return False
|
|
|
|
async def fix_permissions(
|
|
self,
|
|
path: Union[str, Path],
|
|
recursive: bool = False
|
|
) -> List[str]:
|
|
"""Fix common permission issues on a path
|
|
|
|
Args:
|
|
path: Path to fix permissions on
|
|
recursive: Whether to apply recursively
|
|
|
|
Returns:
|
|
List[str]: List of errors encountered
|
|
"""
|
|
errors = []
|
|
try:
|
|
path = Path(path)
|
|
if not path.exists():
|
|
return errors
|
|
|
|
if path.is_file():
|
|
try:
|
|
await self.set_permissions(path, self.DEFAULT_FILE_MODE)
|
|
except Exception as e:
|
|
errors.append(f"Error fixing file permissions for {path}: {str(e)}")
|
|
elif path.is_dir():
|
|
try:
|
|
await self.set_permissions(path, self.DEFAULT_DIR_MODE)
|
|
if recursive:
|
|
for item in path.rglob('*'):
|
|
try:
|
|
if item.is_file():
|
|
await self.set_permissions(item, self.DEFAULT_FILE_MODE)
|
|
elif item.is_dir():
|
|
await self.set_permissions(item, self.DEFAULT_DIR_MODE)
|
|
except Exception as e:
|
|
errors.append(f"Error fixing permissions for {item}: {str(e)}")
|
|
except Exception as e:
|
|
errors.append(f"Error fixing directory permissions for {path}: {str(e)}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error during permission fix: {str(e)}")
|
|
|
|
return errors
|