feat: Implement initial schema and add various API, service, and management command enhancements across the application.

This commit is contained in:
pacnpal
2026-01-01 15:13:01 -05:00
parent c95f99ca10
commit b243b17af7
413 changed files with 11164 additions and 17433 deletions

View File

@@ -16,36 +16,25 @@ from django_fsm_log.models import StateLog
class Command(BaseCommand):
help = 'Analyze state transition patterns and generate statistics'
help = "Analyze state transition patterns and generate statistics"
def add_arguments(self, parser):
parser.add_argument("--days", type=int, default=30, help="Number of days to analyze (default: 30)")
parser.add_argument("--model", type=str, help="Specific model to analyze (e.g., editsubmission)")
parser.add_argument(
'--days',
type=int,
default=30,
help='Number of days to analyze (default: 30)'
)
parser.add_argument(
'--model',
"--output",
type=str,
help='Specific model to analyze (e.g., editsubmission)'
)
parser.add_argument(
'--output',
type=str,
choices=['console', 'json', 'csv'],
default='console',
help='Output format (default: console)'
choices=["console", "json", "csv"],
default="console",
help="Output format (default: console)",
)
def handle(self, *args, **options):
days = options['days']
model_filter = options['model']
output_format = options['output']
days = options["days"]
model_filter = options["model"]
output_format = options["output"]
self.stdout.write(
self.style.SUCCESS(f'\n=== State Transition Analysis (Last {days} days) ===\n')
)
self.stdout.write(self.style.SUCCESS(f"\n=== State Transition Analysis (Last {days} days) ===\n"))
# Filter by date range
start_date = timezone.now() - timedelta(days=days)
@@ -56,173 +45,134 @@ class Command(BaseCommand):
try:
content_type = ContentType.objects.get(model=model_filter.lower())
queryset = queryset.filter(content_type=content_type)
self.stdout.write(f'Filtering for model: {model_filter}\n')
self.stdout.write(f"Filtering for model: {model_filter}\n")
except ContentType.DoesNotExist:
self.stdout.write(
self.style.ERROR(f'Model "{model_filter}" not found')
)
self.stdout.write(self.style.ERROR(f'Model "{model_filter}" not found'))
return
# Total transitions
total_transitions = queryset.count()
self.stdout.write(
self.style.SUCCESS(f'Total Transitions: {total_transitions}\n')
)
self.stdout.write(self.style.SUCCESS(f"Total Transitions: {total_transitions}\n"))
if total_transitions == 0:
self.stdout.write(
self.style.WARNING('No transitions found in the specified period.')
)
self.stdout.write(self.style.WARNING("No transitions found in the specified period."))
return
# Most common transitions
self.stdout.write(self.style.SUCCESS('\n--- Most Common Transitions ---'))
self.stdout.write(self.style.SUCCESS("\n--- Most Common Transitions ---"))
common_transitions = (
queryset.values('transition', 'content_type__model')
.annotate(count=Count('id'))
.order_by('-count')[:10]
queryset.values("transition", "content_type__model").annotate(count=Count("id")).order_by("-count")[:10]
)
for t in common_transitions:
model_name = t['content_type__model']
transition_name = t['transition'] or 'N/A'
count = t['count']
model_name = t["content_type__model"]
transition_name = t["transition"] or "N/A"
count = t["count"]
percentage = (count / total_transitions) * 100
self.stdout.write(
f" {model_name}.{transition_name}: {count} ({percentage:.1f}%)"
)
self.stdout.write(f" {model_name}.{transition_name}: {count} ({percentage:.1f}%)")
# Transitions by model
self.stdout.write(self.style.SUCCESS('\n--- Transitions by Model ---'))
by_model = (
queryset.values('content_type__model')
.annotate(count=Count('id'))
.order_by('-count')
)
self.stdout.write(self.style.SUCCESS("\n--- Transitions by Model ---"))
by_model = queryset.values("content_type__model").annotate(count=Count("id")).order_by("-count")
for m in by_model:
model_name = m['content_type__model']
count = m['count']
model_name = m["content_type__model"]
count = m["count"]
percentage = (count / total_transitions) * 100
self.stdout.write(
f" {model_name}: {count} ({percentage:.1f}%)"
)
self.stdout.write(f" {model_name}: {count} ({percentage:.1f}%)")
# Transitions by state
self.stdout.write(self.style.SUCCESS('\n--- Final States Distribution ---'))
by_state = (
queryset.values('state')
.annotate(count=Count('id'))
.order_by('-count')
)
self.stdout.write(self.style.SUCCESS("\n--- Final States Distribution ---"))
by_state = queryset.values("state").annotate(count=Count("id")).order_by("-count")
for s in by_state:
state_name = s['state']
count = s['count']
state_name = s["state"]
count = s["count"]
percentage = (count / total_transitions) * 100
self.stdout.write(
f" {state_name}: {count} ({percentage:.1f}%)"
)
self.stdout.write(f" {state_name}: {count} ({percentage:.1f}%)")
# Most active users
self.stdout.write(self.style.SUCCESS('\n--- Most Active Users ---'))
self.stdout.write(self.style.SUCCESS("\n--- Most Active Users ---"))
active_users = (
queryset.exclude(by__isnull=True)
.values('by__username', 'by__id')
.annotate(count=Count('id'))
.order_by('-count')[:10]
.values("by__username", "by__id")
.annotate(count=Count("id"))
.order_by("-count")[:10]
)
for u in active_users:
username = u['by__username']
user_id = u['by__id']
count = u['count']
self.stdout.write(
f" {username} (ID: {user_id}): {count} transitions"
)
username = u["by__username"]
user_id = u["by__id"]
count = u["count"]
self.stdout.write(f" {username} (ID: {user_id}): {count} transitions")
# System vs User transitions
system_count = queryset.filter(by__isnull=True).count()
user_count = queryset.exclude(by__isnull=True).count()
self.stdout.write(self.style.SUCCESS('\n--- Transition Attribution ---'))
self.stdout.write(self.style.SUCCESS("\n--- Transition Attribution ---"))
self.stdout.write(f" User-initiated: {user_count} ({(user_count/total_transitions)*100:.1f}%)")
self.stdout.write(f" System-initiated: {system_count} ({(system_count/total_transitions)*100:.1f}%)")
# Daily transition volume
# Security: Using Django ORM functions instead of raw SQL .extra() to prevent SQL injection
self.stdout.write(self.style.SUCCESS('\n--- Daily Transition Volume ---'))
self.stdout.write(self.style.SUCCESS("\n--- Daily Transition Volume ---"))
daily_stats = (
queryset.annotate(day=TruncDate('timestamp'))
.values('day')
.annotate(count=Count('id'))
.order_by('-day')[:7]
queryset.annotate(day=TruncDate("timestamp")).values("day").annotate(count=Count("id")).order_by("-day")[:7]
)
for day in daily_stats:
date = day['day']
count = day['count']
date = day["day"]
count = day["count"]
self.stdout.write(f" {date}: {count} transitions")
# Busiest hours
# Security: Using Django ORM functions instead of raw SQL .extra() to prevent SQL injection
self.stdout.write(self.style.SUCCESS('\n--- Busiest Hours (UTC) ---'))
self.stdout.write(self.style.SUCCESS("\n--- Busiest Hours (UTC) ---"))
hourly_stats = (
queryset.annotate(hour=ExtractHour('timestamp'))
.values('hour')
.annotate(count=Count('id'))
.order_by('-count')[:5]
queryset.annotate(hour=ExtractHour("timestamp"))
.values("hour")
.annotate(count=Count("id"))
.order_by("-count")[:5]
)
for hour in hourly_stats:
hour_val = int(hour['hour'])
count = hour['count']
hour_val = int(hour["hour"])
count = hour["count"]
self.stdout.write(f" Hour {hour_val:02d}:00: {count} transitions")
# Transition patterns (common sequences)
self.stdout.write(self.style.SUCCESS('\n--- Common Transition Patterns ---'))
self.stdout.write(' Analyzing transition sequences...')
self.stdout.write(self.style.SUCCESS("\n--- Common Transition Patterns ---"))
self.stdout.write(" Analyzing transition sequences...")
# Get recent objects and their transition sequences
recent_objects = (
queryset.values('content_type', 'object_id')
.distinct()[:100]
)
recent_objects = queryset.values("content_type", "object_id").distinct()[:100]
pattern_counts = {}
for obj in recent_objects:
transitions = list(
StateLog.objects.filter(
content_type=obj['content_type'],
object_id=obj['object_id']
)
.order_by('timestamp')
.values_list('transition', flat=True)
StateLog.objects.filter(content_type=obj["content_type"], object_id=obj["object_id"])
.order_by("timestamp")
.values_list("transition", flat=True)
)
# Create pattern from consecutive transitions
if len(transitions) >= 2:
pattern = ''.join([t or 'N/A' for t in transitions[:3]])
pattern = "".join([t or "N/A" for t in transitions[:3]])
pattern_counts[pattern] = pattern_counts.get(pattern, 0) + 1
# Display top patterns
sorted_patterns = sorted(
pattern_counts.items(),
key=lambda x: x[1],
reverse=True
)[:5]
sorted_patterns = sorted(pattern_counts.items(), key=lambda x: x[1], reverse=True)[:5]
for pattern, count in sorted_patterns:
self.stdout.write(f" {pattern}: {count} occurrences")
self.stdout.write(
self.style.SUCCESS('\n=== Analysis Complete ===\n')
)
self.stdout.write(self.style.SUCCESS("\n=== Analysis Complete ===\n"))
# Export options
if output_format == 'json':
if output_format == "json":
self._export_json(queryset, days)
elif output_format == 'csv':
elif output_format == "csv":
self._export_csv(queryset, days)
def _export_json(self, queryset, days):
@@ -231,24 +181,21 @@ class Command(BaseCommand):
from datetime import datetime
data = {
'analysis_date': datetime.now().isoformat(),
'period_days': days,
'total_transitions': queryset.count(),
'transitions': list(
"analysis_date": datetime.now().isoformat(),
"period_days": days,
"total_transitions": queryset.count(),
"transitions": list(
queryset.values(
'id', 'timestamp', 'state', 'transition',
'content_type__model', 'object_id', 'by__username'
"id", "timestamp", "state", "transition", "content_type__model", "object_id", "by__username"
)
)
),
}
filename = f'transition_analysis_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
with open(filename, 'w') as f:
with open(filename, "w") as f:
json.dump(data, f, indent=2, default=str)
self.stdout.write(
self.style.SUCCESS(f'Exported to {filename}')
)
self.stdout.write(self.style.SUCCESS(f"Exported to {filename}"))
def _export_csv(self, queryset, days):
"""Export analysis results as CSV."""
@@ -257,24 +204,21 @@ class Command(BaseCommand):
filename = f'transition_analysis_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
with open(filename, 'w', newline='') as f:
with open(filename, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
'ID', 'Timestamp', 'Model', 'Object ID',
'State', 'Transition', 'User'
])
writer.writerow(["ID", "Timestamp", "Model", "Object ID", "State", "Transition", "User"])
for log in queryset.select_related('content_type', 'by'):
writer.writerow([
log.id,
log.timestamp,
log.content_type.model,
log.object_id,
log.state,
log.transition or 'N/A',
log.by.username if log.by else 'System'
])
for log in queryset.select_related("content_type", "by"):
writer.writerow(
[
log.id,
log.timestamp,
log.content_type.model,
log.object_id,
log.state,
log.transition or "N/A",
log.by.username if log.by else "System",
]
)
self.stdout.write(
self.style.SUCCESS(f'Exported to {filename}')
)
self.stdout.write(self.style.SUCCESS(f"Exported to {filename}"))

