Identified cyclic dependency between processor and utils packages

Created new shared package with centralized progress tracking:

videoarchiver/shared/progress.py
videoarchiver/shared/init.py
Refactored compression_manager.py to use shared progress module instead of importing from processor

Architecture improvements:

Better separation of concerns
Eliminated cyclic dependencies
Centralized progress tracking
Improved maintainability
This commit is contained in:
pacnpal
2024-11-17 21:40:50 +00:00
parent e8e5e5804d
commit d9fc0f360f
4 changed files with 118 additions and 45 deletions

View File

@@ -2,37 +2,34 @@
## Current Focus
Completed investigation of cyclic dependencies in the videoarchiver module, particularly in the processor directory.
Cyclic dependency between processor and utils packages has been resolved
## Active Files
## Changes Made
- videoarchiver/processor/core.py
- videoarchiver/processor/message_handler.py
- videoarchiver/processor/queue_handler.py
- videoarchiver/processor/cleanup_manager.py
1. Created new shared module for progress tracking:
- Created videoarchiver/shared/progress.py
- Created videoarchiver/shared/__init__.py
- Implemented centralized progress tracking functionality
## Recent Changes
2. Updated dependencies:
- Removed processor import from compression_manager.py
- Updated compression_manager.py to use shared.progress
- Verified no remaining circular imports
Analysis completed:
## Architecture Improvements
- Identified and documented dependency patterns
- Verified TYPE_CHECKING usage
- Confirmed effective circular dependency management
- Better separation of concerns with shared functionality in dedicated module
- Eliminated cyclic dependencies between packages
- Centralized progress tracking for better maintainability
## Current Status
- ✅ Cyclic dependency resolved
- ✅ Code structure improved
- ✅ No remaining circular imports
- ✅ Functionality maintained
## Next Steps
1. ✓ Analyzed imports in processor directory
2. ✓ Mapped dependencies between components
3. ✓ Identified circular import patterns
4. ✓ Documented findings and recommendations
## Conclusion
The codebase effectively manages potential circular dependencies through:
1. Strategic use of TYPE_CHECKING
2. Late initialization
3. Forward references
4. Clear component boundaries
No immediate refactoring needed as current implementation follows best practices.
- Monitor for any new cyclic dependencies
- Consider moving other shared functionality to the shared package if needed

View File

@@ -0,0 +1,31 @@
"""Shared functionality for the videoarchiver package"""
from .progress import (
compression_progress,
download_progress,
processing_progress,
get_compression_progress,
update_compression_progress,
clear_compression_progress,
get_download_progress,
update_download_progress,
clear_download_progress,
get_processing_progress,
update_processing_progress,
clear_processing_progress,
)
__all__ = [
'compression_progress',
'download_progress',
'processing_progress',
'get_compression_progress',
'update_compression_progress',
'clear_compression_progress',
'get_download_progress',
'update_download_progress',
'clear_download_progress',
'get_processing_progress',
'update_processing_progress',
'clear_processing_progress',
]

View File

@@ -0,0 +1,53 @@
"""Shared progress tracking functionality"""
from typing import Dict, Any
# Global progress tracking
compression_progress: Dict[str, Dict[str, Any]] = {}
download_progress: Dict[str, Dict[str, Any]] = {}
processing_progress: Dict[str, Dict[str, Any]] = {}
def get_compression_progress(file_id: str) -> Dict[str, Any]:
"""Get compression progress for a file"""
return compression_progress.get(file_id, {})
def update_compression_progress(file_id: str, progress_data: Dict[str, Any]) -> None:
"""Update compression progress for a file"""
if file_id in compression_progress:
compression_progress[file_id].update(progress_data)
else:
compression_progress[file_id] = progress_data
def clear_compression_progress(file_id: str) -> None:
"""Clear compression progress for a file"""
compression_progress.pop(file_id, None)
def get_download_progress(url: str) -> Dict[str, Any]:
"""Get download progress for a URL"""
return download_progress.get(url, {})
def update_download_progress(url: str, progress_data: Dict[str, Any]) -> None:
"""Update download progress for a URL"""
if url in download_progress:
download_progress[url].update(progress_data)
else:
download_progress[url] = progress_data
def clear_download_progress(url: str) -> None:
"""Clear download progress for a URL"""
download_progress.pop(url, None)
def get_processing_progress(item_id: str) -> Dict[str, Any]:
"""Get processing progress for an item"""
return processing_progress.get(item_id, {})
def update_processing_progress(item_id: str, progress_data: Dict[str, Any]) -> None:
"""Update processing progress for an item"""
if item_id in processing_progress:
processing_progress[item_id].update(progress_data)
else:
processing_progress[item_id] = progress_data
def clear_processing_progress(item_id: str) -> None:
"""Clear processing progress for an item"""
processing_progress.pop(item_id, None)

View File

@@ -7,7 +7,7 @@ import subprocess
from datetime import datetime
from typing import Dict, Any, Optional, Callable, List, Set, Tuple
from ..processor import _compression_progress
from ..shared.progress import update_compression_progress
from ..utils.compression_handler import CompressionHandler
from ..utils.progress_handler import ProgressHandler
from ..utils.file_operations import FileOperations
@@ -266,7 +266,7 @@ class CompressionManager:
duration: float,
) -> None:
"""Initialize compression progress tracking"""
_compression_progress[input_file] = {
progress_data = {
"active": True,
"filename": os.path.basename(input_file),
"start_time": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
@@ -285,6 +285,7 @@ class CompressionManager:
"audio_bitrate": params.get("b:a", "unknown"),
"last_update": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
}
update_compression_progress(input_file, progress_data)
async def _update_progress(
self,
@@ -301,23 +302,14 @@ class CompressionManager:
if duration > 0:
progress = min(100, (current_time / duration) * 100)
if input_file in _compression_progress:
elapsed = datetime.utcnow() - start_time
_compression_progress[input_file].update(
{
"percent": progress,
"elapsed_time": str(elapsed).split(".")[0],
"current_size": (
os.path.getsize(output_file)
if os.path.exists(output_file)
else 0
),
"current_time": current_time,
"last_update": datetime.utcnow().strftime(
"%Y-%m-%d %H:%M:%S"
),
}
)
progress_data = {
"percent": progress,
"elapsed_time": str(datetime.utcnow() - start_time).split(".")[0],
"current_size": os.path.getsize(output_file) if os.path.exists(output_file) else 0,
"current_time": current_time,
"last_update": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
}
update_compression_progress(input_file, progress_data)
if progress_callback:
progress_callback(progress)