from django.core.files.storage import FileSystemStorage from django.conf import settings from django.core.files.base import File from django.core.files.move import file_move_safe from django.core.files.uploadedfile import UploadedFile, TemporaryUploadedFile import os import re from typing import Optional, Any, Union class MediaStorage(FileSystemStorage): _instance = None _counters = {} def __init__(self, *args: Any, **kwargs: Any) -> None: kwargs['location'] = settings.MEDIA_ROOT kwargs['base_url'] = settings.MEDIA_URL super().__init__(*args, **kwargs) @classmethod def reset_counters(cls): """Reset all counters - useful for testing""" cls._counters = {} def get_available_name(self, name: str, max_length: Optional[int] = None) -> str: """ Returns a filename that's free on the target storage system. Ensures proper normalization and uniqueness. """ # Get the directory and filename directory = os.path.dirname(name) filename = os.path.basename(name) # Create directory if it doesn't exist full_dir = os.path.join(self.location, directory) os.makedirs(full_dir, exist_ok=True) # Split filename into root and extension file_root, file_ext = os.path.splitext(filename) # Extract base name without any existing numbers base_root = file_root.rsplit('_', 1)[0] # Use counter for this directory dir_key = os.path.join(directory, base_root) if dir_key not in self._counters: self._counters[dir_key] = 0 self._counters[dir_key] += 1 counter = self._counters[dir_key] new_name = f"{base_root}_{counter}{file_ext}" return os.path.join(directory, new_name) def _save(self, name: str, content: Union[File, UploadedFile]) -> str: """ Save the file and set proper permissions """ # Get the full path where the file will be saved full_path = self.path(name) directory = os.path.dirname(full_path) # Create the directory if it doesn't exist os.makedirs(directory, exist_ok=True) # Save the file using Django's file handling if isinstance(content, TemporaryUploadedFile): # This is a TemporaryUploadedFile file_move_safe(content.temporary_file_path(), full_path) else: # This is an InMemoryUploadedFile or similar with open(full_path, 'wb') as destination: if hasattr(content, 'chunks'): for chunk in content.chunks(): destination.write(chunk) else: destination.write(content.read()) # Set proper permissions os.chmod(full_path, 0o644) os.chmod(directory, 0o755) return name