View File

@@ -17,9 +17,7 @@ class Command(BaseCommand):
def handle(self, *args, **kwargs):
# Ensure we have a test user
user, created = User.objects.get_or_create(
username="test_user", email="test@example.com"
)
user, created = User.objects.get_or_create(username="test_user", email="test@example.com")
if created:
user.set_password("testpass123")
user.save()
@@ -215,9 +213,7 @@ class Command(BaseCommand):
"audio system, and increased capacity due to improved loading efficiency."
),
source=(
"Park operations manual\n"
"Maintenance records\n"
"Personal observation and timing of new ride cycle"
"Park operations manual\n" "Maintenance records\n" "Personal observation and timing of new ride cycle"
),
status="PENDING",
)
@@ -225,10 +221,10 @@ class Command(BaseCommand):
# Create PhotoSubmissions with detailed captions
# Park photo submission
image_data = b"GIF87a\x01\x00\x01\x00\x80\x01\x00\x00\x00\x00ccc,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;"
dummy_image = SimpleUploadedFile(
"park_entrance.gif", image_data, content_type="image/gif"
image_data = (
b"GIF87a\x01\x00\x01\x00\x80\x01\x00\x00\x00\x00ccc,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;"
)
dummy_image = SimpleUploadedFile("park_entrance.gif", image_data, content_type="image/gif")
PhotoSubmission.objects.create(
user=user,
@@ -244,9 +240,7 @@ class Command(BaseCommand):
)
# Ride photo submission
dummy_image2 = SimpleUploadedFile(
"coaster_track.gif", image_data, content_type="image/gif"
)
dummy_image2 = SimpleUploadedFile("coaster_track.gif", image_data, content_type="image/gif")
PhotoSubmission.objects.create(
user=user,
content_type=ride_ct,

View File

@@ -1,4 +1,5 @@
"""Management command to validate state machine configurations for moderation models."""
from django.core.management import CommandError
from django.core.management.base import BaseCommand
@@ -76,18 +77,15 @@ class Command(BaseCommand):
model_key = model_name.lower()
if model_key not in models_to_validate:
raise CommandError(
f"Unknown model: {model_name}. "
f"Valid options: {', '.join(models_to_validate.keys())}"
f"Unknown model: {model_name}. " f"Valid options: {', '.join(models_to_validate.keys())}"
)
models_to_validate = {model_key: models_to_validate[model_key]}
self.stdout.write(
self.style.SUCCESS("\nValidating State Machine Configurations\n")
)
self.stdout.write(self.style.SUCCESS("\nValidating State Machine Configurations\n"))
self.stdout.write("=" * 60 + "\n")
all_valid = True
for model_key, (
for _model_key, (
model_class,
choice_group,
domain,
@@ -101,61 +99,34 @@ class Command(BaseCommand):
result = validator.validate_choice_group()
if result.is_valid:
self.stdout.write(
self.style.SUCCESS(
f"{model_class.__name__} validation passed"
)
)
self.stdout.write(self.style.SUCCESS(f"{model_class.__name__} validation passed"))
if verbose:
self._show_transition_graph(choice_group, domain)
else:
all_valid = False
self.stdout.write(
self.style.ERROR(
f"{model_class.__name__} validation failed"
)
)
self.stdout.write(self.style.ERROR(f"{model_class.__name__} validation failed"))
for error in result.errors:
self.stdout.write(
self.style.ERROR(f" - {error.message}")
)
self.stdout.write(self.style.ERROR(f" - {error.message}"))
# Check FSM field
if not self._check_fsm_field(model_class):
all_valid = False
self.stdout.write(
self.style.ERROR(
f" - FSM field 'status' not found on "
f"{model_class.__name__}"
)
)
self.stdout.write(self.style.ERROR(f" - FSM field 'status' not found on " f"{model_class.__name__}"))
# Check mixin
if not self._check_state_machine_mixin(model_class):
all_valid = False
self.stdout.write(
self.style.WARNING(
f" - StateMachineMixin not found on "
f"{model_class.__name__}"
)
self.style.WARNING(f" - StateMachineMixin not found on " f"{model_class.__name__}")
)
self.stdout.write("\n" + "=" * 60)
if all_valid:
self.stdout.write(
self.style.SUCCESS(
"\n✓ All validations passed successfully!\n"
)
)
self.stdout.write(self.style.SUCCESS("\n✓ All validations passed successfully!\n"))
else:
self.stdout.write(
self.style.ERROR(
"\n✗ Some validations failed. "
"Please review the errors above.\n"
)
)
self.stdout.write(self.style.ERROR("\n✗ Some validations failed. " "Please review the errors above.\n"))
raise CommandError("State machine validation failed")
def _check_fsm_field(self, model_class):
@@ -177,9 +148,7 @@ class Command(BaseCommand):
self.stdout.write("\n Transition Graph:")
graph = registry_instance.export_transition_graph(
choice_group, domain
)
graph = registry_instance.export_transition_graph(choice_group, domain)
for source, targets in sorted(graph.items()):
if targets: