mirror of
https://github.com/pacnpal/Pac-cogs.git
synced 2025-12-20 10:51:05 -05:00
Added chunked file processing and size limits to prevent blocking Updated video_downloader.py to properly await secure_delete_file Updated video_archiver.py to properly await cleanup_downloads Simplified the secure deletion process while maintaining security Added proper error handling and logging throughout
136 lines
4.8 KiB
Python
136 lines
4.8 KiB
Python
"""File operation utilities"""
|
|
|
|
import os
|
|
import stat
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from .exceptions import FileCleanupError
|
|
|
|
logger = logging.getLogger("VideoArchiver")
|
|
|
|
async def secure_delete_file(file_path: str, max_size: int = 100 * 1024 * 1024) -> bool:
|
|
"""Delete a file securely
|
|
|
|
Args:
|
|
file_path: Path to the file to delete
|
|
max_size: Maximum file size in bytes to attempt secure deletion (default: 100MB)
|
|
|
|
Returns:
|
|
bool: True if file was successfully deleted, False otherwise
|
|
|
|
Raises:
|
|
FileCleanupError: If file deletion fails after all attempts
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
return True
|
|
|
|
try:
|
|
# Get file size
|
|
try:
|
|
file_size = os.path.getsize(file_path)
|
|
except OSError as e:
|
|
logger.warning(f"Could not get size of {file_path}: {e}")
|
|
file_size = 0
|
|
|
|
# For large files, skip secure deletion and just remove
|
|
if file_size > max_size:
|
|
logger.debug(f"File {file_path} exceeds max size for secure deletion, performing direct removal")
|
|
try:
|
|
os.remove(file_path)
|
|
return True
|
|
except OSError as e:
|
|
logger.error(f"Failed to remove large file {file_path}: {e}")
|
|
return False
|
|
|
|
# Ensure file is writable
|
|
try:
|
|
current_mode = os.stat(file_path).st_mode
|
|
os.chmod(file_path, current_mode | stat.S_IWRITE)
|
|
except OSError as e:
|
|
logger.warning(f"Could not modify permissions of {file_path}: {e}")
|
|
raise FileCleanupError(f"Permission error: {str(e)}")
|
|
|
|
# Zero out file content in chunks to avoid memory issues
|
|
if file_size > 0:
|
|
try:
|
|
chunk_size = min(1024 * 1024, file_size) # 1MB chunks or file size if smaller
|
|
with open(file_path, "wb") as f:
|
|
for offset in range(0, file_size, chunk_size):
|
|
write_size = min(chunk_size, file_size - offset)
|
|
f.write(b'\0' * write_size)
|
|
# Allow other tasks to run
|
|
await asyncio.sleep(0)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
except OSError as e:
|
|
logger.warning(f"Error zeroing file {file_path}: {e}")
|
|
|
|
# Delete the file
|
|
try:
|
|
Path(file_path).unlink(missing_ok=True)
|
|
return True
|
|
except OSError as e:
|
|
logger.error(f"Failed to delete file {file_path}: {e}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during deletion of {file_path}: {e}")
|
|
# Last resort: try force delete
|
|
try:
|
|
if os.path.exists(file_path):
|
|
os.chmod(file_path, stat.S_IWRITE | stat.S_IREAD)
|
|
Path(file_path).unlink(missing_ok=True)
|
|
except Exception as e2:
|
|
logger.error(f"Force delete failed for {file_path}: {e2}")
|
|
raise FileCleanupError(f"Force delete failed: {str(e2)}")
|
|
return not os.path.exists(file_path)
|
|
|
|
async def cleanup_downloads(download_path: str) -> None:
|
|
"""Clean up the downloads directory
|
|
|
|
Args:
|
|
download_path: Path to the downloads directory to clean
|
|
|
|
Raises:
|
|
FileCleanupError: If cleanup fails
|
|
"""
|
|
try:
|
|
if not os.path.exists(download_path):
|
|
return
|
|
|
|
errors = []
|
|
# Delete all files in the directory
|
|
for entry in os.scandir(download_path):
|
|
try:
|
|
path = entry.path
|
|
if entry.is_file():
|
|
if not await secure_delete_file(path):
|
|
errors.append(f"Failed to delete file: {path}")
|
|
elif entry.is_dir():
|
|
await asyncio.to_thread(lambda: os.rmdir(path) if not os.listdir(path) else None)
|
|
except Exception as e:
|
|
errors.append(f"Error processing {entry.path}: {str(e)}")
|
|
continue
|
|
|
|
# Clean up empty subdirectories
|
|
for root, dirs, files in os.walk(download_path, topdown=False):
|
|
for name in dirs:
|
|
try:
|
|
dir_path = os.path.join(root, name)
|
|
if not os.listdir(dir_path): # Check if directory is empty
|
|
await asyncio.to_thread(os.rmdir, dir_path)
|
|
except Exception as e:
|
|
errors.append(f"Error removing directory {name}: {str(e)}")
|
|
|
|
if errors:
|
|
raise FileCleanupError("\n".join(errors))
|
|
|
|
except FileCleanupError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error during cleanup of {download_path}: {e}")
|
|
raise FileCleanupError(f"Cleanup failed: {str(e)}")
|